""" Bulk ingest using EDGAR full-text search (EFTS) API. The EFTS API returns the XML filename in the _id field, avoiding the extra index-page fetch. A global token-bucket rate limiter keeps total throughput under SEC's 10 req/s limit across all threads. """ import logging import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Optional import requests import config from db.db import accession_exists, insert_filing from ingestion.edgar_poller import HEADERS, _save_raw_xml from ingestion.form4_parser import parse_form4 logger = logging.getLogger(__name__) EFTS_URL = "https://efts.sec.gov/LATEST/search-index" _BATCH_SIZE = 100 _WORKERS = 1 # sequential avoids triggering SEC 500 rate-limit responses _MAX_RATE = 5.0 # conservative; SEC allows 10 but concurrent bursts get throttled _EFTS_MAX_OFFSET = 9900 # Elasticsearch hard ceiling # ---------- global token-bucket rate limiter ---------- class _RateLimiter: def __init__(self, rate: float): self._rate = rate self._tokens = rate self._last = time.monotonic() self._lock = threading.Lock() def acquire(self): with self._lock: now = time.monotonic() elapsed = now - self._last self._tokens = min(self._rate, self._tokens + elapsed * self._rate) self._last = now if self._tokens < 1: sleep_for = (1 - self._tokens) / self._rate else: sleep_for = 0 self._tokens -= 1 if sleep_for > 0: time.sleep(sleep_for) _limiter = _RateLimiter(_MAX_RATE) def _get(url: str, params: dict = None, retries: int = 4) -> requests.Response: """Rate-limited GET with exponential backoff on 429/5xx.""" delay = 2.0 for attempt in range(retries): _limiter.acquire() try: resp = requests.get(url, params=params, headers=HEADERS, timeout=25) if resp.status_code in (429, 500, 502, 503): raise requests.HTTPError(f"HTTP {resp.status_code}", response=resp) resp.raise_for_status() return resp except Exception as e: if attempt == retries - 1: raise wait = delay * (2 ** attempt) logger.debug(f"Retry {attempt+1} after {wait:.1f}s: {e}") time.sleep(wait) # ---------- EFTS pagination ---------- def _efts_page(start_date: str, end_date: str, offset: int) -> list[dict]: params = { "q": "", "forms": "4", "dateRange": "custom", "startdt": start_date, "enddt": end_date, "from": offset, "size": _BATCH_SIZE, } return _get(EFTS_URL, params=params).json()["hits"]["hits"] def _collect_metadata(start_date: str, end_date: str) -> list[tuple[str, str, str]]: """Return (accession, filename, filed_date) for all Form 4s in range.""" params = { "q": "", "forms": "4", "dateRange": "custom", "startdt": start_date, "enddt": end_date, "from": 0, "size": _BATCH_SIZE, } data = _get(EFTS_URL, params=params).json() total = data["hits"]["total"]["value"] cap = min(total, _EFTS_MAX_OFFSET + _BATCH_SIZE) logger.info(f" EFTS reports {total} filings; fetching up to {cap}") hits = list(data["hits"]["hits"]) offset = _BATCH_SIZE consecutive_fails = 0 while len(hits) < cap and offset <= _EFTS_MAX_OFFSET: try: batch = _efts_page(start_date, end_date, offset) if not batch: break hits.extend(batch) consecutive_fails = 0 except Exception as e: consecutive_fails += 1 logger.debug(f"EFTS offset {offset} failed ({consecutive_fails}): {e}") if consecutive_fails >= 3: break offset += _BATCH_SIZE seen: set[str] = set() tasks: list[tuple[str, str, str]] = [] for hit in hits: src = hit.get("_source", {}) acc = src.get("adsh", "") filed_date = src.get("file_date", "") hit_id = hit.get("_id", "") filename = hit_id.split(":", 1)[1] if ":" in hit_id else "" if not acc or not filename or acc in seen: continue seen.add(acc) tasks.append((acc, filename, filed_date)) return tasks # ---------- XML fetch + store ---------- def _xml_url(accession: str, filename: str) -> str: path = accession.replace("-", "") cik = path[:10].lstrip("0") base = f"{config.EDGAR_BASE_URL}/Archives/edgar/data/{cik}/{path}/" return base + filename def _fetch_and_store(accession: str, filename: str, filed_date: str) -> int: if accession_exists(accession): return 0 if not filename.lower().endswith(".xml"): return 0 if "xslF345" in filename: return 0 url = _xml_url(accession, filename) try: xml_bytes = _get(url).content except Exception as e: logger.debug(f"Skip {accession}: {e}") return 0 _save_raw_xml(accession, xml_bytes) parsed = parse_form4(xml_bytes, accession, filed_date) return sum(1 for f in parsed if insert_filing(f)) # ---------- public API ---------- def ingest_date_range( start_date: str, end_date: str, limit: Optional[int] = None, ) -> int: logger.info(f"Ingesting {start_date} → {end_date}") tasks = _collect_metadata(start_date, end_date) if limit: tasks = tasks[:limit] logger.info(f" {len(tasks)} unique filings to fetch") total_stored = 0 done = 0 with ThreadPoolExecutor(max_workers=_WORKERS) as pool: futures = { pool.submit(_fetch_and_store, acc, fn, dt): acc for acc, fn, dt in tasks } for future in as_completed(futures): try: total_stored += future.result() except Exception as e: logger.debug(f"Worker error: {e}") done += 1 if done % 500 == 0: logger.info(f" {done}/{len(tasks)} fetched, {total_stored} rows stored") logger.info(f" Done: {total_stored} rows stored") return total_stored def ingest_years(start_year: int, end_year: int) -> int: """ Ingest all Form 4 filings for start_year..end_year. Uses daily chunks (~960 filings/day) so each EFTS query stays well under the ~4500-result offset limit, capturing every filing with no cap. """ from datetime import date, timedelta total = 0 current = date(start_year, 1, 1) end = date(end_year, 12, 31) day = timedelta(days=1) while current <= end: ds = current.strftime("%Y-%m-%d") stored = ingest_date_range(ds, ds) total += stored logger.info(f"{ds} done — cumulative: {total} rows") current += day return total