From 2e2be3e9c761edf65d62ebd32761740c14be9f38 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 4 May 2026 16:32:00 +0000 Subject: [PATCH] fix: address sanity-check issues + rebrand to Smaug Co-authored-by: dodox --- README.md | 13 +++++-- backtest/backtest.py | 77 +++++++++++++++++++++++-------------- broker/alpaca_client.py | 51 +++++++++++++++--------- db/db.py | 27 ++++++++++++- db/schema.sql | 3 ++ ingestion/edgar_poller.py | 81 ++++++++++++++++----------------------- main.py | 18 +++++++-- 7 files changed, 167 insertions(+), 103 deletions(-) diff --git a/README.md b/README.md index f0db139..a957e7b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Cleopatra — Insider Copytrade POC +# Smaug — Insider Copytrade Monitor Monitors SEC EDGAR Form 4 filings in near real-time, detects insider buy clusters, sends Slack alerts, and optionally executes trades via Alpaca. @@ -22,6 +22,7 @@ signals/cluster_detector.py ← counts unique insiders per ticker in rolling 30- │ ├──► alerts/slack_alert.py ← POST to Slack webhook when score ≥ threshold └──► broker/alpaca_client.py ← paper/live order: 2% position size, 10% per-ticker cap + positions auto-closed after holding period expires ``` ## Setup @@ -64,7 +65,7 @@ python main.py backtest | `MIN_TRANSACTION_VALUE` | $50,000 | Ignore buys below this | | `MIN_CLUSTER_SIZE` | 1 | Minimum unique insiders before a signal fires | | `CLUSTER_WINDOW_DAYS` | 30 | Rolling window for cluster counting | -| `HOLDING_PERIOD_DAYS` | 90 | Days held per position (backtest + close trigger) | +| `HOLDING_PERIOD_DAYS` | 90 | Days held per position (backtest + auto-close trigger) | | `POSITION_SIZE_PCT` | 2% | Fraction of portfolio per trade | | `MAX_POSITIONS` | 20 | Hard position limit | | `SCORE_ALERT_THRESHOLD` | 5.0 | Minimum score to trigger Slack alert | @@ -79,10 +80,14 @@ Role weights: CEO 3.0 · CFO/President 2.5 · COO 2.0 · Director 1.5 · VP 1.2 ## Backtesting -The backtest loads signals from the SQLite DB and fetches OHLC data via `yfinance` on demand (no local price cache). Raw XML filings are cached in `DATA_DIR` (`data/filings/`) by accession number to avoid re-downloading. +The backtest loads signals from the SQLite DB and fetches OHLC data via `yfinance` on demand (no local price cache). Entry price is the closing price on the first trading day on or after the signal date; exit price is the closing price on the last trading day before or on the exit date. Raw XML filings are cached in `DATA_DIR` (`data/filings/`) by accession number. Metrics reported: win rate, average return, average alpha vs SPY, Sharpe ratio. +## Position lifecycle + +Positions are tracked in the `signals` table. When a trade is executed, `executed_at` is recorded. On each poll cycle the poller checks for positions where `executed_at` is older than `HOLDING_PERIOD_DAYS` and calls Alpaca to close them, marking `closed=1` in the DB. + ## Modules | Path | Purpose | @@ -95,7 +100,7 @@ Metrics reported: win rate, average return, average alpha vs SPY, Sharpe ratio. | `signals/filter_engine.py` | Filing → signal pipeline | | `signals/cluster_detector.py` | Cluster detection from DB | | `alerts/slack_alert.py` | Slack webhook alert | -| `broker/alpaca_client.py` | Alpaca order execution | +| `broker/alpaca_client.py` | Alpaca order execution + position exit | | `backtest/backtest.py` | Historical backtest runner | | `main.py` | CLI entry point | diff --git a/backtest/backtest.py b/backtest/backtest.py index 6c8acc8..5f8474d 100644 --- a/backtest/backtest.py +++ b/backtest/backtest.py @@ -1,25 +1,49 @@ import logging +import math from datetime import datetime, timedelta -from typing import Optional -import sqlite3 import config logger = logging.getLogger(__name__) -def _load_signals_from_db(db_path: str) -> list[dict]: +def _load_signals_from_db(db_path: str, min_score: float, min_cluster_size: int) -> list[dict]: + import sqlite3 conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row rows = conn.execute( - "SELECT s.*, f.role FROM signals s " - "LEFT JOIN filings f ON f.ticker = s.ticker AND f.transaction_date = s.trigger_date " - "WHERE s.cluster_size >= 1" + """ + SELECT s.*, f.role FROM signals s + LEFT JOIN filings f ON f.ticker = s.ticker AND f.transaction_date = s.trigger_date + WHERE s.score >= ? AND s.cluster_size >= ? + """, + (min_score, min_cluster_size), ).fetchall() conn.close() return [dict(r) for r in rows] +def _first_close_on_or_after(price_data, target_date: datetime) -> float: + """Return the closing price on the first trading day on or after target_date.""" + for ts, row in price_data["Close"].items(): + ts_date = ts.to_pydatetime().replace(tzinfo=None) + if ts_date.date() >= target_date.date(): + return float(row) + raise ValueError(f"No price data on or after {target_date.date()}") + + +def _first_close_before(price_data, target_date: datetime) -> float: + """Return the closing price on the last trading day before or on target_date.""" + result = None + for ts, row in price_data["Close"].items(): + ts_date = ts.to_pydatetime().replace(tzinfo=None) + if ts_date.date() <= target_date.date(): + result = float(row) + if result is None: + raise ValueError(f"No price data on or before {target_date.date()}") + return result + + def run_backtest( db_path: str = None, holding_days: int = None, @@ -34,15 +58,14 @@ def run_backtest( db_path = db_path or config.DB_PATH holding_days = holding_days or config.HOLDING_PERIOD_DAYS - signals = _load_signals_from_db(db_path) - signals = [s for s in signals if s["score"] >= min_score and s["cluster_size"] >= min_cluster_size] + signals = _load_signals_from_db(db_path, min_score, min_cluster_size) if not signals: logger.warning("No signals found matching criteria") return {} results = [] - spy_returns = {} + spy_cache: dict[tuple, float] = {} for signal in signals: ticker = signal["ticker"] @@ -58,16 +81,17 @@ def run_backtest( try: stock_data = yf.download( ticker, - start=(entry_date - timedelta(days=5)).strftime("%Y-%m-%d"), + start=entry_date.strftime("%Y-%m-%d"), end=(exit_date + timedelta(days=5)).strftime("%Y-%m-%d"), progress=False, auto_adjust=True, ) if stock_data.empty: + logger.debug(f"No price data for {ticker}") continue - entry_price = float(stock_data["Close"].iloc[0]) - exit_price = float(stock_data["Close"].iloc[-1]) + entry_price = _first_close_on_or_after(stock_data, entry_date) + exit_price = _first_close_before(stock_data, exit_date) stock_return = (exit_price - entry_price) / entry_price except Exception as e: @@ -75,25 +99,25 @@ def run_backtest( continue period_key = (entry_date_str, holding_days) - if period_key not in spy_returns: + if period_key not in spy_cache: try: spy_data = yf.download( "SPY", - start=(entry_date - timedelta(days=5)).strftime("%Y-%m-%d"), + start=entry_date.strftime("%Y-%m-%d"), end=(exit_date + timedelta(days=5)).strftime("%Y-%m-%d"), progress=False, auto_adjust=True, ) if not spy_data.empty: - spy_entry = float(spy_data["Close"].iloc[0]) - spy_exit = float(spy_data["Close"].iloc[-1]) - spy_returns[period_key] = (spy_exit - spy_entry) / spy_entry + spy_entry = _first_close_on_or_after(spy_data, entry_date) + spy_exit = _first_close_before(spy_data, exit_date) + spy_cache[period_key] = (spy_exit - spy_entry) / spy_entry else: - spy_returns[period_key] = 0.0 + spy_cache[period_key] = 0.0 except Exception: - spy_returns[period_key] = 0.0 + spy_cache[period_key] = 0.0 - spy_return = spy_returns.get(period_key, 0.0) + spy_return = spy_cache[period_key] alpha = stock_return - spy_return results.append({ @@ -114,12 +138,10 @@ def run_backtest( win_rate = sum(1 for r in returns if r > 0) / len(returns) avg_return = sum(returns) / len(returns) avg_alpha = sum(alphas) / len(alphas) - - import math std_dev = math.sqrt(sum((r - avg_return) ** 2 for r in returns) / len(returns)) sharpe = (avg_return / std_dev * math.sqrt(252 / holding_days)) if std_dev > 0 else 0.0 - summary = { + return { "total_signals": len(results), "win_rate": round(win_rate, 4), "avg_return": round(avg_return, 4), @@ -129,19 +151,18 @@ def run_backtest( "results": results, } - return summary - def print_summary(summary: dict): if "error" in summary: print(f"Error: {summary['error']}") return - print(f"\n{'='*40}") + width = 40 + print(f"\n{'=' * width}") print(f"Backtest Results ({summary['holding_days']}-day hold)") - print(f"{'='*40}") + print(f"{'=' * width}") print(f"Total signals: {summary['total_signals']}") print(f"Win rate: {summary['win_rate']:.1%}") print(f"Avg return: {summary['avg_return']:.2%}") print(f"Avg alpha vs SPY: {summary['avg_alpha_vs_spy']:.2%}") print(f"Sharpe ratio: {summary['sharpe_ratio']:.2f}") - print(f"{'='*40}\n") + print(f"{'=' * width}\n") diff --git a/broker/alpaca_client.py b/broker/alpaca_client.py index 67b9223..2b52dc2 100644 --- a/broker/alpaca_client.py +++ b/broker/alpaca_client.py @@ -1,8 +1,8 @@ import logging -from typing import Optional +from datetime import datetime, timedelta import config -from db.db import mark_signal_executed +from db.db import mark_signal_executed, mark_signal_closed, get_executed_unclosed_signals logger = logging.getLogger(__name__) @@ -21,14 +21,11 @@ def _get_api(): def get_portfolio_value() -> float: - api = _get_api() - account = api.get_account() - return float(account.portfolio_value) + return float(_get_api().get_account().portfolio_value) def get_open_positions_count() -> int: - api = _get_api() - return len(api.list_positions()) + return len(_get_api().list_positions()) def execute_signal(signal: dict) -> bool: @@ -40,16 +37,15 @@ def execute_signal(signal: dict) -> bool: try: api = _get_api() - positions_count = get_open_positions_count() - if positions_count >= config.MAX_POSITIONS: + + if get_open_positions_count() >= config.MAX_POSITIONS: logger.warning(f"Max positions ({config.MAX_POSITIONS}) reached, skipping {ticker}") return False portfolio_value = get_portfolio_value() allocation = portfolio_value * config.POSITION_SIZE_PCT - latest_trade = api.get_latest_trade(ticker) - price = float(latest_trade.price) + price = float(api.get_latest_trade(ticker).price) if price <= 0: logger.error(f"Invalid price for {ticker}: {price}") return False @@ -61,8 +57,8 @@ def execute_signal(signal: dict) -> bool: existing_positions = {p.symbol: p for p in api.list_positions()} if ticker in existing_positions: - existing_value = float(existing_positions[ticker].market_value) - if existing_value / portfolio_value >= 0.10: + existing_pct = float(existing_positions[ticker].market_value) / portfolio_value + if existing_pct >= 0.10: logger.warning(f"Already at 10% cap for {ticker}, skipping") return False @@ -82,13 +78,32 @@ def execute_signal(signal: dict) -> bool: return False -def close_position_after_days(ticker: str, holding_days: Optional[int] = None): - days = holding_days or config.HOLDING_PERIOD_DAYS - api = _get_api() +def close_position(ticker: str, signal_id: int) -> bool: try: - api.close_position(ticker) - logger.info(f"Closed position: {ticker} after {days} days") + _get_api().close_position(ticker) + mark_signal_closed(signal_id) + logger.info(f"Closed position: {ticker} (signal {signal_id})") return True except Exception as e: logger.error(f"Failed to close position {ticker}: {e}") return False + + +def close_expired_positions(): + if not config.ALPACA_KEY or not config.ALPACA_SECRET: + return + + cutoff = datetime.utcnow() - timedelta(days=config.HOLDING_PERIOD_DAYS) + signals = get_executed_unclosed_signals() + + for signal in signals: + executed_at_str = signal.get("executed_at") + if not executed_at_str: + continue + try: + executed_at = datetime.strptime(executed_at_str, "%Y-%m-%dT%H:%M:%SZ") + except ValueError: + continue + + if executed_at <= cutoff: + close_position(signal["ticker"], signal["id"]) diff --git a/db/db.py b/db/db.py index d1f1172..1055af8 100644 --- a/db/db.py +++ b/db/db.py @@ -76,13 +76,25 @@ def mark_signal_alerted(signal_id: int): def mark_signal_executed(signal_id: int): conn = get_connection() try: - conn.execute("UPDATE signals SET executed=1 WHERE id=?", (signal_id,)) + conn.execute( + "UPDATE signals SET executed=1, executed_at=? WHERE id=?", + (datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"), signal_id), + ) conn.commit() finally: conn.close() -def get_unalerted_signals(): +def mark_signal_closed(signal_id: int): + conn = get_connection() + try: + conn.execute("UPDATE signals SET closed=1 WHERE id=?", (signal_id,)) + conn.commit() + finally: + conn.close() + + +def get_unalerted_signals() -> list[dict]: conn = get_connection() try: rows = conn.execute( @@ -93,6 +105,17 @@ def get_unalerted_signals(): conn.close() +def get_executed_unclosed_signals() -> list[dict]: + conn = get_connection() + try: + rows = conn.execute( + "SELECT * FROM signals WHERE executed=1 AND closed=0 AND executed_at IS NOT NULL" + ).fetchall() + return [dict(r) for r in rows] + finally: + conn.close() + + def get_recent_buys_for_ticker(ticker: str, window_days: int) -> list: conn = get_connection() try: diff --git a/db/schema.sql b/db/schema.sql index 099d94d..bed3c6b 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -25,6 +25,8 @@ CREATE TABLE IF NOT EXISTS signals ( score REAL, alerted INTEGER DEFAULT 0, executed INTEGER DEFAULT 0, + executed_at TEXT, + closed INTEGER DEFAULT 0, created_at TEXT DEFAULT (datetime('now')) ); @@ -32,3 +34,4 @@ CREATE INDEX IF NOT EXISTS idx_filings_ticker ON filings(ticker); CREATE INDEX IF NOT EXISTS idx_filings_transaction_date ON filings(transaction_date); CREATE INDEX IF NOT EXISTS idx_signals_ticker ON signals(ticker); CREATE INDEX IF NOT EXISTS idx_signals_alerted ON signals(alerted); +CREATE INDEX IF NOT EXISTS idx_signals_executed ON signals(executed); diff --git a/ingestion/edgar_poller.py b/ingestion/edgar_poller.py index 08bd4e6..727dc47 100644 --- a/ingestion/edgar_poller.py +++ b/ingestion/edgar_poller.py @@ -1,10 +1,9 @@ import time import os import logging -from datetime import datetime from typing import Optional import requests -from lxml import etree +from lxml import etree, html import config from ingestion.form4_parser import parse_form4 @@ -13,11 +12,14 @@ from db.db import insert_filing, accession_exists logger = logging.getLogger(__name__) HEADERS = { - "User-Agent": "insider-copytrade-poc contact@example.com", + "User-Agent": "smaug-insider-monitor contact@example.com", "Accept-Encoding": "gzip, deflate", } -EDGAR_FULL_INDEX = "https://www.sec.gov/cgi-bin/browse-edgar?action=getcurrent&type=4&dateb=&owner=include&count=40&output=atom" +EDGAR_ATOM_URL = ( + "https://www.sec.gov/cgi-bin/browse-edgar" + "?action=getcurrent&type=4&dateb=&owner=include&count=40&output=atom" +) def _fetch(url: str, timeout: int = 30) -> requests.Response: @@ -27,39 +29,40 @@ def _fetch(url: str, timeout: int = 30) -> requests.Response: def _get_filing_urls() -> list[tuple[str, str, str]]: - resp = _fetch(EDGAR_FULL_INDEX) + resp = _fetch(EDGAR_ATOM_URL) root = etree.fromstring(resp.content) ns = {"atom": "http://www.w3.org/2005/Atom"} - entries = root.findall("atom:entry", ns) results = [] - for entry in entries: - filing_href = entry.find("atom:link", ns) - if filing_href is None: + for entry in root.findall("atom:entry", ns): + link = entry.find("atom:link", ns) + if link is None: continue - url = filing_href.get("href", "") + url = link.get("href", "") updated = (entry.findtext("atom:updated", namespaces=ns) or "")[:10] - accession = url.rstrip("/").split("/")[-1].replace("-index.htm", "") - accession = accession.replace("-", "") - if len(accession) == 18: - accession = f"{accession[:10]}-{accession[10:12]}-{accession[12:]}" + raw = url.rstrip("/").split("/")[-1].replace("-index.htm", "") + raw = raw.replace("-", "") + if len(raw) == 18: + accession = f"{raw[:10]}-{raw[10:12]}-{raw[12:]}" + else: + accession = raw results.append((url, accession, updated)) return results -def _get_xml_url_from_index(index_url: str) -> Optional[str]: +def _resolve_xml_url(accession: str) -> Optional[str]: + accession_path = accession.replace("-", "") + cik = accession_path[:10].lstrip("0") + base = f"{config.EDGAR_BASE_URL}/Archives/edgar/data/{cik}/{accession_path}/" + index_url = f"{base}{accession}-index.htm" try: resp = _fetch(index_url) - except Exception: - return None - root = etree.fromstring(resp.content) - ns = {"atom": "http://www.w3.org/2005/Atom"} - for link in root.findall("atom:link", ns): - href = link.get("href", "") - if href.endswith(".xml") and "form4" in href.lower(): - return href - for link in root.findall(".//filing-href"): - if link.text and link.text.endswith(".xml"): - return link.text.strip() + doc = html.fromstring(resp.content) + for link in doc.cssselect("table.tableFile a[href]"): + href = link.get("href", "") + if href.lower().endswith(".xml") and not href.lower().endswith("-index.htm"): + return config.EDGAR_BASE_URL + href if href.startswith("/") else base + href + except Exception as e: + logger.debug(f"Could not resolve XML URL for {accession}: {e}") return None @@ -79,11 +82,11 @@ def fetch_and_store_new_filings() -> list[dict]: logger.error(f"Failed to fetch EDGAR index: {e}") return new_filings - for index_url, accession, filed_date in entries: + for _index_url, accession, filed_date in entries: if accession_exists(accession): continue - xml_url = _resolve_xml_url(index_url, accession) + xml_url = _resolve_xml_url(accession) if not xml_url: logger.warning(f"No XML found for {accession}") continue @@ -99,30 +102,12 @@ def fetch_and_store_new_filings() -> list[dict]: parsed = parse_form4(xml_bytes, accession, filed_date) for filing in parsed: - inserted = insert_filing(filing) - if inserted: + if insert_filing(filing): new_filings.append(filing) return new_filings -def _resolve_xml_url(index_url: str, accession: str) -> Optional[str]: - accession_path = accession.replace("-", "") - cik = accession_path[:10].lstrip("0") - base = f"{config.EDGAR_BASE_URL}/Archives/edgar/data/{cik}/{accession_path}/" - candidate = f"{base}{accession}-index.htm" - try: - resp = _fetch(candidate) - root = etree.fromstring(resp.content) - for node in root.iter(): - text = (node.text or "").strip() - if text.endswith(".xml") and ("4" in text or "form" in text.lower()): - return base + text - except Exception: - pass - return None - - def run_poller(on_new_filing=None): logger.info("EDGAR poller started") while True: @@ -134,5 +119,5 @@ def run_poller(on_new_filing=None): try: on_new_filing(filing) except Exception as e: - logger.error(f"Error in on_new_filing callback: {e}") + logger.error(f"Error processing filing {filing.get('accession_number')}: {e}") time.sleep(config.EDGAR_POLL_INTERVAL) diff --git a/main.py b/main.py index 2bd6f41..7f947a2 100644 --- a/main.py +++ b/main.py @@ -8,7 +8,7 @@ logging.basicConfig( logger = logging.getLogger(__name__) -def _on_new_filing(filing: dict): +def _process_filing(filing: dict): from signals.filter_engine import process_filing from alerts.slack_alert import send_slack_alert import config @@ -29,13 +29,25 @@ def _on_new_filing(filing: dict): execute_signal(signal) +def _close_expired_positions(): + import config + if config.ALPACA_KEY and config.ALPACA_SECRET: + from broker.alpaca_client import close_expired_positions + close_expired_positions() + + def cmd_run(): from db.db import init_db from ingestion.edgar_poller import run_poller init_db() logger.info("Database initialized") - run_poller(on_new_filing=_on_new_filing) + + def on_new_filing(filing: dict): + _process_filing(filing) + _close_expired_positions() + + run_poller(on_new_filing=on_new_filing) def cmd_backtest(): @@ -61,7 +73,7 @@ def cmd_fetch_once(): logger.info(f"Fetched and stored {len(filings)} new filings") for filing in filings: - signal = _on_new_filing(filing) + _process_filing(filing) COMMANDS = {