smaug/ingestion/edgar_poller.py
Dominik Roth b5268f063e feat(ingestion): bulk historical ingest, form4 tx_code, parser fixes
- sec_bulk_ingest.py: new module — downloads quarterly form.idx from SEC EDGAR,
  filters Form 4/4A, fetches each filing's SGML/XML, parses and stores.
  Adaptive token-bucket rate limiter (backs off on 429/5xx, ramps on success).
  Uses filter_new_accessions for fast quarter-level dedup before any HTTP.
  Marks derivative-only filings as seen so they're skipped on resume.
- form4_parser: extract tx_code (transactionCode) from each transaction row;
  fix role extraction (Director/10%owner/Officer fallback); fix _text() to
  handle <value> sub-elements; fix footnote text extraction
- edgar_poller: filter feed entries to Form 4/4A only; skip XSLT stylesheet URLs
  when resolving XML filing links

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-26 17:48:51 +02:00

138 lines
4.4 KiB
Python

import time
import os
import logging
from typing import Optional
import requests
from lxml import etree, html
import config
from ingestion.form4_parser import parse_form4
from db.db import accession_exists, get_latest_filed_date, insert_filing
logger = logging.getLogger(__name__)
HEADERS = {
"User-Agent": "smaug-insider-monitor contact@example.com",
"Accept-Encoding": "gzip, deflate",
}
EDGAR_ATOM_URL = (
"https://www.sec.gov/cgi-bin/browse-edgar"
"?action=getcurrent&type=4&dateb=&owner=include&count=40&output=atom"
)
def _fetch(url: str, timeout: int = 30) -> requests.Response:
resp = requests.get(url, headers=HEADERS, timeout=timeout)
resp.raise_for_status()
return resp
def _get_filing_urls() -> list[tuple[str, str, str]]:
resp = _fetch(EDGAR_ATOM_URL)
root = etree.fromstring(resp.content)
ns = {"atom": "http://www.w3.org/2005/Atom"}
results = []
for entry in root.findall("atom:entry", ns):
title = entry.findtext("atom:title", namespaces=ns) or ""
form_type = title.split(" - ")[0].strip()
if form_type not in ("4", "4/A"):
continue
link = entry.find("atom:link", ns)
if link is None:
continue
url = link.get("href", "")
updated = (entry.findtext("atom:updated", namespaces=ns) or "")[:10]
raw = url.rstrip("/").split("/")[-1].replace("-index.htm", "")
raw = raw.replace("-", "")
if len(raw) == 18:
accession = f"{raw[:10]}-{raw[10:12]}-{raw[12:]}"
else:
accession = raw
results.append((url, accession, updated))
return results
def _resolve_xml_url(accession: str) -> Optional[str]:
accession_path = accession.replace("-", "")
cik = accession_path[:10].lstrip("0")
base = f"{config.EDGAR_BASE_URL}/Archives/edgar/data/{cik}/{accession_path}/"
index_url = f"{base}{accession}-index.htm"
try:
resp = _fetch(index_url)
doc = html.fromstring(resp.content)
for link in doc.cssselect("table.tableFile a[href]"):
href = link.get("href", "")
if (
href.lower().endswith(".xml")
and not href.lower().endswith("-index.htm")
and "xslF345X06" not in href
):
return config.EDGAR_BASE_URL + href if href.startswith("/") else base + href
except Exception as e:
logger.debug(f"Could not resolve XML URL for {accession}: {e}")
return None
def _save_raw_xml(accession: str, xml_bytes: bytes):
os.makedirs(config.DATA_DIR, exist_ok=True)
path = os.path.join(config.DATA_DIR, f"{accession}.xml")
if not os.path.exists(path):
with open(path, "wb") as f:
f.write(xml_bytes)
def fetch_and_store_new_filings() -> list[dict]:
new_filings = []
try:
entries = _get_filing_urls()
except Exception as e:
logger.error(f"Failed to fetch EDGAR index: {e}")
return new_filings
latest_in_db = get_latest_filed_date()
for _index_url, accession, filed_date in entries:
if latest_in_db and filed_date and filed_date < latest_in_db:
logger.debug(f"Skipping {accession}: filed_date {filed_date} older than latest in DB {latest_in_db}")
continue
if accession_exists(accession):
continue
xml_url = _resolve_xml_url(accession)
if not xml_url:
logger.warning(f"No XML found for {accession}")
continue
try:
xml_resp = _fetch(xml_url)
xml_bytes = xml_resp.content
except Exception as e:
logger.error(f"Failed to fetch XML for {accession}: {e}")
continue
_save_raw_xml(accession, xml_bytes)
parsed = parse_form4(xml_bytes, accession, filed_date)
for filing in parsed:
if insert_filing(filing):
new_filings.append(filing)
return new_filings
def run_poller(on_new_filing=None):
logger.info("EDGAR poller started")
while True:
logger.info("Polling EDGAR for new Form 4 filings...")
new = fetch_and_store_new_filings()
logger.info(f"Found {len(new)} new filings")
if on_new_filing:
for filing in new:
try:
on_new_filing(filing)
except Exception as e:
logger.error(f"Error processing filing {filing.get('accession_number')}: {e}")
time.sleep(config.EDGAR_POLL_INTERVAL)