feat: Insider Copytrade POC + PLAN.md #2

Merged
dodox merged 4 commits from claude/issue-1-insider-copytrade-poc into master 2026-05-04 19:38:21 +02:00
7 changed files with 368 additions and 231 deletions
Showing only changes of commit b119b9abae - Show all commits

View File

@ -14,7 +14,7 @@ ingestion/edgar_poller.py ← polls every 10 min, dedupes by accession
ingestion/form4_parser.py ← parses XML, detects 10b5-1 plans
db/schema.sql + db/db.py ← SQLite (WAL mode): filings + signals tables
db/models.py + db/db.py ← SQLAlchemy ORM: filings, signals, price_cache tables
signals/filter_engine.py ← buy-only, exclude 10b5-1, min $50k, role-weighted scoring
@ -80,7 +80,9 @@ Role weights: CEO 3.0 · CFO/President 2.5 · COO 2.0 · Director 1.5 · VP 1.2
## Backtesting
The backtest loads signals from the SQLite DB and fetches OHLC data via `yfinance` on demand (no local price cache). Entry price is the closing price on the first trading day on or after the signal date; exit price is the closing price on the last trading day before or on the exit date. Raw XML filings are cached in `DATA_DIR` (`data/filings/`) by accession number.
The backtest loads signals from the DB and fetches OHLC data via `yfinance`. Prices are cached in the `price_cache` table — completed date ranges are served entirely from the DB on repeat runs, avoiding redundant network calls. Entry price is the closing price on the first trading day on or after the signal date; exit price is the closing price on the last trading day before or on the exit date. Raw XML filings are cached in `DATA_DIR` (`data/filings/`) by accession number.
The EDGAR poller also skips fetching XML for filings older than the newest `filed_date` already stored in the DB, so incremental runs only process truly new filings.
Metrics reported: win rate, average return, average alpha vs SPY, Sharpe ratio.
@ -95,8 +97,8 @@ Positions are tracked in the `signals` table. When a trade is executed, `execute
| `config.py` | All thresholds and env-var loading |
| `ingestion/edgar_poller.py` | EDGAR Atom feed polling and deduplication |
| `ingestion/form4_parser.py` | Form 4 XML → structured dict; 10b5-1 detection |
| `db/schema.sql` | SQLite schema (`filings`, `signals`) |
| `db/db.py` | DB access layer |
| `db/models.py` | SQLAlchemy ORM models (`Filing`, `Signal`, `PriceCache`) |
| `db/db.py` | DB access layer (SQLAlchemy sessions) |
| `signals/filter_engine.py` | Filing → signal pipeline |
| `signals/cluster_detector.py` | Cluster detection from DB |
| `alerts/slack_alert.py` | Slack webhook alert |
@ -107,4 +109,4 @@ Positions are tracked in the `signals` table. When a trade is executed, `execute
## Requirements
- Python 3.11+
- See `requirements.txt`: `requests`, `lxml`, `yfinance`, `python-dotenv`, `alpaca-trade-api`
- See `requirements.txt`: `requests`, `lxml`, `yfinance`, `python-dotenv`, `alpaca-trade-api`, `sqlalchemy`

View File

@ -3,44 +3,67 @@ import math
from datetime import datetime, timedelta
import config
from db.db import get_cached_prices, get_signals_for_backtest, upsert_prices
logger = logging.getLogger(__name__)
def _load_signals_from_db(db_path: str, min_score: float, min_cluster_size: int) -> list[dict]:
import sqlite3
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
rows = conn.execute(
"""
SELECT s.*, f.role FROM signals s
LEFT JOIN filings f ON f.ticker = s.ticker AND f.transaction_date = s.trigger_date
WHERE s.score >= ? AND s.cluster_size >= ?
""",
(min_score, min_cluster_size),
).fetchall()
conn.close()
return [dict(r) for r in rows]
def _fetch_prices(ticker: str, start: datetime, end: datetime) -> dict[str, float]:
try:
import yfinance as yf
except ImportError:
raise ImportError("yfinance not installed. Run: pip install yfinance")
start_str = start.strftime("%Y-%m-%d")
end_str = (end + timedelta(days=5)).strftime("%Y-%m-%d")
cached = get_cached_prices(ticker, start_str, end_str)
today = datetime.utcnow().strftime("%Y-%m-%d")
range_is_complete = end_str < today
if range_is_complete and cached:
return cached
data = yf.download(
ticker,
start=start_str,
end=end_str,
progress=False,
auto_adjust=True,
)
if data.empty:
return cached
fetched: dict[str, float] = {}
for ts, close_val in data["Close"].items():
date_key = ts.to_pydatetime().replace(tzinfo=None).strftime("%Y-%m-%d")
fetched[date_key] = float(close_val)
new_prices = {k: v for k, v in fetched.items() if k not in cached}
if new_prices:
upsert_prices(ticker, new_prices)
cached.update(fetched)
return cached
def _first_close_on_or_after(price_data, target_date: datetime) -> float:
"""Return the closing price on the first trading day on or after target_date."""
for ts, row in price_data["Close"].items():
ts_date = ts.to_pydatetime().replace(tzinfo=None)
if ts_date.date() >= target_date.date():
return float(row)
raise ValueError(f"No price data on or after {target_date.date()}")
def _first_close_on_or_after(prices: dict[str, float], target: datetime) -> float:
target_str = target.strftime("%Y-%m-%d")
for date_str in sorted(prices):
if date_str >= target_str:
return prices[date_str]
raise ValueError(f"No price data on or after {target_str}")
def _first_close_before(price_data, target_date: datetime) -> float:
"""Return the closing price on the last trading day before or on target_date."""
def _first_close_before(prices: dict[str, float], target: datetime) -> float:
target_str = target.strftime("%Y-%m-%d")
result = None
for ts, row in price_data["Close"].items():
ts_date = ts.to_pydatetime().replace(tzinfo=None)
if ts_date.date() <= target_date.date():
result = float(row)
for date_str in sorted(prices):
if date_str <= target_str:
result = prices[date_str]
if result is None:
raise ValueError(f"No price data on or before {target_date.date()}")
raise ValueError(f"No price data on or before {target_str}")
return result
@ -50,22 +73,15 @@ def run_backtest(
min_score: float = 0.0,
min_cluster_size: int = 1,
) -> dict:
try:
import yfinance as yf
except ImportError:
raise ImportError("yfinance not installed. Run: pip install yfinance")
db_path = db_path or config.DB_PATH
holding_days = holding_days or config.HOLDING_PERIOD_DAYS
signals = _load_signals_from_db(db_path, min_score, min_cluster_size)
signals = get_signals_for_backtest(min_score, min_cluster_size)
if not signals:
logger.warning("No signals found matching criteria")
return {}
results = []
spy_cache: dict[tuple, float] = {}
spy_cache: dict[str, float] = {}
for signal in signals:
ticker = signal["ticker"]
@ -79,38 +95,26 @@ def run_backtest(
exit_date = entry_date + timedelta(days=holding_days)
try:
stock_data = yf.download(
ticker,
start=entry_date.strftime("%Y-%m-%d"),
end=(exit_date + timedelta(days=5)).strftime("%Y-%m-%d"),
progress=False,
auto_adjust=True,
)
if stock_data.empty:
prices = _fetch_prices(ticker, entry_date, exit_date)
if not prices:
logger.debug(f"No price data for {ticker}")
continue
entry_price = _first_close_on_or_after(stock_data, entry_date)
exit_price = _first_close_before(stock_data, exit_date)
entry_price = _first_close_on_or_after(prices, entry_date)
exit_price = _first_close_before(prices, exit_date)
stock_return = (exit_price - entry_price) / entry_price
except Exception as e:
logger.debug(f"Failed to get data for {ticker}: {e}")
continue
period_key = (entry_date_str, holding_days)
period_key = entry_date_str
if period_key not in spy_cache:
try:
spy_data = yf.download(
"SPY",
start=entry_date.strftime("%Y-%m-%d"),
end=(exit_date + timedelta(days=5)).strftime("%Y-%m-%d"),
progress=False,
auto_adjust=True,
)
if not spy_data.empty:
spy_entry = _first_close_on_or_after(spy_data, entry_date)
spy_exit = _first_close_before(spy_data, exit_date)
spy_prices = _fetch_prices("SPY", entry_date, exit_date)
if spy_prices:
spy_entry = _first_close_on_or_after(spy_prices, entry_date)
spy_exit = _first_close_before(spy_prices, exit_date)
spy_cache[period_key] = (spy_exit - spy_entry) / spy_entry
else:
spy_cache[period_key] = 0.0

340
db/db.py
View File

@ -1,146 +1,226 @@
import sqlite3
import os
from datetime import datetime
from typing import Optional
from sqlalchemy import create_engine, func, select, text, update
from sqlalchemy.orm import Session
import config
from db.models import Base, Filing, PriceCache, Signal
def get_connection():
conn = sqlite3.connect(config.DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def _engine():
url = f"sqlite:///{config.DB_PATH}"
return create_engine(url, connect_args={"check_same_thread": False})
_ENGINE = None
def _get_engine():
global _ENGINE
if _ENGINE is None:
_ENGINE = _engine()
return _ENGINE
def init_db():
schema_path = os.path.join(os.path.dirname(__file__), "schema.sql")
with open(schema_path, "r") as f:
schema = f.read()
conn = get_connection()
conn.executescript(schema)
conn.commit()
conn.close()
engine = _get_engine()
with engine.connect() as conn:
conn.execute(text("PRAGMA journal_mode=WAL"))
conn.execute(text("PRAGMA foreign_keys=ON"))
Base.metadata.create_all(engine)
def _session() -> Session:
return Session(_get_engine())
def insert_filing(filing: dict) -> bool:
conn = get_connection()
try:
conn.execute(
"""
INSERT OR IGNORE INTO filings
(accession_number, ticker, cik, insider_name, role,
transaction_date, filed_date, shares, price, total_value,
flag, is_10b51, post_tx_shares)
VALUES
(:accession_number, :ticker, :cik, :insider_name, :role,
:transaction_date, :filed_date, :shares, :price, :total_value,
:flag, :is_10b51, :post_tx_shares)
""",
filing,
with _session() as session:
exists = session.scalar(
select(Filing.id).where(Filing.accession_number == filing["accession_number"])
)
inserted = conn.execute("SELECT changes()").fetchone()[0] > 0
conn.commit()
return inserted
finally:
conn.close()
if exists is not None:
return False
def insert_signal(signal: dict) -> int:
conn = get_connection()
try:
cur = conn.execute(
"""
INSERT INTO signals
(ticker, trigger_date, cluster_size, total_cluster_value, score)
VALUES
(:ticker, :trigger_date, :cluster_size, :total_cluster_value, :score)
""",
signal,
row = Filing(
accession_number=filing["accession_number"],
ticker=filing.get("ticker"),
cik=filing.get("cik"),
insider_name=filing.get("insider_name"),
role=filing.get("role"),
transaction_date=filing.get("transaction_date"),
filed_date=filing.get("filed_date"),
shares=filing.get("shares"),
price=filing.get("price"),
total_value=filing.get("total_value"),
flag=filing.get("flag"),
is_10b51=bool(filing.get("is_10b51", False)),
post_tx_shares=filing.get("post_tx_shares"),
)
signal_id = cur.lastrowid
conn.commit()
return signal_id
finally:
conn.close()
def mark_signal_alerted(signal_id: int):
conn = get_connection()
try:
conn.execute("UPDATE signals SET alerted=1 WHERE id=?", (signal_id,))
conn.commit()
finally:
conn.close()
def mark_signal_executed(signal_id: int):
conn = get_connection()
try:
conn.execute(
"UPDATE signals SET executed=1, executed_at=? WHERE id=?",
(datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"), signal_id),
)
conn.commit()
finally:
conn.close()
def mark_signal_closed(signal_id: int):
conn = get_connection()
try:
conn.execute("UPDATE signals SET closed=1 WHERE id=?", (signal_id,))
conn.commit()
finally:
conn.close()
def get_unalerted_signals() -> list[dict]:
conn = get_connection()
try:
rows = conn.execute(
"SELECT * FROM signals WHERE alerted=0 ORDER BY created_at ASC"
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def get_executed_unclosed_signals() -> list[dict]:
conn = get_connection()
try:
rows = conn.execute(
"SELECT * FROM signals WHERE executed=1 AND closed=0 AND executed_at IS NOT NULL"
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def get_recent_buys_for_ticker(ticker: str, window_days: int) -> list:
conn = get_connection()
try:
rows = conn.execute(
"""
SELECT * FROM filings
WHERE ticker=?
AND flag='A'
AND is_10b51=0
AND transaction_date >= date('now', ? || ' days')
ORDER BY transaction_date DESC
""",
(ticker, f"-{window_days}"),
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
session.add(row)
session.commit()
return True
def accession_exists(accession_number: str) -> bool:
conn = get_connection()
try:
row = conn.execute(
"SELECT 1 FROM filings WHERE accession_number=?", (accession_number,)
).fetchone()
return row is not None
finally:
conn.close()
with _session() as session:
return session.scalar(
select(Filing.id).where(Filing.accession_number == accession_number)
) is not None
def get_latest_filed_date() -> Optional[str]:
with _session() as session:
return session.scalar(select(func.max(Filing.filed_date)))
def insert_signal(signal: dict) -> int:
with _session() as session:
row = Signal(
ticker=signal["ticker"],
trigger_date=signal["trigger_date"],
cluster_size=signal["cluster_size"],
total_cluster_value=signal.get("total_cluster_value", 0.0),
score=signal["score"],
)
session.add(row)
session.commit()
return row.id
def mark_signal_alerted(signal_id: int):
with _session() as session:
session.execute(
update(Signal).where(Signal.id == signal_id).values(alerted=True)
)
session.commit()
def mark_signal_executed(signal_id: int):
with _session() as session:
session.execute(
update(Signal)
.where(Signal.id == signal_id)
.values(executed=True, executed_at=datetime.utcnow())
)
session.commit()
def mark_signal_closed(signal_id: int):
with _session() as session:
session.execute(
update(Signal).where(Signal.id == signal_id).values(closed=True)
)
session.commit()
def get_unalerted_signals() -> list[dict]:
with _session() as session:
rows = session.scalars(
select(Signal).where(Signal.alerted == False).order_by(Signal.created_at)
).all()
return [_signal_to_dict(r) for r in rows]
def get_executed_unclosed_signals() -> list[dict]:
with _session() as session:
rows = session.scalars(
select(Signal).where(
Signal.executed == True,
Signal.closed == False,
Signal.executed_at.is_not(None),
)
).all()
return [_signal_to_dict(r) for r in rows]
def get_recent_buys_for_ticker(ticker: str, window_days: int) -> list[dict]:
from datetime import timedelta
cutoff = (datetime.utcnow() - timedelta(days=window_days)).strftime("%Y-%m-%d")
with _session() as session:
rows = session.scalars(
select(Filing)
.where(
Filing.ticker == ticker,
Filing.flag == "A",
Filing.is_10b51 == False,
Filing.transaction_date >= cutoff,
)
.order_by(Filing.transaction_date.desc())
).all()
return [_filing_to_dict(r) for r in rows]
def get_signals_for_backtest(min_score: float, min_cluster_size: int) -> list[dict]:
with _session() as session:
rows = session.scalars(
select(Signal).where(
Signal.score >= min_score,
Signal.cluster_size >= min_cluster_size,
)
).all()
return [_signal_to_dict(r) for r in rows]
def get_cached_prices(ticker: str, start_date: str, end_date: str) -> dict[str, float]:
with _session() as session:
rows = session.scalars(
select(PriceCache).where(
PriceCache.ticker == ticker,
PriceCache.date >= start_date,
PriceCache.date <= end_date,
)
).all()
return {r.date: r.close for r in rows}
def upsert_prices(ticker: str, prices: dict[str, float]):
with _session() as session:
for date_str, close in prices.items():
existing = session.scalar(
select(PriceCache).where(
PriceCache.ticker == ticker,
PriceCache.date == date_str,
)
)
if existing is None:
session.add(PriceCache(ticker=ticker, date=date_str, close=close))
session.commit()
def _filing_to_dict(row: Filing) -> dict:
return {
"id": row.id,
"accession_number": row.accession_number,
"ticker": row.ticker,
"cik": row.cik,
"insider_name": row.insider_name,
"role": row.role,
"transaction_date": row.transaction_date,
"filed_date": row.filed_date,
"shares": row.shares,
"price": row.price,
"total_value": row.total_value,
"flag": row.flag,
"is_10b51": row.is_10b51,
"post_tx_shares": row.post_tx_shares,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
def _signal_to_dict(row: Signal) -> dict:
return {
"id": row.id,
"ticker": row.ticker,
"trigger_date": row.trigger_date,
"cluster_size": row.cluster_size,
"total_cluster_value": row.total_cluster_value,
"score": row.score,
"alerted": row.alerted,
"executed": row.executed,
"executed_at": row.executed_at.strftime("%Y-%m-%dT%H:%M:%SZ") if row.executed_at else None,
"closed": row.closed,
"created_at": row.created_at.isoformat() if row.created_at else None,
}

81
db/models.py Normal file
View File

@ -0,0 +1,81 @@
from datetime import datetime
from sqlalchemy import (
Boolean,
Column,
DateTime,
Float,
Index,
Integer,
String,
Text,
UniqueConstraint,
)
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class Filing(Base):
__tablename__ = "filings"
id = Column(Integer, primary_key=True, autoincrement=True)
accession_number = Column(String, unique=True, nullable=False)
ticker = Column(String)
cik = Column(String)
insider_name = Column(String)
role = Column(String)
transaction_date = Column(String)
filed_date = Column(String)
shares = Column(Float)
price = Column(Float)
total_value = Column(Float)
flag = Column(String)
is_10b51 = Column(Boolean, default=False)
post_tx_shares = Column(Float)
created_at = Column(DateTime, default=datetime.utcnow)
__table_args__ = (
Index("idx_filings_ticker", "ticker"),
Index("idx_filings_transaction_date", "transaction_date"),
Index("idx_filings_filed_date", "filed_date"),
)
class Signal(Base):
__tablename__ = "signals"
id = Column(Integer, primary_key=True, autoincrement=True)
ticker = Column(String)
trigger_date = Column(String)
cluster_size = Column(Integer)
total_cluster_value = Column(Float)
score = Column(Float)
alerted = Column(Boolean, default=False)
executed = Column(Boolean, default=False)
executed_at = Column(DateTime)
closed = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
__table_args__ = (
Index("idx_signals_ticker", "ticker"),
Index("idx_signals_alerted", "alerted"),
Index("idx_signals_executed", "executed"),
)
class PriceCache(Base):
__tablename__ = "price_cache"
id = Column(Integer, primary_key=True, autoincrement=True)
ticker = Column(String, nullable=False)
date = Column(String, nullable=False)
close = Column(Float, nullable=False)
fetched_at = Column(DateTime, default=datetime.utcnow)
__table_args__ = (
UniqueConstraint("ticker", "date", name="uq_price_cache_ticker_date"),
Index("idx_price_cache_ticker_date", "ticker", "date"),
)

View File

@ -1,37 +0,0 @@
CREATE TABLE IF NOT EXISTS filings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
accession_number TEXT UNIQUE,
ticker TEXT,
cik TEXT,
insider_name TEXT,
role TEXT,
transaction_date TEXT,
filed_date TEXT,
shares REAL,
price REAL,
total_value REAL,
flag TEXT,
is_10b51 INTEGER DEFAULT 0,
post_tx_shares REAL,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS signals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT,
trigger_date TEXT,
cluster_size INTEGER,
total_cluster_value REAL,
score REAL,
alerted INTEGER DEFAULT 0,
executed INTEGER DEFAULT 0,
executed_at TEXT,
closed INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_filings_ticker ON filings(ticker);
CREATE INDEX IF NOT EXISTS idx_filings_transaction_date ON filings(transaction_date);
CREATE INDEX IF NOT EXISTS idx_signals_ticker ON signals(ticker);
CREATE INDEX IF NOT EXISTS idx_signals_alerted ON signals(alerted);
CREATE INDEX IF NOT EXISTS idx_signals_executed ON signals(executed);

View File

@ -7,7 +7,7 @@ from lxml import etree, html
import config
from ingestion.form4_parser import parse_form4
from db.db import insert_filing, accession_exists
from db.db import accession_exists, get_latest_filed_date, insert_filing
logger = logging.getLogger(__name__)
@ -82,7 +82,13 @@ def fetch_and_store_new_filings() -> list[dict]:
logger.error(f"Failed to fetch EDGAR index: {e}")
return new_filings
latest_in_db = get_latest_filed_date()
for _index_url, accession, filed_date in entries:
if latest_in_db and filed_date and filed_date < latest_in_db:
logger.debug(f"Skipping {accession}: filed_date {filed_date} older than latest in DB {latest_in_db}")
continue
if accession_exists(accession):
continue

View File

@ -3,3 +3,4 @@ lxml>=5.0.0
yfinance>=0.2.0
python-dotenv>=1.0.0
alpaca-trade-api>=3.0.0
sqlalchemy>=2.0.0