- sec_bulk_ingest.py: new module — downloads quarterly form.idx from SEC EDGAR, filters Form 4/4A, fetches each filing's SGML/XML, parses and stores. Adaptive token-bucket rate limiter (backs off on 429/5xx, ramps on success). Uses filter_new_accessions for fast quarter-level dedup before any HTTP. Marks derivative-only filings as seen so they're skipped on resume. - form4_parser: extract tx_code (transactionCode) from each transaction row; fix role extraction (Director/10%owner/Officer fallback); fix _text() to handle <value> sub-elements; fix footnote text extraction - edgar_poller: filter feed entries to Form 4/4A only; skip XSLT stylesheet URLs when resolving XML filing links Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
219 lines
6.7 KiB
Python
219 lines
6.7 KiB
Python
"""
|
|
Bulk ingest using EDGAR full-text search (EFTS) API.
|
|
|
|
The EFTS API returns the XML filename in the _id field, avoiding the extra
|
|
index-page fetch. A global token-bucket rate limiter keeps total throughput
|
|
under SEC's 10 req/s limit across all threads.
|
|
"""
|
|
|
|
import logging
|
|
import threading
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from typing import Optional
|
|
|
|
import requests
|
|
|
|
import config
|
|
from db.db import accession_exists, insert_filing
|
|
from ingestion.edgar_poller import HEADERS, _save_raw_xml
|
|
from ingestion.form4_parser import parse_form4
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
EFTS_URL = "https://efts.sec.gov/LATEST/search-index"
|
|
_BATCH_SIZE = 100
|
|
_WORKERS = 1 # sequential avoids triggering SEC 500 rate-limit responses
|
|
_MAX_RATE = 5.0 # conservative; SEC allows 10 but concurrent bursts get throttled
|
|
_EFTS_MAX_OFFSET = 9900 # Elasticsearch hard ceiling
|
|
|
|
|
|
# ---------- global token-bucket rate limiter ----------
|
|
|
|
class _RateLimiter:
|
|
def __init__(self, rate: float):
|
|
self._rate = rate
|
|
self._tokens = rate
|
|
self._last = time.monotonic()
|
|
self._lock = threading.Lock()
|
|
|
|
def acquire(self):
|
|
with self._lock:
|
|
now = time.monotonic()
|
|
elapsed = now - self._last
|
|
self._tokens = min(self._rate, self._tokens + elapsed * self._rate)
|
|
self._last = now
|
|
if self._tokens < 1:
|
|
sleep_for = (1 - self._tokens) / self._rate
|
|
else:
|
|
sleep_for = 0
|
|
self._tokens -= 1
|
|
if sleep_for > 0:
|
|
time.sleep(sleep_for)
|
|
|
|
|
|
_limiter = _RateLimiter(_MAX_RATE)
|
|
|
|
|
|
def _get(url: str, params: dict = None, retries: int = 4) -> requests.Response:
|
|
"""Rate-limited GET with exponential backoff on 429/5xx."""
|
|
delay = 2.0
|
|
for attempt in range(retries):
|
|
_limiter.acquire()
|
|
try:
|
|
resp = requests.get(url, params=params, headers=HEADERS, timeout=25)
|
|
if resp.status_code in (429, 500, 502, 503):
|
|
raise requests.HTTPError(f"HTTP {resp.status_code}", response=resp)
|
|
resp.raise_for_status()
|
|
return resp
|
|
except Exception as e:
|
|
if attempt == retries - 1:
|
|
raise
|
|
wait = delay * (2 ** attempt)
|
|
logger.debug(f"Retry {attempt+1} after {wait:.1f}s: {e}")
|
|
time.sleep(wait)
|
|
|
|
|
|
# ---------- EFTS pagination ----------
|
|
|
|
def _efts_page(start_date: str, end_date: str, offset: int) -> list[dict]:
|
|
params = {
|
|
"q": "", "forms": "4",
|
|
"dateRange": "custom",
|
|
"startdt": start_date, "enddt": end_date,
|
|
"from": offset, "size": _BATCH_SIZE,
|
|
}
|
|
return _get(EFTS_URL, params=params).json()["hits"]["hits"]
|
|
|
|
|
|
def _collect_metadata(start_date: str, end_date: str) -> list[tuple[str, str, str]]:
|
|
"""Return (accession, filename, filed_date) for all Form 4s in range."""
|
|
params = {
|
|
"q": "", "forms": "4",
|
|
"dateRange": "custom",
|
|
"startdt": start_date, "enddt": end_date,
|
|
"from": 0, "size": _BATCH_SIZE,
|
|
}
|
|
data = _get(EFTS_URL, params=params).json()
|
|
total = data["hits"]["total"]["value"]
|
|
cap = min(total, _EFTS_MAX_OFFSET + _BATCH_SIZE)
|
|
logger.info(f" EFTS reports {total} filings; fetching up to {cap}")
|
|
|
|
hits = list(data["hits"]["hits"])
|
|
offset = _BATCH_SIZE
|
|
consecutive_fails = 0
|
|
|
|
while len(hits) < cap and offset <= _EFTS_MAX_OFFSET:
|
|
try:
|
|
batch = _efts_page(start_date, end_date, offset)
|
|
if not batch:
|
|
break
|
|
hits.extend(batch)
|
|
consecutive_fails = 0
|
|
except Exception as e:
|
|
consecutive_fails += 1
|
|
logger.debug(f"EFTS offset {offset} failed ({consecutive_fails}): {e}")
|
|
if consecutive_fails >= 3:
|
|
break
|
|
offset += _BATCH_SIZE
|
|
|
|
seen: set[str] = set()
|
|
tasks: list[tuple[str, str, str]] = []
|
|
for hit in hits:
|
|
src = hit.get("_source", {})
|
|
acc = src.get("adsh", "")
|
|
filed_date = src.get("file_date", "")
|
|
hit_id = hit.get("_id", "")
|
|
filename = hit_id.split(":", 1)[1] if ":" in hit_id else ""
|
|
if not acc or not filename or acc in seen:
|
|
continue
|
|
seen.add(acc)
|
|
tasks.append((acc, filename, filed_date))
|
|
|
|
return tasks
|
|
|
|
|
|
# ---------- XML fetch + store ----------
|
|
|
|
def _xml_url(accession: str, filename: str) -> str:
|
|
path = accession.replace("-", "")
|
|
cik = path[:10].lstrip("0")
|
|
base = f"{config.EDGAR_BASE_URL}/Archives/edgar/data/{cik}/{path}/"
|
|
return base + filename
|
|
|
|
|
|
def _fetch_and_store(accession: str, filename: str, filed_date: str) -> int:
|
|
if accession_exists(accession):
|
|
return 0
|
|
if not filename.lower().endswith(".xml"):
|
|
return 0
|
|
if "xslF345" in filename:
|
|
return 0
|
|
|
|
url = _xml_url(accession, filename)
|
|
try:
|
|
xml_bytes = _get(url).content
|
|
except Exception as e:
|
|
logger.debug(f"Skip {accession}: {e}")
|
|
return 0
|
|
|
|
_save_raw_xml(accession, xml_bytes)
|
|
parsed = parse_form4(xml_bytes, accession, filed_date)
|
|
return sum(1 for f in parsed if insert_filing(f))
|
|
|
|
|
|
# ---------- public API ----------
|
|
|
|
def ingest_date_range(
|
|
start_date: str,
|
|
end_date: str,
|
|
limit: Optional[int] = None,
|
|
) -> int:
|
|
logger.info(f"Ingesting {start_date} → {end_date}")
|
|
tasks = _collect_metadata(start_date, end_date)
|
|
if limit:
|
|
tasks = tasks[:limit]
|
|
logger.info(f" {len(tasks)} unique filings to fetch")
|
|
|
|
total_stored = 0
|
|
done = 0
|
|
with ThreadPoolExecutor(max_workers=_WORKERS) as pool:
|
|
futures = {
|
|
pool.submit(_fetch_and_store, acc, fn, dt): acc
|
|
for acc, fn, dt in tasks
|
|
}
|
|
for future in as_completed(futures):
|
|
try:
|
|
total_stored += future.result()
|
|
except Exception as e:
|
|
logger.debug(f"Worker error: {e}")
|
|
done += 1
|
|
if done % 500 == 0:
|
|
logger.info(f" {done}/{len(tasks)} fetched, {total_stored} rows stored")
|
|
|
|
logger.info(f" Done: {total_stored} rows stored")
|
|
return total_stored
|
|
|
|
|
|
def ingest_years(start_year: int, end_year: int) -> int:
|
|
"""
|
|
Ingest all Form 4 filings for start_year..end_year.
|
|
Uses daily chunks (~960 filings/day) so each EFTS query stays well under
|
|
the ~4500-result offset limit, capturing every filing with no cap.
|
|
"""
|
|
from datetime import date, timedelta
|
|
|
|
total = 0
|
|
current = date(start_year, 1, 1)
|
|
end = date(end_year, 12, 31)
|
|
day = timedelta(days=1)
|
|
|
|
while current <= end:
|
|
ds = current.strftime("%Y-%m-%d")
|
|
stored = ingest_date_range(ds, ds)
|
|
total += stored
|
|
logger.info(f"{ds} done — cumulative: {total} rows")
|
|
current += day
|
|
|
|
return total
|