import time import os import logging from datetime import datetime from typing import Optional import requests from lxml import etree import config from ingestion.form4_parser import parse_form4 from db.db import insert_filing, accession_exists logger = logging.getLogger(__name__) HEADERS = { "User-Agent": "insider-copytrade-poc contact@example.com", "Accept-Encoding": "gzip, deflate", } EDGAR_FULL_INDEX = "https://www.sec.gov/cgi-bin/browse-edgar?action=getcurrent&type=4&dateb=&owner=include&count=40&output=atom" def _fetch(url: str, timeout: int = 30) -> requests.Response: resp = requests.get(url, headers=HEADERS, timeout=timeout) resp.raise_for_status() return resp def _get_filing_urls() -> list[tuple[str, str, str]]: resp = _fetch(EDGAR_FULL_INDEX) root = etree.fromstring(resp.content) ns = {"atom": "http://www.w3.org/2005/Atom"} entries = root.findall("atom:entry", ns) results = [] for entry in entries: filing_href = entry.find("atom:link", ns) if filing_href is None: continue url = filing_href.get("href", "") updated = (entry.findtext("atom:updated", namespaces=ns) or "")[:10] accession = url.rstrip("/").split("/")[-1].replace("-index.htm", "") accession = accession.replace("-", "") if len(accession) == 18: accession = f"{accession[:10]}-{accession[10:12]}-{accession[12:]}" results.append((url, accession, updated)) return results def _get_xml_url_from_index(index_url: str) -> Optional[str]: try: resp = _fetch(index_url) except Exception: return None root = etree.fromstring(resp.content) ns = {"atom": "http://www.w3.org/2005/Atom"} for link in root.findall("atom:link", ns): href = link.get("href", "") if href.endswith(".xml") and "form4" in href.lower(): return href for link in root.findall(".//filing-href"): if link.text and link.text.endswith(".xml"): return link.text.strip() return None def _save_raw_xml(accession: str, xml_bytes: bytes): os.makedirs(config.DATA_DIR, exist_ok=True) path = os.path.join(config.DATA_DIR, f"{accession}.xml") if not os.path.exists(path): with open(path, "wb") as f: f.write(xml_bytes) def fetch_and_store_new_filings() -> list[dict]: new_filings = [] try: entries = _get_filing_urls() except Exception as e: logger.error(f"Failed to fetch EDGAR index: {e}") return new_filings for index_url, accession, filed_date in entries: if accession_exists(accession): continue xml_url = _resolve_xml_url(index_url, accession) if not xml_url: logger.warning(f"No XML found for {accession}") continue try: xml_resp = _fetch(xml_url) xml_bytes = xml_resp.content except Exception as e: logger.error(f"Failed to fetch XML for {accession}: {e}") continue _save_raw_xml(accession, xml_bytes) parsed = parse_form4(xml_bytes, accession, filed_date) for filing in parsed: inserted = insert_filing(filing) if inserted: new_filings.append(filing) return new_filings def _resolve_xml_url(index_url: str, accession: str) -> Optional[str]: accession_path = accession.replace("-", "") cik = accession_path[:10].lstrip("0") base = f"{config.EDGAR_BASE_URL}/Archives/edgar/data/{cik}/{accession_path}/" candidate = f"{base}{accession}-index.htm" try: resp = _fetch(candidate) root = etree.fromstring(resp.content) for node in root.iter(): text = (node.text or "").strip() if text.endswith(".xml") and ("4" in text or "form" in text.lower()): return base + text except Exception: pass return None def run_poller(on_new_filing=None): logger.info("EDGAR poller started") while True: logger.info("Polling EDGAR for new Form 4 filings...") new = fetch_and_store_new_filings() logger.info(f"Found {len(new)} new filings") if on_new_filing: for filing in new: try: on_new_filing(filing) except Exception as e: logger.error(f"Error in on_new_filing callback: {e}") time.sleep(config.EDGAR_POLL_INTERVAL)