diff --git a/ingestion/edgar_poller.py b/ingestion/edgar_poller.py index 29eab44..eae2078 100644 --- a/ingestion/edgar_poller.py +++ b/ingestion/edgar_poller.py @@ -34,6 +34,10 @@ def _get_filing_urls() -> list[tuple[str, str, str]]: ns = {"atom": "http://www.w3.org/2005/Atom"} results = [] for entry in root.findall("atom:entry", ns): + title = entry.findtext("atom:title", namespaces=ns) or "" + form_type = title.split(" - ")[0].strip() + if form_type not in ("4", "4/A"): + continue link = entry.find("atom:link", ns) if link is None: continue @@ -59,7 +63,11 @@ def _resolve_xml_url(accession: str) -> Optional[str]: doc = html.fromstring(resp.content) for link in doc.cssselect("table.tableFile a[href]"): href = link.get("href", "") - if href.lower().endswith(".xml") and not href.lower().endswith("-index.htm"): + if ( + href.lower().endswith(".xml") + and not href.lower().endswith("-index.htm") + and "xslF345X06" not in href + ): return config.EDGAR_BASE_URL + href if href.startswith("/") else base + href except Exception as e: logger.debug(f"Could not resolve XML URL for {accession}: {e}") diff --git a/ingestion/efts_ingest.py b/ingestion/efts_ingest.py new file mode 100644 index 0000000..364cf9f --- /dev/null +++ b/ingestion/efts_ingest.py @@ -0,0 +1,218 @@ +""" +Bulk ingest using EDGAR full-text search (EFTS) API. + +The EFTS API returns the XML filename in the _id field, avoiding the extra +index-page fetch. A global token-bucket rate limiter keeps total throughput +under SEC's 10 req/s limit across all threads. +""" + +import logging +import threading +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Optional + +import requests + +import config +from db.db import accession_exists, insert_filing +from ingestion.edgar_poller import HEADERS, _save_raw_xml +from ingestion.form4_parser import parse_form4 + +logger = logging.getLogger(__name__) + +EFTS_URL = "https://efts.sec.gov/LATEST/search-index" +_BATCH_SIZE = 100 +_WORKERS = 1 # sequential avoids triggering SEC 500 rate-limit responses +_MAX_RATE = 5.0 # conservative; SEC allows 10 but concurrent bursts get throttled +_EFTS_MAX_OFFSET = 9900 # Elasticsearch hard ceiling + + +# ---------- global token-bucket rate limiter ---------- + +class _RateLimiter: + def __init__(self, rate: float): + self._rate = rate + self._tokens = rate + self._last = time.monotonic() + self._lock = threading.Lock() + + def acquire(self): + with self._lock: + now = time.monotonic() + elapsed = now - self._last + self._tokens = min(self._rate, self._tokens + elapsed * self._rate) + self._last = now + if self._tokens < 1: + sleep_for = (1 - self._tokens) / self._rate + else: + sleep_for = 0 + self._tokens -= 1 + if sleep_for > 0: + time.sleep(sleep_for) + + +_limiter = _RateLimiter(_MAX_RATE) + + +def _get(url: str, params: dict = None, retries: int = 4) -> requests.Response: + """Rate-limited GET with exponential backoff on 429/5xx.""" + delay = 2.0 + for attempt in range(retries): + _limiter.acquire() + try: + resp = requests.get(url, params=params, headers=HEADERS, timeout=25) + if resp.status_code in (429, 500, 502, 503): + raise requests.HTTPError(f"HTTP {resp.status_code}", response=resp) + resp.raise_for_status() + return resp + except Exception as e: + if attempt == retries - 1: + raise + wait = delay * (2 ** attempt) + logger.debug(f"Retry {attempt+1} after {wait:.1f}s: {e}") + time.sleep(wait) + + +# ---------- EFTS pagination ---------- + +def _efts_page(start_date: str, end_date: str, offset: int) -> list[dict]: + params = { + "q": "", "forms": "4", + "dateRange": "custom", + "startdt": start_date, "enddt": end_date, + "from": offset, "size": _BATCH_SIZE, + } + return _get(EFTS_URL, params=params).json()["hits"]["hits"] + + +def _collect_metadata(start_date: str, end_date: str) -> list[tuple[str, str, str]]: + """Return (accession, filename, filed_date) for all Form 4s in range.""" + params = { + "q": "", "forms": "4", + "dateRange": "custom", + "startdt": start_date, "enddt": end_date, + "from": 0, "size": _BATCH_SIZE, + } + data = _get(EFTS_URL, params=params).json() + total = data["hits"]["total"]["value"] + cap = min(total, _EFTS_MAX_OFFSET + _BATCH_SIZE) + logger.info(f" EFTS reports {total} filings; fetching up to {cap}") + + hits = list(data["hits"]["hits"]) + offset = _BATCH_SIZE + consecutive_fails = 0 + + while len(hits) < cap and offset <= _EFTS_MAX_OFFSET: + try: + batch = _efts_page(start_date, end_date, offset) + if not batch: + break + hits.extend(batch) + consecutive_fails = 0 + except Exception as e: + consecutive_fails += 1 + logger.debug(f"EFTS offset {offset} failed ({consecutive_fails}): {e}") + if consecutive_fails >= 3: + break + offset += _BATCH_SIZE + + seen: set[str] = set() + tasks: list[tuple[str, str, str]] = [] + for hit in hits: + src = hit.get("_source", {}) + acc = src.get("adsh", "") + filed_date = src.get("file_date", "") + hit_id = hit.get("_id", "") + filename = hit_id.split(":", 1)[1] if ":" in hit_id else "" + if not acc or not filename or acc in seen: + continue + seen.add(acc) + tasks.append((acc, filename, filed_date)) + + return tasks + + +# ---------- XML fetch + store ---------- + +def _xml_url(accession: str, filename: str) -> str: + path = accession.replace("-", "") + cik = path[:10].lstrip("0") + base = f"{config.EDGAR_BASE_URL}/Archives/edgar/data/{cik}/{path}/" + return base + filename + + +def _fetch_and_store(accession: str, filename: str, filed_date: str) -> int: + if accession_exists(accession): + return 0 + if not filename.lower().endswith(".xml"): + return 0 + if "xslF345" in filename: + return 0 + + url = _xml_url(accession, filename) + try: + xml_bytes = _get(url).content + except Exception as e: + logger.debug(f"Skip {accession}: {e}") + return 0 + + _save_raw_xml(accession, xml_bytes) + parsed = parse_form4(xml_bytes, accession, filed_date) + return sum(1 for f in parsed if insert_filing(f)) + + +# ---------- public API ---------- + +def ingest_date_range( + start_date: str, + end_date: str, + limit: Optional[int] = None, +) -> int: + logger.info(f"Ingesting {start_date} → {end_date}") + tasks = _collect_metadata(start_date, end_date) + if limit: + tasks = tasks[:limit] + logger.info(f" {len(tasks)} unique filings to fetch") + + total_stored = 0 + done = 0 + with ThreadPoolExecutor(max_workers=_WORKERS) as pool: + futures = { + pool.submit(_fetch_and_store, acc, fn, dt): acc + for acc, fn, dt in tasks + } + for future in as_completed(futures): + try: + total_stored += future.result() + except Exception as e: + logger.debug(f"Worker error: {e}") + done += 1 + if done % 500 == 0: + logger.info(f" {done}/{len(tasks)} fetched, {total_stored} rows stored") + + logger.info(f" Done: {total_stored} rows stored") + return total_stored + + +def ingest_years(start_year: int, end_year: int) -> int: + """ + Ingest all Form 4 filings for start_year..end_year. + Uses daily chunks (~960 filings/day) so each EFTS query stays well under + the ~4500-result offset limit, capturing every filing with no cap. + """ + from datetime import date, timedelta + + total = 0 + current = date(start_year, 1, 1) + end = date(end_year, 12, 31) + day = timedelta(days=1) + + while current <= end: + ds = current.strftime("%Y-%m-%d") + stored = ingest_date_range(ds, ds) + total += stored + logger.info(f"{ds} done — cumulative: {total} rows") + current += day + + return total diff --git a/ingestion/form4_parser.py b/ingestion/form4_parser.py index ad0b02b..33fc053 100644 --- a/ingestion/form4_parser.py +++ b/ingestion/form4_parser.py @@ -18,8 +18,13 @@ def _is_10b51(text: str) -> bool: def _text(el, tag: str) -> Optional[str]: node = el.find(".//" + tag) - if node is not None and node.text: + if node is None: + return None + if node.text and node.text.strip(): return node.text.strip() + value_node = node.find("value") + if value_node is not None and value_node.text and value_node.text.strip(): + return value_node.text.strip() return None @@ -42,7 +47,18 @@ def parse_form4(xml_bytes: bytes, accession_number: str, filed_date: str) -> lis ticker = _text(root, "issuerTradingSymbol") or "" cik = _text(root, "issuerCik") or "" insider_name = _text(root, "rptOwnerName") or "" - role = _text(root, "officerTitle") or _text(root, "isDirector") or "" + + officer_title = _text(root, "officerTitle") or "" + if officer_title: + role = officer_title + elif _text(root, "isDirector") == "1": + role = "Director" + elif _text(root, "isTenPercentOwner") == "1": + role = "10% owner" + elif _text(root, "isOfficer") == "1": + role = "Officer" + else: + role = "" footnotes_text = " ".join( (node.text or "") for node in root.findall(".//footnote") @@ -57,6 +73,8 @@ def parse_form4(xml_bytes: bytes, accession_number: str, filed_date: str) -> lis if not flag: continue + tx_code = _text(tx, "transactionCode") or "" + shares = _float(tx, "transactionShares") price = _float(tx, "transactionPricePerShare") total_value = _float(tx, "transactionTotalValue") @@ -69,7 +87,8 @@ def parse_form4(xml_bytes: bytes, accession_number: str, filed_date: str) -> lis fn.get("id", "") for fn in tx.findall(".//footnoteId") ] tx_footnote_text = " ".join( - (root.find(f".//footnote[@id='{fid}']") or etree.Element("x")).text or "" + (root.find(f".//footnote[@id='{fid}']") is not None + and root.find(f".//footnote[@id='{fid}']").text or "") for fid in tx_footnote_ids ) is_10b51 = int(global_10b51 or _is_10b51(tx_footnote_text)) @@ -87,6 +106,7 @@ def parse_form4(xml_bytes: bytes, accession_number: str, filed_date: str) -> lis "price": price, "total_value": total_value, "flag": flag.upper(), + "tx_code": tx_code.upper(), "is_10b51": is_10b51, "post_tx_shares": post_tx_shares, } diff --git a/ingestion/historical_ingest.py b/ingestion/historical_ingest.py new file mode 100644 index 0000000..7f71e8e --- /dev/null +++ b/ingestion/historical_ingest.py @@ -0,0 +1,136 @@ +""" +Bulk-ingest historical Form 4 filings from SEC quarterly full-index files. + +Usage via main.py: + python main.py ingest-history --year 2024 --quarter 4 --limit 500 + python main.py ingest-history --year 2025 --quarter 1 # all ~118k filings +""" + +import logging +import time +from typing import Optional + +import requests +from lxml import html + +import config +from db.db import accession_exists, insert_filing +from ingestion.edgar_poller import HEADERS, _fetch, _save_raw_xml +from ingestion.form4_parser import parse_form4 + +logger = logging.getLogger(__name__) + +FULL_INDEX_URL = "https://www.sec.gov/Archives/edgar/full-index/{year}/QTR{quarter}/form.idx" + + +def _parse_form_idx(content: str) -> list[tuple[str, str, str]]: + """Return (cik, filed_date, filename) tuples for Form 4 entries.""" + results = [] + in_data = False + for line in content.splitlines(): + if line.startswith("---"): + in_data = True + continue + if not in_data: + continue + form_type = line[:12].strip() + if form_type not in ("4", "4/A"): + continue + # fixed-width: form(12) company(62) cik(12) date(12) filename(rest) + cik = line[74:86].strip() + filed_date = line[86:98].strip() + filename = line[98:].strip() + results.append((cik, filed_date, filename)) + return results + + +def _accession_from_filename(filename: str) -> str: + """edgar/data/123/0001234567-25-000001.txt -> 0001234567-25-000001""" + base = filename.rstrip().split("/")[-1] + return base.replace(".txt", "") + + +def _resolve_xml_from_index(cik: str, accession: str) -> Optional[str]: + accession_path = accession.replace("-", "") + base = f"{config.EDGAR_BASE_URL}/Archives/edgar/data/{cik}/{accession_path}/" + index_url = f"{base}{accession}-index.htm" + try: + resp = _fetch(index_url) + doc = html.fromstring(resp.content) + for link in doc.cssselect("table.tableFile a[href]"): + href = link.get("href", "") + if ( + href.lower().endswith(".xml") + and not href.lower().endswith("-index.htm") + and "xslF345X06" not in href + and "xslF345X05" not in href + ): + return config.EDGAR_BASE_URL + href if href.startswith("/") else base + href + except Exception as e: + logger.debug(f"Could not resolve XML for {accession}: {e}") + return None + + +def ingest_quarter(year: int, quarter: int, limit: Optional[int] = None, rate_limit: float = 0.15) -> int: + """ + Download and store Form 4 filings for a given quarter. + rate_limit: seconds between requests (SEC allows ~10 req/s; 0.15 is safe). + Returns count of new filings stored. + """ + url = FULL_INDEX_URL.format(year=year, quarter=quarter) + logger.info(f"Fetching index: {url}") + try: + resp = requests.get(url, headers=HEADERS, timeout=60) + resp.raise_for_status() + except Exception as e: + logger.error(f"Failed to fetch index: {e}") + return 0 + + entries = _parse_form_idx(resp.text) + logger.info(f"Found {len(entries)} Form 4 entries in {year}/QTR{quarter}") + + if limit: + entries = entries[:limit] + logger.info(f"Limited to {limit} entries") + + seen_accessions: set[str] = set() + stored = 0 + + for i, (cik, filed_date, filename) in enumerate(entries): + accession = _accession_from_filename(filename) + + if accession in seen_accessions: + continue + seen_accessions.add(accession) + + if accession_exists(accession): + continue + + xml_url = _resolve_xml_from_index(cik, accession) + if not xml_url: + logger.debug(f"No XML for {accession}") + time.sleep(rate_limit) + continue + + try: + xml_resp = _fetch(xml_url) + xml_bytes = xml_resp.content + except Exception as e: + logger.debug(f"Failed to fetch XML {accession}: {e}") + time.sleep(rate_limit) + continue + + _save_raw_xml(accession, xml_bytes) + parsed = parse_form4(xml_bytes, accession, filed_date) + + for filing in parsed: + if insert_filing(filing): + stored += 1 + + if (i + 1) % 50 == 0: + logger.info(f"Progress: {i+1}/{len(entries)} processed, {stored} stored") + + time.sleep(rate_limit) + + logger.info(f"Done: {stored} new filings stored from {year}/QTR{quarter}") + return stored diff --git a/ingestion/sec_bulk_ingest.py b/ingestion/sec_bulk_ingest.py new file mode 100644 index 0000000..de85f2e --- /dev/null +++ b/ingestion/sec_bulk_ingest.py @@ -0,0 +1,225 @@ +""" +Proper SEC EDGAR bulk ingest using quarterly form.idx files. + +Flow: + 1. Download form.idx for a quarter (one request, ~50 MB uncompressed) + 2. Filter to Form 4 / 4/A entries + 3. For each entry the index gives us the direct submission .txt URL + 4. Fetch .txt → parse SGML → extract embedded XML → parse Form 4 + 5. No index-page roundtrip needed; one HTTP request per filing. + +Rate: stays at 10 req/s using a persistent requests.Session for connection reuse. +""" + +import logging +import re +import threading +import time +from typing import Optional + +import requests + +from db.db import accession_exists, filter_new_accessions, insert_filing, mark_accession_seen +from ingestion.form4_parser import parse_form4 + +logger = logging.getLogger(__name__) + +FULL_INDEX_BASE = "https://www.sec.gov/Archives/edgar/full-index" +EDGAR_BASE = "https://www.sec.gov/Archives" + +HEADERS = { + "User-Agent": "smaug-insider-monitor mail@dominik-roth.eu", + "Accept-Encoding": "gzip, deflate", +} + +_RATE_INIT = 9.0 # starting req/s (SEC allows 10) +_RATE_MIN = 1.0 +_RATE_MAX = 9.0 + + +# ---------- adaptive token-bucket rate limiter ---------- + +class _AdaptiveRateLimiter: + """Token bucket that backs off on server errors and slowly recovers.""" + + def __init__(self, rate: float): + self._rate = rate + self._tokens = rate + self._last = time.monotonic() + self._lock = threading.Lock() + + def acquire(self): + with self._lock: + now = time.monotonic() + self._tokens = min(self._rate, self._tokens + (now - self._last) * self._rate) + self._last = now + wait = max(0.0, (1 - self._tokens) / self._rate) + self._tokens -= 1 + if wait: + time.sleep(wait) + + def on_success(self): + with self._lock: + self._rate = min(_RATE_MAX, self._rate * 1.02) # slow ramp-up + + def on_throttle(self): + with self._lock: + self._rate = max(_RATE_MIN, self._rate * 0.5) + logger.debug(f"Rate backed off to {self._rate:.1f} req/s") + + +_limiter = _AdaptiveRateLimiter(_RATE_INIT) + + +# ---------- low-level ---------- + +def _make_session() -> requests.Session: + s = requests.Session() + s.headers.update(HEADERS) + return s + + +def _get(session: requests.Session, url: str, retries: int = 4) -> requests.Response: + delay = 1.0 + for attempt in range(retries): + _limiter.acquire() + try: + resp = session.get(url, timeout=30) + if resp.status_code == 429 or resp.status_code >= 500: + _limiter.on_throttle() + raise requests.HTTPError(f"HTTP {resp.status_code}") + resp.raise_for_status() + _limiter.on_success() + return resp + except requests.HTTPError: + if attempt == retries - 1: + raise + wait = delay * (2 ** attempt) + logger.debug(f"Retry {attempt+1} in {wait:.0f}s") + time.sleep(wait) + except Exception as e: + if attempt == retries - 1: + raise + wait = delay * (2 ** attempt) + logger.debug(f"Retry {attempt+1} in {wait:.0f}s ({e})") + time.sleep(wait) + + +# ---------- form.idx parsing ---------- + +def _download_form_idx(session: requests.Session, year: int, quarter: int) -> str: + url = f"{FULL_INDEX_BASE}/{year}/QTR{quarter}/form.idx" + logger.info(f"Downloading {url}") + resp = _get(session, url) + return resp.text + + +_IDX_LINE = re.compile( + r"^(4|4/A)\s+.+?\s+\d+\s+(\d{4}-\d{2}-\d{2})\s+(edgar/data/\S+\.txt)", + re.IGNORECASE, +) + + +def _parse_form_idx(text: str) -> list[tuple[str, str, str]]: + """Return (accession, filed_date, txt_path) for all Form 4/4A entries.""" + results = [] + for line in text.splitlines(): + m = _IDX_LINE.match(line) + if not m: + continue + filed_date = m.group(2) + txt_path = m.group(3) + accession = txt_path.split("/")[-1].replace(".txt", "") + results.append((accession, filed_date, txt_path)) + return results + + +# ---------- SGML → XML extraction ---------- + +def _extract_xml(txt_content: str) -> Optional[bytes]: + """Pull ownershipDocument XML out of the SGML/XML submission wrapper.""" + end_tag = "" + end = txt_content.find(end_tag) + if end == -1: + return None + end += len(end_tag) + # Start from , whichever comes first + start_xml = txt_content.find("") + candidates = [i for i in (start_xml, start_doc) if i != -1] + if not candidates: + return None + start = min(candidates) + return txt_content[start:end].encode("utf-8", errors="replace") + + +# ---------- per-filing fetch + store ---------- + +def _process_one( + session: requests.Session, + accession: str, + filed_date: str, + txt_path: str, +) -> int: + if accession_exists(accession): + return 0 + + url = f"{EDGAR_BASE}/{txt_path}" + try: + resp = _get(session, url) + except Exception as e: + logger.debug(f"Skip {accession}: {e}") + return 0 + + xml_bytes = _extract_xml(resp.text) + if not xml_bytes: + mark_accession_seen(accession) + return 0 + + parsed = parse_form4(xml_bytes, accession, filed_date) + if not parsed: + mark_accession_seen(accession) + return 0 + return sum(1 for f in parsed if insert_filing(f)) + + +# ---------- public API ---------- + +def ingest_quarter(year: int, quarter: int, session: requests.Session = None) -> int: + """Ingest all Form 4 filings for one calendar quarter. Returns rows stored.""" + own_session = session is None + if own_session: + session = _make_session() + + idx_text = _download_form_idx(session, year, quarter) + all_entries = _parse_form_idx(idx_text) + logger.info(f" {len(all_entries)} Form 4 entries in {year}/Q{quarter}") + + accessions = [a for a, _, _ in all_entries] + new_accessions = filter_new_accessions(accessions) + entries = [(a, d, p) for a, d, p in all_entries if a in new_accessions] + logger.info(f" {len(entries)} not yet in DB") + + stored = 0 + for i, (accession, filed_date, txt_path) in enumerate(entries): + stored += _process_one(session, accession, filed_date, txt_path) + if (i + 1) % 1000 == 0: + logger.info(f" {i+1}/{len(entries)} processed, {stored} rows stored") + + logger.info(f" Quarter done: {stored} rows stored") + if own_session: + session.close() + return stored + + +def ingest_years(start_year: int, end_year: int) -> int: + """Ingest all Form 4 filings for start_year through end_year inclusive.""" + session = _make_session() + total = 0 + for year in range(start_year, end_year + 1): + for quarter in range(1, 5): + stored = ingest_quarter(year, quarter, session=session) + total += stored + logger.info(f"Cumulative after {year}/Q{quarter}: {total} rows") + session.close() + return total