From 7e9221a9145bbd2a3dd82066f588fdb1f58ce815 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 4 May 2026 16:15:22 +0000 Subject: [PATCH 1/4] feat: add PLAN.md and insider copytrade POC implementation - PLAN.md: full implementation plan from issue - config.py: configurable thresholds, API keys via .env - ingestion/: EDGAR RSS poller + Form 4 XML parser - db/: SQLite schema + interface (WAL mode) - signals/: filter engine (buy/10b5-1/value/role) + cluster detector - alerts/: Slack webhook alert with score gating - broker/: Alpaca paper/live trade execution - backtest/: historical signal backtesting with yfinance - main.py: CLI entrypoint (run | fetch-once | backtest) --- .env.example | 6 + PLAN.md | 340 ++++++++++++++++++++++++++++++++++++ alerts/__init__.py | 0 alerts/slack_alert.py | 64 +++++++ backtest/__init__.py | 0 backtest/backtest.py | 147 ++++++++++++++++ broker/__init__.py | 0 broker/alpaca_client.py | 94 ++++++++++ config.py | 40 +++++ db/__init__.py | 0 db/db.py | 123 +++++++++++++ db/schema.sql | 34 ++++ ingestion/__init__.py | 0 ingestion/edgar_poller.py | 138 +++++++++++++++ ingestion/form4_parser.py | 95 ++++++++++ main.py | 78 +++++++++ requirements.txt | 5 + signals/__init__.py | 0 signals/cluster_detector.py | 13 ++ signals/filter_engine.py | 67 +++++++ 20 files changed, 1244 insertions(+) create mode 100644 .env.example create mode 100644 PLAN.md create mode 100644 alerts/__init__.py create mode 100644 alerts/slack_alert.py create mode 100644 backtest/__init__.py create mode 100644 backtest/backtest.py create mode 100644 broker/__init__.py create mode 100644 broker/alpaca_client.py create mode 100644 config.py create mode 100644 db/__init__.py create mode 100644 db/db.py create mode 100644 db/schema.sql create mode 100644 ingestion/__init__.py create mode 100644 ingestion/edgar_poller.py create mode 100644 ingestion/form4_parser.py create mode 100644 main.py create mode 100644 requirements.txt create mode 100644 signals/__init__.py create mode 100644 signals/cluster_detector.py create mode 100644 signals/filter_engine.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..e78a987 --- /dev/null +++ b/.env.example @@ -0,0 +1,6 @@ +SLACK_WEBHOOK_URL= +ALPACA_KEY= +ALPACA_SECRET= +ALPACA_BASE_URL=https://paper-api.alpaca.markets +DB_PATH=insider.db +DATA_DIR=data/filings diff --git a/PLAN.md b/PLAN.md new file mode 100644 index 0000000..9e0f1b5 --- /dev/null +++ b/PLAN.md @@ -0,0 +1,340 @@ +# Insider Copytrade System -- Implementation Plan + +## Description + +A personal system that monitors SEC EDGAR Form 4 filings in real-time, filters for high-quality insider buying signals, alerts via Slack, and optionally executes trades automatically through Alpaca's paper or live trading API. + +The system is fully self-hosted, uses only free/public data sources, and requires no third-party data subscriptions. + +--- + +## Background + +Company insiders (executives, directors, >10% shareholders) must file SEC Form 4 within 2 business days of any trade. This is public data via SEC EDGAR. The signal value of insider *buying* is academically documented -- executives buying their own stock with personal capital is a meaningful vote of confidence, particularly when: + +- Multiple insiders buy simultaneously (cluster signal) +- The trade is unplanned (not a 10b5-1 scheduled plan) +- The company is small/mid-cap (less institutional arbitrage) + +The edge vs. political trade copying: 2-day disclosure lag vs. 45 days, and the signal is company-specific rather than sector-level. + +**Key risk:** This signal is publicly known and tracked. The edge is in filtering quality and execution speed, not data exclusivity. Large-cap Form 4 signals are arbitraged quickly. Focus on small/mid-cap, clustered, unplanned buys. + +--- + +## System Outline + +``` +SEC EDGAR RSS Feed (poll every 10 min) + | + [Ingestion Layer] + | + Parse Form 4 XML + | + [Filter Engine] + - Buy only (flag = A) + - Exclude 10b5-1 plans + - Min transaction size + - Role weighting + - Cluster detection + | + SQLite Database + | + ┌────────────┬──────────────┐ + | | | +[Backtester] [Slack Alert] [Alpaca API] + (manual) (paper/live) +``` + +--- + +## Actionables + +### Phase 1 -- Data Ingestion + +**Goal:** Reliably pull and parse Form 4 filings as they appear. + +**Tasks:** + +1. Set up project structure +``` +insider-copytrade/ + ingestion/ + edgar_poller.py # polls EDGAR RSS + form4_parser.py # parses XML -> structured dict + db/ + schema.sql + db.py # SQLite interface + signals/ + filter_engine.py # applies signal filters + cluster_detector.py + alerts/ + slack_alert.py + broker/ + alpaca_client.py + backtest/ + backtest.py + config.py + main.py +``` + +2. Poll EDGAR RSS for Form 4 filings every 10 minutes: +``` +https://www.sec.gov/cgi-bin/browse-edgar?action=getcurrent&type=4&dateb=&owner=include&count=40&search_text=&action=getcurrent +``` +SEC also provides a structured latest filings feed: +``` +https://efts.sec.gov/LATEST/search-index?q=&forms=4 +``` + +3. For each new filing, fetch and parse the XML document. Key fields to extract: + - `issuerTradingSymbol` (ticker) + - `rptOwnerName`, `officerTitle` (insider name + role) + - `transactionDate` + - `transactionAcquiredDisposedCode` (A = buy, D = sell) + - `transactionShares`, `transactionPricePerShare` + - `transactionTotalValue` (compute if not present) + - `footnotes` (check for "10b5-1" mention) + - `sharesOwnedFollowingTransaction` + +4. Store raw filing XML + parsed fields. Track `accessionNumber` as dedup key. + +**SQLite schema:** +```sql +CREATE TABLE filings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + accession_number TEXT UNIQUE, + ticker TEXT, + cik TEXT, + insider_name TEXT, + role TEXT, + transaction_date TEXT, + filed_date TEXT, + shares REAL, + price REAL, + total_value REAL, + flag TEXT, -- A or D + is_10b51 INTEGER, -- 0 or 1 + post_tx_shares REAL, + created_at TEXT +); + +CREATE TABLE signals ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ticker TEXT, + trigger_date TEXT, + cluster_size INTEGER, + total_cluster_value REAL, + score REAL, + alerted INTEGER DEFAULT 0, + executed INTEGER DEFAULT 0, + created_at TEXT +); +``` + +--- + +### Phase 2 -- Filter Engine + +**Goal:** Reduce noise to actionable signals only. + +**Filters to apply (in order):** + +| Filter | Logic | +|---|---| +| Buy only | `flag == 'A'` | +| Exclude 10b5-1 | Scan footnotes for "10b5-1", "Rule 10b5", "adopted a plan" | +| Min transaction value | `total_value >= 50000` (configurable) | +| Exclude derivative transactions | Options exercises are weaker signal than open market purchases | +| Role weighting | CEO/CFO/President = high; Director = medium; 10% owner = context-dependent | +| Cluster detection | 2+ insiders buying same ticker within 30 days = elevated signal | + +**Scoring formula (simple v1):** +```python +score = base_role_weight * log(total_value) * cluster_multiplier +# cluster_multiplier = 1.0 + (0.5 * (cluster_size - 1)) +``` + +Expose all thresholds in `config.py` for easy tuning during backtesting. + +--- + +### Phase 3 -- SQLite Storage + +SQLite is sufficient for this workload (low write volume, single process). Use WAL mode for concurrent reads during backtesting: + +```python +conn = sqlite3.connect('insider.db') +conn.execute('PRAGMA journal_mode=WAL') +``` + +Keep raw filing XML in a `/data/filings/` directory keyed by accession number. Parse on ingest, re-parse never needed. + +--- + +### Phase 4 -- Slack Alerts + +**Goal:** Get notified immediately when a signal fires, with enough context to decide manually. + +1. Create a Slack app, get a webhook URL (takes 5 minutes) +2. Alert format: + +``` +INSIDER BUY SIGNAL +Ticker: $ACME +Insider: John Smith (CEO) +Date: 2025-05-01 +Shares: 10,000 @ $14.50 = $145,000 +Cluster: 3 insiders in last 14 days +Score: 8.4 +10b5-1: No +EDGAR: https://www.sec.gov/cgi-bin/browse-edgar?... +``` + +3. Alert only on signals above configurable score threshold +4. Mark `alerted = 1` in DB after sending to avoid duplicates on re-poll + +```python +import requests + +def send_slack_alert(webhook_url, signal): + requests.post(webhook_url, json={"text": format_signal(signal)}) +``` + +--- + +### Phase 5 -- Backtesting + +**Goal:** Validate filter parameters on historical data before going live. + +**Data:** +- Historical Form 4 filings: download bulk XML from `https://www.sec.gov/dera/data/form-4-data` +- Price data: `yfinance` (free, sufficient for backtesting) + +**Backtest logic:** +```python +# For each signal in historical data: +# - Entry: next market open after filed_date +# - Exit: N days later (configurable: 30/60/90/180) +# - Calculate return vs SPY over same period +# - Aggregate by role, cluster_size, market_cap bucket +``` + +**Use `vectorbt` for performance:** +```python +import vectorbt as vbt +# Build entry/exit signal matrices aligned to price data +# Run portfolio simulation with configurable position sizing +``` + +**Output metrics:** +- Annualized return vs SPY benchmark +- Win rate +- Avg return by holding period +- Avg return by role / cluster size +- Max drawdown +- Sharpe ratio + +**Critical:** Test on post-2022 data specifically. Pre-2022 results are likely inflated -- the signal became widely tracked after Autopilot/media coverage. + +**Parameter grid to test:** +```python +MIN_VALUE = [25_000, 50_000, 100_000] +HOLDING_DAYS = [30, 60, 90, 180] +CLUSTER_WINDOW = [14, 30] +MIN_CLUSTER_SIZE = [1, 2, 3] +ROLES = ['all', 'c-suite-only'] +``` + +--- + +### Phase 6 -- Alpaca Integration + +**Goal:** Optionally auto-execute signals. Start with paper trading. + +**Paper trading base URL:** `https://paper-api.alpaca.markets` +**Live trading base URL:** `https://api.alpaca.markets` + +Swap via config flag -- never hardcode. + +```python +from alpaca_trade_api import REST + +api = REST( + key_id=config.ALPACA_KEY, + secret_key=config.ALPACA_SECRET, + base_url=config.ALPACA_BASE_URL # paper or live +) + +def execute_signal(ticker, portfolio_value, signal_score): + # Fixed fractional sizing: 2% of portfolio per signal + price = api.get_latest_trade(ticker).price + allocation = portfolio_value * 0.02 + qty = int(allocation / price) + if qty < 1: + return + api.submit_order( + symbol=ticker, + qty=qty, + side='buy', + type='market', + time_in_force='day' + ) +``` + +Position sizing: start at 2% per signal, max 10% in any single ticker. Add a max open positions limit (e.g. 20) to cap exposure. + +Exit logic (v1): time-based only (close after N days). Add trailing stop later. + +--- + +## Build Order + +| Step | Deliverable | Est. Time | +|---|---|---| +| 1 | EDGAR poller + Form 4 XML parser + SQLite storage | 1 day | +| 2 | Filter engine + cluster detector | 0.5 day | +| 3 | Slack alert | 1 hour | +| 4 | Historical data download + backtest | 1-2 days | +| 5 | Alpaca paper trading integration | 0.5 day | +| 6 | Run paper trading 4-8 weeks, monitor | -- | +| 7 | Switch to live with small capital | -- | + +Do not proceed to Step 7 without meaningful paper trading history. + +--- + +## Dependencies + +``` +requests +lxml +sqlite3 (stdlib) +yfinance +vectorbt +alpaca-trade-api +python-dotenv +``` + +All free. No paid APIs required. + +--- + +## Config Template + +```python +# config.py +EDGAR_POLL_INTERVAL = 600 # seconds +MIN_TRANSACTION_VALUE = 50_000 +MIN_CLUSTER_SIZE = 1 # raise to 2 for higher quality +CLUSTER_WINDOW_DAYS = 30 +HOLDING_PERIOD_DAYS = 90 +POSITION_SIZE_PCT = 0.02 # 2% per signal +MAX_POSITIONS = 20 +SCORE_ALERT_THRESHOLD = 5.0 + +SLACK_WEBHOOK_URL = "" +ALPACA_KEY = "" +ALPACA_SECRET = "" +ALPACA_BASE_URL = "https://paper-api.alpaca.markets" # switch for live +``` diff --git a/alerts/__init__.py b/alerts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/alerts/slack_alert.py b/alerts/slack_alert.py new file mode 100644 index 0000000..5056453 --- /dev/null +++ b/alerts/slack_alert.py @@ -0,0 +1,64 @@ +import logging +import requests + +import config +from db.db import mark_signal_alerted + +logger = logging.getLogger(__name__) + + +def format_signal(signal: dict) -> str: + filing = signal.get("filing", {}) + ticker = signal["ticker"] + insider = filing.get("insider_name", "Unknown") + role = filing.get("role", "Unknown") + tx_date = filing.get("transaction_date", "") + shares = filing.get("shares") + price = filing.get("price") + total_value = filing.get("total_value") or signal.get("total_cluster_value", 0) + cluster_size = signal["cluster_size"] + score = signal["score"] + is_10b51 = "Yes" if filing.get("is_10b51") else "No" + accession = filing.get("accession_number", "") + + shares_str = f"{shares:,.0f}" if shares else "N/A" + price_str = f"${price:,.2f}" if price else "N/A" + value_str = f"${total_value:,.0f}" if total_value else "N/A" + edgar_url = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&type=4&dateb=&owner=include&count=10&search_text=&ticker={ticker}" + + return ( + f"*INSIDER BUY SIGNAL*\n" + f"Ticker: ${ticker}\n" + f"Insider: {insider} ({role})\n" + f"Date: {tx_date}\n" + f"Shares: {shares_str} @ {price_str} = {value_str}\n" + f"Cluster: {cluster_size} insider(s) in last {config.CLUSTER_WINDOW_DAYS} days\n" + f"Score: {score}\n" + f"10b5-1: {is_10b51}\n" + f"EDGAR: {edgar_url}" + ) + + +def send_slack_alert(signal: dict) -> bool: + if not config.SLACK_WEBHOOK_URL: + logger.warning("SLACK_WEBHOOK_URL not configured") + return False + + if signal.get("score", 0) < config.SCORE_ALERT_THRESHOLD: + logger.debug(f"Signal score {signal['score']} below threshold {config.SCORE_ALERT_THRESHOLD}") + return False + + text = format_signal(signal) + try: + resp = requests.post( + config.SLACK_WEBHOOK_URL, + json={"text": text}, + timeout=10, + ) + resp.raise_for_status() + mark_signal_alerted(signal["id"]) + logger.info(f"Slack alert sent for {signal['ticker']}") + return True + except Exception as e: + logger.error(f"Failed to send Slack alert: {e}") + return False diff --git a/backtest/__init__.py b/backtest/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backtest/backtest.py b/backtest/backtest.py new file mode 100644 index 0000000..6c8acc8 --- /dev/null +++ b/backtest/backtest.py @@ -0,0 +1,147 @@ +import logging +from datetime import datetime, timedelta +from typing import Optional +import sqlite3 + +import config + +logger = logging.getLogger(__name__) + + +def _load_signals_from_db(db_path: str) -> list[dict]: + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + rows = conn.execute( + "SELECT s.*, f.role FROM signals s " + "LEFT JOIN filings f ON f.ticker = s.ticker AND f.transaction_date = s.trigger_date " + "WHERE s.cluster_size >= 1" + ).fetchall() + conn.close() + return [dict(r) for r in rows] + + +def run_backtest( + db_path: str = None, + holding_days: int = None, + min_score: float = 0.0, + min_cluster_size: int = 1, +) -> dict: + try: + import yfinance as yf + except ImportError: + raise ImportError("yfinance not installed. Run: pip install yfinance") + + db_path = db_path or config.DB_PATH + holding_days = holding_days or config.HOLDING_PERIOD_DAYS + + signals = _load_signals_from_db(db_path) + signals = [s for s in signals if s["score"] >= min_score and s["cluster_size"] >= min_cluster_size] + + if not signals: + logger.warning("No signals found matching criteria") + return {} + + results = [] + spy_returns = {} + + for signal in signals: + ticker = signal["ticker"] + entry_date_str = signal["trigger_date"] + + try: + entry_date = datetime.strptime(entry_date_str, "%Y-%m-%d") + except ValueError: + continue + + exit_date = entry_date + timedelta(days=holding_days) + + try: + stock_data = yf.download( + ticker, + start=(entry_date - timedelta(days=5)).strftime("%Y-%m-%d"), + end=(exit_date + timedelta(days=5)).strftime("%Y-%m-%d"), + progress=False, + auto_adjust=True, + ) + if stock_data.empty: + continue + + entry_price = float(stock_data["Close"].iloc[0]) + exit_price = float(stock_data["Close"].iloc[-1]) + stock_return = (exit_price - entry_price) / entry_price + + except Exception as e: + logger.debug(f"Failed to get data for {ticker}: {e}") + continue + + period_key = (entry_date_str, holding_days) + if period_key not in spy_returns: + try: + spy_data = yf.download( + "SPY", + start=(entry_date - timedelta(days=5)).strftime("%Y-%m-%d"), + end=(exit_date + timedelta(days=5)).strftime("%Y-%m-%d"), + progress=False, + auto_adjust=True, + ) + if not spy_data.empty: + spy_entry = float(spy_data["Close"].iloc[0]) + spy_exit = float(spy_data["Close"].iloc[-1]) + spy_returns[period_key] = (spy_exit - spy_entry) / spy_entry + else: + spy_returns[period_key] = 0.0 + except Exception: + spy_returns[period_key] = 0.0 + + spy_return = spy_returns.get(period_key, 0.0) + alpha = stock_return - spy_return + + results.append({ + "ticker": ticker, + "entry_date": entry_date_str, + "stock_return": round(stock_return, 4), + "spy_return": round(spy_return, 4), + "alpha": round(alpha, 4), + "cluster_size": signal["cluster_size"], + "score": signal["score"], + }) + + if not results: + return {"error": "No results computed"} + + returns = [r["stock_return"] for r in results] + alphas = [r["alpha"] for r in results] + win_rate = sum(1 for r in returns if r > 0) / len(returns) + avg_return = sum(returns) / len(returns) + avg_alpha = sum(alphas) / len(alphas) + + import math + std_dev = math.sqrt(sum((r - avg_return) ** 2 for r in returns) / len(returns)) + sharpe = (avg_return / std_dev * math.sqrt(252 / holding_days)) if std_dev > 0 else 0.0 + + summary = { + "total_signals": len(results), + "win_rate": round(win_rate, 4), + "avg_return": round(avg_return, 4), + "avg_alpha_vs_spy": round(avg_alpha, 4), + "sharpe_ratio": round(sharpe, 4), + "holding_days": holding_days, + "results": results, + } + + return summary + + +def print_summary(summary: dict): + if "error" in summary: + print(f"Error: {summary['error']}") + return + print(f"\n{'='*40}") + print(f"Backtest Results ({summary['holding_days']}-day hold)") + print(f"{'='*40}") + print(f"Total signals: {summary['total_signals']}") + print(f"Win rate: {summary['win_rate']:.1%}") + print(f"Avg return: {summary['avg_return']:.2%}") + print(f"Avg alpha vs SPY: {summary['avg_alpha_vs_spy']:.2%}") + print(f"Sharpe ratio: {summary['sharpe_ratio']:.2f}") + print(f"{'='*40}\n") diff --git a/broker/__init__.py b/broker/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/broker/alpaca_client.py b/broker/alpaca_client.py new file mode 100644 index 0000000..67b9223 --- /dev/null +++ b/broker/alpaca_client.py @@ -0,0 +1,94 @@ +import logging +from typing import Optional + +import config +from db.db import mark_signal_executed + +logger = logging.getLogger(__name__) + + +def _get_api(): + try: + from alpaca_trade_api import REST + except ImportError: + raise ImportError("alpaca-trade-api not installed. Run: pip install alpaca-trade-api") + + return REST( + key_id=config.ALPACA_KEY, + secret_key=config.ALPACA_SECRET, + base_url=config.ALPACA_BASE_URL, + ) + + +def get_portfolio_value() -> float: + api = _get_api() + account = api.get_account() + return float(account.portfolio_value) + + +def get_open_positions_count() -> int: + api = _get_api() + return len(api.list_positions()) + + +def execute_signal(signal: dict) -> bool: + if not config.ALPACA_KEY or not config.ALPACA_SECRET: + logger.warning("Alpaca credentials not configured") + return False + + ticker = signal["ticker"] + + try: + api = _get_api() + positions_count = get_open_positions_count() + if positions_count >= config.MAX_POSITIONS: + logger.warning(f"Max positions ({config.MAX_POSITIONS}) reached, skipping {ticker}") + return False + + portfolio_value = get_portfolio_value() + allocation = portfolio_value * config.POSITION_SIZE_PCT + + latest_trade = api.get_latest_trade(ticker) + price = float(latest_trade.price) + if price <= 0: + logger.error(f"Invalid price for {ticker}: {price}") + return False + + qty = int(allocation / price) + if qty < 1: + logger.warning(f"Allocation too small for {ticker}: ${allocation:.2f} at ${price:.2f}") + return False + + existing_positions = {p.symbol: p for p in api.list_positions()} + if ticker in existing_positions: + existing_value = float(existing_positions[ticker].market_value) + if existing_value / portfolio_value >= 0.10: + logger.warning(f"Already at 10% cap for {ticker}, skipping") + return False + + order = api.submit_order( + symbol=ticker, + qty=qty, + side="buy", + type="market", + time_in_force="day", + ) + logger.info(f"Order submitted: {ticker} qty={qty} order_id={order.id}") + mark_signal_executed(signal["id"]) + return True + + except Exception as e: + logger.error(f"Failed to execute signal for {ticker}: {e}") + return False + + +def close_position_after_days(ticker: str, holding_days: Optional[int] = None): + days = holding_days or config.HOLDING_PERIOD_DAYS + api = _get_api() + try: + api.close_position(ticker) + logger.info(f"Closed position: {ticker} after {days} days") + return True + except Exception as e: + logger.error(f"Failed to close position {ticker}: {e}") + return False diff --git a/config.py b/config.py new file mode 100644 index 0000000..f43ee93 --- /dev/null +++ b/config.py @@ -0,0 +1,40 @@ +import os +from dotenv import load_dotenv + +load_dotenv() + +EDGAR_POLL_INTERVAL = 600 +MIN_TRANSACTION_VALUE = 50_000 +MIN_CLUSTER_SIZE = 1 +CLUSTER_WINDOW_DAYS = 30 +HOLDING_PERIOD_DAYS = 90 +POSITION_SIZE_PCT = 0.02 +MAX_POSITIONS = 20 +SCORE_ALERT_THRESHOLD = 5.0 + +ROLE_WEIGHTS = { + "ceo": 3.0, + "chief executive officer": 3.0, + "cfo": 2.5, + "chief financial officer": 2.5, + "president": 2.5, + "coo": 2.0, + "chief operating officer": 2.0, + "director": 1.5, + "vp": 1.2, + "vice president": 1.2, + "10% owner": 1.0, +} +DEFAULT_ROLE_WEIGHT = 1.0 + +SLACK_WEBHOOK_URL = os.getenv("SLACK_WEBHOOK_URL", "") +ALPACA_KEY = os.getenv("ALPACA_KEY", "") +ALPACA_SECRET = os.getenv("ALPACA_SECRET", "") +ALPACA_BASE_URL = os.getenv("ALPACA_BASE_URL", "https://paper-api.alpaca.markets") + +DB_PATH = os.getenv("DB_PATH", "insider.db") +DATA_DIR = os.getenv("DATA_DIR", "data/filings") + +EDGAR_RSS_URL = "https://www.sec.gov/cgi-bin/browse-edgar?action=getcurrent&type=4&dateb=&owner=include&count=40&search_text=&action=getcurrent" +EDGAR_SEARCH_URL = "https://efts.sec.gov/LATEST/search-index?q=&forms=4" +EDGAR_BASE_URL = "https://www.sec.gov" diff --git a/db/__init__.py b/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/db/db.py b/db/db.py new file mode 100644 index 0000000..d1f1172 --- /dev/null +++ b/db/db.py @@ -0,0 +1,123 @@ +import sqlite3 +import os +from datetime import datetime +import config + + +def get_connection(): + conn = sqlite3.connect(config.DB_PATH) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA foreign_keys=ON") + return conn + + +def init_db(): + schema_path = os.path.join(os.path.dirname(__file__), "schema.sql") + with open(schema_path, "r") as f: + schema = f.read() + conn = get_connection() + conn.executescript(schema) + conn.commit() + conn.close() + + +def insert_filing(filing: dict) -> bool: + conn = get_connection() + try: + conn.execute( + """ + INSERT OR IGNORE INTO filings + (accession_number, ticker, cik, insider_name, role, + transaction_date, filed_date, shares, price, total_value, + flag, is_10b51, post_tx_shares) + VALUES + (:accession_number, :ticker, :cik, :insider_name, :role, + :transaction_date, :filed_date, :shares, :price, :total_value, + :flag, :is_10b51, :post_tx_shares) + """, + filing, + ) + inserted = conn.execute("SELECT changes()").fetchone()[0] > 0 + conn.commit() + return inserted + finally: + conn.close() + + +def insert_signal(signal: dict) -> int: + conn = get_connection() + try: + cur = conn.execute( + """ + INSERT INTO signals + (ticker, trigger_date, cluster_size, total_cluster_value, score) + VALUES + (:ticker, :trigger_date, :cluster_size, :total_cluster_value, :score) + """, + signal, + ) + signal_id = cur.lastrowid + conn.commit() + return signal_id + finally: + conn.close() + + +def mark_signal_alerted(signal_id: int): + conn = get_connection() + try: + conn.execute("UPDATE signals SET alerted=1 WHERE id=?", (signal_id,)) + conn.commit() + finally: + conn.close() + + +def mark_signal_executed(signal_id: int): + conn = get_connection() + try: + conn.execute("UPDATE signals SET executed=1 WHERE id=?", (signal_id,)) + conn.commit() + finally: + conn.close() + + +def get_unalerted_signals(): + conn = get_connection() + try: + rows = conn.execute( + "SELECT * FROM signals WHERE alerted=0 ORDER BY created_at ASC" + ).fetchall() + return [dict(r) for r in rows] + finally: + conn.close() + + +def get_recent_buys_for_ticker(ticker: str, window_days: int) -> list: + conn = get_connection() + try: + rows = conn.execute( + """ + SELECT * FROM filings + WHERE ticker=? + AND flag='A' + AND is_10b51=0 + AND transaction_date >= date('now', ? || ' days') + ORDER BY transaction_date DESC + """, + (ticker, f"-{window_days}"), + ).fetchall() + return [dict(r) for r in rows] + finally: + conn.close() + + +def accession_exists(accession_number: str) -> bool: + conn = get_connection() + try: + row = conn.execute( + "SELECT 1 FROM filings WHERE accession_number=?", (accession_number,) + ).fetchone() + return row is not None + finally: + conn.close() diff --git a/db/schema.sql b/db/schema.sql new file mode 100644 index 0000000..099d94d --- /dev/null +++ b/db/schema.sql @@ -0,0 +1,34 @@ +CREATE TABLE IF NOT EXISTS filings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + accession_number TEXT UNIQUE, + ticker TEXT, + cik TEXT, + insider_name TEXT, + role TEXT, + transaction_date TEXT, + filed_date TEXT, + shares REAL, + price REAL, + total_value REAL, + flag TEXT, + is_10b51 INTEGER DEFAULT 0, + post_tx_shares REAL, + created_at TEXT DEFAULT (datetime('now')) +); + +CREATE TABLE IF NOT EXISTS signals ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ticker TEXT, + trigger_date TEXT, + cluster_size INTEGER, + total_cluster_value REAL, + score REAL, + alerted INTEGER DEFAULT 0, + executed INTEGER DEFAULT 0, + created_at TEXT DEFAULT (datetime('now')) +); + +CREATE INDEX IF NOT EXISTS idx_filings_ticker ON filings(ticker); +CREATE INDEX IF NOT EXISTS idx_filings_transaction_date ON filings(transaction_date); +CREATE INDEX IF NOT EXISTS idx_signals_ticker ON signals(ticker); +CREATE INDEX IF NOT EXISTS idx_signals_alerted ON signals(alerted); diff --git a/ingestion/__init__.py b/ingestion/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ingestion/edgar_poller.py b/ingestion/edgar_poller.py new file mode 100644 index 0000000..08bd4e6 --- /dev/null +++ b/ingestion/edgar_poller.py @@ -0,0 +1,138 @@ +import time +import os +import logging +from datetime import datetime +from typing import Optional +import requests +from lxml import etree + +import config +from ingestion.form4_parser import parse_form4 +from db.db import insert_filing, accession_exists + +logger = logging.getLogger(__name__) + +HEADERS = { + "User-Agent": "insider-copytrade-poc contact@example.com", + "Accept-Encoding": "gzip, deflate", +} + +EDGAR_FULL_INDEX = "https://www.sec.gov/cgi-bin/browse-edgar?action=getcurrent&type=4&dateb=&owner=include&count=40&output=atom" + + +def _fetch(url: str, timeout: int = 30) -> requests.Response: + resp = requests.get(url, headers=HEADERS, timeout=timeout) + resp.raise_for_status() + return resp + + +def _get_filing_urls() -> list[tuple[str, str, str]]: + resp = _fetch(EDGAR_FULL_INDEX) + root = etree.fromstring(resp.content) + ns = {"atom": "http://www.w3.org/2005/Atom"} + entries = root.findall("atom:entry", ns) + results = [] + for entry in entries: + filing_href = entry.find("atom:link", ns) + if filing_href is None: + continue + url = filing_href.get("href", "") + updated = (entry.findtext("atom:updated", namespaces=ns) or "")[:10] + accession = url.rstrip("/").split("/")[-1].replace("-index.htm", "") + accession = accession.replace("-", "") + if len(accession) == 18: + accession = f"{accession[:10]}-{accession[10:12]}-{accession[12:]}" + results.append((url, accession, updated)) + return results + + +def _get_xml_url_from_index(index_url: str) -> Optional[str]: + try: + resp = _fetch(index_url) + except Exception: + return None + root = etree.fromstring(resp.content) + ns = {"atom": "http://www.w3.org/2005/Atom"} + for link in root.findall("atom:link", ns): + href = link.get("href", "") + if href.endswith(".xml") and "form4" in href.lower(): + return href + for link in root.findall(".//filing-href"): + if link.text and link.text.endswith(".xml"): + return link.text.strip() + return None + + +def _save_raw_xml(accession: str, xml_bytes: bytes): + os.makedirs(config.DATA_DIR, exist_ok=True) + path = os.path.join(config.DATA_DIR, f"{accession}.xml") + if not os.path.exists(path): + with open(path, "wb") as f: + f.write(xml_bytes) + + +def fetch_and_store_new_filings() -> list[dict]: + new_filings = [] + try: + entries = _get_filing_urls() + except Exception as e: + logger.error(f"Failed to fetch EDGAR index: {e}") + return new_filings + + for index_url, accession, filed_date in entries: + if accession_exists(accession): + continue + + xml_url = _resolve_xml_url(index_url, accession) + if not xml_url: + logger.warning(f"No XML found for {accession}") + continue + + try: + xml_resp = _fetch(xml_url) + xml_bytes = xml_resp.content + except Exception as e: + logger.error(f"Failed to fetch XML for {accession}: {e}") + continue + + _save_raw_xml(accession, xml_bytes) + parsed = parse_form4(xml_bytes, accession, filed_date) + + for filing in parsed: + inserted = insert_filing(filing) + if inserted: + new_filings.append(filing) + + return new_filings + + +def _resolve_xml_url(index_url: str, accession: str) -> Optional[str]: + accession_path = accession.replace("-", "") + cik = accession_path[:10].lstrip("0") + base = f"{config.EDGAR_BASE_URL}/Archives/edgar/data/{cik}/{accession_path}/" + candidate = f"{base}{accession}-index.htm" + try: + resp = _fetch(candidate) + root = etree.fromstring(resp.content) + for node in root.iter(): + text = (node.text or "").strip() + if text.endswith(".xml") and ("4" in text or "form" in text.lower()): + return base + text + except Exception: + pass + return None + + +def run_poller(on_new_filing=None): + logger.info("EDGAR poller started") + while True: + logger.info("Polling EDGAR for new Form 4 filings...") + new = fetch_and_store_new_filings() + logger.info(f"Found {len(new)} new filings") + if on_new_filing: + for filing in new: + try: + on_new_filing(filing) + except Exception as e: + logger.error(f"Error in on_new_filing callback: {e}") + time.sleep(config.EDGAR_POLL_INTERVAL) diff --git a/ingestion/form4_parser.py b/ingestion/form4_parser.py new file mode 100644 index 0000000..ad0b02b --- /dev/null +++ b/ingestion/form4_parser.py @@ -0,0 +1,95 @@ +import re +from lxml import etree +from typing import Optional + + +_10B51_PATTERNS = [ + r"10b5-1", + r"rule 10b5", + r"adopted a plan", + r"10b5\(1\)", +] + + +def _is_10b51(text: str) -> bool: + text_lower = text.lower() + return any(re.search(p, text_lower) for p in _10B51_PATTERNS) + + +def _text(el, tag: str) -> Optional[str]: + node = el.find(".//" + tag) + if node is not None and node.text: + return node.text.strip() + return None + + +def _float(el, tag: str) -> Optional[float]: + val = _text(el, tag) + if val is None: + return None + try: + return float(val.replace(",", "")) + except ValueError: + return None + + +def parse_form4(xml_bytes: bytes, accession_number: str, filed_date: str) -> list[dict]: + try: + root = etree.fromstring(xml_bytes) + except etree.XMLSyntaxError: + return [] + + ticker = _text(root, "issuerTradingSymbol") or "" + cik = _text(root, "issuerCik") or "" + insider_name = _text(root, "rptOwnerName") or "" + role = _text(root, "officerTitle") or _text(root, "isDirector") or "" + + footnotes_text = " ".join( + (node.text or "") for node in root.findall(".//footnote") + ) + global_10b51 = _is_10b51(footnotes_text) + + transactions = root.findall(".//nonDerivativeTransaction") + results = [] + + for tx in transactions: + flag = _text(tx, "transactionAcquiredDisposedCode") + if not flag: + continue + + shares = _float(tx, "transactionShares") + price = _float(tx, "transactionPricePerShare") + total_value = _float(tx, "transactionTotalValue") + if total_value is None and shares is not None and price is not None: + total_value = shares * price + post_tx_shares = _float(tx, "sharesOwnedFollowingTransaction") + tx_date = _text(tx, "transactionDate") or filed_date + + tx_footnote_ids = [ + fn.get("id", "") for fn in tx.findall(".//footnoteId") + ] + tx_footnote_text = " ".join( + (root.find(f".//footnote[@id='{fid}']") or etree.Element("x")).text or "" + for fid in tx_footnote_ids + ) + is_10b51 = int(global_10b51 or _is_10b51(tx_footnote_text)) + + results.append( + { + "accession_number": accession_number, + "ticker": ticker.upper(), + "cik": cik, + "insider_name": insider_name, + "role": role, + "transaction_date": tx_date, + "filed_date": filed_date, + "shares": shares, + "price": price, + "total_value": total_value, + "flag": flag.upper(), + "is_10b51": is_10b51, + "post_tx_shares": post_tx_shares, + } + ) + + return results diff --git a/main.py b/main.py new file mode 100644 index 0000000..2bd6f41 --- /dev/null +++ b/main.py @@ -0,0 +1,78 @@ +import logging +import sys + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) +logger = logging.getLogger(__name__) + + +def _on_new_filing(filing: dict): + from signals.filter_engine import process_filing + from alerts.slack_alert import send_slack_alert + import config + + signal = process_filing(filing) + if signal is None: + return + + logger.info( + f"Signal: {signal['ticker']} score={signal['score']} cluster={signal['cluster_size']}" + ) + + if config.SLACK_WEBHOOK_URL: + send_slack_alert(signal) + + if config.ALPACA_KEY and config.ALPACA_SECRET: + from broker.alpaca_client import execute_signal + execute_signal(signal) + + +def cmd_run(): + from db.db import init_db + from ingestion.edgar_poller import run_poller + + init_db() + logger.info("Database initialized") + run_poller(on_new_filing=_on_new_filing) + + +def cmd_backtest(): + from backtest.backtest import run_backtest, print_summary + import config + + logger.info("Running backtest...") + summary = run_backtest( + db_path=config.DB_PATH, + holding_days=config.HOLDING_PERIOD_DAYS, + min_score=config.SCORE_ALERT_THRESHOLD, + min_cluster_size=config.MIN_CLUSTER_SIZE, + ) + print_summary(summary) + + +def cmd_fetch_once(): + from db.db import init_db + from ingestion.edgar_poller import fetch_and_store_new_filings + + init_db() + filings = fetch_and_store_new_filings() + logger.info(f"Fetched and stored {len(filings)} new filings") + + for filing in filings: + signal = _on_new_filing(filing) + + +COMMANDS = { + "run": cmd_run, + "backtest": cmd_backtest, + "fetch-once": cmd_fetch_once, +} + +if __name__ == "__main__": + cmd = sys.argv[1] if len(sys.argv) > 1 else "run" + if cmd not in COMMANDS: + print(f"Usage: python main.py [{' | '.join(COMMANDS)}]") + sys.exit(1) + COMMANDS[cmd]() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..3111da8 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +requests>=2.31.0 +lxml>=5.0.0 +yfinance>=0.2.0 +python-dotenv>=1.0.0 +alpaca-trade-api>=3.0.0 diff --git a/signals/__init__.py b/signals/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/signals/cluster_detector.py b/signals/cluster_detector.py new file mode 100644 index 0000000..7f3a172 --- /dev/null +++ b/signals/cluster_detector.py @@ -0,0 +1,13 @@ +from db.db import get_recent_buys_for_ticker +import config + + +def detect_cluster(ticker: str) -> dict: + buys = get_recent_buys_for_ticker(ticker, config.CLUSTER_WINDOW_DAYS) + unique_insiders = {b["insider_name"] for b in buys} + total_value = sum(b["total_value"] or 0 for b in buys) + return { + "cluster_size": len(unique_insiders), + "total_cluster_value": total_value, + "buys": buys, + } diff --git a/signals/filter_engine.py b/signals/filter_engine.py new file mode 100644 index 0000000..ad020fb --- /dev/null +++ b/signals/filter_engine.py @@ -0,0 +1,67 @@ +import math +import logging +from typing import Optional + +import config +from signals.cluster_detector import detect_cluster +from db.db import insert_signal + +logger = logging.getLogger(__name__) + + +def _role_weight(role: str) -> float: + role_lower = (role or "").lower() + for key, weight in config.ROLE_WEIGHTS.items(): + if key in role_lower: + return weight + return config.DEFAULT_ROLE_WEIGHT + + +def _score(total_value: float, role: str, cluster_size: int) -> float: + if not total_value or total_value <= 0: + return 0.0 + base = _role_weight(role) + cluster_mult = 1.0 + 0.5 * (cluster_size - 1) + return base * math.log(total_value) * cluster_mult + + +def process_filing(filing: dict) -> Optional[dict]: + if filing.get("flag") != "A": + return None + + if filing.get("is_10b51"): + logger.debug(f"Skipping 10b5-1 filing: {filing['accession_number']}") + return None + + total_value = filing.get("total_value") or 0 + if total_value < config.MIN_TRANSACTION_VALUE: + logger.debug(f"Below min value: {filing['accession_number']} (${total_value:,.0f})") + return None + + ticker = filing.get("ticker", "") + if not ticker: + return None + + cluster_info = detect_cluster(ticker) + cluster_size = cluster_info["cluster_size"] + total_cluster_value = cluster_info["total_cluster_value"] + + if cluster_size < config.MIN_CLUSTER_SIZE: + return None + + score = _score(total_value, filing.get("role", ""), cluster_size) + + signal = { + "ticker": ticker, + "trigger_date": filing.get("transaction_date", ""), + "cluster_size": cluster_size, + "total_cluster_value": total_cluster_value, + "score": round(score, 2), + "filing": filing, + "cluster_buys": cluster_info["buys"], + } + + signal_id = insert_signal(signal) + signal["id"] = signal_id + logger.info(f"Signal generated: {ticker} score={score:.2f} cluster={cluster_size}") + return signal From 8c0085e5036697f4dbd2988c1ab142bbc23a0bc2 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 4 May 2026 16:24:25 +0000 Subject: [PATCH 2/4] docs: add README Co-authored-by: dodox --- README.md | 105 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..f0db139 --- /dev/null +++ b/README.md @@ -0,0 +1,105 @@ +# Cleopatra — Insider Copytrade POC + +Monitors SEC EDGAR Form 4 filings in near real-time, detects insider buy clusters, sends Slack alerts, and optionally executes trades via Alpaca. + +## Architecture + +``` +EDGAR (Form 4 feed) + │ + ▼ +ingestion/edgar_poller.py ← polls every 10 min, dedupes by accession + │ + ▼ +ingestion/form4_parser.py ← parses XML, detects 10b5-1 plans + │ + ▼ +db/schema.sql + db/db.py ← SQLite (WAL mode): filings + signals tables + │ + ▼ +signals/filter_engine.py ← buy-only, exclude 10b5-1, min $50k, role-weighted scoring +signals/cluster_detector.py ← counts unique insiders per ticker in rolling 30-day window + │ + ├──► alerts/slack_alert.py ← POST to Slack webhook when score ≥ threshold + └──► broker/alpaca_client.py ← paper/live order: 2% position size, 10% per-ticker cap +``` + +## Setup + +```bash +cp .env.example .env +# edit .env with your credentials +pip install -r requirements.txt +``` + +### Environment variables (`.env`) + +| Variable | Required | Default | Description | +|---|---|---|---| +| `SLACK_WEBHOOK_URL` | optional | — | Incoming webhook URL for alerts | +| `ALPACA_KEY` | optional | — | Alpaca API key | +| `ALPACA_SECRET` | optional | — | Alpaca API secret | +| `ALPACA_BASE_URL` | optional | `https://paper-api.alpaca.markets` | Use paper or live endpoint | +| `DB_PATH` | optional | `insider.db` | SQLite database file path | +| `DATA_DIR` | optional | `data/filings` | Directory for cached raw XML filings | + +## Usage + +```bash +# Initialize DB and ingest current EDGAR feed (one shot) +python main.py fetch-once + +# Run continuous polling loop (every 10 minutes) +python main.py run + +# Backtest signals already in the DB against historical prices +python main.py backtest +``` + +## Key configuration (`config.py`) + +| Parameter | Default | Description | +|---|---|---| +| `EDGAR_POLL_INTERVAL` | 600 s | Polling cadence | +| `MIN_TRANSACTION_VALUE` | $50,000 | Ignore buys below this | +| `MIN_CLUSTER_SIZE` | 1 | Minimum unique insiders before a signal fires | +| `CLUSTER_WINDOW_DAYS` | 30 | Rolling window for cluster counting | +| `HOLDING_PERIOD_DAYS` | 90 | Days held per position (backtest + close trigger) | +| `POSITION_SIZE_PCT` | 2% | Fraction of portfolio per trade | +| `MAX_POSITIONS` | 20 | Hard position limit | +| `SCORE_ALERT_THRESHOLD` | 5.0 | Minimum score to trigger Slack alert | + +## Scoring + +``` +score = role_weight × log(total_value) × (1 + 0.5 × (cluster_size − 1)) +``` + +Role weights: CEO 3.0 · CFO/President 2.5 · COO 2.0 · Director 1.5 · VP 1.2 · 10% owner 1.0 + +## Backtesting + +The backtest loads signals from the SQLite DB and fetches OHLC data via `yfinance` on demand (no local price cache). Raw XML filings are cached in `DATA_DIR` (`data/filings/`) by accession number to avoid re-downloading. + +Metrics reported: win rate, average return, average alpha vs SPY, Sharpe ratio. + +## Modules + +| Path | Purpose | +|---|---| +| `config.py` | All thresholds and env-var loading | +| `ingestion/edgar_poller.py` | EDGAR Atom feed polling and deduplication | +| `ingestion/form4_parser.py` | Form 4 XML → structured dict; 10b5-1 detection | +| `db/schema.sql` | SQLite schema (`filings`, `signals`) | +| `db/db.py` | DB access layer | +| `signals/filter_engine.py` | Filing → signal pipeline | +| `signals/cluster_detector.py` | Cluster detection from DB | +| `alerts/slack_alert.py` | Slack webhook alert | +| `broker/alpaca_client.py` | Alpaca order execution | +| `backtest/backtest.py` | Historical backtest runner | +| `main.py` | CLI entry point | + +## Requirements + +- Python 3.11+ +- See `requirements.txt`: `requests`, `lxml`, `yfinance`, `python-dotenv`, `alpaca-trade-api` From 2e2be3e9c761edf65d62ebd32761740c14be9f38 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 4 May 2026 16:32:00 +0000 Subject: [PATCH 3/4] fix: address sanity-check issues + rebrand to Smaug Co-authored-by: dodox --- README.md | 13 +++++-- backtest/backtest.py | 77 +++++++++++++++++++++++-------------- broker/alpaca_client.py | 51 +++++++++++++++--------- db/db.py | 27 ++++++++++++- db/schema.sql | 3 ++ ingestion/edgar_poller.py | 81 ++++++++++++++++----------------------- main.py | 18 +++++++-- 7 files changed, 167 insertions(+), 103 deletions(-) diff --git a/README.md b/README.md index f0db139..a957e7b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Cleopatra — Insider Copytrade POC +# Smaug — Insider Copytrade Monitor Monitors SEC EDGAR Form 4 filings in near real-time, detects insider buy clusters, sends Slack alerts, and optionally executes trades via Alpaca. @@ -22,6 +22,7 @@ signals/cluster_detector.py ← counts unique insiders per ticker in rolling 30- │ ├──► alerts/slack_alert.py ← POST to Slack webhook when score ≥ threshold └──► broker/alpaca_client.py ← paper/live order: 2% position size, 10% per-ticker cap + positions auto-closed after holding period expires ``` ## Setup @@ -64,7 +65,7 @@ python main.py backtest | `MIN_TRANSACTION_VALUE` | $50,000 | Ignore buys below this | | `MIN_CLUSTER_SIZE` | 1 | Minimum unique insiders before a signal fires | | `CLUSTER_WINDOW_DAYS` | 30 | Rolling window for cluster counting | -| `HOLDING_PERIOD_DAYS` | 90 | Days held per position (backtest + close trigger) | +| `HOLDING_PERIOD_DAYS` | 90 | Days held per position (backtest + auto-close trigger) | | `POSITION_SIZE_PCT` | 2% | Fraction of portfolio per trade | | `MAX_POSITIONS` | 20 | Hard position limit | | `SCORE_ALERT_THRESHOLD` | 5.0 | Minimum score to trigger Slack alert | @@ -79,10 +80,14 @@ Role weights: CEO 3.0 · CFO/President 2.5 · COO 2.0 · Director 1.5 · VP 1.2 ## Backtesting -The backtest loads signals from the SQLite DB and fetches OHLC data via `yfinance` on demand (no local price cache). Raw XML filings are cached in `DATA_DIR` (`data/filings/`) by accession number to avoid re-downloading. +The backtest loads signals from the SQLite DB and fetches OHLC data via `yfinance` on demand (no local price cache). Entry price is the closing price on the first trading day on or after the signal date; exit price is the closing price on the last trading day before or on the exit date. Raw XML filings are cached in `DATA_DIR` (`data/filings/`) by accession number. Metrics reported: win rate, average return, average alpha vs SPY, Sharpe ratio. +## Position lifecycle + +Positions are tracked in the `signals` table. When a trade is executed, `executed_at` is recorded. On each poll cycle the poller checks for positions where `executed_at` is older than `HOLDING_PERIOD_DAYS` and calls Alpaca to close them, marking `closed=1` in the DB. + ## Modules | Path | Purpose | @@ -95,7 +100,7 @@ Metrics reported: win rate, average return, average alpha vs SPY, Sharpe ratio. | `signals/filter_engine.py` | Filing → signal pipeline | | `signals/cluster_detector.py` | Cluster detection from DB | | `alerts/slack_alert.py` | Slack webhook alert | -| `broker/alpaca_client.py` | Alpaca order execution | +| `broker/alpaca_client.py` | Alpaca order execution + position exit | | `backtest/backtest.py` | Historical backtest runner | | `main.py` | CLI entry point | diff --git a/backtest/backtest.py b/backtest/backtest.py index 6c8acc8..5f8474d 100644 --- a/backtest/backtest.py +++ b/backtest/backtest.py @@ -1,25 +1,49 @@ import logging +import math from datetime import datetime, timedelta -from typing import Optional -import sqlite3 import config logger = logging.getLogger(__name__) -def _load_signals_from_db(db_path: str) -> list[dict]: +def _load_signals_from_db(db_path: str, min_score: float, min_cluster_size: int) -> list[dict]: + import sqlite3 conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row rows = conn.execute( - "SELECT s.*, f.role FROM signals s " - "LEFT JOIN filings f ON f.ticker = s.ticker AND f.transaction_date = s.trigger_date " - "WHERE s.cluster_size >= 1" + """ + SELECT s.*, f.role FROM signals s + LEFT JOIN filings f ON f.ticker = s.ticker AND f.transaction_date = s.trigger_date + WHERE s.score >= ? AND s.cluster_size >= ? + """, + (min_score, min_cluster_size), ).fetchall() conn.close() return [dict(r) for r in rows] +def _first_close_on_or_after(price_data, target_date: datetime) -> float: + """Return the closing price on the first trading day on or after target_date.""" + for ts, row in price_data["Close"].items(): + ts_date = ts.to_pydatetime().replace(tzinfo=None) + if ts_date.date() >= target_date.date(): + return float(row) + raise ValueError(f"No price data on or after {target_date.date()}") + + +def _first_close_before(price_data, target_date: datetime) -> float: + """Return the closing price on the last trading day before or on target_date.""" + result = None + for ts, row in price_data["Close"].items(): + ts_date = ts.to_pydatetime().replace(tzinfo=None) + if ts_date.date() <= target_date.date(): + result = float(row) + if result is None: + raise ValueError(f"No price data on or before {target_date.date()}") + return result + + def run_backtest( db_path: str = None, holding_days: int = None, @@ -34,15 +58,14 @@ def run_backtest( db_path = db_path or config.DB_PATH holding_days = holding_days or config.HOLDING_PERIOD_DAYS - signals = _load_signals_from_db(db_path) - signals = [s for s in signals if s["score"] >= min_score and s["cluster_size"] >= min_cluster_size] + signals = _load_signals_from_db(db_path, min_score, min_cluster_size) if not signals: logger.warning("No signals found matching criteria") return {} results = [] - spy_returns = {} + spy_cache: dict[tuple, float] = {} for signal in signals: ticker = signal["ticker"] @@ -58,16 +81,17 @@ def run_backtest( try: stock_data = yf.download( ticker, - start=(entry_date - timedelta(days=5)).strftime("%Y-%m-%d"), + start=entry_date.strftime("%Y-%m-%d"), end=(exit_date + timedelta(days=5)).strftime("%Y-%m-%d"), progress=False, auto_adjust=True, ) if stock_data.empty: + logger.debug(f"No price data for {ticker}") continue - entry_price = float(stock_data["Close"].iloc[0]) - exit_price = float(stock_data["Close"].iloc[-1]) + entry_price = _first_close_on_or_after(stock_data, entry_date) + exit_price = _first_close_before(stock_data, exit_date) stock_return = (exit_price - entry_price) / entry_price except Exception as e: @@ -75,25 +99,25 @@ def run_backtest( continue period_key = (entry_date_str, holding_days) - if period_key not in spy_returns: + if period_key not in spy_cache: try: spy_data = yf.download( "SPY", - start=(entry_date - timedelta(days=5)).strftime("%Y-%m-%d"), + start=entry_date.strftime("%Y-%m-%d"), end=(exit_date + timedelta(days=5)).strftime("%Y-%m-%d"), progress=False, auto_adjust=True, ) if not spy_data.empty: - spy_entry = float(spy_data["Close"].iloc[0]) - spy_exit = float(spy_data["Close"].iloc[-1]) - spy_returns[period_key] = (spy_exit - spy_entry) / spy_entry + spy_entry = _first_close_on_or_after(spy_data, entry_date) + spy_exit = _first_close_before(spy_data, exit_date) + spy_cache[period_key] = (spy_exit - spy_entry) / spy_entry else: - spy_returns[period_key] = 0.0 + spy_cache[period_key] = 0.0 except Exception: - spy_returns[period_key] = 0.0 + spy_cache[period_key] = 0.0 - spy_return = spy_returns.get(period_key, 0.0) + spy_return = spy_cache[period_key] alpha = stock_return - spy_return results.append({ @@ -114,12 +138,10 @@ def run_backtest( win_rate = sum(1 for r in returns if r > 0) / len(returns) avg_return = sum(returns) / len(returns) avg_alpha = sum(alphas) / len(alphas) - - import math std_dev = math.sqrt(sum((r - avg_return) ** 2 for r in returns) / len(returns)) sharpe = (avg_return / std_dev * math.sqrt(252 / holding_days)) if std_dev > 0 else 0.0 - summary = { + return { "total_signals": len(results), "win_rate": round(win_rate, 4), "avg_return": round(avg_return, 4), @@ -129,19 +151,18 @@ def run_backtest( "results": results, } - return summary - def print_summary(summary: dict): if "error" in summary: print(f"Error: {summary['error']}") return - print(f"\n{'='*40}") + width = 40 + print(f"\n{'=' * width}") print(f"Backtest Results ({summary['holding_days']}-day hold)") - print(f"{'='*40}") + print(f"{'=' * width}") print(f"Total signals: {summary['total_signals']}") print(f"Win rate: {summary['win_rate']:.1%}") print(f"Avg return: {summary['avg_return']:.2%}") print(f"Avg alpha vs SPY: {summary['avg_alpha_vs_spy']:.2%}") print(f"Sharpe ratio: {summary['sharpe_ratio']:.2f}") - print(f"{'='*40}\n") + print(f"{'=' * width}\n") diff --git a/broker/alpaca_client.py b/broker/alpaca_client.py index 67b9223..2b52dc2 100644 --- a/broker/alpaca_client.py +++ b/broker/alpaca_client.py @@ -1,8 +1,8 @@ import logging -from typing import Optional +from datetime import datetime, timedelta import config -from db.db import mark_signal_executed +from db.db import mark_signal_executed, mark_signal_closed, get_executed_unclosed_signals logger = logging.getLogger(__name__) @@ -21,14 +21,11 @@ def _get_api(): def get_portfolio_value() -> float: - api = _get_api() - account = api.get_account() - return float(account.portfolio_value) + return float(_get_api().get_account().portfolio_value) def get_open_positions_count() -> int: - api = _get_api() - return len(api.list_positions()) + return len(_get_api().list_positions()) def execute_signal(signal: dict) -> bool: @@ -40,16 +37,15 @@ def execute_signal(signal: dict) -> bool: try: api = _get_api() - positions_count = get_open_positions_count() - if positions_count >= config.MAX_POSITIONS: + + if get_open_positions_count() >= config.MAX_POSITIONS: logger.warning(f"Max positions ({config.MAX_POSITIONS}) reached, skipping {ticker}") return False portfolio_value = get_portfolio_value() allocation = portfolio_value * config.POSITION_SIZE_PCT - latest_trade = api.get_latest_trade(ticker) - price = float(latest_trade.price) + price = float(api.get_latest_trade(ticker).price) if price <= 0: logger.error(f"Invalid price for {ticker}: {price}") return False @@ -61,8 +57,8 @@ def execute_signal(signal: dict) -> bool: existing_positions = {p.symbol: p for p in api.list_positions()} if ticker in existing_positions: - existing_value = float(existing_positions[ticker].market_value) - if existing_value / portfolio_value >= 0.10: + existing_pct = float(existing_positions[ticker].market_value) / portfolio_value + if existing_pct >= 0.10: logger.warning(f"Already at 10% cap for {ticker}, skipping") return False @@ -82,13 +78,32 @@ def execute_signal(signal: dict) -> bool: return False -def close_position_after_days(ticker: str, holding_days: Optional[int] = None): - days = holding_days or config.HOLDING_PERIOD_DAYS - api = _get_api() +def close_position(ticker: str, signal_id: int) -> bool: try: - api.close_position(ticker) - logger.info(f"Closed position: {ticker} after {days} days") + _get_api().close_position(ticker) + mark_signal_closed(signal_id) + logger.info(f"Closed position: {ticker} (signal {signal_id})") return True except Exception as e: logger.error(f"Failed to close position {ticker}: {e}") return False + + +def close_expired_positions(): + if not config.ALPACA_KEY or not config.ALPACA_SECRET: + return + + cutoff = datetime.utcnow() - timedelta(days=config.HOLDING_PERIOD_DAYS) + signals = get_executed_unclosed_signals() + + for signal in signals: + executed_at_str = signal.get("executed_at") + if not executed_at_str: + continue + try: + executed_at = datetime.strptime(executed_at_str, "%Y-%m-%dT%H:%M:%SZ") + except ValueError: + continue + + if executed_at <= cutoff: + close_position(signal["ticker"], signal["id"]) diff --git a/db/db.py b/db/db.py index d1f1172..1055af8 100644 --- a/db/db.py +++ b/db/db.py @@ -76,13 +76,25 @@ def mark_signal_alerted(signal_id: int): def mark_signal_executed(signal_id: int): conn = get_connection() try: - conn.execute("UPDATE signals SET executed=1 WHERE id=?", (signal_id,)) + conn.execute( + "UPDATE signals SET executed=1, executed_at=? WHERE id=?", + (datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"), signal_id), + ) conn.commit() finally: conn.close() -def get_unalerted_signals(): +def mark_signal_closed(signal_id: int): + conn = get_connection() + try: + conn.execute("UPDATE signals SET closed=1 WHERE id=?", (signal_id,)) + conn.commit() + finally: + conn.close() + + +def get_unalerted_signals() -> list[dict]: conn = get_connection() try: rows = conn.execute( @@ -93,6 +105,17 @@ def get_unalerted_signals(): conn.close() +def get_executed_unclosed_signals() -> list[dict]: + conn = get_connection() + try: + rows = conn.execute( + "SELECT * FROM signals WHERE executed=1 AND closed=0 AND executed_at IS NOT NULL" + ).fetchall() + return [dict(r) for r in rows] + finally: + conn.close() + + def get_recent_buys_for_ticker(ticker: str, window_days: int) -> list: conn = get_connection() try: diff --git a/db/schema.sql b/db/schema.sql index 099d94d..bed3c6b 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -25,6 +25,8 @@ CREATE TABLE IF NOT EXISTS signals ( score REAL, alerted INTEGER DEFAULT 0, executed INTEGER DEFAULT 0, + executed_at TEXT, + closed INTEGER DEFAULT 0, created_at TEXT DEFAULT (datetime('now')) ); @@ -32,3 +34,4 @@ CREATE INDEX IF NOT EXISTS idx_filings_ticker ON filings(ticker); CREATE INDEX IF NOT EXISTS idx_filings_transaction_date ON filings(transaction_date); CREATE INDEX IF NOT EXISTS idx_signals_ticker ON signals(ticker); CREATE INDEX IF NOT EXISTS idx_signals_alerted ON signals(alerted); +CREATE INDEX IF NOT EXISTS idx_signals_executed ON signals(executed); diff --git a/ingestion/edgar_poller.py b/ingestion/edgar_poller.py index 08bd4e6..727dc47 100644 --- a/ingestion/edgar_poller.py +++ b/ingestion/edgar_poller.py @@ -1,10 +1,9 @@ import time import os import logging -from datetime import datetime from typing import Optional import requests -from lxml import etree +from lxml import etree, html import config from ingestion.form4_parser import parse_form4 @@ -13,11 +12,14 @@ from db.db import insert_filing, accession_exists logger = logging.getLogger(__name__) HEADERS = { - "User-Agent": "insider-copytrade-poc contact@example.com", + "User-Agent": "smaug-insider-monitor contact@example.com", "Accept-Encoding": "gzip, deflate", } -EDGAR_FULL_INDEX = "https://www.sec.gov/cgi-bin/browse-edgar?action=getcurrent&type=4&dateb=&owner=include&count=40&output=atom" +EDGAR_ATOM_URL = ( + "https://www.sec.gov/cgi-bin/browse-edgar" + "?action=getcurrent&type=4&dateb=&owner=include&count=40&output=atom" +) def _fetch(url: str, timeout: int = 30) -> requests.Response: @@ -27,39 +29,40 @@ def _fetch(url: str, timeout: int = 30) -> requests.Response: def _get_filing_urls() -> list[tuple[str, str, str]]: - resp = _fetch(EDGAR_FULL_INDEX) + resp = _fetch(EDGAR_ATOM_URL) root = etree.fromstring(resp.content) ns = {"atom": "http://www.w3.org/2005/Atom"} - entries = root.findall("atom:entry", ns) results = [] - for entry in entries: - filing_href = entry.find("atom:link", ns) - if filing_href is None: + for entry in root.findall("atom:entry", ns): + link = entry.find("atom:link", ns) + if link is None: continue - url = filing_href.get("href", "") + url = link.get("href", "") updated = (entry.findtext("atom:updated", namespaces=ns) or "")[:10] - accession = url.rstrip("/").split("/")[-1].replace("-index.htm", "") - accession = accession.replace("-", "") - if len(accession) == 18: - accession = f"{accession[:10]}-{accession[10:12]}-{accession[12:]}" + raw = url.rstrip("/").split("/")[-1].replace("-index.htm", "") + raw = raw.replace("-", "") + if len(raw) == 18: + accession = f"{raw[:10]}-{raw[10:12]}-{raw[12:]}" + else: + accession = raw results.append((url, accession, updated)) return results -def _get_xml_url_from_index(index_url: str) -> Optional[str]: +def _resolve_xml_url(accession: str) -> Optional[str]: + accession_path = accession.replace("-", "") + cik = accession_path[:10].lstrip("0") + base = f"{config.EDGAR_BASE_URL}/Archives/edgar/data/{cik}/{accession_path}/" + index_url = f"{base}{accession}-index.htm" try: resp = _fetch(index_url) - except Exception: - return None - root = etree.fromstring(resp.content) - ns = {"atom": "http://www.w3.org/2005/Atom"} - for link in root.findall("atom:link", ns): - href = link.get("href", "") - if href.endswith(".xml") and "form4" in href.lower(): - return href - for link in root.findall(".//filing-href"): - if link.text and link.text.endswith(".xml"): - return link.text.strip() + doc = html.fromstring(resp.content) + for link in doc.cssselect("table.tableFile a[href]"): + href = link.get("href", "") + if href.lower().endswith(".xml") and not href.lower().endswith("-index.htm"): + return config.EDGAR_BASE_URL + href if href.startswith("/") else base + href + except Exception as e: + logger.debug(f"Could not resolve XML URL for {accession}: {e}") return None @@ -79,11 +82,11 @@ def fetch_and_store_new_filings() -> list[dict]: logger.error(f"Failed to fetch EDGAR index: {e}") return new_filings - for index_url, accession, filed_date in entries: + for _index_url, accession, filed_date in entries: if accession_exists(accession): continue - xml_url = _resolve_xml_url(index_url, accession) + xml_url = _resolve_xml_url(accession) if not xml_url: logger.warning(f"No XML found for {accession}") continue @@ -99,30 +102,12 @@ def fetch_and_store_new_filings() -> list[dict]: parsed = parse_form4(xml_bytes, accession, filed_date) for filing in parsed: - inserted = insert_filing(filing) - if inserted: + if insert_filing(filing): new_filings.append(filing) return new_filings -def _resolve_xml_url(index_url: str, accession: str) -> Optional[str]: - accession_path = accession.replace("-", "") - cik = accession_path[:10].lstrip("0") - base = f"{config.EDGAR_BASE_URL}/Archives/edgar/data/{cik}/{accession_path}/" - candidate = f"{base}{accession}-index.htm" - try: - resp = _fetch(candidate) - root = etree.fromstring(resp.content) - for node in root.iter(): - text = (node.text or "").strip() - if text.endswith(".xml") and ("4" in text or "form" in text.lower()): - return base + text - except Exception: - pass - return None - - def run_poller(on_new_filing=None): logger.info("EDGAR poller started") while True: @@ -134,5 +119,5 @@ def run_poller(on_new_filing=None): try: on_new_filing(filing) except Exception as e: - logger.error(f"Error in on_new_filing callback: {e}") + logger.error(f"Error processing filing {filing.get('accession_number')}: {e}") time.sleep(config.EDGAR_POLL_INTERVAL) diff --git a/main.py b/main.py index 2bd6f41..7f947a2 100644 --- a/main.py +++ b/main.py @@ -8,7 +8,7 @@ logging.basicConfig( logger = logging.getLogger(__name__) -def _on_new_filing(filing: dict): +def _process_filing(filing: dict): from signals.filter_engine import process_filing from alerts.slack_alert import send_slack_alert import config @@ -29,13 +29,25 @@ def _on_new_filing(filing: dict): execute_signal(signal) +def _close_expired_positions(): + import config + if config.ALPACA_KEY and config.ALPACA_SECRET: + from broker.alpaca_client import close_expired_positions + close_expired_positions() + + def cmd_run(): from db.db import init_db from ingestion.edgar_poller import run_poller init_db() logger.info("Database initialized") - run_poller(on_new_filing=_on_new_filing) + + def on_new_filing(filing: dict): + _process_filing(filing) + _close_expired_positions() + + run_poller(on_new_filing=on_new_filing) def cmd_backtest(): @@ -61,7 +73,7 @@ def cmd_fetch_once(): logger.info(f"Fetched and stored {len(filings)} new filings") for filing in filings: - signal = _on_new_filing(filing) + _process_filing(filing) COMMANDS = { From b119b9abae5d0c75c317f0b890ebb06227a93058 Mon Sep 17 00:00:00 2001 From: claude Date: Mon, 4 May 2026 17:21:23 +0000 Subject: [PATCH 4/4] feat: SQLAlchemy ORM models, filing cache incremental fetch, yfinance price cache - Replace db/schema.sql + raw sqlite3 with SQLAlchemy ORM (db/models.py) - Filing, Signal, PriceCache models with proper indexes - db/db.py uses SQLAlchemy sessions throughout; no raw SQL strings - Add PriceCache table: stores daily close prices per ticker - backtest._fetch_prices checks DB first; skips yfinance for completed ranges - New data persisted via upsert_prices() - get_cached_prices() / upsert_prices() added to db.py - EDGAR poller incremental fetch: get_latest_filed_date() returns newest filed_date in DB; fetch_and_store_new_filings skips entries older than that cutoff before even checking accession_exists - Add get_signals_for_backtest() to db.py; backtest no longer opens its own sqlite3 connection - requirements.txt: add sqlalchemy>=2.0.0 Co-authored-by: dodox --- README.md | 12 +- backtest/backtest.py | 120 +++++++------- db/db.py | 340 +++++++++++++++++++++++--------------- db/models.py | 81 +++++++++ db/schema.sql | 37 ----- ingestion/edgar_poller.py | 8 +- requirements.txt | 1 + 7 files changed, 368 insertions(+), 231 deletions(-) create mode 100644 db/models.py delete mode 100644 db/schema.sql diff --git a/README.md b/README.md index a957e7b..82f35da 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ ingestion/edgar_poller.py ← polls every 10 min, dedupes by accession ingestion/form4_parser.py ← parses XML, detects 10b5-1 plans │ ▼ -db/schema.sql + db/db.py ← SQLite (WAL mode): filings + signals tables +db/models.py + db/db.py ← SQLAlchemy ORM: filings, signals, price_cache tables │ ▼ signals/filter_engine.py ← buy-only, exclude 10b5-1, min $50k, role-weighted scoring @@ -80,7 +80,9 @@ Role weights: CEO 3.0 · CFO/President 2.5 · COO 2.0 · Director 1.5 · VP 1.2 ## Backtesting -The backtest loads signals from the SQLite DB and fetches OHLC data via `yfinance` on demand (no local price cache). Entry price is the closing price on the first trading day on or after the signal date; exit price is the closing price on the last trading day before or on the exit date. Raw XML filings are cached in `DATA_DIR` (`data/filings/`) by accession number. +The backtest loads signals from the DB and fetches OHLC data via `yfinance`. Prices are cached in the `price_cache` table — completed date ranges are served entirely from the DB on repeat runs, avoiding redundant network calls. Entry price is the closing price on the first trading day on or after the signal date; exit price is the closing price on the last trading day before or on the exit date. Raw XML filings are cached in `DATA_DIR` (`data/filings/`) by accession number. + +The EDGAR poller also skips fetching XML for filings older than the newest `filed_date` already stored in the DB, so incremental runs only process truly new filings. Metrics reported: win rate, average return, average alpha vs SPY, Sharpe ratio. @@ -95,8 +97,8 @@ Positions are tracked in the `signals` table. When a trade is executed, `execute | `config.py` | All thresholds and env-var loading | | `ingestion/edgar_poller.py` | EDGAR Atom feed polling and deduplication | | `ingestion/form4_parser.py` | Form 4 XML → structured dict; 10b5-1 detection | -| `db/schema.sql` | SQLite schema (`filings`, `signals`) | -| `db/db.py` | DB access layer | +| `db/models.py` | SQLAlchemy ORM models (`Filing`, `Signal`, `PriceCache`) | +| `db/db.py` | DB access layer (SQLAlchemy sessions) | | `signals/filter_engine.py` | Filing → signal pipeline | | `signals/cluster_detector.py` | Cluster detection from DB | | `alerts/slack_alert.py` | Slack webhook alert | @@ -107,4 +109,4 @@ Positions are tracked in the `signals` table. When a trade is executed, `execute ## Requirements - Python 3.11+ -- See `requirements.txt`: `requests`, `lxml`, `yfinance`, `python-dotenv`, `alpaca-trade-api` +- See `requirements.txt`: `requests`, `lxml`, `yfinance`, `python-dotenv`, `alpaca-trade-api`, `sqlalchemy` diff --git a/backtest/backtest.py b/backtest/backtest.py index 5f8474d..3649ea1 100644 --- a/backtest/backtest.py +++ b/backtest/backtest.py @@ -3,44 +3,67 @@ import math from datetime import datetime, timedelta import config +from db.db import get_cached_prices, get_signals_for_backtest, upsert_prices logger = logging.getLogger(__name__) -def _load_signals_from_db(db_path: str, min_score: float, min_cluster_size: int) -> list[dict]: - import sqlite3 - conn = sqlite3.connect(db_path) - conn.row_factory = sqlite3.Row - rows = conn.execute( - """ - SELECT s.*, f.role FROM signals s - LEFT JOIN filings f ON f.ticker = s.ticker AND f.transaction_date = s.trigger_date - WHERE s.score >= ? AND s.cluster_size >= ? - """, - (min_score, min_cluster_size), - ).fetchall() - conn.close() - return [dict(r) for r in rows] +def _fetch_prices(ticker: str, start: datetime, end: datetime) -> dict[str, float]: + try: + import yfinance as yf + except ImportError: + raise ImportError("yfinance not installed. Run: pip install yfinance") + + start_str = start.strftime("%Y-%m-%d") + end_str = (end + timedelta(days=5)).strftime("%Y-%m-%d") + + cached = get_cached_prices(ticker, start_str, end_str) + + today = datetime.utcnow().strftime("%Y-%m-%d") + range_is_complete = end_str < today + + if range_is_complete and cached: + return cached + + data = yf.download( + ticker, + start=start_str, + end=end_str, + progress=False, + auto_adjust=True, + ) + if data.empty: + return cached + + fetched: dict[str, float] = {} + for ts, close_val in data["Close"].items(): + date_key = ts.to_pydatetime().replace(tzinfo=None).strftime("%Y-%m-%d") + fetched[date_key] = float(close_val) + + new_prices = {k: v for k, v in fetched.items() if k not in cached} + if new_prices: + upsert_prices(ticker, new_prices) + + cached.update(fetched) + return cached -def _first_close_on_or_after(price_data, target_date: datetime) -> float: - """Return the closing price on the first trading day on or after target_date.""" - for ts, row in price_data["Close"].items(): - ts_date = ts.to_pydatetime().replace(tzinfo=None) - if ts_date.date() >= target_date.date(): - return float(row) - raise ValueError(f"No price data on or after {target_date.date()}") +def _first_close_on_or_after(prices: dict[str, float], target: datetime) -> float: + target_str = target.strftime("%Y-%m-%d") + for date_str in sorted(prices): + if date_str >= target_str: + return prices[date_str] + raise ValueError(f"No price data on or after {target_str}") -def _first_close_before(price_data, target_date: datetime) -> float: - """Return the closing price on the last trading day before or on target_date.""" +def _first_close_before(prices: dict[str, float], target: datetime) -> float: + target_str = target.strftime("%Y-%m-%d") result = None - for ts, row in price_data["Close"].items(): - ts_date = ts.to_pydatetime().replace(tzinfo=None) - if ts_date.date() <= target_date.date(): - result = float(row) + for date_str in sorted(prices): + if date_str <= target_str: + result = prices[date_str] if result is None: - raise ValueError(f"No price data on or before {target_date.date()}") + raise ValueError(f"No price data on or before {target_str}") return result @@ -50,22 +73,15 @@ def run_backtest( min_score: float = 0.0, min_cluster_size: int = 1, ) -> dict: - try: - import yfinance as yf - except ImportError: - raise ImportError("yfinance not installed. Run: pip install yfinance") - - db_path = db_path or config.DB_PATH holding_days = holding_days or config.HOLDING_PERIOD_DAYS - - signals = _load_signals_from_db(db_path, min_score, min_cluster_size) + signals = get_signals_for_backtest(min_score, min_cluster_size) if not signals: logger.warning("No signals found matching criteria") return {} results = [] - spy_cache: dict[tuple, float] = {} + spy_cache: dict[str, float] = {} for signal in signals: ticker = signal["ticker"] @@ -79,38 +95,26 @@ def run_backtest( exit_date = entry_date + timedelta(days=holding_days) try: - stock_data = yf.download( - ticker, - start=entry_date.strftime("%Y-%m-%d"), - end=(exit_date + timedelta(days=5)).strftime("%Y-%m-%d"), - progress=False, - auto_adjust=True, - ) - if stock_data.empty: + prices = _fetch_prices(ticker, entry_date, exit_date) + if not prices: logger.debug(f"No price data for {ticker}") continue - entry_price = _first_close_on_or_after(stock_data, entry_date) - exit_price = _first_close_before(stock_data, exit_date) + entry_price = _first_close_on_or_after(prices, entry_date) + exit_price = _first_close_before(prices, exit_date) stock_return = (exit_price - entry_price) / entry_price except Exception as e: logger.debug(f"Failed to get data for {ticker}: {e}") continue - period_key = (entry_date_str, holding_days) + period_key = entry_date_str if period_key not in spy_cache: try: - spy_data = yf.download( - "SPY", - start=entry_date.strftime("%Y-%m-%d"), - end=(exit_date + timedelta(days=5)).strftime("%Y-%m-%d"), - progress=False, - auto_adjust=True, - ) - if not spy_data.empty: - spy_entry = _first_close_on_or_after(spy_data, entry_date) - spy_exit = _first_close_before(spy_data, exit_date) + spy_prices = _fetch_prices("SPY", entry_date, exit_date) + if spy_prices: + spy_entry = _first_close_on_or_after(spy_prices, entry_date) + spy_exit = _first_close_before(spy_prices, exit_date) spy_cache[period_key] = (spy_exit - spy_entry) / spy_entry else: spy_cache[period_key] = 0.0 diff --git a/db/db.py b/db/db.py index 1055af8..d021483 100644 --- a/db/db.py +++ b/db/db.py @@ -1,146 +1,226 @@ -import sqlite3 -import os from datetime import datetime +from typing import Optional + +from sqlalchemy import create_engine, func, select, text, update +from sqlalchemy.orm import Session + import config +from db.models import Base, Filing, PriceCache, Signal -def get_connection(): - conn = sqlite3.connect(config.DB_PATH) - conn.row_factory = sqlite3.Row - conn.execute("PRAGMA journal_mode=WAL") - conn.execute("PRAGMA foreign_keys=ON") - return conn +def _engine(): + url = f"sqlite:///{config.DB_PATH}" + return create_engine(url, connect_args={"check_same_thread": False}) + + +_ENGINE = None + + +def _get_engine(): + global _ENGINE + if _ENGINE is None: + _ENGINE = _engine() + return _ENGINE def init_db(): - schema_path = os.path.join(os.path.dirname(__file__), "schema.sql") - with open(schema_path, "r") as f: - schema = f.read() - conn = get_connection() - conn.executescript(schema) - conn.commit() - conn.close() + engine = _get_engine() + with engine.connect() as conn: + conn.execute(text("PRAGMA journal_mode=WAL")) + conn.execute(text("PRAGMA foreign_keys=ON")) + Base.metadata.create_all(engine) + + +def _session() -> Session: + return Session(_get_engine()) def insert_filing(filing: dict) -> bool: - conn = get_connection() - try: - conn.execute( - """ - INSERT OR IGNORE INTO filings - (accession_number, ticker, cik, insider_name, role, - transaction_date, filed_date, shares, price, total_value, - flag, is_10b51, post_tx_shares) - VALUES - (:accession_number, :ticker, :cik, :insider_name, :role, - :transaction_date, :filed_date, :shares, :price, :total_value, - :flag, :is_10b51, :post_tx_shares) - """, - filing, + with _session() as session: + exists = session.scalar( + select(Filing.id).where(Filing.accession_number == filing["accession_number"]) ) - inserted = conn.execute("SELECT changes()").fetchone()[0] > 0 - conn.commit() - return inserted - finally: - conn.close() + if exists is not None: + return False - -def insert_signal(signal: dict) -> int: - conn = get_connection() - try: - cur = conn.execute( - """ - INSERT INTO signals - (ticker, trigger_date, cluster_size, total_cluster_value, score) - VALUES - (:ticker, :trigger_date, :cluster_size, :total_cluster_value, :score) - """, - signal, + row = Filing( + accession_number=filing["accession_number"], + ticker=filing.get("ticker"), + cik=filing.get("cik"), + insider_name=filing.get("insider_name"), + role=filing.get("role"), + transaction_date=filing.get("transaction_date"), + filed_date=filing.get("filed_date"), + shares=filing.get("shares"), + price=filing.get("price"), + total_value=filing.get("total_value"), + flag=filing.get("flag"), + is_10b51=bool(filing.get("is_10b51", False)), + post_tx_shares=filing.get("post_tx_shares"), ) - signal_id = cur.lastrowid - conn.commit() - return signal_id - finally: - conn.close() - - -def mark_signal_alerted(signal_id: int): - conn = get_connection() - try: - conn.execute("UPDATE signals SET alerted=1 WHERE id=?", (signal_id,)) - conn.commit() - finally: - conn.close() - - -def mark_signal_executed(signal_id: int): - conn = get_connection() - try: - conn.execute( - "UPDATE signals SET executed=1, executed_at=? WHERE id=?", - (datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"), signal_id), - ) - conn.commit() - finally: - conn.close() - - -def mark_signal_closed(signal_id: int): - conn = get_connection() - try: - conn.execute("UPDATE signals SET closed=1 WHERE id=?", (signal_id,)) - conn.commit() - finally: - conn.close() - - -def get_unalerted_signals() -> list[dict]: - conn = get_connection() - try: - rows = conn.execute( - "SELECT * FROM signals WHERE alerted=0 ORDER BY created_at ASC" - ).fetchall() - return [dict(r) for r in rows] - finally: - conn.close() - - -def get_executed_unclosed_signals() -> list[dict]: - conn = get_connection() - try: - rows = conn.execute( - "SELECT * FROM signals WHERE executed=1 AND closed=0 AND executed_at IS NOT NULL" - ).fetchall() - return [dict(r) for r in rows] - finally: - conn.close() - - -def get_recent_buys_for_ticker(ticker: str, window_days: int) -> list: - conn = get_connection() - try: - rows = conn.execute( - """ - SELECT * FROM filings - WHERE ticker=? - AND flag='A' - AND is_10b51=0 - AND transaction_date >= date('now', ? || ' days') - ORDER BY transaction_date DESC - """, - (ticker, f"-{window_days}"), - ).fetchall() - return [dict(r) for r in rows] - finally: - conn.close() + session.add(row) + session.commit() + return True def accession_exists(accession_number: str) -> bool: - conn = get_connection() - try: - row = conn.execute( - "SELECT 1 FROM filings WHERE accession_number=?", (accession_number,) - ).fetchone() - return row is not None - finally: - conn.close() + with _session() as session: + return session.scalar( + select(Filing.id).where(Filing.accession_number == accession_number) + ) is not None + + +def get_latest_filed_date() -> Optional[str]: + with _session() as session: + return session.scalar(select(func.max(Filing.filed_date))) + + +def insert_signal(signal: dict) -> int: + with _session() as session: + row = Signal( + ticker=signal["ticker"], + trigger_date=signal["trigger_date"], + cluster_size=signal["cluster_size"], + total_cluster_value=signal.get("total_cluster_value", 0.0), + score=signal["score"], + ) + session.add(row) + session.commit() + return row.id + + +def mark_signal_alerted(signal_id: int): + with _session() as session: + session.execute( + update(Signal).where(Signal.id == signal_id).values(alerted=True) + ) + session.commit() + + +def mark_signal_executed(signal_id: int): + with _session() as session: + session.execute( + update(Signal) + .where(Signal.id == signal_id) + .values(executed=True, executed_at=datetime.utcnow()) + ) + session.commit() + + +def mark_signal_closed(signal_id: int): + with _session() as session: + session.execute( + update(Signal).where(Signal.id == signal_id).values(closed=True) + ) + session.commit() + + +def get_unalerted_signals() -> list[dict]: + with _session() as session: + rows = session.scalars( + select(Signal).where(Signal.alerted == False).order_by(Signal.created_at) + ).all() + return [_signal_to_dict(r) for r in rows] + + +def get_executed_unclosed_signals() -> list[dict]: + with _session() as session: + rows = session.scalars( + select(Signal).where( + Signal.executed == True, + Signal.closed == False, + Signal.executed_at.is_not(None), + ) + ).all() + return [_signal_to_dict(r) for r in rows] + + +def get_recent_buys_for_ticker(ticker: str, window_days: int) -> list[dict]: + from datetime import timedelta + + cutoff = (datetime.utcnow() - timedelta(days=window_days)).strftime("%Y-%m-%d") + with _session() as session: + rows = session.scalars( + select(Filing) + .where( + Filing.ticker == ticker, + Filing.flag == "A", + Filing.is_10b51 == False, + Filing.transaction_date >= cutoff, + ) + .order_by(Filing.transaction_date.desc()) + ).all() + return [_filing_to_dict(r) for r in rows] + + +def get_signals_for_backtest(min_score: float, min_cluster_size: int) -> list[dict]: + with _session() as session: + rows = session.scalars( + select(Signal).where( + Signal.score >= min_score, + Signal.cluster_size >= min_cluster_size, + ) + ).all() + return [_signal_to_dict(r) for r in rows] + + +def get_cached_prices(ticker: str, start_date: str, end_date: str) -> dict[str, float]: + with _session() as session: + rows = session.scalars( + select(PriceCache).where( + PriceCache.ticker == ticker, + PriceCache.date >= start_date, + PriceCache.date <= end_date, + ) + ).all() + return {r.date: r.close for r in rows} + + +def upsert_prices(ticker: str, prices: dict[str, float]): + with _session() as session: + for date_str, close in prices.items(): + existing = session.scalar( + select(PriceCache).where( + PriceCache.ticker == ticker, + PriceCache.date == date_str, + ) + ) + if existing is None: + session.add(PriceCache(ticker=ticker, date=date_str, close=close)) + session.commit() + + +def _filing_to_dict(row: Filing) -> dict: + return { + "id": row.id, + "accession_number": row.accession_number, + "ticker": row.ticker, + "cik": row.cik, + "insider_name": row.insider_name, + "role": row.role, + "transaction_date": row.transaction_date, + "filed_date": row.filed_date, + "shares": row.shares, + "price": row.price, + "total_value": row.total_value, + "flag": row.flag, + "is_10b51": row.is_10b51, + "post_tx_shares": row.post_tx_shares, + "created_at": row.created_at.isoformat() if row.created_at else None, + } + + +def _signal_to_dict(row: Signal) -> dict: + return { + "id": row.id, + "ticker": row.ticker, + "trigger_date": row.trigger_date, + "cluster_size": row.cluster_size, + "total_cluster_value": row.total_cluster_value, + "score": row.score, + "alerted": row.alerted, + "executed": row.executed, + "executed_at": row.executed_at.strftime("%Y-%m-%dT%H:%M:%SZ") if row.executed_at else None, + "closed": row.closed, + "created_at": row.created_at.isoformat() if row.created_at else None, + } diff --git a/db/models.py b/db/models.py new file mode 100644 index 0000000..27761e7 --- /dev/null +++ b/db/models.py @@ -0,0 +1,81 @@ +from datetime import datetime + +from sqlalchemy import ( + Boolean, + Column, + DateTime, + Float, + Index, + Integer, + String, + Text, + UniqueConstraint, +) +from sqlalchemy.orm import DeclarativeBase + + +class Base(DeclarativeBase): + pass + + +class Filing(Base): + __tablename__ = "filings" + + id = Column(Integer, primary_key=True, autoincrement=True) + accession_number = Column(String, unique=True, nullable=False) + ticker = Column(String) + cik = Column(String) + insider_name = Column(String) + role = Column(String) + transaction_date = Column(String) + filed_date = Column(String) + shares = Column(Float) + price = Column(Float) + total_value = Column(Float) + flag = Column(String) + is_10b51 = Column(Boolean, default=False) + post_tx_shares = Column(Float) + created_at = Column(DateTime, default=datetime.utcnow) + + __table_args__ = ( + Index("idx_filings_ticker", "ticker"), + Index("idx_filings_transaction_date", "transaction_date"), + Index("idx_filings_filed_date", "filed_date"), + ) + + +class Signal(Base): + __tablename__ = "signals" + + id = Column(Integer, primary_key=True, autoincrement=True) + ticker = Column(String) + trigger_date = Column(String) + cluster_size = Column(Integer) + total_cluster_value = Column(Float) + score = Column(Float) + alerted = Column(Boolean, default=False) + executed = Column(Boolean, default=False) + executed_at = Column(DateTime) + closed = Column(Boolean, default=False) + created_at = Column(DateTime, default=datetime.utcnow) + + __table_args__ = ( + Index("idx_signals_ticker", "ticker"), + Index("idx_signals_alerted", "alerted"), + Index("idx_signals_executed", "executed"), + ) + + +class PriceCache(Base): + __tablename__ = "price_cache" + + id = Column(Integer, primary_key=True, autoincrement=True) + ticker = Column(String, nullable=False) + date = Column(String, nullable=False) + close = Column(Float, nullable=False) + fetched_at = Column(DateTime, default=datetime.utcnow) + + __table_args__ = ( + UniqueConstraint("ticker", "date", name="uq_price_cache_ticker_date"), + Index("idx_price_cache_ticker_date", "ticker", "date"), + ) diff --git a/db/schema.sql b/db/schema.sql deleted file mode 100644 index bed3c6b..0000000 --- a/db/schema.sql +++ /dev/null @@ -1,37 +0,0 @@ -CREATE TABLE IF NOT EXISTS filings ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - accession_number TEXT UNIQUE, - ticker TEXT, - cik TEXT, - insider_name TEXT, - role TEXT, - transaction_date TEXT, - filed_date TEXT, - shares REAL, - price REAL, - total_value REAL, - flag TEXT, - is_10b51 INTEGER DEFAULT 0, - post_tx_shares REAL, - created_at TEXT DEFAULT (datetime('now')) -); - -CREATE TABLE IF NOT EXISTS signals ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - ticker TEXT, - trigger_date TEXT, - cluster_size INTEGER, - total_cluster_value REAL, - score REAL, - alerted INTEGER DEFAULT 0, - executed INTEGER DEFAULT 0, - executed_at TEXT, - closed INTEGER DEFAULT 0, - created_at TEXT DEFAULT (datetime('now')) -); - -CREATE INDEX IF NOT EXISTS idx_filings_ticker ON filings(ticker); -CREATE INDEX IF NOT EXISTS idx_filings_transaction_date ON filings(transaction_date); -CREATE INDEX IF NOT EXISTS idx_signals_ticker ON signals(ticker); -CREATE INDEX IF NOT EXISTS idx_signals_alerted ON signals(alerted); -CREATE INDEX IF NOT EXISTS idx_signals_executed ON signals(executed); diff --git a/ingestion/edgar_poller.py b/ingestion/edgar_poller.py index 727dc47..29eab44 100644 --- a/ingestion/edgar_poller.py +++ b/ingestion/edgar_poller.py @@ -7,7 +7,7 @@ from lxml import etree, html import config from ingestion.form4_parser import parse_form4 -from db.db import insert_filing, accession_exists +from db.db import accession_exists, get_latest_filed_date, insert_filing logger = logging.getLogger(__name__) @@ -82,7 +82,13 @@ def fetch_and_store_new_filings() -> list[dict]: logger.error(f"Failed to fetch EDGAR index: {e}") return new_filings + latest_in_db = get_latest_filed_date() + for _index_url, accession, filed_date in entries: + if latest_in_db and filed_date and filed_date < latest_in_db: + logger.debug(f"Skipping {accession}: filed_date {filed_date} older than latest in DB {latest_in_db}") + continue + if accession_exists(accession): continue diff --git a/requirements.txt b/requirements.txt index 3111da8..93d6bdd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ lxml>=5.0.0 yfinance>=0.2.0 python-dotenv>=1.0.0 alpaca-trade-api>=3.0.0 +sqlalchemy>=2.0.0