diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..e78a987 --- /dev/null +++ b/.env.example @@ -0,0 +1,6 @@ +SLACK_WEBHOOK_URL= +ALPACA_KEY= +ALPACA_SECRET= +ALPACA_BASE_URL=https://paper-api.alpaca.markets +DB_PATH=insider.db +DATA_DIR=data/filings diff --git a/PLAN.md b/PLAN.md new file mode 100644 index 0000000..9e0f1b5 --- /dev/null +++ b/PLAN.md @@ -0,0 +1,340 @@ +# Insider Copytrade System -- Implementation Plan + +## Description + +A personal system that monitors SEC EDGAR Form 4 filings in real-time, filters for high-quality insider buying signals, alerts via Slack, and optionally executes trades automatically through Alpaca's paper or live trading API. + +The system is fully self-hosted, uses only free/public data sources, and requires no third-party data subscriptions. + +--- + +## Background + +Company insiders (executives, directors, >10% shareholders) must file SEC Form 4 within 2 business days of any trade. This is public data via SEC EDGAR. The signal value of insider *buying* is academically documented -- executives buying their own stock with personal capital is a meaningful vote of confidence, particularly when: + +- Multiple insiders buy simultaneously (cluster signal) +- The trade is unplanned (not a 10b5-1 scheduled plan) +- The company is small/mid-cap (less institutional arbitrage) + +The edge vs. political trade copying: 2-day disclosure lag vs. 45 days, and the signal is company-specific rather than sector-level. + +**Key risk:** This signal is publicly known and tracked. The edge is in filtering quality and execution speed, not data exclusivity. Large-cap Form 4 signals are arbitraged quickly. Focus on small/mid-cap, clustered, unplanned buys. + +--- + +## System Outline + +``` +SEC EDGAR RSS Feed (poll every 10 min) + | + [Ingestion Layer] + | + Parse Form 4 XML + | + [Filter Engine] + - Buy only (flag = A) + - Exclude 10b5-1 plans + - Min transaction size + - Role weighting + - Cluster detection + | + SQLite Database + | + ┌────────────┬──────────────┐ + | | | +[Backtester] [Slack Alert] [Alpaca API] + (manual) (paper/live) +``` + +--- + +## Actionables + +### Phase 1 -- Data Ingestion + +**Goal:** Reliably pull and parse Form 4 filings as they appear. + +**Tasks:** + +1. Set up project structure +``` +insider-copytrade/ + ingestion/ + edgar_poller.py # polls EDGAR RSS + form4_parser.py # parses XML -> structured dict + db/ + schema.sql + db.py # SQLite interface + signals/ + filter_engine.py # applies signal filters + cluster_detector.py + alerts/ + slack_alert.py + broker/ + alpaca_client.py + backtest/ + backtest.py + config.py + main.py +``` + +2. Poll EDGAR RSS for Form 4 filings every 10 minutes: +``` +https://www.sec.gov/cgi-bin/browse-edgar?action=getcurrent&type=4&dateb=&owner=include&count=40&search_text=&action=getcurrent +``` +SEC also provides a structured latest filings feed: +``` +https://efts.sec.gov/LATEST/search-index?q=&forms=4 +``` + +3. For each new filing, fetch and parse the XML document. Key fields to extract: + - `issuerTradingSymbol` (ticker) + - `rptOwnerName`, `officerTitle` (insider name + role) + - `transactionDate` + - `transactionAcquiredDisposedCode` (A = buy, D = sell) + - `transactionShares`, `transactionPricePerShare` + - `transactionTotalValue` (compute if not present) + - `footnotes` (check for "10b5-1" mention) + - `sharesOwnedFollowingTransaction` + +4. Store raw filing XML + parsed fields. Track `accessionNumber` as dedup key. + +**SQLite schema:** +```sql +CREATE TABLE filings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + accession_number TEXT UNIQUE, + ticker TEXT, + cik TEXT, + insider_name TEXT, + role TEXT, + transaction_date TEXT, + filed_date TEXT, + shares REAL, + price REAL, + total_value REAL, + flag TEXT, -- A or D + is_10b51 INTEGER, -- 0 or 1 + post_tx_shares REAL, + created_at TEXT +); + +CREATE TABLE signals ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ticker TEXT, + trigger_date TEXT, + cluster_size INTEGER, + total_cluster_value REAL, + score REAL, + alerted INTEGER DEFAULT 0, + executed INTEGER DEFAULT 0, + created_at TEXT +); +``` + +--- + +### Phase 2 -- Filter Engine + +**Goal:** Reduce noise to actionable signals only. + +**Filters to apply (in order):** + +| Filter | Logic | +|---|---| +| Buy only | `flag == 'A'` | +| Exclude 10b5-1 | Scan footnotes for "10b5-1", "Rule 10b5", "adopted a plan" | +| Min transaction value | `total_value >= 50000` (configurable) | +| Exclude derivative transactions | Options exercises are weaker signal than open market purchases | +| Role weighting | CEO/CFO/President = high; Director = medium; 10% owner = context-dependent | +| Cluster detection | 2+ insiders buying same ticker within 30 days = elevated signal | + +**Scoring formula (simple v1):** +```python +score = base_role_weight * log(total_value) * cluster_multiplier +# cluster_multiplier = 1.0 + (0.5 * (cluster_size - 1)) +``` + +Expose all thresholds in `config.py` for easy tuning during backtesting. + +--- + +### Phase 3 -- SQLite Storage + +SQLite is sufficient for this workload (low write volume, single process). Use WAL mode for concurrent reads during backtesting: + +```python +conn = sqlite3.connect('insider.db') +conn.execute('PRAGMA journal_mode=WAL') +``` + +Keep raw filing XML in a `/data/filings/` directory keyed by accession number. Parse on ingest, re-parse never needed. + +--- + +### Phase 4 -- Slack Alerts + +**Goal:** Get notified immediately when a signal fires, with enough context to decide manually. + +1. Create a Slack app, get a webhook URL (takes 5 minutes) +2. Alert format: + +``` +INSIDER BUY SIGNAL +Ticker: $ACME +Insider: John Smith (CEO) +Date: 2025-05-01 +Shares: 10,000 @ $14.50 = $145,000 +Cluster: 3 insiders in last 14 days +Score: 8.4 +10b5-1: No +EDGAR: https://www.sec.gov/cgi-bin/browse-edgar?... +``` + +3. Alert only on signals above configurable score threshold +4. Mark `alerted = 1` in DB after sending to avoid duplicates on re-poll + +```python +import requests + +def send_slack_alert(webhook_url, signal): + requests.post(webhook_url, json={"text": format_signal(signal)}) +``` + +--- + +### Phase 5 -- Backtesting + +**Goal:** Validate filter parameters on historical data before going live. + +**Data:** +- Historical Form 4 filings: download bulk XML from `https://www.sec.gov/dera/data/form-4-data` +- Price data: `yfinance` (free, sufficient for backtesting) + +**Backtest logic:** +```python +# For each signal in historical data: +# - Entry: next market open after filed_date +# - Exit: N days later (configurable: 30/60/90/180) +# - Calculate return vs SPY over same period +# - Aggregate by role, cluster_size, market_cap bucket +``` + +**Use `vectorbt` for performance:** +```python +import vectorbt as vbt +# Build entry/exit signal matrices aligned to price data +# Run portfolio simulation with configurable position sizing +``` + +**Output metrics:** +- Annualized return vs SPY benchmark +- Win rate +- Avg return by holding period +- Avg return by role / cluster size +- Max drawdown +- Sharpe ratio + +**Critical:** Test on post-2022 data specifically. Pre-2022 results are likely inflated -- the signal became widely tracked after Autopilot/media coverage. + +**Parameter grid to test:** +```python +MIN_VALUE = [25_000, 50_000, 100_000] +HOLDING_DAYS = [30, 60, 90, 180] +CLUSTER_WINDOW = [14, 30] +MIN_CLUSTER_SIZE = [1, 2, 3] +ROLES = ['all', 'c-suite-only'] +``` + +--- + +### Phase 6 -- Alpaca Integration + +**Goal:** Optionally auto-execute signals. Start with paper trading. + +**Paper trading base URL:** `https://paper-api.alpaca.markets` +**Live trading base URL:** `https://api.alpaca.markets` + +Swap via config flag -- never hardcode. + +```python +from alpaca_trade_api import REST + +api = REST( + key_id=config.ALPACA_KEY, + secret_key=config.ALPACA_SECRET, + base_url=config.ALPACA_BASE_URL # paper or live +) + +def execute_signal(ticker, portfolio_value, signal_score): + # Fixed fractional sizing: 2% of portfolio per signal + price = api.get_latest_trade(ticker).price + allocation = portfolio_value * 0.02 + qty = int(allocation / price) + if qty < 1: + return + api.submit_order( + symbol=ticker, + qty=qty, + side='buy', + type='market', + time_in_force='day' + ) +``` + +Position sizing: start at 2% per signal, max 10% in any single ticker. Add a max open positions limit (e.g. 20) to cap exposure. + +Exit logic (v1): time-based only (close after N days). Add trailing stop later. + +--- + +## Build Order + +| Step | Deliverable | Est. Time | +|---|---|---| +| 1 | EDGAR poller + Form 4 XML parser + SQLite storage | 1 day | +| 2 | Filter engine + cluster detector | 0.5 day | +| 3 | Slack alert | 1 hour | +| 4 | Historical data download + backtest | 1-2 days | +| 5 | Alpaca paper trading integration | 0.5 day | +| 6 | Run paper trading 4-8 weeks, monitor | -- | +| 7 | Switch to live with small capital | -- | + +Do not proceed to Step 7 without meaningful paper trading history. + +--- + +## Dependencies + +``` +requests +lxml +sqlite3 (stdlib) +yfinance +vectorbt +alpaca-trade-api +python-dotenv +``` + +All free. No paid APIs required. + +--- + +## Config Template + +```python +# config.py +EDGAR_POLL_INTERVAL = 600 # seconds +MIN_TRANSACTION_VALUE = 50_000 +MIN_CLUSTER_SIZE = 1 # raise to 2 for higher quality +CLUSTER_WINDOW_DAYS = 30 +HOLDING_PERIOD_DAYS = 90 +POSITION_SIZE_PCT = 0.02 # 2% per signal +MAX_POSITIONS = 20 +SCORE_ALERT_THRESHOLD = 5.0 + +SLACK_WEBHOOK_URL = "" +ALPACA_KEY = "" +ALPACA_SECRET = "" +ALPACA_BASE_URL = "https://paper-api.alpaca.markets" # switch for live +``` diff --git a/alerts/__init__.py b/alerts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/alerts/slack_alert.py b/alerts/slack_alert.py new file mode 100644 index 0000000..5056453 --- /dev/null +++ b/alerts/slack_alert.py @@ -0,0 +1,64 @@ +import logging +import requests + +import config +from db.db import mark_signal_alerted + +logger = logging.getLogger(__name__) + + +def format_signal(signal: dict) -> str: + filing = signal.get("filing", {}) + ticker = signal["ticker"] + insider = filing.get("insider_name", "Unknown") + role = filing.get("role", "Unknown") + tx_date = filing.get("transaction_date", "") + shares = filing.get("shares") + price = filing.get("price") + total_value = filing.get("total_value") or signal.get("total_cluster_value", 0) + cluster_size = signal["cluster_size"] + score = signal["score"] + is_10b51 = "Yes" if filing.get("is_10b51") else "No" + accession = filing.get("accession_number", "") + + shares_str = f"{shares:,.0f}" if shares else "N/A" + price_str = f"${price:,.2f}" if price else "N/A" + value_str = f"${total_value:,.0f}" if total_value else "N/A" + edgar_url = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&type=4&dateb=&owner=include&count=10&search_text=&ticker={ticker}" + + return ( + f"*INSIDER BUY SIGNAL*\n" + f"Ticker: ${ticker}\n" + f"Insider: {insider} ({role})\n" + f"Date: {tx_date}\n" + f"Shares: {shares_str} @ {price_str} = {value_str}\n" + f"Cluster: {cluster_size} insider(s) in last {config.CLUSTER_WINDOW_DAYS} days\n" + f"Score: {score}\n" + f"10b5-1: {is_10b51}\n" + f"EDGAR: {edgar_url}" + ) + + +def send_slack_alert(signal: dict) -> bool: + if not config.SLACK_WEBHOOK_URL: + logger.warning("SLACK_WEBHOOK_URL not configured") + return False + + if signal.get("score", 0) < config.SCORE_ALERT_THRESHOLD: + logger.debug(f"Signal score {signal['score']} below threshold {config.SCORE_ALERT_THRESHOLD}") + return False + + text = format_signal(signal) + try: + resp = requests.post( + config.SLACK_WEBHOOK_URL, + json={"text": text}, + timeout=10, + ) + resp.raise_for_status() + mark_signal_alerted(signal["id"]) + logger.info(f"Slack alert sent for {signal['ticker']}") + return True + except Exception as e: + logger.error(f"Failed to send Slack alert: {e}") + return False diff --git a/backtest/__init__.py b/backtest/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backtest/backtest.py b/backtest/backtest.py new file mode 100644 index 0000000..6c8acc8 --- /dev/null +++ b/backtest/backtest.py @@ -0,0 +1,147 @@ +import logging +from datetime import datetime, timedelta +from typing import Optional +import sqlite3 + +import config + +logger = logging.getLogger(__name__) + + +def _load_signals_from_db(db_path: str) -> list[dict]: + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + rows = conn.execute( + "SELECT s.*, f.role FROM signals s " + "LEFT JOIN filings f ON f.ticker = s.ticker AND f.transaction_date = s.trigger_date " + "WHERE s.cluster_size >= 1" + ).fetchall() + conn.close() + return [dict(r) for r in rows] + + +def run_backtest( + db_path: str = None, + holding_days: int = None, + min_score: float = 0.0, + min_cluster_size: int = 1, +) -> dict: + try: + import yfinance as yf + except ImportError: + raise ImportError("yfinance not installed. Run: pip install yfinance") + + db_path = db_path or config.DB_PATH + holding_days = holding_days or config.HOLDING_PERIOD_DAYS + + signals = _load_signals_from_db(db_path) + signals = [s for s in signals if s["score"] >= min_score and s["cluster_size"] >= min_cluster_size] + + if not signals: + logger.warning("No signals found matching criteria") + return {} + + results = [] + spy_returns = {} + + for signal in signals: + ticker = signal["ticker"] + entry_date_str = signal["trigger_date"] + + try: + entry_date = datetime.strptime(entry_date_str, "%Y-%m-%d") + except ValueError: + continue + + exit_date = entry_date + timedelta(days=holding_days) + + try: + stock_data = yf.download( + ticker, + start=(entry_date - timedelta(days=5)).strftime("%Y-%m-%d"), + end=(exit_date + timedelta(days=5)).strftime("%Y-%m-%d"), + progress=False, + auto_adjust=True, + ) + if stock_data.empty: + continue + + entry_price = float(stock_data["Close"].iloc[0]) + exit_price = float(stock_data["Close"].iloc[-1]) + stock_return = (exit_price - entry_price) / entry_price + + except Exception as e: + logger.debug(f"Failed to get data for {ticker}: {e}") + continue + + period_key = (entry_date_str, holding_days) + if period_key not in spy_returns: + try: + spy_data = yf.download( + "SPY", + start=(entry_date - timedelta(days=5)).strftime("%Y-%m-%d"), + end=(exit_date + timedelta(days=5)).strftime("%Y-%m-%d"), + progress=False, + auto_adjust=True, + ) + if not spy_data.empty: + spy_entry = float(spy_data["Close"].iloc[0]) + spy_exit = float(spy_data["Close"].iloc[-1]) + spy_returns[period_key] = (spy_exit - spy_entry) / spy_entry + else: + spy_returns[period_key] = 0.0 + except Exception: + spy_returns[period_key] = 0.0 + + spy_return = spy_returns.get(period_key, 0.0) + alpha = stock_return - spy_return + + results.append({ + "ticker": ticker, + "entry_date": entry_date_str, + "stock_return": round(stock_return, 4), + "spy_return": round(spy_return, 4), + "alpha": round(alpha, 4), + "cluster_size": signal["cluster_size"], + "score": signal["score"], + }) + + if not results: + return {"error": "No results computed"} + + returns = [r["stock_return"] for r in results] + alphas = [r["alpha"] for r in results] + win_rate = sum(1 for r in returns if r > 0) / len(returns) + avg_return = sum(returns) / len(returns) + avg_alpha = sum(alphas) / len(alphas) + + import math + std_dev = math.sqrt(sum((r - avg_return) ** 2 for r in returns) / len(returns)) + sharpe = (avg_return / std_dev * math.sqrt(252 / holding_days)) if std_dev > 0 else 0.0 + + summary = { + "total_signals": len(results), + "win_rate": round(win_rate, 4), + "avg_return": round(avg_return, 4), + "avg_alpha_vs_spy": round(avg_alpha, 4), + "sharpe_ratio": round(sharpe, 4), + "holding_days": holding_days, + "results": results, + } + + return summary + + +def print_summary(summary: dict): + if "error" in summary: + print(f"Error: {summary['error']}") + return + print(f"\n{'='*40}") + print(f"Backtest Results ({summary['holding_days']}-day hold)") + print(f"{'='*40}") + print(f"Total signals: {summary['total_signals']}") + print(f"Win rate: {summary['win_rate']:.1%}") + print(f"Avg return: {summary['avg_return']:.2%}") + print(f"Avg alpha vs SPY: {summary['avg_alpha_vs_spy']:.2%}") + print(f"Sharpe ratio: {summary['sharpe_ratio']:.2f}") + print(f"{'='*40}\n") diff --git a/broker/__init__.py b/broker/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/broker/alpaca_client.py b/broker/alpaca_client.py new file mode 100644 index 0000000..67b9223 --- /dev/null +++ b/broker/alpaca_client.py @@ -0,0 +1,94 @@ +import logging +from typing import Optional + +import config +from db.db import mark_signal_executed + +logger = logging.getLogger(__name__) + + +def _get_api(): + try: + from alpaca_trade_api import REST + except ImportError: + raise ImportError("alpaca-trade-api not installed. Run: pip install alpaca-trade-api") + + return REST( + key_id=config.ALPACA_KEY, + secret_key=config.ALPACA_SECRET, + base_url=config.ALPACA_BASE_URL, + ) + + +def get_portfolio_value() -> float: + api = _get_api() + account = api.get_account() + return float(account.portfolio_value) + + +def get_open_positions_count() -> int: + api = _get_api() + return len(api.list_positions()) + + +def execute_signal(signal: dict) -> bool: + if not config.ALPACA_KEY or not config.ALPACA_SECRET: + logger.warning("Alpaca credentials not configured") + return False + + ticker = signal["ticker"] + + try: + api = _get_api() + positions_count = get_open_positions_count() + if positions_count >= config.MAX_POSITIONS: + logger.warning(f"Max positions ({config.MAX_POSITIONS}) reached, skipping {ticker}") + return False + + portfolio_value = get_portfolio_value() + allocation = portfolio_value * config.POSITION_SIZE_PCT + + latest_trade = api.get_latest_trade(ticker) + price = float(latest_trade.price) + if price <= 0: + logger.error(f"Invalid price for {ticker}: {price}") + return False + + qty = int(allocation / price) + if qty < 1: + logger.warning(f"Allocation too small for {ticker}: ${allocation:.2f} at ${price:.2f}") + return False + + existing_positions = {p.symbol: p for p in api.list_positions()} + if ticker in existing_positions: + existing_value = float(existing_positions[ticker].market_value) + if existing_value / portfolio_value >= 0.10: + logger.warning(f"Already at 10% cap for {ticker}, skipping") + return False + + order = api.submit_order( + symbol=ticker, + qty=qty, + side="buy", + type="market", + time_in_force="day", + ) + logger.info(f"Order submitted: {ticker} qty={qty} order_id={order.id}") + mark_signal_executed(signal["id"]) + return True + + except Exception as e: + logger.error(f"Failed to execute signal for {ticker}: {e}") + return False + + +def close_position_after_days(ticker: str, holding_days: Optional[int] = None): + days = holding_days or config.HOLDING_PERIOD_DAYS + api = _get_api() + try: + api.close_position(ticker) + logger.info(f"Closed position: {ticker} after {days} days") + return True + except Exception as e: + logger.error(f"Failed to close position {ticker}: {e}") + return False diff --git a/config.py b/config.py new file mode 100644 index 0000000..f43ee93 --- /dev/null +++ b/config.py @@ -0,0 +1,40 @@ +import os +from dotenv import load_dotenv + +load_dotenv() + +EDGAR_POLL_INTERVAL = 600 +MIN_TRANSACTION_VALUE = 50_000 +MIN_CLUSTER_SIZE = 1 +CLUSTER_WINDOW_DAYS = 30 +HOLDING_PERIOD_DAYS = 90 +POSITION_SIZE_PCT = 0.02 +MAX_POSITIONS = 20 +SCORE_ALERT_THRESHOLD = 5.0 + +ROLE_WEIGHTS = { + "ceo": 3.0, + "chief executive officer": 3.0, + "cfo": 2.5, + "chief financial officer": 2.5, + "president": 2.5, + "coo": 2.0, + "chief operating officer": 2.0, + "director": 1.5, + "vp": 1.2, + "vice president": 1.2, + "10% owner": 1.0, +} +DEFAULT_ROLE_WEIGHT = 1.0 + +SLACK_WEBHOOK_URL = os.getenv("SLACK_WEBHOOK_URL", "") +ALPACA_KEY = os.getenv("ALPACA_KEY", "") +ALPACA_SECRET = os.getenv("ALPACA_SECRET", "") +ALPACA_BASE_URL = os.getenv("ALPACA_BASE_URL", "https://paper-api.alpaca.markets") + +DB_PATH = os.getenv("DB_PATH", "insider.db") +DATA_DIR = os.getenv("DATA_DIR", "data/filings") + +EDGAR_RSS_URL = "https://www.sec.gov/cgi-bin/browse-edgar?action=getcurrent&type=4&dateb=&owner=include&count=40&search_text=&action=getcurrent" +EDGAR_SEARCH_URL = "https://efts.sec.gov/LATEST/search-index?q=&forms=4" +EDGAR_BASE_URL = "https://www.sec.gov" diff --git a/db/__init__.py b/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/db/db.py b/db/db.py new file mode 100644 index 0000000..d1f1172 --- /dev/null +++ b/db/db.py @@ -0,0 +1,123 @@ +import sqlite3 +import os +from datetime import datetime +import config + + +def get_connection(): + conn = sqlite3.connect(config.DB_PATH) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA foreign_keys=ON") + return conn + + +def init_db(): + schema_path = os.path.join(os.path.dirname(__file__), "schema.sql") + with open(schema_path, "r") as f: + schema = f.read() + conn = get_connection() + conn.executescript(schema) + conn.commit() + conn.close() + + +def insert_filing(filing: dict) -> bool: + conn = get_connection() + try: + conn.execute( + """ + INSERT OR IGNORE INTO filings + (accession_number, ticker, cik, insider_name, role, + transaction_date, filed_date, shares, price, total_value, + flag, is_10b51, post_tx_shares) + VALUES + (:accession_number, :ticker, :cik, :insider_name, :role, + :transaction_date, :filed_date, :shares, :price, :total_value, + :flag, :is_10b51, :post_tx_shares) + """, + filing, + ) + inserted = conn.execute("SELECT changes()").fetchone()[0] > 0 + conn.commit() + return inserted + finally: + conn.close() + + +def insert_signal(signal: dict) -> int: + conn = get_connection() + try: + cur = conn.execute( + """ + INSERT INTO signals + (ticker, trigger_date, cluster_size, total_cluster_value, score) + VALUES + (:ticker, :trigger_date, :cluster_size, :total_cluster_value, :score) + """, + signal, + ) + signal_id = cur.lastrowid + conn.commit() + return signal_id + finally: + conn.close() + + +def mark_signal_alerted(signal_id: int): + conn = get_connection() + try: + conn.execute("UPDATE signals SET alerted=1 WHERE id=?", (signal_id,)) + conn.commit() + finally: + conn.close() + + +def mark_signal_executed(signal_id: int): + conn = get_connection() + try: + conn.execute("UPDATE signals SET executed=1 WHERE id=?", (signal_id,)) + conn.commit() + finally: + conn.close() + + +def get_unalerted_signals(): + conn = get_connection() + try: + rows = conn.execute( + "SELECT * FROM signals WHERE alerted=0 ORDER BY created_at ASC" + ).fetchall() + return [dict(r) for r in rows] + finally: + conn.close() + + +def get_recent_buys_for_ticker(ticker: str, window_days: int) -> list: + conn = get_connection() + try: + rows = conn.execute( + """ + SELECT * FROM filings + WHERE ticker=? + AND flag='A' + AND is_10b51=0 + AND transaction_date >= date('now', ? || ' days') + ORDER BY transaction_date DESC + """, + (ticker, f"-{window_days}"), + ).fetchall() + return [dict(r) for r in rows] + finally: + conn.close() + + +def accession_exists(accession_number: str) -> bool: + conn = get_connection() + try: + row = conn.execute( + "SELECT 1 FROM filings WHERE accession_number=?", (accession_number,) + ).fetchone() + return row is not None + finally: + conn.close() diff --git a/db/schema.sql b/db/schema.sql new file mode 100644 index 0000000..099d94d --- /dev/null +++ b/db/schema.sql @@ -0,0 +1,34 @@ +CREATE TABLE IF NOT EXISTS filings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + accession_number TEXT UNIQUE, + ticker TEXT, + cik TEXT, + insider_name TEXT, + role TEXT, + transaction_date TEXT, + filed_date TEXT, + shares REAL, + price REAL, + total_value REAL, + flag TEXT, + is_10b51 INTEGER DEFAULT 0, + post_tx_shares REAL, + created_at TEXT DEFAULT (datetime('now')) +); + +CREATE TABLE IF NOT EXISTS signals ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ticker TEXT, + trigger_date TEXT, + cluster_size INTEGER, + total_cluster_value REAL, + score REAL, + alerted INTEGER DEFAULT 0, + executed INTEGER DEFAULT 0, + created_at TEXT DEFAULT (datetime('now')) +); + +CREATE INDEX IF NOT EXISTS idx_filings_ticker ON filings(ticker); +CREATE INDEX IF NOT EXISTS idx_filings_transaction_date ON filings(transaction_date); +CREATE INDEX IF NOT EXISTS idx_signals_ticker ON signals(ticker); +CREATE INDEX IF NOT EXISTS idx_signals_alerted ON signals(alerted); diff --git a/ingestion/__init__.py b/ingestion/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ingestion/edgar_poller.py b/ingestion/edgar_poller.py new file mode 100644 index 0000000..08bd4e6 --- /dev/null +++ b/ingestion/edgar_poller.py @@ -0,0 +1,138 @@ +import time +import os +import logging +from datetime import datetime +from typing import Optional +import requests +from lxml import etree + +import config +from ingestion.form4_parser import parse_form4 +from db.db import insert_filing, accession_exists + +logger = logging.getLogger(__name__) + +HEADERS = { + "User-Agent": "insider-copytrade-poc contact@example.com", + "Accept-Encoding": "gzip, deflate", +} + +EDGAR_FULL_INDEX = "https://www.sec.gov/cgi-bin/browse-edgar?action=getcurrent&type=4&dateb=&owner=include&count=40&output=atom" + + +def _fetch(url: str, timeout: int = 30) -> requests.Response: + resp = requests.get(url, headers=HEADERS, timeout=timeout) + resp.raise_for_status() + return resp + + +def _get_filing_urls() -> list[tuple[str, str, str]]: + resp = _fetch(EDGAR_FULL_INDEX) + root = etree.fromstring(resp.content) + ns = {"atom": "http://www.w3.org/2005/Atom"} + entries = root.findall("atom:entry", ns) + results = [] + for entry in entries: + filing_href = entry.find("atom:link", ns) + if filing_href is None: + continue + url = filing_href.get("href", "") + updated = (entry.findtext("atom:updated", namespaces=ns) or "")[:10] + accession = url.rstrip("/").split("/")[-1].replace("-index.htm", "") + accession = accession.replace("-", "") + if len(accession) == 18: + accession = f"{accession[:10]}-{accession[10:12]}-{accession[12:]}" + results.append((url, accession, updated)) + return results + + +def _get_xml_url_from_index(index_url: str) -> Optional[str]: + try: + resp = _fetch(index_url) + except Exception: + return None + root = etree.fromstring(resp.content) + ns = {"atom": "http://www.w3.org/2005/Atom"} + for link in root.findall("atom:link", ns): + href = link.get("href", "") + if href.endswith(".xml") and "form4" in href.lower(): + return href + for link in root.findall(".//filing-href"): + if link.text and link.text.endswith(".xml"): + return link.text.strip() + return None + + +def _save_raw_xml(accession: str, xml_bytes: bytes): + os.makedirs(config.DATA_DIR, exist_ok=True) + path = os.path.join(config.DATA_DIR, f"{accession}.xml") + if not os.path.exists(path): + with open(path, "wb") as f: + f.write(xml_bytes) + + +def fetch_and_store_new_filings() -> list[dict]: + new_filings = [] + try: + entries = _get_filing_urls() + except Exception as e: + logger.error(f"Failed to fetch EDGAR index: {e}") + return new_filings + + for index_url, accession, filed_date in entries: + if accession_exists(accession): + continue + + xml_url = _resolve_xml_url(index_url, accession) + if not xml_url: + logger.warning(f"No XML found for {accession}") + continue + + try: + xml_resp = _fetch(xml_url) + xml_bytes = xml_resp.content + except Exception as e: + logger.error(f"Failed to fetch XML for {accession}: {e}") + continue + + _save_raw_xml(accession, xml_bytes) + parsed = parse_form4(xml_bytes, accession, filed_date) + + for filing in parsed: + inserted = insert_filing(filing) + if inserted: + new_filings.append(filing) + + return new_filings + + +def _resolve_xml_url(index_url: str, accession: str) -> Optional[str]: + accession_path = accession.replace("-", "") + cik = accession_path[:10].lstrip("0") + base = f"{config.EDGAR_BASE_URL}/Archives/edgar/data/{cik}/{accession_path}/" + candidate = f"{base}{accession}-index.htm" + try: + resp = _fetch(candidate) + root = etree.fromstring(resp.content) + for node in root.iter(): + text = (node.text or "").strip() + if text.endswith(".xml") and ("4" in text or "form" in text.lower()): + return base + text + except Exception: + pass + return None + + +def run_poller(on_new_filing=None): + logger.info("EDGAR poller started") + while True: + logger.info("Polling EDGAR for new Form 4 filings...") + new = fetch_and_store_new_filings() + logger.info(f"Found {len(new)} new filings") + if on_new_filing: + for filing in new: + try: + on_new_filing(filing) + except Exception as e: + logger.error(f"Error in on_new_filing callback: {e}") + time.sleep(config.EDGAR_POLL_INTERVAL) diff --git a/ingestion/form4_parser.py b/ingestion/form4_parser.py new file mode 100644 index 0000000..ad0b02b --- /dev/null +++ b/ingestion/form4_parser.py @@ -0,0 +1,95 @@ +import re +from lxml import etree +from typing import Optional + + +_10B51_PATTERNS = [ + r"10b5-1", + r"rule 10b5", + r"adopted a plan", + r"10b5\(1\)", +] + + +def _is_10b51(text: str) -> bool: + text_lower = text.lower() + return any(re.search(p, text_lower) for p in _10B51_PATTERNS) + + +def _text(el, tag: str) -> Optional[str]: + node = el.find(".//" + tag) + if node is not None and node.text: + return node.text.strip() + return None + + +def _float(el, tag: str) -> Optional[float]: + val = _text(el, tag) + if val is None: + return None + try: + return float(val.replace(",", "")) + except ValueError: + return None + + +def parse_form4(xml_bytes: bytes, accession_number: str, filed_date: str) -> list[dict]: + try: + root = etree.fromstring(xml_bytes) + except etree.XMLSyntaxError: + return [] + + ticker = _text(root, "issuerTradingSymbol") or "" + cik = _text(root, "issuerCik") or "" + insider_name = _text(root, "rptOwnerName") or "" + role = _text(root, "officerTitle") or _text(root, "isDirector") or "" + + footnotes_text = " ".join( + (node.text or "") for node in root.findall(".//footnote") + ) + global_10b51 = _is_10b51(footnotes_text) + + transactions = root.findall(".//nonDerivativeTransaction") + results = [] + + for tx in transactions: + flag = _text(tx, "transactionAcquiredDisposedCode") + if not flag: + continue + + shares = _float(tx, "transactionShares") + price = _float(tx, "transactionPricePerShare") + total_value = _float(tx, "transactionTotalValue") + if total_value is None and shares is not None and price is not None: + total_value = shares * price + post_tx_shares = _float(tx, "sharesOwnedFollowingTransaction") + tx_date = _text(tx, "transactionDate") or filed_date + + tx_footnote_ids = [ + fn.get("id", "") for fn in tx.findall(".//footnoteId") + ] + tx_footnote_text = " ".join( + (root.find(f".//footnote[@id='{fid}']") or etree.Element("x")).text or "" + for fid in tx_footnote_ids + ) + is_10b51 = int(global_10b51 or _is_10b51(tx_footnote_text)) + + results.append( + { + "accession_number": accession_number, + "ticker": ticker.upper(), + "cik": cik, + "insider_name": insider_name, + "role": role, + "transaction_date": tx_date, + "filed_date": filed_date, + "shares": shares, + "price": price, + "total_value": total_value, + "flag": flag.upper(), + "is_10b51": is_10b51, + "post_tx_shares": post_tx_shares, + } + ) + + return results diff --git a/main.py b/main.py new file mode 100644 index 0000000..2bd6f41 --- /dev/null +++ b/main.py @@ -0,0 +1,78 @@ +import logging +import sys + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) +logger = logging.getLogger(__name__) + + +def _on_new_filing(filing: dict): + from signals.filter_engine import process_filing + from alerts.slack_alert import send_slack_alert + import config + + signal = process_filing(filing) + if signal is None: + return + + logger.info( + f"Signal: {signal['ticker']} score={signal['score']} cluster={signal['cluster_size']}" + ) + + if config.SLACK_WEBHOOK_URL: + send_slack_alert(signal) + + if config.ALPACA_KEY and config.ALPACA_SECRET: + from broker.alpaca_client import execute_signal + execute_signal(signal) + + +def cmd_run(): + from db.db import init_db + from ingestion.edgar_poller import run_poller + + init_db() + logger.info("Database initialized") + run_poller(on_new_filing=_on_new_filing) + + +def cmd_backtest(): + from backtest.backtest import run_backtest, print_summary + import config + + logger.info("Running backtest...") + summary = run_backtest( + db_path=config.DB_PATH, + holding_days=config.HOLDING_PERIOD_DAYS, + min_score=config.SCORE_ALERT_THRESHOLD, + min_cluster_size=config.MIN_CLUSTER_SIZE, + ) + print_summary(summary) + + +def cmd_fetch_once(): + from db.db import init_db + from ingestion.edgar_poller import fetch_and_store_new_filings + + init_db() + filings = fetch_and_store_new_filings() + logger.info(f"Fetched and stored {len(filings)} new filings") + + for filing in filings: + signal = _on_new_filing(filing) + + +COMMANDS = { + "run": cmd_run, + "backtest": cmd_backtest, + "fetch-once": cmd_fetch_once, +} + +if __name__ == "__main__": + cmd = sys.argv[1] if len(sys.argv) > 1 else "run" + if cmd not in COMMANDS: + print(f"Usage: python main.py [{' | '.join(COMMANDS)}]") + sys.exit(1) + COMMANDS[cmd]() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..3111da8 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +requests>=2.31.0 +lxml>=5.0.0 +yfinance>=0.2.0 +python-dotenv>=1.0.0 +alpaca-trade-api>=3.0.0 diff --git a/signals/__init__.py b/signals/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/signals/cluster_detector.py b/signals/cluster_detector.py new file mode 100644 index 0000000..7f3a172 --- /dev/null +++ b/signals/cluster_detector.py @@ -0,0 +1,13 @@ +from db.db import get_recent_buys_for_ticker +import config + + +def detect_cluster(ticker: str) -> dict: + buys = get_recent_buys_for_ticker(ticker, config.CLUSTER_WINDOW_DAYS) + unique_insiders = {b["insider_name"] for b in buys} + total_value = sum(b["total_value"] or 0 for b in buys) + return { + "cluster_size": len(unique_insiders), + "total_cluster_value": total_value, + "buys": buys, + } diff --git a/signals/filter_engine.py b/signals/filter_engine.py new file mode 100644 index 0000000..ad020fb --- /dev/null +++ b/signals/filter_engine.py @@ -0,0 +1,67 @@ +import math +import logging +from typing import Optional + +import config +from signals.cluster_detector import detect_cluster +from db.db import insert_signal + +logger = logging.getLogger(__name__) + + +def _role_weight(role: str) -> float: + role_lower = (role or "").lower() + for key, weight in config.ROLE_WEIGHTS.items(): + if key in role_lower: + return weight + return config.DEFAULT_ROLE_WEIGHT + + +def _score(total_value: float, role: str, cluster_size: int) -> float: + if not total_value or total_value <= 0: + return 0.0 + base = _role_weight(role) + cluster_mult = 1.0 + 0.5 * (cluster_size - 1) + return base * math.log(total_value) * cluster_mult + + +def process_filing(filing: dict) -> Optional[dict]: + if filing.get("flag") != "A": + return None + + if filing.get("is_10b51"): + logger.debug(f"Skipping 10b5-1 filing: {filing['accession_number']}") + return None + + total_value = filing.get("total_value") or 0 + if total_value < config.MIN_TRANSACTION_VALUE: + logger.debug(f"Below min value: {filing['accession_number']} (${total_value:,.0f})") + return None + + ticker = filing.get("ticker", "") + if not ticker: + return None + + cluster_info = detect_cluster(ticker) + cluster_size = cluster_info["cluster_size"] + total_cluster_value = cluster_info["total_cluster_value"] + + if cluster_size < config.MIN_CLUSTER_SIZE: + return None + + score = _score(total_value, filing.get("role", ""), cluster_size) + + signal = { + "ticker": ticker, + "trigger_date": filing.get("transaction_date", ""), + "cluster_size": cluster_size, + "total_cluster_value": total_cluster_value, + "score": round(score, 2), + "filing": filing, + "cluster_buys": cluster_info["buys"], + } + + signal_id = insert_signal(signal) + signal["id"] = signal_id + logger.info(f"Signal generated: {ticker} score={score:.2f} cluster={cluster_size}") + return signal