smaug/ingestion/historical_ingest.py
Dominik Roth b5268f063e feat(ingestion): bulk historical ingest, form4 tx_code, parser fixes
- sec_bulk_ingest.py: new module — downloads quarterly form.idx from SEC EDGAR,
  filters Form 4/4A, fetches each filing's SGML/XML, parses and stores.
  Adaptive token-bucket rate limiter (backs off on 429/5xx, ramps on success).
  Uses filter_new_accessions for fast quarter-level dedup before any HTTP.
  Marks derivative-only filings as seen so they're skipped on resume.
- form4_parser: extract tx_code (transactionCode) from each transaction row;
  fix role extraction (Director/10%owner/Officer fallback); fix _text() to
  handle <value> sub-elements; fix footnote text extraction
- edgar_poller: filter feed entries to Form 4/4A only; skip XSLT stylesheet URLs
  when resolving XML filing links

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-26 17:48:51 +02:00

137 lines
4.4 KiB
Python

"""
Bulk-ingest historical Form 4 filings from SEC quarterly full-index files.
Usage via main.py:
python main.py ingest-history --year 2024 --quarter 4 --limit 500
python main.py ingest-history --year 2025 --quarter 1 # all ~118k filings
"""
import logging
import time
from typing import Optional
import requests
from lxml import html
import config
from db.db import accession_exists, insert_filing
from ingestion.edgar_poller import HEADERS, _fetch, _save_raw_xml
from ingestion.form4_parser import parse_form4
logger = logging.getLogger(__name__)
FULL_INDEX_URL = "https://www.sec.gov/Archives/edgar/full-index/{year}/QTR{quarter}/form.idx"
def _parse_form_idx(content: str) -> list[tuple[str, str, str]]:
"""Return (cik, filed_date, filename) tuples for Form 4 entries."""
results = []
in_data = False
for line in content.splitlines():
if line.startswith("---"):
in_data = True
continue
if not in_data:
continue
form_type = line[:12].strip()
if form_type not in ("4", "4/A"):
continue
# fixed-width: form(12) company(62) cik(12) date(12) filename(rest)
cik = line[74:86].strip()
filed_date = line[86:98].strip()
filename = line[98:].strip()
results.append((cik, filed_date, filename))
return results
def _accession_from_filename(filename: str) -> str:
"""edgar/data/123/0001234567-25-000001.txt -> 0001234567-25-000001"""
base = filename.rstrip().split("/")[-1]
return base.replace(".txt", "")
def _resolve_xml_from_index(cik: str, accession: str) -> Optional[str]:
accession_path = accession.replace("-", "")
base = f"{config.EDGAR_BASE_URL}/Archives/edgar/data/{cik}/{accession_path}/"
index_url = f"{base}{accession}-index.htm"
try:
resp = _fetch(index_url)
doc = html.fromstring(resp.content)
for link in doc.cssselect("table.tableFile a[href]"):
href = link.get("href", "")
if (
href.lower().endswith(".xml")
and not href.lower().endswith("-index.htm")
and "xslF345X06" not in href
and "xslF345X05" not in href
):
return config.EDGAR_BASE_URL + href if href.startswith("/") else base + href
except Exception as e:
logger.debug(f"Could not resolve XML for {accession}: {e}")
return None
def ingest_quarter(year: int, quarter: int, limit: Optional[int] = None, rate_limit: float = 0.15) -> int:
"""
Download and store Form 4 filings for a given quarter.
rate_limit: seconds between requests (SEC allows ~10 req/s; 0.15 is safe).
Returns count of new filings stored.
"""
url = FULL_INDEX_URL.format(year=year, quarter=quarter)
logger.info(f"Fetching index: {url}")
try:
resp = requests.get(url, headers=HEADERS, timeout=60)
resp.raise_for_status()
except Exception as e:
logger.error(f"Failed to fetch index: {e}")
return 0
entries = _parse_form_idx(resp.text)
logger.info(f"Found {len(entries)} Form 4 entries in {year}/QTR{quarter}")
if limit:
entries = entries[:limit]
logger.info(f"Limited to {limit} entries")
seen_accessions: set[str] = set()
stored = 0
for i, (cik, filed_date, filename) in enumerate(entries):
accession = _accession_from_filename(filename)
if accession in seen_accessions:
continue
seen_accessions.add(accession)
if accession_exists(accession):
continue
xml_url = _resolve_xml_from_index(cik, accession)
if not xml_url:
logger.debug(f"No XML for {accession}")
time.sleep(rate_limit)
continue
try:
xml_resp = _fetch(xml_url)
xml_bytes = xml_resp.content
except Exception as e:
logger.debug(f"Failed to fetch XML {accession}: {e}")
time.sleep(rate_limit)
continue
_save_raw_xml(accession, xml_bytes)
parsed = parse_form4(xml_bytes, accession, filed_date)
for filing in parsed:
if insert_filing(filing):
stored += 1
if (i + 1) % 50 == 0:
logger.info(f"Progress: {i+1}/{len(entries)} processed, {stored} stored")
time.sleep(rate_limit)
logger.info(f"Done: {stored} new filings stored from {year}/QTR{quarter}")
return stored