smaug/ingestion/efts_ingest.py
Dominik Roth b5268f063e feat(ingestion): bulk historical ingest, form4 tx_code, parser fixes
- sec_bulk_ingest.py: new module — downloads quarterly form.idx from SEC EDGAR,
  filters Form 4/4A, fetches each filing's SGML/XML, parses and stores.
  Adaptive token-bucket rate limiter (backs off on 429/5xx, ramps on success).
  Uses filter_new_accessions for fast quarter-level dedup before any HTTP.
  Marks derivative-only filings as seen so they're skipped on resume.
- form4_parser: extract tx_code (transactionCode) from each transaction row;
  fix role extraction (Director/10%owner/Officer fallback); fix _text() to
  handle <value> sub-elements; fix footnote text extraction
- edgar_poller: filter feed entries to Form 4/4A only; skip XSLT stylesheet URLs
  when resolving XML filing links

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-26 17:48:51 +02:00

219 lines
6.7 KiB
Python

"""
Bulk ingest using EDGAR full-text search (EFTS) API.
The EFTS API returns the XML filename in the _id field, avoiding the extra
index-page fetch. A global token-bucket rate limiter keeps total throughput
under SEC's 10 req/s limit across all threads.
"""
import logging
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Optional
import requests
import config
from db.db import accession_exists, insert_filing
from ingestion.edgar_poller import HEADERS, _save_raw_xml
from ingestion.form4_parser import parse_form4
logger = logging.getLogger(__name__)
EFTS_URL = "https://efts.sec.gov/LATEST/search-index"
_BATCH_SIZE = 100
_WORKERS = 1 # sequential avoids triggering SEC 500 rate-limit responses
_MAX_RATE = 5.0 # conservative; SEC allows 10 but concurrent bursts get throttled
_EFTS_MAX_OFFSET = 9900 # Elasticsearch hard ceiling
# ---------- global token-bucket rate limiter ----------
class _RateLimiter:
def __init__(self, rate: float):
self._rate = rate
self._tokens = rate
self._last = time.monotonic()
self._lock = threading.Lock()
def acquire(self):
with self._lock:
now = time.monotonic()
elapsed = now - self._last
self._tokens = min(self._rate, self._tokens + elapsed * self._rate)
self._last = now
if self._tokens < 1:
sleep_for = (1 - self._tokens) / self._rate
else:
sleep_for = 0
self._tokens -= 1
if sleep_for > 0:
time.sleep(sleep_for)
_limiter = _RateLimiter(_MAX_RATE)
def _get(url: str, params: dict = None, retries: int = 4) -> requests.Response:
"""Rate-limited GET with exponential backoff on 429/5xx."""
delay = 2.0
for attempt in range(retries):
_limiter.acquire()
try:
resp = requests.get(url, params=params, headers=HEADERS, timeout=25)
if resp.status_code in (429, 500, 502, 503):
raise requests.HTTPError(f"HTTP {resp.status_code}", response=resp)
resp.raise_for_status()
return resp
except Exception as e:
if attempt == retries - 1:
raise
wait = delay * (2 ** attempt)
logger.debug(f"Retry {attempt+1} after {wait:.1f}s: {e}")
time.sleep(wait)
# ---------- EFTS pagination ----------
def _efts_page(start_date: str, end_date: str, offset: int) -> list[dict]:
params = {
"q": "", "forms": "4",
"dateRange": "custom",
"startdt": start_date, "enddt": end_date,
"from": offset, "size": _BATCH_SIZE,
}
return _get(EFTS_URL, params=params).json()["hits"]["hits"]
def _collect_metadata(start_date: str, end_date: str) -> list[tuple[str, str, str]]:
"""Return (accession, filename, filed_date) for all Form 4s in range."""
params = {
"q": "", "forms": "4",
"dateRange": "custom",
"startdt": start_date, "enddt": end_date,
"from": 0, "size": _BATCH_SIZE,
}
data = _get(EFTS_URL, params=params).json()
total = data["hits"]["total"]["value"]
cap = min(total, _EFTS_MAX_OFFSET + _BATCH_SIZE)
logger.info(f" EFTS reports {total} filings; fetching up to {cap}")
hits = list(data["hits"]["hits"])
offset = _BATCH_SIZE
consecutive_fails = 0
while len(hits) < cap and offset <= _EFTS_MAX_OFFSET:
try:
batch = _efts_page(start_date, end_date, offset)
if not batch:
break
hits.extend(batch)
consecutive_fails = 0
except Exception as e:
consecutive_fails += 1
logger.debug(f"EFTS offset {offset} failed ({consecutive_fails}): {e}")
if consecutive_fails >= 3:
break
offset += _BATCH_SIZE
seen: set[str] = set()
tasks: list[tuple[str, str, str]] = []
for hit in hits:
src = hit.get("_source", {})
acc = src.get("adsh", "")
filed_date = src.get("file_date", "")
hit_id = hit.get("_id", "")
filename = hit_id.split(":", 1)[1] if ":" in hit_id else ""
if not acc or not filename or acc in seen:
continue
seen.add(acc)
tasks.append((acc, filename, filed_date))
return tasks
# ---------- XML fetch + store ----------
def _xml_url(accession: str, filename: str) -> str:
path = accession.replace("-", "")
cik = path[:10].lstrip("0")
base = f"{config.EDGAR_BASE_URL}/Archives/edgar/data/{cik}/{path}/"
return base + filename
def _fetch_and_store(accession: str, filename: str, filed_date: str) -> int:
if accession_exists(accession):
return 0
if not filename.lower().endswith(".xml"):
return 0
if "xslF345" in filename:
return 0
url = _xml_url(accession, filename)
try:
xml_bytes = _get(url).content
except Exception as e:
logger.debug(f"Skip {accession}: {e}")
return 0
_save_raw_xml(accession, xml_bytes)
parsed = parse_form4(xml_bytes, accession, filed_date)
return sum(1 for f in parsed if insert_filing(f))
# ---------- public API ----------
def ingest_date_range(
start_date: str,
end_date: str,
limit: Optional[int] = None,
) -> int:
logger.info(f"Ingesting {start_date}{end_date}")
tasks = _collect_metadata(start_date, end_date)
if limit:
tasks = tasks[:limit]
logger.info(f" {len(tasks)} unique filings to fetch")
total_stored = 0
done = 0
with ThreadPoolExecutor(max_workers=_WORKERS) as pool:
futures = {
pool.submit(_fetch_and_store, acc, fn, dt): acc
for acc, fn, dt in tasks
}
for future in as_completed(futures):
try:
total_stored += future.result()
except Exception as e:
logger.debug(f"Worker error: {e}")
done += 1
if done % 500 == 0:
logger.info(f" {done}/{len(tasks)} fetched, {total_stored} rows stored")
logger.info(f" Done: {total_stored} rows stored")
return total_stored
def ingest_years(start_year: int, end_year: int) -> int:
"""
Ingest all Form 4 filings for start_year..end_year.
Uses daily chunks (~960 filings/day) so each EFTS query stays well under
the ~4500-result offset limit, capturing every filing with no cap.
"""
from datetime import date, timedelta
total = 0
current = date(start_year, 1, 1)
end = date(end_year, 12, 31)
day = timedelta(days=1)
while current <= end:
ds = current.strftime("%Y-%m-%d")
stored = ingest_date_range(ds, ds)
total += stored
logger.info(f"{ds} done — cumulative: {total} rows")
current += day
return total