""" Proper SEC EDGAR bulk ingest using quarterly form.idx files. Flow: 1. Download form.idx for a quarter (one request, ~50 MB uncompressed) 2. Filter to Form 4 / 4/A entries 3. For each entry the index gives us the direct submission .txt URL 4. Fetch .txt → parse SGML → extract embedded XML → parse Form 4 5. No index-page roundtrip needed; one HTTP request per filing. Rate: stays at 10 req/s using a persistent requests.Session for connection reuse. """ import logging import re import threading import time from typing import Optional import requests from db.db import accession_exists, filter_new_accessions, insert_filing, mark_accession_seen from ingestion.form4_parser import parse_form4 logger = logging.getLogger(__name__) FULL_INDEX_BASE = "https://www.sec.gov/Archives/edgar/full-index" EDGAR_BASE = "https://www.sec.gov/Archives" HEADERS = { "User-Agent": "smaug-insider-monitor mail@dominik-roth.eu", "Accept-Encoding": "gzip, deflate", } _RATE_INIT = 9.0 # starting req/s (SEC allows 10) _RATE_MIN = 1.0 _RATE_MAX = 9.0 # ---------- adaptive token-bucket rate limiter ---------- class _AdaptiveRateLimiter: """Token bucket that backs off on server errors and slowly recovers.""" def __init__(self, rate: float): self._rate = rate self._tokens = rate self._last = time.monotonic() self._lock = threading.Lock() def acquire(self): with self._lock: now = time.monotonic() self._tokens = min(self._rate, self._tokens + (now - self._last) * self._rate) self._last = now wait = max(0.0, (1 - self._tokens) / self._rate) self._tokens -= 1 if wait: time.sleep(wait) def on_success(self): with self._lock: self._rate = min(_RATE_MAX, self._rate * 1.02) # slow ramp-up def on_throttle(self): with self._lock: self._rate = max(_RATE_MIN, self._rate * 0.5) logger.debug(f"Rate backed off to {self._rate:.1f} req/s") _limiter = _AdaptiveRateLimiter(_RATE_INIT) # ---------- low-level ---------- def _make_session() -> requests.Session: s = requests.Session() s.headers.update(HEADERS) return s def _get(session: requests.Session, url: str, retries: int = 4) -> requests.Response: delay = 1.0 for attempt in range(retries): _limiter.acquire() try: resp = session.get(url, timeout=30) if resp.status_code == 429 or resp.status_code >= 500: _limiter.on_throttle() raise requests.HTTPError(f"HTTP {resp.status_code}") resp.raise_for_status() _limiter.on_success() return resp except requests.HTTPError: if attempt == retries - 1: raise wait = delay * (2 ** attempt) logger.debug(f"Retry {attempt+1} in {wait:.0f}s") time.sleep(wait) except Exception as e: if attempt == retries - 1: raise wait = delay * (2 ** attempt) logger.debug(f"Retry {attempt+1} in {wait:.0f}s ({e})") time.sleep(wait) # ---------- form.idx parsing ---------- def _download_form_idx(session: requests.Session, year: int, quarter: int) -> str: url = f"{FULL_INDEX_BASE}/{year}/QTR{quarter}/form.idx" logger.info(f"Downloading {url}") resp = _get(session, url) return resp.text _IDX_LINE = re.compile( r"^(4|4/A)\s+.+?\s+\d+\s+(\d{4}-\d{2}-\d{2})\s+(edgar/data/\S+\.txt)", re.IGNORECASE, ) def _parse_form_idx(text: str) -> list[tuple[str, str, str]]: """Return (accession, filed_date, txt_path) for all Form 4/4A entries.""" results = [] for line in text.splitlines(): m = _IDX_LINE.match(line) if not m: continue filed_date = m.group(2) txt_path = m.group(3) accession = txt_path.split("/")[-1].replace(".txt", "") results.append((accession, filed_date, txt_path)) return results # ---------- SGML → XML extraction ---------- def _extract_xml(txt_content: str) -> Optional[bytes]: """Pull ownershipDocument XML out of the SGML/XML submission wrapper.""" end_tag = "" end = txt_content.find(end_tag) if end == -1: return None end += len(end_tag) # Start from , whichever comes first start_xml = txt_content.find("") candidates = [i for i in (start_xml, start_doc) if i != -1] if not candidates: return None start = min(candidates) return txt_content[start:end].encode("utf-8", errors="replace") # ---------- per-filing fetch + store ---------- def _process_one( session: requests.Session, accession: str, filed_date: str, txt_path: str, ) -> int: if accession_exists(accession): return 0 url = f"{EDGAR_BASE}/{txt_path}" try: resp = _get(session, url) except Exception as e: logger.debug(f"Skip {accession}: {e}") return 0 xml_bytes = _extract_xml(resp.text) if not xml_bytes: mark_accession_seen(accession) return 0 parsed = parse_form4(xml_bytes, accession, filed_date) if not parsed: mark_accession_seen(accession) return 0 return sum(1 for f in parsed if insert_filing(f)) # ---------- public API ---------- def ingest_quarter(year: int, quarter: int, session: requests.Session = None) -> int: """Ingest all Form 4 filings for one calendar quarter. Returns rows stored.""" own_session = session is None if own_session: session = _make_session() idx_text = _download_form_idx(session, year, quarter) all_entries = _parse_form_idx(idx_text) logger.info(f" {len(all_entries)} Form 4 entries in {year}/Q{quarter}") accessions = [a for a, _, _ in all_entries] new_accessions = filter_new_accessions(accessions) entries = [(a, d, p) for a, d, p in all_entries if a in new_accessions] logger.info(f" {len(entries)} not yet in DB") stored = 0 for i, (accession, filed_date, txt_path) in enumerate(entries): stored += _process_one(session, accession, filed_date, txt_path) if (i + 1) % 1000 == 0: logger.info(f" {i+1}/{len(entries)} processed, {stored} rows stored") logger.info(f" Quarter done: {stored} rows stored") if own_session: session.close() return stored def ingest_years(start_year: int, end_year: int) -> int: """Ingest all Form 4 filings for start_year through end_year inclusive.""" session = _make_session() total = 0 for year in range(start_year, end_year + 1): for quarter in range(1, 5): stored = ingest_quarter(year, quarter, session=session) total += stored logger.info(f"Cumulative after {year}/Q{quarter}: {total} rows") session.close() return total