Merge pull request 'feat: Insider Copytrade POC + PLAN.md' (#2) from claude/issue-1-insider-copytrade-poc into master

Reviewed-on: #2
This commit is contained in:
Dominik Moritz Roth 2026-05-04 19:38:21 +02:00
commit cc4343d805
21 changed files with 1550 additions and 0 deletions

6
.env.example Normal file
View File

@ -0,0 +1,6 @@
SLACK_WEBHOOK_URL=
ALPACA_KEY=
ALPACA_SECRET=
ALPACA_BASE_URL=https://paper-api.alpaca.markets
DB_PATH=insider.db
DATA_DIR=data/filings

340
PLAN.md Normal file
View File

@ -0,0 +1,340 @@
# Insider Copytrade System -- Implementation Plan
## Description
A personal system that monitors SEC EDGAR Form 4 filings in real-time, filters for high-quality insider buying signals, alerts via Slack, and optionally executes trades automatically through Alpaca's paper or live trading API.
The system is fully self-hosted, uses only free/public data sources, and requires no third-party data subscriptions.
---
## Background
Company insiders (executives, directors, >10% shareholders) must file SEC Form 4 within 2 business days of any trade. This is public data via SEC EDGAR. The signal value of insider *buying* is academically documented -- executives buying their own stock with personal capital is a meaningful vote of confidence, particularly when:
- Multiple insiders buy simultaneously (cluster signal)
- The trade is unplanned (not a 10b5-1 scheduled plan)
- The company is small/mid-cap (less institutional arbitrage)
The edge vs. political trade copying: 2-day disclosure lag vs. 45 days, and the signal is company-specific rather than sector-level.
**Key risk:** This signal is publicly known and tracked. The edge is in filtering quality and execution speed, not data exclusivity. Large-cap Form 4 signals are arbitraged quickly. Focus on small/mid-cap, clustered, unplanned buys.
---
## System Outline
```
SEC EDGAR RSS Feed (poll every 10 min)
|
[Ingestion Layer]
|
Parse Form 4 XML
|
[Filter Engine]
- Buy only (flag = A)
- Exclude 10b5-1 plans
- Min transaction size
- Role weighting
- Cluster detection
|
SQLite Database
|
┌────────────┬──────────────┐
| | |
[Backtester] [Slack Alert] [Alpaca API]
(manual) (paper/live)
```
---
## Actionables
### Phase 1 -- Data Ingestion
**Goal:** Reliably pull and parse Form 4 filings as they appear.
**Tasks:**
1. Set up project structure
```
insider-copytrade/
ingestion/
edgar_poller.py # polls EDGAR RSS
form4_parser.py # parses XML -> structured dict
db/
schema.sql
db.py # SQLite interface
signals/
filter_engine.py # applies signal filters
cluster_detector.py
alerts/
slack_alert.py
broker/
alpaca_client.py
backtest/
backtest.py
config.py
main.py
```
2. Poll EDGAR RSS for Form 4 filings every 10 minutes:
```
https://www.sec.gov/cgi-bin/browse-edgar?action=getcurrent&type=4&dateb=&owner=include&count=40&search_text=&action=getcurrent
```
SEC also provides a structured latest filings feed:
```
https://efts.sec.gov/LATEST/search-index?q=&forms=4
```
3. For each new filing, fetch and parse the XML document. Key fields to extract:
- `issuerTradingSymbol` (ticker)
- `rptOwnerName`, `officerTitle` (insider name + role)
- `transactionDate`
- `transactionAcquiredDisposedCode` (A = buy, D = sell)
- `transactionShares`, `transactionPricePerShare`
- `transactionTotalValue` (compute if not present)
- `footnotes` (check for "10b5-1" mention)
- `sharesOwnedFollowingTransaction`
4. Store raw filing XML + parsed fields. Track `accessionNumber` as dedup key.
**SQLite schema:**
```sql
CREATE TABLE filings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
accession_number TEXT UNIQUE,
ticker TEXT,
cik TEXT,
insider_name TEXT,
role TEXT,
transaction_date TEXT,
filed_date TEXT,
shares REAL,
price REAL,
total_value REAL,
flag TEXT, -- A or D
is_10b51 INTEGER, -- 0 or 1
post_tx_shares REAL,
created_at TEXT
);
CREATE TABLE signals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT,
trigger_date TEXT,
cluster_size INTEGER,
total_cluster_value REAL,
score REAL,
alerted INTEGER DEFAULT 0,
executed INTEGER DEFAULT 0,
created_at TEXT
);
```
---
### Phase 2 -- Filter Engine
**Goal:** Reduce noise to actionable signals only.
**Filters to apply (in order):**
| Filter | Logic |
|---|---|
| Buy only | `flag == 'A'` |
| Exclude 10b5-1 | Scan footnotes for "10b5-1", "Rule 10b5", "adopted a plan" |
| Min transaction value | `total_value >= 50000` (configurable) |
| Exclude derivative transactions | Options exercises are weaker signal than open market purchases |
| Role weighting | CEO/CFO/President = high; Director = medium; 10% owner = context-dependent |
| Cluster detection | 2+ insiders buying same ticker within 30 days = elevated signal |
**Scoring formula (simple v1):**
```python
score = base_role_weight * log(total_value) * cluster_multiplier
# cluster_multiplier = 1.0 + (0.5 * (cluster_size - 1))
```
Expose all thresholds in `config.py` for easy tuning during backtesting.
---
### Phase 3 -- SQLite Storage
SQLite is sufficient for this workload (low write volume, single process). Use WAL mode for concurrent reads during backtesting:
```python
conn = sqlite3.connect('insider.db')
conn.execute('PRAGMA journal_mode=WAL')
```
Keep raw filing XML in a `/data/filings/` directory keyed by accession number. Parse on ingest, re-parse never needed.
---
### Phase 4 -- Slack Alerts
**Goal:** Get notified immediately when a signal fires, with enough context to decide manually.
1. Create a Slack app, get a webhook URL (takes 5 minutes)
2. Alert format:
```
INSIDER BUY SIGNAL
Ticker: $ACME
Insider: John Smith (CEO)
Date: 2025-05-01
Shares: 10,000 @ $14.50 = $145,000
Cluster: 3 insiders in last 14 days
Score: 8.4
10b5-1: No
EDGAR: https://www.sec.gov/cgi-bin/browse-edgar?...
```
3. Alert only on signals above configurable score threshold
4. Mark `alerted = 1` in DB after sending to avoid duplicates on re-poll
```python
import requests
def send_slack_alert(webhook_url, signal):
requests.post(webhook_url, json={"text": format_signal(signal)})
```
---
### Phase 5 -- Backtesting
**Goal:** Validate filter parameters on historical data before going live.
**Data:**
- Historical Form 4 filings: download bulk XML from `https://www.sec.gov/dera/data/form-4-data`
- Price data: `yfinance` (free, sufficient for backtesting)
**Backtest logic:**
```python
# For each signal in historical data:
# - Entry: next market open after filed_date
# - Exit: N days later (configurable: 30/60/90/180)
# - Calculate return vs SPY over same period
# - Aggregate by role, cluster_size, market_cap bucket
```
**Use `vectorbt` for performance:**
```python
import vectorbt as vbt
# Build entry/exit signal matrices aligned to price data
# Run portfolio simulation with configurable position sizing
```
**Output metrics:**
- Annualized return vs SPY benchmark
- Win rate
- Avg return by holding period
- Avg return by role / cluster size
- Max drawdown
- Sharpe ratio
**Critical:** Test on post-2022 data specifically. Pre-2022 results are likely inflated -- the signal became widely tracked after Autopilot/media coverage.
**Parameter grid to test:**
```python
MIN_VALUE = [25_000, 50_000, 100_000]
HOLDING_DAYS = [30, 60, 90, 180]
CLUSTER_WINDOW = [14, 30]
MIN_CLUSTER_SIZE = [1, 2, 3]
ROLES = ['all', 'c-suite-only']
```
---
### Phase 6 -- Alpaca Integration
**Goal:** Optionally auto-execute signals. Start with paper trading.
**Paper trading base URL:** `https://paper-api.alpaca.markets`
**Live trading base URL:** `https://api.alpaca.markets`
Swap via config flag -- never hardcode.
```python
from alpaca_trade_api import REST
api = REST(
key_id=config.ALPACA_KEY,
secret_key=config.ALPACA_SECRET,
base_url=config.ALPACA_BASE_URL # paper or live
)
def execute_signal(ticker, portfolio_value, signal_score):
# Fixed fractional sizing: 2% of portfolio per signal
price = api.get_latest_trade(ticker).price
allocation = portfolio_value * 0.02
qty = int(allocation / price)
if qty < 1:
return
api.submit_order(
symbol=ticker,
qty=qty,
side='buy',
type='market',
time_in_force='day'
)
```
Position sizing: start at 2% per signal, max 10% in any single ticker. Add a max open positions limit (e.g. 20) to cap exposure.
Exit logic (v1): time-based only (close after N days). Add trailing stop later.
---
## Build Order
| Step | Deliverable | Est. Time |
|---|---|---|
| 1 | EDGAR poller + Form 4 XML parser + SQLite storage | 1 day |
| 2 | Filter engine + cluster detector | 0.5 day |
| 3 | Slack alert | 1 hour |
| 4 | Historical data download + backtest | 1-2 days |
| 5 | Alpaca paper trading integration | 0.5 day |
| 6 | Run paper trading 4-8 weeks, monitor | -- |
| 7 | Switch to live with small capital | -- |
Do not proceed to Step 7 without meaningful paper trading history.
---
## Dependencies
```
requests
lxml
sqlite3 (stdlib)
yfinance
vectorbt
alpaca-trade-api
python-dotenv
```
All free. No paid APIs required.
---
## Config Template
```python
# config.py
EDGAR_POLL_INTERVAL = 600 # seconds
MIN_TRANSACTION_VALUE = 50_000
MIN_CLUSTER_SIZE = 1 # raise to 2 for higher quality
CLUSTER_WINDOW_DAYS = 30
HOLDING_PERIOD_DAYS = 90
POSITION_SIZE_PCT = 0.02 # 2% per signal
MAX_POSITIONS = 20
SCORE_ALERT_THRESHOLD = 5.0
SLACK_WEBHOOK_URL = ""
ALPACA_KEY = ""
ALPACA_SECRET = ""
ALPACA_BASE_URL = "https://paper-api.alpaca.markets" # switch for live
```

112
README.md Normal file
View File

@ -0,0 +1,112 @@
# Smaug — Insider Copytrade Monitor
Monitors SEC EDGAR Form 4 filings in near real-time, detects insider buy clusters, sends Slack alerts, and optionally executes trades via Alpaca.
## Architecture
```
EDGAR (Form 4 feed)
ingestion/edgar_poller.py ← polls every 10 min, dedupes by accession
ingestion/form4_parser.py ← parses XML, detects 10b5-1 plans
db/models.py + db/db.py ← SQLAlchemy ORM: filings, signals, price_cache tables
signals/filter_engine.py ← buy-only, exclude 10b5-1, min $50k, role-weighted scoring
signals/cluster_detector.py ← counts unique insiders per ticker in rolling 30-day window
├──► alerts/slack_alert.py ← POST to Slack webhook when score ≥ threshold
└──► broker/alpaca_client.py ← paper/live order: 2% position size, 10% per-ticker cap
positions auto-closed after holding period expires
```
## Setup
```bash
cp .env.example .env
# edit .env with your credentials
pip install -r requirements.txt
```
### Environment variables (`.env`)
| Variable | Required | Default | Description |
|---|---|---|---|
| `SLACK_WEBHOOK_URL` | optional | — | Incoming webhook URL for alerts |
| `ALPACA_KEY` | optional | — | Alpaca API key |
| `ALPACA_SECRET` | optional | — | Alpaca API secret |
| `ALPACA_BASE_URL` | optional | `https://paper-api.alpaca.markets` | Use paper or live endpoint |
| `DB_PATH` | optional | `insider.db` | SQLite database file path |
| `DATA_DIR` | optional | `data/filings` | Directory for cached raw XML filings |
## Usage
```bash
# Initialize DB and ingest current EDGAR feed (one shot)
python main.py fetch-once
# Run continuous polling loop (every 10 minutes)
python main.py run
# Backtest signals already in the DB against historical prices
python main.py backtest
```
## Key configuration (`config.py`)
| Parameter | Default | Description |
|---|---|---|
| `EDGAR_POLL_INTERVAL` | 600 s | Polling cadence |
| `MIN_TRANSACTION_VALUE` | $50,000 | Ignore buys below this |
| `MIN_CLUSTER_SIZE` | 1 | Minimum unique insiders before a signal fires |
| `CLUSTER_WINDOW_DAYS` | 30 | Rolling window for cluster counting |
| `HOLDING_PERIOD_DAYS` | 90 | Days held per position (backtest + auto-close trigger) |
| `POSITION_SIZE_PCT` | 2% | Fraction of portfolio per trade |
| `MAX_POSITIONS` | 20 | Hard position limit |
| `SCORE_ALERT_THRESHOLD` | 5.0 | Minimum score to trigger Slack alert |
## Scoring
```
score = role_weight × log(total_value) × (1 + 0.5 × (cluster_size 1))
```
Role weights: CEO 3.0 · CFO/President 2.5 · COO 2.0 · Director 1.5 · VP 1.2 · 10% owner 1.0
## Backtesting
The backtest loads signals from the DB and fetches OHLC data via `yfinance`. Prices are cached in the `price_cache` table — completed date ranges are served entirely from the DB on repeat runs, avoiding redundant network calls. Entry price is the closing price on the first trading day on or after the signal date; exit price is the closing price on the last trading day before or on the exit date. Raw XML filings are cached in `DATA_DIR` (`data/filings/`) by accession number.
The EDGAR poller also skips fetching XML for filings older than the newest `filed_date` already stored in the DB, so incremental runs only process truly new filings.
Metrics reported: win rate, average return, average alpha vs SPY, Sharpe ratio.
## Position lifecycle
Positions are tracked in the `signals` table. When a trade is executed, `executed_at` is recorded. On each poll cycle the poller checks for positions where `executed_at` is older than `HOLDING_PERIOD_DAYS` and calls Alpaca to close them, marking `closed=1` in the DB.
## Modules
| Path | Purpose |
|---|---|
| `config.py` | All thresholds and env-var loading |
| `ingestion/edgar_poller.py` | EDGAR Atom feed polling and deduplication |
| `ingestion/form4_parser.py` | Form 4 XML → structured dict; 10b5-1 detection |
| `db/models.py` | SQLAlchemy ORM models (`Filing`, `Signal`, `PriceCache`) |
| `db/db.py` | DB access layer (SQLAlchemy sessions) |
| `signals/filter_engine.py` | Filing → signal pipeline |
| `signals/cluster_detector.py` | Cluster detection from DB |
| `alerts/slack_alert.py` | Slack webhook alert |
| `broker/alpaca_client.py` | Alpaca order execution + position exit |
| `backtest/backtest.py` | Historical backtest runner |
| `main.py` | CLI entry point |
## Requirements
- Python 3.11+
- See `requirements.txt`: `requests`, `lxml`, `yfinance`, `python-dotenv`, `alpaca-trade-api`, `sqlalchemy`

0
alerts/__init__.py Normal file
View File

64
alerts/slack_alert.py Normal file
View File

@ -0,0 +1,64 @@
import logging
import requests
import config
from db.db import mark_signal_alerted
logger = logging.getLogger(__name__)
def format_signal(signal: dict) -> str:
filing = signal.get("filing", {})
ticker = signal["ticker"]
insider = filing.get("insider_name", "Unknown")
role = filing.get("role", "Unknown")
tx_date = filing.get("transaction_date", "")
shares = filing.get("shares")
price = filing.get("price")
total_value = filing.get("total_value") or signal.get("total_cluster_value", 0)
cluster_size = signal["cluster_size"]
score = signal["score"]
is_10b51 = "Yes" if filing.get("is_10b51") else "No"
accession = filing.get("accession_number", "")
shares_str = f"{shares:,.0f}" if shares else "N/A"
price_str = f"${price:,.2f}" if price else "N/A"
value_str = f"${total_value:,.0f}" if total_value else "N/A"
edgar_url = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&type=4&dateb=&owner=include&count=10&search_text=&ticker={ticker}"
return (
f"*INSIDER BUY SIGNAL*\n"
f"Ticker: ${ticker}\n"
f"Insider: {insider} ({role})\n"
f"Date: {tx_date}\n"
f"Shares: {shares_str} @ {price_str} = {value_str}\n"
f"Cluster: {cluster_size} insider(s) in last {config.CLUSTER_WINDOW_DAYS} days\n"
f"Score: {score}\n"
f"10b5-1: {is_10b51}\n"
f"EDGAR: {edgar_url}"
)
def send_slack_alert(signal: dict) -> bool:
if not config.SLACK_WEBHOOK_URL:
logger.warning("SLACK_WEBHOOK_URL not configured")
return False
if signal.get("score", 0) < config.SCORE_ALERT_THRESHOLD:
logger.debug(f"Signal score {signal['score']} below threshold {config.SCORE_ALERT_THRESHOLD}")
return False
text = format_signal(signal)
try:
resp = requests.post(
config.SLACK_WEBHOOK_URL,
json={"text": text},
timeout=10,
)
resp.raise_for_status()
mark_signal_alerted(signal["id"])
logger.info(f"Slack alert sent for {signal['ticker']}")
return True
except Exception as e:
logger.error(f"Failed to send Slack alert: {e}")
return False

0
backtest/__init__.py Normal file
View File

172
backtest/backtest.py Normal file
View File

@ -0,0 +1,172 @@
import logging
import math
from datetime import datetime, timedelta
import config
from db.db import get_cached_prices, get_signals_for_backtest, upsert_prices
logger = logging.getLogger(__name__)
def _fetch_prices(ticker: str, start: datetime, end: datetime) -> dict[str, float]:
try:
import yfinance as yf
except ImportError:
raise ImportError("yfinance not installed. Run: pip install yfinance")
start_str = start.strftime("%Y-%m-%d")
end_str = (end + timedelta(days=5)).strftime("%Y-%m-%d")
cached = get_cached_prices(ticker, start_str, end_str)
today = datetime.utcnow().strftime("%Y-%m-%d")
range_is_complete = end_str < today
if range_is_complete and cached:
return cached
data = yf.download(
ticker,
start=start_str,
end=end_str,
progress=False,
auto_adjust=True,
)
if data.empty:
return cached
fetched: dict[str, float] = {}
for ts, close_val in data["Close"].items():
date_key = ts.to_pydatetime().replace(tzinfo=None).strftime("%Y-%m-%d")
fetched[date_key] = float(close_val)
new_prices = {k: v for k, v in fetched.items() if k not in cached}
if new_prices:
upsert_prices(ticker, new_prices)
cached.update(fetched)
return cached
def _first_close_on_or_after(prices: dict[str, float], target: datetime) -> float:
target_str = target.strftime("%Y-%m-%d")
for date_str in sorted(prices):
if date_str >= target_str:
return prices[date_str]
raise ValueError(f"No price data on or after {target_str}")
def _first_close_before(prices: dict[str, float], target: datetime) -> float:
target_str = target.strftime("%Y-%m-%d")
result = None
for date_str in sorted(prices):
if date_str <= target_str:
result = prices[date_str]
if result is None:
raise ValueError(f"No price data on or before {target_str}")
return result
def run_backtest(
db_path: str = None,
holding_days: int = None,
min_score: float = 0.0,
min_cluster_size: int = 1,
) -> dict:
holding_days = holding_days or config.HOLDING_PERIOD_DAYS
signals = get_signals_for_backtest(min_score, min_cluster_size)
if not signals:
logger.warning("No signals found matching criteria")
return {}
results = []
spy_cache: dict[str, float] = {}
for signal in signals:
ticker = signal["ticker"]
entry_date_str = signal["trigger_date"]
try:
entry_date = datetime.strptime(entry_date_str, "%Y-%m-%d")
except ValueError:
continue
exit_date = entry_date + timedelta(days=holding_days)
try:
prices = _fetch_prices(ticker, entry_date, exit_date)
if not prices:
logger.debug(f"No price data for {ticker}")
continue
entry_price = _first_close_on_or_after(prices, entry_date)
exit_price = _first_close_before(prices, exit_date)
stock_return = (exit_price - entry_price) / entry_price
except Exception as e:
logger.debug(f"Failed to get data for {ticker}: {e}")
continue
period_key = entry_date_str
if period_key not in spy_cache:
try:
spy_prices = _fetch_prices("SPY", entry_date, exit_date)
if spy_prices:
spy_entry = _first_close_on_or_after(spy_prices, entry_date)
spy_exit = _first_close_before(spy_prices, exit_date)
spy_cache[period_key] = (spy_exit - spy_entry) / spy_entry
else:
spy_cache[period_key] = 0.0
except Exception:
spy_cache[period_key] = 0.0
spy_return = spy_cache[period_key]
alpha = stock_return - spy_return
results.append({
"ticker": ticker,
"entry_date": entry_date_str,
"stock_return": round(stock_return, 4),
"spy_return": round(spy_return, 4),
"alpha": round(alpha, 4),
"cluster_size": signal["cluster_size"],
"score": signal["score"],
})
if not results:
return {"error": "No results computed"}
returns = [r["stock_return"] for r in results]
alphas = [r["alpha"] for r in results]
win_rate = sum(1 for r in returns if r > 0) / len(returns)
avg_return = sum(returns) / len(returns)
avg_alpha = sum(alphas) / len(alphas)
std_dev = math.sqrt(sum((r - avg_return) ** 2 for r in returns) / len(returns))
sharpe = (avg_return / std_dev * math.sqrt(252 / holding_days)) if std_dev > 0 else 0.0
return {
"total_signals": len(results),
"win_rate": round(win_rate, 4),
"avg_return": round(avg_return, 4),
"avg_alpha_vs_spy": round(avg_alpha, 4),
"sharpe_ratio": round(sharpe, 4),
"holding_days": holding_days,
"results": results,
}
def print_summary(summary: dict):
if "error" in summary:
print(f"Error: {summary['error']}")
return
width = 40
print(f"\n{'=' * width}")
print(f"Backtest Results ({summary['holding_days']}-day hold)")
print(f"{'=' * width}")
print(f"Total signals: {summary['total_signals']}")
print(f"Win rate: {summary['win_rate']:.1%}")
print(f"Avg return: {summary['avg_return']:.2%}")
print(f"Avg alpha vs SPY: {summary['avg_alpha_vs_spy']:.2%}")
print(f"Sharpe ratio: {summary['sharpe_ratio']:.2f}")
print(f"{'=' * width}\n")

0
broker/__init__.py Normal file
View File

109
broker/alpaca_client.py Normal file
View File

@ -0,0 +1,109 @@
import logging
from datetime import datetime, timedelta
import config
from db.db import mark_signal_executed, mark_signal_closed, get_executed_unclosed_signals
logger = logging.getLogger(__name__)
def _get_api():
try:
from alpaca_trade_api import REST
except ImportError:
raise ImportError("alpaca-trade-api not installed. Run: pip install alpaca-trade-api")
return REST(
key_id=config.ALPACA_KEY,
secret_key=config.ALPACA_SECRET,
base_url=config.ALPACA_BASE_URL,
)
def get_portfolio_value() -> float:
return float(_get_api().get_account().portfolio_value)
def get_open_positions_count() -> int:
return len(_get_api().list_positions())
def execute_signal(signal: dict) -> bool:
if not config.ALPACA_KEY or not config.ALPACA_SECRET:
logger.warning("Alpaca credentials not configured")
return False
ticker = signal["ticker"]
try:
api = _get_api()
if get_open_positions_count() >= config.MAX_POSITIONS:
logger.warning(f"Max positions ({config.MAX_POSITIONS}) reached, skipping {ticker}")
return False
portfolio_value = get_portfolio_value()
allocation = portfolio_value * config.POSITION_SIZE_PCT
price = float(api.get_latest_trade(ticker).price)
if price <= 0:
logger.error(f"Invalid price for {ticker}: {price}")
return False
qty = int(allocation / price)
if qty < 1:
logger.warning(f"Allocation too small for {ticker}: ${allocation:.2f} at ${price:.2f}")
return False
existing_positions = {p.symbol: p for p in api.list_positions()}
if ticker in existing_positions:
existing_pct = float(existing_positions[ticker].market_value) / portfolio_value
if existing_pct >= 0.10:
logger.warning(f"Already at 10% cap for {ticker}, skipping")
return False
order = api.submit_order(
symbol=ticker,
qty=qty,
side="buy",
type="market",
time_in_force="day",
)
logger.info(f"Order submitted: {ticker} qty={qty} order_id={order.id}")
mark_signal_executed(signal["id"])
return True
except Exception as e:
logger.error(f"Failed to execute signal for {ticker}: {e}")
return False
def close_position(ticker: str, signal_id: int) -> bool:
try:
_get_api().close_position(ticker)
mark_signal_closed(signal_id)
logger.info(f"Closed position: {ticker} (signal {signal_id})")
return True
except Exception as e:
logger.error(f"Failed to close position {ticker}: {e}")
return False
def close_expired_positions():
if not config.ALPACA_KEY or not config.ALPACA_SECRET:
return
cutoff = datetime.utcnow() - timedelta(days=config.HOLDING_PERIOD_DAYS)
signals = get_executed_unclosed_signals()
for signal in signals:
executed_at_str = signal.get("executed_at")
if not executed_at_str:
continue
try:
executed_at = datetime.strptime(executed_at_str, "%Y-%m-%dT%H:%M:%SZ")
except ValueError:
continue
if executed_at <= cutoff:
close_position(signal["ticker"], signal["id"])

40
config.py Normal file
View File

@ -0,0 +1,40 @@
import os
from dotenv import load_dotenv
load_dotenv()
EDGAR_POLL_INTERVAL = 600
MIN_TRANSACTION_VALUE = 50_000
MIN_CLUSTER_SIZE = 1
CLUSTER_WINDOW_DAYS = 30
HOLDING_PERIOD_DAYS = 90
POSITION_SIZE_PCT = 0.02
MAX_POSITIONS = 20
SCORE_ALERT_THRESHOLD = 5.0
ROLE_WEIGHTS = {
"ceo": 3.0,
"chief executive officer": 3.0,
"cfo": 2.5,
"chief financial officer": 2.5,
"president": 2.5,
"coo": 2.0,
"chief operating officer": 2.0,
"director": 1.5,
"vp": 1.2,
"vice president": 1.2,
"10% owner": 1.0,
}
DEFAULT_ROLE_WEIGHT = 1.0
SLACK_WEBHOOK_URL = os.getenv("SLACK_WEBHOOK_URL", "")
ALPACA_KEY = os.getenv("ALPACA_KEY", "")
ALPACA_SECRET = os.getenv("ALPACA_SECRET", "")
ALPACA_BASE_URL = os.getenv("ALPACA_BASE_URL", "https://paper-api.alpaca.markets")
DB_PATH = os.getenv("DB_PATH", "insider.db")
DATA_DIR = os.getenv("DATA_DIR", "data/filings")
EDGAR_RSS_URL = "https://www.sec.gov/cgi-bin/browse-edgar?action=getcurrent&type=4&dateb=&owner=include&count=40&search_text=&action=getcurrent"
EDGAR_SEARCH_URL = "https://efts.sec.gov/LATEST/search-index?q=&forms=4"
EDGAR_BASE_URL = "https://www.sec.gov"

0
db/__init__.py Normal file
View File

226
db/db.py Normal file
View File

@ -0,0 +1,226 @@
from datetime import datetime
from typing import Optional
from sqlalchemy import create_engine, func, select, text, update
from sqlalchemy.orm import Session
import config
from db.models import Base, Filing, PriceCache, Signal
def _engine():
url = f"sqlite:///{config.DB_PATH}"
return create_engine(url, connect_args={"check_same_thread": False})
_ENGINE = None
def _get_engine():
global _ENGINE
if _ENGINE is None:
_ENGINE = _engine()
return _ENGINE
def init_db():
engine = _get_engine()
with engine.connect() as conn:
conn.execute(text("PRAGMA journal_mode=WAL"))
conn.execute(text("PRAGMA foreign_keys=ON"))
Base.metadata.create_all(engine)
def _session() -> Session:
return Session(_get_engine())
def insert_filing(filing: dict) -> bool:
with _session() as session:
exists = session.scalar(
select(Filing.id).where(Filing.accession_number == filing["accession_number"])
)
if exists is not None:
return False
row = Filing(
accession_number=filing["accession_number"],
ticker=filing.get("ticker"),
cik=filing.get("cik"),
insider_name=filing.get("insider_name"),
role=filing.get("role"),
transaction_date=filing.get("transaction_date"),
filed_date=filing.get("filed_date"),
shares=filing.get("shares"),
price=filing.get("price"),
total_value=filing.get("total_value"),
flag=filing.get("flag"),
is_10b51=bool(filing.get("is_10b51", False)),
post_tx_shares=filing.get("post_tx_shares"),
)
session.add(row)
session.commit()
return True
def accession_exists(accession_number: str) -> bool:
with _session() as session:
return session.scalar(
select(Filing.id).where(Filing.accession_number == accession_number)
) is not None
def get_latest_filed_date() -> Optional[str]:
with _session() as session:
return session.scalar(select(func.max(Filing.filed_date)))
def insert_signal(signal: dict) -> int:
with _session() as session:
row = Signal(
ticker=signal["ticker"],
trigger_date=signal["trigger_date"],
cluster_size=signal["cluster_size"],
total_cluster_value=signal.get("total_cluster_value", 0.0),
score=signal["score"],
)
session.add(row)
session.commit()
return row.id
def mark_signal_alerted(signal_id: int):
with _session() as session:
session.execute(
update(Signal).where(Signal.id == signal_id).values(alerted=True)
)
session.commit()
def mark_signal_executed(signal_id: int):
with _session() as session:
session.execute(
update(Signal)
.where(Signal.id == signal_id)
.values(executed=True, executed_at=datetime.utcnow())
)
session.commit()
def mark_signal_closed(signal_id: int):
with _session() as session:
session.execute(
update(Signal).where(Signal.id == signal_id).values(closed=True)
)
session.commit()
def get_unalerted_signals() -> list[dict]:
with _session() as session:
rows = session.scalars(
select(Signal).where(Signal.alerted == False).order_by(Signal.created_at)
).all()
return [_signal_to_dict(r) for r in rows]
def get_executed_unclosed_signals() -> list[dict]:
with _session() as session:
rows = session.scalars(
select(Signal).where(
Signal.executed == True,
Signal.closed == False,
Signal.executed_at.is_not(None),
)
).all()
return [_signal_to_dict(r) for r in rows]
def get_recent_buys_for_ticker(ticker: str, window_days: int) -> list[dict]:
from datetime import timedelta
cutoff = (datetime.utcnow() - timedelta(days=window_days)).strftime("%Y-%m-%d")
with _session() as session:
rows = session.scalars(
select(Filing)
.where(
Filing.ticker == ticker,
Filing.flag == "A",
Filing.is_10b51 == False,
Filing.transaction_date >= cutoff,
)
.order_by(Filing.transaction_date.desc())
).all()
return [_filing_to_dict(r) for r in rows]
def get_signals_for_backtest(min_score: float, min_cluster_size: int) -> list[dict]:
with _session() as session:
rows = session.scalars(
select(Signal).where(
Signal.score >= min_score,
Signal.cluster_size >= min_cluster_size,
)
).all()
return [_signal_to_dict(r) for r in rows]
def get_cached_prices(ticker: str, start_date: str, end_date: str) -> dict[str, float]:
with _session() as session:
rows = session.scalars(
select(PriceCache).where(
PriceCache.ticker == ticker,
PriceCache.date >= start_date,
PriceCache.date <= end_date,
)
).all()
return {r.date: r.close for r in rows}
def upsert_prices(ticker: str, prices: dict[str, float]):
with _session() as session:
for date_str, close in prices.items():
existing = session.scalar(
select(PriceCache).where(
PriceCache.ticker == ticker,
PriceCache.date == date_str,
)
)
if existing is None:
session.add(PriceCache(ticker=ticker, date=date_str, close=close))
session.commit()
def _filing_to_dict(row: Filing) -> dict:
return {
"id": row.id,
"accession_number": row.accession_number,
"ticker": row.ticker,
"cik": row.cik,
"insider_name": row.insider_name,
"role": row.role,
"transaction_date": row.transaction_date,
"filed_date": row.filed_date,
"shares": row.shares,
"price": row.price,
"total_value": row.total_value,
"flag": row.flag,
"is_10b51": row.is_10b51,
"post_tx_shares": row.post_tx_shares,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
def _signal_to_dict(row: Signal) -> dict:
return {
"id": row.id,
"ticker": row.ticker,
"trigger_date": row.trigger_date,
"cluster_size": row.cluster_size,
"total_cluster_value": row.total_cluster_value,
"score": row.score,
"alerted": row.alerted,
"executed": row.executed,
"executed_at": row.executed_at.strftime("%Y-%m-%dT%H:%M:%SZ") if row.executed_at else None,
"closed": row.closed,
"created_at": row.created_at.isoformat() if row.created_at else None,
}

81
db/models.py Normal file
View File

@ -0,0 +1,81 @@
from datetime import datetime
from sqlalchemy import (
Boolean,
Column,
DateTime,
Float,
Index,
Integer,
String,
Text,
UniqueConstraint,
)
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class Filing(Base):
__tablename__ = "filings"
id = Column(Integer, primary_key=True, autoincrement=True)
accession_number = Column(String, unique=True, nullable=False)
ticker = Column(String)
cik = Column(String)
insider_name = Column(String)
role = Column(String)
transaction_date = Column(String)
filed_date = Column(String)
shares = Column(Float)
price = Column(Float)
total_value = Column(Float)
flag = Column(String)
is_10b51 = Column(Boolean, default=False)
post_tx_shares = Column(Float)
created_at = Column(DateTime, default=datetime.utcnow)
__table_args__ = (
Index("idx_filings_ticker", "ticker"),
Index("idx_filings_transaction_date", "transaction_date"),
Index("idx_filings_filed_date", "filed_date"),
)
class Signal(Base):
__tablename__ = "signals"
id = Column(Integer, primary_key=True, autoincrement=True)
ticker = Column(String)
trigger_date = Column(String)
cluster_size = Column(Integer)
total_cluster_value = Column(Float)
score = Column(Float)
alerted = Column(Boolean, default=False)
executed = Column(Boolean, default=False)
executed_at = Column(DateTime)
closed = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
__table_args__ = (
Index("idx_signals_ticker", "ticker"),
Index("idx_signals_alerted", "alerted"),
Index("idx_signals_executed", "executed"),
)
class PriceCache(Base):
__tablename__ = "price_cache"
id = Column(Integer, primary_key=True, autoincrement=True)
ticker = Column(String, nullable=False)
date = Column(String, nullable=False)
close = Column(Float, nullable=False)
fetched_at = Column(DateTime, default=datetime.utcnow)
__table_args__ = (
UniqueConstraint("ticker", "date", name="uq_price_cache_ticker_date"),
Index("idx_price_cache_ticker_date", "ticker", "date"),
)

0
ingestion/__init__.py Normal file
View File

129
ingestion/edgar_poller.py Normal file
View File

@ -0,0 +1,129 @@
import time
import os
import logging
from typing import Optional
import requests
from lxml import etree, html
import config
from ingestion.form4_parser import parse_form4
from db.db import accession_exists, get_latest_filed_date, insert_filing
logger = logging.getLogger(__name__)
HEADERS = {
"User-Agent": "smaug-insider-monitor contact@example.com",
"Accept-Encoding": "gzip, deflate",
}
EDGAR_ATOM_URL = (
"https://www.sec.gov/cgi-bin/browse-edgar"
"?action=getcurrent&type=4&dateb=&owner=include&count=40&output=atom"
)
def _fetch(url: str, timeout: int = 30) -> requests.Response:
resp = requests.get(url, headers=HEADERS, timeout=timeout)
resp.raise_for_status()
return resp
def _get_filing_urls() -> list[tuple[str, str, str]]:
resp = _fetch(EDGAR_ATOM_URL)
root = etree.fromstring(resp.content)
ns = {"atom": "http://www.w3.org/2005/Atom"}
results = []
for entry in root.findall("atom:entry", ns):
link = entry.find("atom:link", ns)
if link is None:
continue
url = link.get("href", "")
updated = (entry.findtext("atom:updated", namespaces=ns) or "")[:10]
raw = url.rstrip("/").split("/")[-1].replace("-index.htm", "")
raw = raw.replace("-", "")
if len(raw) == 18:
accession = f"{raw[:10]}-{raw[10:12]}-{raw[12:]}"
else:
accession = raw
results.append((url, accession, updated))
return results
def _resolve_xml_url(accession: str) -> Optional[str]:
accession_path = accession.replace("-", "")
cik = accession_path[:10].lstrip("0")
base = f"{config.EDGAR_BASE_URL}/Archives/edgar/data/{cik}/{accession_path}/"
index_url = f"{base}{accession}-index.htm"
try:
resp = _fetch(index_url)
doc = html.fromstring(resp.content)
for link in doc.cssselect("table.tableFile a[href]"):
href = link.get("href", "")
if href.lower().endswith(".xml") and not href.lower().endswith("-index.htm"):
return config.EDGAR_BASE_URL + href if href.startswith("/") else base + href
except Exception as e:
logger.debug(f"Could not resolve XML URL for {accession}: {e}")
return None
def _save_raw_xml(accession: str, xml_bytes: bytes):
os.makedirs(config.DATA_DIR, exist_ok=True)
path = os.path.join(config.DATA_DIR, f"{accession}.xml")
if not os.path.exists(path):
with open(path, "wb") as f:
f.write(xml_bytes)
def fetch_and_store_new_filings() -> list[dict]:
new_filings = []
try:
entries = _get_filing_urls()
except Exception as e:
logger.error(f"Failed to fetch EDGAR index: {e}")
return new_filings
latest_in_db = get_latest_filed_date()
for _index_url, accession, filed_date in entries:
if latest_in_db and filed_date and filed_date < latest_in_db:
logger.debug(f"Skipping {accession}: filed_date {filed_date} older than latest in DB {latest_in_db}")
continue
if accession_exists(accession):
continue
xml_url = _resolve_xml_url(accession)
if not xml_url:
logger.warning(f"No XML found for {accession}")
continue
try:
xml_resp = _fetch(xml_url)
xml_bytes = xml_resp.content
except Exception as e:
logger.error(f"Failed to fetch XML for {accession}: {e}")
continue
_save_raw_xml(accession, xml_bytes)
parsed = parse_form4(xml_bytes, accession, filed_date)
for filing in parsed:
if insert_filing(filing):
new_filings.append(filing)
return new_filings
def run_poller(on_new_filing=None):
logger.info("EDGAR poller started")
while True:
logger.info("Polling EDGAR for new Form 4 filings...")
new = fetch_and_store_new_filings()
logger.info(f"Found {len(new)} new filings")
if on_new_filing:
for filing in new:
try:
on_new_filing(filing)
except Exception as e:
logger.error(f"Error processing filing {filing.get('accession_number')}: {e}")
time.sleep(config.EDGAR_POLL_INTERVAL)

95
ingestion/form4_parser.py Normal file
View File

@ -0,0 +1,95 @@
import re
from lxml import etree
from typing import Optional
_10B51_PATTERNS = [
r"10b5-1",
r"rule 10b5",
r"adopted a plan",
r"10b5\(1\)",
]
def _is_10b51(text: str) -> bool:
text_lower = text.lower()
return any(re.search(p, text_lower) for p in _10B51_PATTERNS)
def _text(el, tag: str) -> Optional[str]:
node = el.find(".//" + tag)
if node is not None and node.text:
return node.text.strip()
return None
def _float(el, tag: str) -> Optional[float]:
val = _text(el, tag)
if val is None:
return None
try:
return float(val.replace(",", ""))
except ValueError:
return None
def parse_form4(xml_bytes: bytes, accession_number: str, filed_date: str) -> list[dict]:
try:
root = etree.fromstring(xml_bytes)
except etree.XMLSyntaxError:
return []
ticker = _text(root, "issuerTradingSymbol") or ""
cik = _text(root, "issuerCik") or ""
insider_name = _text(root, "rptOwnerName") or ""
role = _text(root, "officerTitle") or _text(root, "isDirector") or ""
footnotes_text = " ".join(
(node.text or "") for node in root.findall(".//footnote")
)
global_10b51 = _is_10b51(footnotes_text)
transactions = root.findall(".//nonDerivativeTransaction")
results = []
for tx in transactions:
flag = _text(tx, "transactionAcquiredDisposedCode")
if not flag:
continue
shares = _float(tx, "transactionShares")
price = _float(tx, "transactionPricePerShare")
total_value = _float(tx, "transactionTotalValue")
if total_value is None and shares is not None and price is not None:
total_value = shares * price
post_tx_shares = _float(tx, "sharesOwnedFollowingTransaction")
tx_date = _text(tx, "transactionDate") or filed_date
tx_footnote_ids = [
fn.get("id", "") for fn in tx.findall(".//footnoteId")
]
tx_footnote_text = " ".join(
(root.find(f".//footnote[@id='{fid}']") or etree.Element("x")).text or ""
for fid in tx_footnote_ids
)
is_10b51 = int(global_10b51 or _is_10b51(tx_footnote_text))
results.append(
{
"accession_number": accession_number,
"ticker": ticker.upper(),
"cik": cik,
"insider_name": insider_name,
"role": role,
"transaction_date": tx_date,
"filed_date": filed_date,
"shares": shares,
"price": price,
"total_value": total_value,
"flag": flag.upper(),
"is_10b51": is_10b51,
"post_tx_shares": post_tx_shares,
}
)
return results

90
main.py Normal file
View File

@ -0,0 +1,90 @@
import logging
import sys
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
def _process_filing(filing: dict):
from signals.filter_engine import process_filing
from alerts.slack_alert import send_slack_alert
import config
signal = process_filing(filing)
if signal is None:
return
logger.info(
f"Signal: {signal['ticker']} score={signal['score']} cluster={signal['cluster_size']}"
)
if config.SLACK_WEBHOOK_URL:
send_slack_alert(signal)
if config.ALPACA_KEY and config.ALPACA_SECRET:
from broker.alpaca_client import execute_signal
execute_signal(signal)
def _close_expired_positions():
import config
if config.ALPACA_KEY and config.ALPACA_SECRET:
from broker.alpaca_client import close_expired_positions
close_expired_positions()
def cmd_run():
from db.db import init_db
from ingestion.edgar_poller import run_poller
init_db()
logger.info("Database initialized")
def on_new_filing(filing: dict):
_process_filing(filing)
_close_expired_positions()
run_poller(on_new_filing=on_new_filing)
def cmd_backtest():
from backtest.backtest import run_backtest, print_summary
import config
logger.info("Running backtest...")
summary = run_backtest(
db_path=config.DB_PATH,
holding_days=config.HOLDING_PERIOD_DAYS,
min_score=config.SCORE_ALERT_THRESHOLD,
min_cluster_size=config.MIN_CLUSTER_SIZE,
)
print_summary(summary)
def cmd_fetch_once():
from db.db import init_db
from ingestion.edgar_poller import fetch_and_store_new_filings
init_db()
filings = fetch_and_store_new_filings()
logger.info(f"Fetched and stored {len(filings)} new filings")
for filing in filings:
_process_filing(filing)
COMMANDS = {
"run": cmd_run,
"backtest": cmd_backtest,
"fetch-once": cmd_fetch_once,
}
if __name__ == "__main__":
cmd = sys.argv[1] if len(sys.argv) > 1 else "run"
if cmd not in COMMANDS:
print(f"Usage: python main.py [{' | '.join(COMMANDS)}]")
sys.exit(1)
COMMANDS[cmd]()

6
requirements.txt Normal file
View File

@ -0,0 +1,6 @@
requests>=2.31.0
lxml>=5.0.0
yfinance>=0.2.0
python-dotenv>=1.0.0
alpaca-trade-api>=3.0.0
sqlalchemy>=2.0.0

0
signals/__init__.py Normal file
View File

View File

@ -0,0 +1,13 @@
from db.db import get_recent_buys_for_ticker
import config
def detect_cluster(ticker: str) -> dict:
buys = get_recent_buys_for_ticker(ticker, config.CLUSTER_WINDOW_DAYS)
unique_insiders = {b["insider_name"] for b in buys}
total_value = sum(b["total_value"] or 0 for b in buys)
return {
"cluster_size": len(unique_insiders),
"total_cluster_value": total_value,
"buys": buys,
}

67
signals/filter_engine.py Normal file
View File

@ -0,0 +1,67 @@
import math
import logging
from typing import Optional
import config
from signals.cluster_detector import detect_cluster
from db.db import insert_signal
logger = logging.getLogger(__name__)
def _role_weight(role: str) -> float:
role_lower = (role or "").lower()
for key, weight in config.ROLE_WEIGHTS.items():
if key in role_lower:
return weight
return config.DEFAULT_ROLE_WEIGHT
def _score(total_value: float, role: str, cluster_size: int) -> float:
if not total_value or total_value <= 0:
return 0.0
base = _role_weight(role)
cluster_mult = 1.0 + 0.5 * (cluster_size - 1)
return base * math.log(total_value) * cluster_mult
def process_filing(filing: dict) -> Optional[dict]:
if filing.get("flag") != "A":
return None
if filing.get("is_10b51"):
logger.debug(f"Skipping 10b5-1 filing: {filing['accession_number']}")
return None
total_value = filing.get("total_value") or 0
if total_value < config.MIN_TRANSACTION_VALUE:
logger.debug(f"Below min value: {filing['accession_number']} (${total_value:,.0f})")
return None
ticker = filing.get("ticker", "")
if not ticker:
return None
cluster_info = detect_cluster(ticker)
cluster_size = cluster_info["cluster_size"]
total_cluster_value = cluster_info["total_cluster_value"]
if cluster_size < config.MIN_CLUSTER_SIZE:
return None
score = _score(total_value, filing.get("role", ""), cluster_size)
signal = {
"ticker": ticker,
"trigger_date": filing.get("transaction_date", ""),
"cluster_size": cluster_size,
"total_cluster_value": total_cluster_value,
"score": round(score, 2),
"filing": filing,
"cluster_buys": cluster_info["buys"],
}
signal_id = insert_signal(signal)
signal["id"] = signal_id
logger.info(f"Signal generated: {ticker} score={score:.2f} cluster={cluster_size}")
return signal