- sec_bulk_ingest.py: new module — downloads quarterly form.idx from SEC EDGAR, filters Form 4/4A, fetches each filing's SGML/XML, parses and stores. Adaptive token-bucket rate limiter (backs off on 429/5xx, ramps on success). Uses filter_new_accessions for fast quarter-level dedup before any HTTP. Marks derivative-only filings as seen so they're skipped on resume. - form4_parser: extract tx_code (transactionCode) from each transaction row; fix role extraction (Director/10%owner/Officer fallback); fix _text() to handle <value> sub-elements; fix footnote text extraction - edgar_poller: filter feed entries to Form 4/4A only; skip XSLT stylesheet URLs when resolving XML filing links Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
137 lines
4.4 KiB
Python
137 lines
4.4 KiB
Python
"""
|
|
Bulk-ingest historical Form 4 filings from SEC quarterly full-index files.
|
|
|
|
Usage via main.py:
|
|
python main.py ingest-history --year 2024 --quarter 4 --limit 500
|
|
python main.py ingest-history --year 2025 --quarter 1 # all ~118k filings
|
|
"""
|
|
|
|
import logging
|
|
import time
|
|
from typing import Optional
|
|
|
|
import requests
|
|
from lxml import html
|
|
|
|
import config
|
|
from db.db import accession_exists, insert_filing
|
|
from ingestion.edgar_poller import HEADERS, _fetch, _save_raw_xml
|
|
from ingestion.form4_parser import parse_form4
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
FULL_INDEX_URL = "https://www.sec.gov/Archives/edgar/full-index/{year}/QTR{quarter}/form.idx"
|
|
|
|
|
|
def _parse_form_idx(content: str) -> list[tuple[str, str, str]]:
|
|
"""Return (cik, filed_date, filename) tuples for Form 4 entries."""
|
|
results = []
|
|
in_data = False
|
|
for line in content.splitlines():
|
|
if line.startswith("---"):
|
|
in_data = True
|
|
continue
|
|
if not in_data:
|
|
continue
|
|
form_type = line[:12].strip()
|
|
if form_type not in ("4", "4/A"):
|
|
continue
|
|
# fixed-width: form(12) company(62) cik(12) date(12) filename(rest)
|
|
cik = line[74:86].strip()
|
|
filed_date = line[86:98].strip()
|
|
filename = line[98:].strip()
|
|
results.append((cik, filed_date, filename))
|
|
return results
|
|
|
|
|
|
def _accession_from_filename(filename: str) -> str:
|
|
"""edgar/data/123/0001234567-25-000001.txt -> 0001234567-25-000001"""
|
|
base = filename.rstrip().split("/")[-1]
|
|
return base.replace(".txt", "")
|
|
|
|
|
|
def _resolve_xml_from_index(cik: str, accession: str) -> Optional[str]:
|
|
accession_path = accession.replace("-", "")
|
|
base = f"{config.EDGAR_BASE_URL}/Archives/edgar/data/{cik}/{accession_path}/"
|
|
index_url = f"{base}{accession}-index.htm"
|
|
try:
|
|
resp = _fetch(index_url)
|
|
doc = html.fromstring(resp.content)
|
|
for link in doc.cssselect("table.tableFile a[href]"):
|
|
href = link.get("href", "")
|
|
if (
|
|
href.lower().endswith(".xml")
|
|
and not href.lower().endswith("-index.htm")
|
|
and "xslF345X06" not in href
|
|
and "xslF345X05" not in href
|
|
):
|
|
return config.EDGAR_BASE_URL + href if href.startswith("/") else base + href
|
|
except Exception as e:
|
|
logger.debug(f"Could not resolve XML for {accession}: {e}")
|
|
return None
|
|
|
|
|
|
def ingest_quarter(year: int, quarter: int, limit: Optional[int] = None, rate_limit: float = 0.15) -> int:
|
|
"""
|
|
Download and store Form 4 filings for a given quarter.
|
|
rate_limit: seconds between requests (SEC allows ~10 req/s; 0.15 is safe).
|
|
Returns count of new filings stored.
|
|
"""
|
|
url = FULL_INDEX_URL.format(year=year, quarter=quarter)
|
|
logger.info(f"Fetching index: {url}")
|
|
try:
|
|
resp = requests.get(url, headers=HEADERS, timeout=60)
|
|
resp.raise_for_status()
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch index: {e}")
|
|
return 0
|
|
|
|
entries = _parse_form_idx(resp.text)
|
|
logger.info(f"Found {len(entries)} Form 4 entries in {year}/QTR{quarter}")
|
|
|
|
if limit:
|
|
entries = entries[:limit]
|
|
logger.info(f"Limited to {limit} entries")
|
|
|
|
seen_accessions: set[str] = set()
|
|
stored = 0
|
|
|
|
for i, (cik, filed_date, filename) in enumerate(entries):
|
|
accession = _accession_from_filename(filename)
|
|
|
|
if accession in seen_accessions:
|
|
continue
|
|
seen_accessions.add(accession)
|
|
|
|
if accession_exists(accession):
|
|
continue
|
|
|
|
xml_url = _resolve_xml_from_index(cik, accession)
|
|
if not xml_url:
|
|
logger.debug(f"No XML for {accession}")
|
|
time.sleep(rate_limit)
|
|
continue
|
|
|
|
try:
|
|
xml_resp = _fetch(xml_url)
|
|
xml_bytes = xml_resp.content
|
|
except Exception as e:
|
|
logger.debug(f"Failed to fetch XML {accession}: {e}")
|
|
time.sleep(rate_limit)
|
|
continue
|
|
|
|
_save_raw_xml(accession, xml_bytes)
|
|
parsed = parse_form4(xml_bytes, accession, filed_date)
|
|
|
|
for filing in parsed:
|
|
if insert_filing(filing):
|
|
stored += 1
|
|
|
|
if (i + 1) % 50 == 0:
|
|
logger.info(f"Progress: {i+1}/{len(entries)} processed, {stored} stored")
|
|
|
|
time.sleep(rate_limit)
|
|
|
|
logger.info(f"Done: {stored} new filings stored from {year}/QTR{quarter}")
|
|
return stored
|