smaug/backtest/simulate.py
Dominik Roth b615920843 fix: realistic transaction costs, colorbar layout, equity curve clipping
- Costs updated to evidence-based values (SEC small-cap liquidity study 2013,
  Nasdaq spread data 2021, AQR Trading Costs paper 2018):
  large ~0.2% RT, mid ~0.5%, small ~1.5%, micro ~5%
- Micro-cap note: Alpaca does not allow new OTC/Pink Sheet positions;
  most micro-cap signals are untradeable; at realistic 5% RT, micro-cap
  destroys capital (-36% to -81% excess return)
- db.py: get_cached_market_caps returns already_fetched set including null
  rows, preventing repeated yfinance re-queries for known-missing tickers
- plot_hp_heatmap: colorbar in dedicated axes (right margin), no overlap
- plot_equity_curves: two-pass approach clips all curves to min end date
- README: updated cost table, shortened insidercopytrading.com section

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 14:23:13 +02:00

491 lines
18 KiB
Python

"""
Portfolio simulator for the insider-copytrade strategy.
Usage:
python backtest/simulate.py [options]
Strategy params:
--holding-days Calendar days to hold each position (default: 7)
--buy-delay Days after signal trigger to enter (default: 1)
--position-size Fraction of available cash per trade (default: 0.10)
--min-score Minimum signal score filter (default: 0.0)
--min-cluster Minimum cluster size filter (default: 1)
--capital Initial capital in USD (default: 100000)
Transaction cost params:
--spread One-way bid-ask half-spread, e.g. 0.003 = 0.3% (default: 0.003)
--slippage Entry slippage / market impact (default: 0.002)
--commission Flat per-trade commission as fraction of notional (default: 0.001)
Round-trip cost = spread*2 + slippage + commission*2 (applied at buy and sell)
"""
import argparse
import logging
import math
import os
import sys
from collections import defaultdict
from datetime import datetime, timedelta
# Allow running as script from repo root
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import config
from db.db import get_signals_for_backtest, get_cached_market_caps, upsert_market_caps
CAP_TIERS = {
"large": (10_000_000_000, None),
"mid": (2_000_000_000, 10_000_000_000),
"small": (300_000_000, 2_000_000_000),
"micro": (0, 300_000_000),
}
def _fetch_market_caps(tickers: list[str]) -> dict[str, float]:
"""Return market cap for each ticker, using DB cache then yfinance for misses."""
import yfinance as yf
from concurrent.futures import ThreadPoolExecutor, as_completed
cached, already_fetched = get_cached_market_caps(tickers)
# Skip tickers already tried (even if null) and those with special chars
missing = [t for t in tickers if t not in already_fetched and "/" not in t]
if missing:
logger.info(f"Fetching market caps for {len(missing)} tickers via yfinance (parallel)...")
fetched = {}
def _get_cap(ticker):
try:
cap = getattr(yf.Ticker(ticker).fast_info, "market_cap", None)
return ticker, float(cap) if cap else None
except Exception:
return ticker, None
with ThreadPoolExecutor(max_workers=20) as pool:
futures = {pool.submit(_get_cap, t): t for t in missing}
done = 0
for future in as_completed(futures):
ticker, cap = future.result()
done += 1
if cap:
fetched[ticker] = cap
if done % 50 == 0:
print(f" market caps: {done}/{len(missing)} fetched, {len(fetched)} hits", flush=True)
# Cache all results (including None = not found) to avoid re-querying
all_results = {t: fetched.get(t) for t in missing}
upsert_market_caps(all_results)
cached.update({t: v for t, v in all_results.items() if v is not None})
return cached
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Price loading
# ---------------------------------------------------------------------------
def _load_all_prices() -> dict[str, tuple[list, list]]:
"""
Load price cache from DB into memory.
Returns {ticker: (sorted_dates, closes)} for O(log n) bisect lookups.
"""
from sqlalchemy import create_engine, text
from collections import defaultdict
engine = create_engine(
f"sqlite:///{config.DB_PATH}",
connect_args={"check_same_thread": False},
)
with engine.connect() as conn:
rows = conn.execute(text(
"SELECT ticker, date, close FROM price_cache ORDER BY ticker, date"
)).fetchall()
raw: dict[str, list] = defaultdict(list)
for ticker, date, close in rows:
raw[ticker].append((date, close))
prices = {
ticker: ([d for d, _ in pairs], [c for _, c in pairs])
for ticker, pairs in raw.items()
}
logger.info(f"Loaded prices for {len(prices)} tickers ({sum(len(v[0]) for v in prices.values())} rows)")
return prices
def _closest_price_on_or_after(prices: tuple, date_str: str) -> float | None:
import bisect
dates, closes = prices
i = bisect.bisect_left(dates, date_str)
return closes[i] if i < len(closes) else None
def _closest_price_on_or_before(prices: tuple, date_str: str) -> float | None:
import bisect
dates, closes = prices
i = bisect.bisect_right(dates, date_str) - 1
return closes[i] if i >= 0 else None
# ---------------------------------------------------------------------------
# Core simulation
# ---------------------------------------------------------------------------
class Strategy:
def __init__(
self,
holding_days: int = 7,
buy_delay: int = 1,
position_size: float = 0.10,
min_score: float = 0.0,
min_cluster: int = 1,
capital: float = 100_000.0,
spread: float = 0.003,
slippage: float = 0.002,
commission: float = 0.001,
cap_tier: str = None,
):
self.holding_days = holding_days
self.buy_delay = buy_delay
self.position_size = position_size
self.min_score = min_score
self.min_cluster = min_cluster
self.capital = capital
self.spread = spread
self.slippage = slippage
self.commission = commission
self.cap_tier = cap_tier # "large" | "mid" | "small" | "micro" | None
# cost applied at entry: half-spread + slippage + commission
@property
def entry_cost(self) -> float:
return self.spread + self.slippage + self.commission
# cost applied at exit: half-spread + commission
@property
def exit_cost(self) -> float:
return self.spread + self.commission
@property
def roundtrip_cost(self) -> float:
return self.entry_cost + self.exit_cost
def simulate(strategy: Strategy, prices: dict = None,
_signals: list = None, _market_caps: dict = None) -> dict:
if _signals is None:
_signals = get_signals_for_backtest(strategy.min_score, strategy.min_cluster)
# Filter malformed dates
valid = []
for s in _signals:
try:
date_str = s["trigger_date"][:10]
yr = int(date_str[:4])
if yr >= 2020:
s = dict(s, trigger_date=date_str)
valid.append(s)
except Exception:
pass
signals = valid
if not signals:
return {"error": "No signals after filtering"}
if strategy.cap_tier:
tier = CAP_TIERS.get(strategy.cap_tier)
if tier is None:
raise ValueError(f"Unknown cap_tier {strategy.cap_tier!r}. Use: {list(CAP_TIERS)}")
cap_min, cap_max = tier
if _market_caps is None:
tickers = list({s["ticker"] for s in signals})
_market_caps = _fetch_market_caps(tickers)
signals = [
s for s in signals
if _market_caps.get(s["ticker"], 0) >= cap_min
and (cap_max is None or _market_caps.get(s["ticker"], 0) < cap_max)
]
logger.info(f"Cap tier '{strategy.cap_tier}': {len(signals)} signals after filtering")
if not signals:
return {"error": f"No signals after cap_tier={strategy.cap_tier} filter"}
if prices is None:
prices = _load_all_prices()
# Build trade list: {entry_date_str: [(ticker, exit_date_str, signal)]}
trades_by_entry: dict[str, list] = defaultdict(list)
for sig in signals:
trigger_dt = datetime.strptime(sig["trigger_date"], "%Y-%m-%d")
entry_dt = trigger_dt + timedelta(days=strategy.buy_delay)
exit_dt = entry_dt + timedelta(days=strategy.holding_days)
entry_str = entry_dt.strftime("%Y-%m-%d")
exit_str = exit_dt.strftime("%Y-%m-%d")
trades_by_entry[entry_str].append((sig["ticker"], exit_str, sig))
# Collect all dates with events
all_dates = sorted(set(trades_by_entry.keys()))
# State
cash = strategy.capital
# open positions: list of (exit_date_str, ticker, cost_basis, shares, notional_invested)
open_positions: list[tuple[str, str, float, float, float]] = []
equity_curve: list[tuple[str, float]] = [] # (date, portfolio_value)
trade_log: list[dict] = []
trades_executed = 0
trades_skipped_no_price = 0
spy_prices = prices.get("SPY", ([], []))
for date_str in all_dates:
# 1. Close any positions whose exit_date <= today
still_open = []
for pos in open_positions:
exit_dt_str, ticker, cost_basis, shares, notional = pos
if exit_dt_str <= date_str:
px = prices.get(ticker, ([], []))
exit_price = _closest_price_on_or_before(px, exit_dt_str)
if exit_price is None:
exit_price = _closest_price_on_or_before(px, date_str)
if exit_price is None:
# can't find exit price — recover notional (no gain/loss)
cash += notional
trade_log.append({
"ticker": ticker,
"entry_date": date_str,
"exit_date": exit_dt_str,
"gross_return": 0.0,
"net_return": 0.0,
"pnl": 0.0,
"note": "no_exit_price",
})
continue
gross_return = (exit_price - cost_basis) / cost_basis
net_return = gross_return - strategy.exit_cost
exit_proceeds = notional * (1 + net_return)
cash += exit_proceeds
trade_log.append({
"ticker": ticker,
"exit_date": exit_dt_str,
"gross_return": round(gross_return, 5),
"net_return": round(net_return, 5),
"pnl": round(exit_proceeds - notional, 2),
"notional": round(notional, 2),
})
else:
still_open.append(pos)
open_positions = still_open
# 2. Open new positions for today's signals
for ticker, exit_date_str, sig in trades_by_entry[date_str]:
px = prices.get(ticker, ([], []))
entry_price = _closest_price_on_or_after(px, date_str)
if entry_price is None:
trades_skipped_no_price += 1
continue
notional = cash * strategy.position_size
if notional < 1.0:
continue
# Deduct entry cost from proceeds (effective entry price is higher)
effective_entry = entry_price * (1 + strategy.entry_cost)
shares = notional / effective_entry
cash -= notional
open_positions.append((exit_date_str, ticker, effective_entry, shares, notional))
trades_executed += 1
# Track equity (cash + mark-to-market open positions at cost basis — conservative)
open_value = sum(n for _, _, _, _, n in open_positions)
equity_curve.append((date_str, cash + open_value))
# Close all remaining open positions at last available price
for exit_dt_str, ticker, cost_basis, shares, notional in open_positions:
px = prices.get(ticker, ([], []))
exit_price = _closest_price_on_or_before(px, exit_dt_str) or cost_basis
gross_return = (exit_price - cost_basis) / cost_basis
net_return = gross_return - strategy.exit_cost
cash += notional * (1 + net_return)
final_value = cash
# SPY benchmark
if equity_curve and spy_prices[0]:
start_str = equity_curve[0][0]
end_str = equity_curve[-1][0]
spy_start = _closest_price_on_or_after(spy_prices, start_str)
spy_end = _closest_price_on_or_before(spy_prices, end_str)
spy_total = (spy_end - spy_start) / spy_start if (spy_start and spy_end) else 0.0
else:
spy_total = 0.0
# Annualized metrics
if equity_curve:
start_dt = datetime.strptime(equity_curve[0][0], "%Y-%m-%d")
end_dt = datetime.strptime(equity_curve[-1][0], "%Y-%m-%d")
years = max((end_dt - start_dt).days / 365.25, 0.001)
else:
years = 1.0
total_return = (final_value - strategy.capital) / strategy.capital
ann_return = (1 + total_return) ** (1 / years) - 1
spy_ann = (1 + spy_total) ** (1 / years) - 1
# Max drawdown from equity curve
peak = strategy.capital
max_dd = 0.0
for _, val in equity_curve:
if val > peak:
peak = val
dd = (peak - val) / peak
if dd > max_dd:
max_dd = dd
# Per-trade Sharpe from trade log
net_returns = [t["net_return"] for t in trade_log if "net_return" in t]
if net_returns:
avg_r = sum(net_returns) / len(net_returns)
std_r = math.sqrt(sum((r - avg_r) ** 2 for r in net_returns) / len(net_returns))
trades_per_year = trades_executed / years
sharpe = (avg_r / std_r * math.sqrt(trades_per_year)) if std_r > 0 else 0.0
win_rate = sum(1 for r in net_returns if r > 0) / len(net_returns)
avg_net_return = avg_r
else:
sharpe = win_rate = avg_net_return = 0.0
return {
"strategy": {
"holding_days": strategy.holding_days,
"buy_delay": strategy.buy_delay,
"position_size": strategy.position_size,
"min_score": strategy.min_score,
"min_cluster": strategy.min_cluster,
"roundtrip_cost_pct": round(strategy.roundtrip_cost * 100, 3),
"cap_tier": strategy.cap_tier or "all",
},
"period": {
"start": equity_curve[0][0] if equity_curve else "n/a",
"end": equity_curve[-1][0] if equity_curve else "n/a",
"years": round(years, 2),
},
"performance": {
"initial_capital": strategy.capital,
"final_value": round(final_value, 2),
"total_return_pct": round(total_return * 100, 2),
"annualized_return_pct": round(ann_return * 100, 2),
"spy_annualized_pct": round(spy_ann * 100, 2),
"excess_return_pct": round((ann_return - spy_ann) * 100, 2),
"max_drawdown_pct": round(max_dd * 100, 2),
"sharpe": round(sharpe, 3),
},
"trades": {
"signals_total": len(signals),
"executed": trades_executed,
"skipped_no_price": trades_skipped_no_price,
"win_rate_pct": round(win_rate * 100, 2),
"avg_net_return_pct": round(avg_net_return * 100, 3),
},
"equity_curve": equity_curve,
}
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _print_results(r: dict):
if "error" in r:
print(f"Error: {r['error']}")
return
s = r["strategy"]
p = r["performance"]
t = r["trades"]
period = r["period"]
w = 48
print(f"\n{'=' * w}")
print(f" Portfolio Simulation Results")
print(f"{'=' * w}")
print(f" Strategy")
print(f" Hold: {s['holding_days']}d | Delay: {s['buy_delay']}d | Size: {s['position_size']*100:.0f}% of cash")
print(f" Score ≥ {s['min_score']} | Cluster ≥ {s['min_cluster']} | Cap: {s['cap_tier']}")
print(f" Round-trip cost: {s['roundtrip_cost_pct']:.2f}%")
print(f" Period: {period['start']}{period['end']} ({period['years']}y)")
print(f"{'' * w}")
print(f" Capital: ${p['initial_capital']:>12,.0f} → ${p['final_value']:>12,.2f}")
print(f" Total ret: {p['total_return_pct']:>+8.1f}%")
print(f" Ann. ret: {p['annualized_return_pct']:>+8.1f}% (SPY: {p['spy_annualized_pct']:+.1f}%)")
print(f" Excess: {p['excess_return_pct']:>+8.1f}%")
print(f" Max DD: {p['max_drawdown_pct']:>8.1f}%")
print(f" Sharpe: {p['sharpe']:>8.3f}")
print(f"{'' * w}")
print(f" Trades executed: {t['executed']:>6} / {t['signals_total']} signals")
print(f" Skipped (no px): {t['skipped_no_price']:>6}")
print(f" Win rate: {t['win_rate_pct']:>5.1f}%")
print(f" Avg net return: {t['avg_net_return_pct']:>+6.3f}% per trade")
print(f"{'=' * w}\n")
def main():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
parser = argparse.ArgumentParser(
description="Simulate insider-copytrade portfolio with realistic costs",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
# Strategy
parser.add_argument("--holding-days", type=int, default=7)
parser.add_argument("--buy-delay", type=int, default=1)
parser.add_argument("--position-size", type=float, default=0.10,
help="Fraction of available cash per trade (0.10 = 10%%)")
parser.add_argument("--min-score", type=float, default=0.0)
parser.add_argument("--min-cluster", type=int, default=1)
parser.add_argument("--cap-tier", choices=["large", "mid", "small", "micro"],
default=None, help="Filter by market cap tier")
parser.add_argument("--capital", type=float, default=100_000.0)
# Costs
parser.add_argument("--spread", type=float, default=0.003,
help="Half bid-ask spread paid on entry AND exit (0.003 = 0.3%%)")
parser.add_argument("--slippage", type=float, default=0.002,
help="Entry slippage / market impact (0.002 = 0.2%%)")
parser.add_argument("--commission", type=float, default=0.001,
help="Per-trade commission as fraction of notional")
# When invoked via `python main.py simulate ...`, argv[1] is 'simulate' -- skip it
raw = sys.argv[1:]
if raw and raw[0] == "simulate":
raw = raw[1:]
args = parser.parse_args(raw)
from db.db import init_db
init_db()
strategy = Strategy(
holding_days=args.holding_days,
buy_delay=args.buy_delay,
position_size=args.position_size,
min_score=args.min_score,
min_cluster=args.min_cluster,
capital=args.capital,
spread=args.spread,
slippage=args.slippage,
commission=args.commission,
cap_tier=args.cap_tier,
)
result = simulate(strategy)
_print_results(result)
if __name__ == "__main__":
main()