smaug/ingestion/edgar_poller.py
Claude 7e9221a914 feat: add PLAN.md and insider copytrade POC implementation
- PLAN.md: full implementation plan from issue
- config.py: configurable thresholds, API keys via .env
- ingestion/: EDGAR RSS poller + Form 4 XML parser
- db/: SQLite schema + interface (WAL mode)
- signals/: filter engine (buy/10b5-1/value/role) + cluster detector
- alerts/: Slack webhook alert with score gating
- broker/: Alpaca paper/live trade execution
- backtest/: historical signal backtesting with yfinance
- main.py: CLI entrypoint (run | fetch-once | backtest)
2026-05-04 16:15:22 +00:00

139 lines
4.4 KiB
Python

import time
import os
import logging
from datetime import datetime
from typing import Optional
import requests
from lxml import etree
import config
from ingestion.form4_parser import parse_form4
from db.db import insert_filing, accession_exists
logger = logging.getLogger(__name__)
HEADERS = {
"User-Agent": "insider-copytrade-poc contact@example.com",
"Accept-Encoding": "gzip, deflate",
}
EDGAR_FULL_INDEX = "https://www.sec.gov/cgi-bin/browse-edgar?action=getcurrent&type=4&dateb=&owner=include&count=40&output=atom"
def _fetch(url: str, timeout: int = 30) -> requests.Response:
resp = requests.get(url, headers=HEADERS, timeout=timeout)
resp.raise_for_status()
return resp
def _get_filing_urls() -> list[tuple[str, str, str]]:
resp = _fetch(EDGAR_FULL_INDEX)
root = etree.fromstring(resp.content)
ns = {"atom": "http://www.w3.org/2005/Atom"}
entries = root.findall("atom:entry", ns)
results = []
for entry in entries:
filing_href = entry.find("atom:link", ns)
if filing_href is None:
continue
url = filing_href.get("href", "")
updated = (entry.findtext("atom:updated", namespaces=ns) or "")[:10]
accession = url.rstrip("/").split("/")[-1].replace("-index.htm", "")
accession = accession.replace("-", "")
if len(accession) == 18:
accession = f"{accession[:10]}-{accession[10:12]}-{accession[12:]}"
results.append((url, accession, updated))
return results
def _get_xml_url_from_index(index_url: str) -> Optional[str]:
try:
resp = _fetch(index_url)
except Exception:
return None
root = etree.fromstring(resp.content)
ns = {"atom": "http://www.w3.org/2005/Atom"}
for link in root.findall("atom:link", ns):
href = link.get("href", "")
if href.endswith(".xml") and "form4" in href.lower():
return href
for link in root.findall(".//filing-href"):
if link.text and link.text.endswith(".xml"):
return link.text.strip()
return None
def _save_raw_xml(accession: str, xml_bytes: bytes):
os.makedirs(config.DATA_DIR, exist_ok=True)
path = os.path.join(config.DATA_DIR, f"{accession}.xml")
if not os.path.exists(path):
with open(path, "wb") as f:
f.write(xml_bytes)
def fetch_and_store_new_filings() -> list[dict]:
new_filings = []
try:
entries = _get_filing_urls()
except Exception as e:
logger.error(f"Failed to fetch EDGAR index: {e}")
return new_filings
for index_url, accession, filed_date in entries:
if accession_exists(accession):
continue
xml_url = _resolve_xml_url(index_url, accession)
if not xml_url:
logger.warning(f"No XML found for {accession}")
continue
try:
xml_resp = _fetch(xml_url)
xml_bytes = xml_resp.content
except Exception as e:
logger.error(f"Failed to fetch XML for {accession}: {e}")
continue
_save_raw_xml(accession, xml_bytes)
parsed = parse_form4(xml_bytes, accession, filed_date)
for filing in parsed:
inserted = insert_filing(filing)
if inserted:
new_filings.append(filing)
return new_filings
def _resolve_xml_url(index_url: str, accession: str) -> Optional[str]:
accession_path = accession.replace("-", "")
cik = accession_path[:10].lstrip("0")
base = f"{config.EDGAR_BASE_URL}/Archives/edgar/data/{cik}/{accession_path}/"
candidate = f"{base}{accession}-index.htm"
try:
resp = _fetch(candidate)
root = etree.fromstring(resp.content)
for node in root.iter():
text = (node.text or "").strip()
if text.endswith(".xml") and ("4" in text or "form" in text.lower()):
return base + text
except Exception:
pass
return None
def run_poller(on_new_filing=None):
logger.info("EDGAR poller started")
while True:
logger.info("Polling EDGAR for new Form 4 filings...")
new = fetch_and_store_new_filings()
logger.info(f"Found {len(new)} new filings")
if on_new_filing:
for filing in new:
try:
on_new_filing(filing)
except Exception as e:
logger.error(f"Error in on_new_filing callback: {e}")
time.sleep(config.EDGAR_POLL_INTERVAL)