smaug/backtest/backtest.py
claude b119b9abae feat: SQLAlchemy ORM models, filing cache incremental fetch, yfinance price cache
- Replace db/schema.sql + raw sqlite3 with SQLAlchemy ORM (db/models.py)
  - Filing, Signal, PriceCache models with proper indexes
  - db/db.py uses SQLAlchemy sessions throughout; no raw SQL strings
- Add PriceCache table: stores daily close prices per ticker
  - backtest._fetch_prices checks DB first; skips yfinance for completed ranges
  - New data persisted via upsert_prices()
  - get_cached_prices() / upsert_prices() added to db.py
- EDGAR poller incremental fetch: get_latest_filed_date() returns newest
  filed_date in DB; fetch_and_store_new_filings skips entries older than
  that cutoff before even checking accession_exists
- Add get_signals_for_backtest() to db.py; backtest no longer opens its
  own sqlite3 connection
- requirements.txt: add sqlalchemy>=2.0.0

Co-authored-by: dodox <dodox@users.noreply.local>
2026-05-04 17:21:23 +00:00

173 lines
5.5 KiB
Python

import logging
import math
from datetime import datetime, timedelta
import config
from db.db import get_cached_prices, get_signals_for_backtest, upsert_prices
logger = logging.getLogger(__name__)
def _fetch_prices(ticker: str, start: datetime, end: datetime) -> dict[str, float]:
try:
import yfinance as yf
except ImportError:
raise ImportError("yfinance not installed. Run: pip install yfinance")
start_str = start.strftime("%Y-%m-%d")
end_str = (end + timedelta(days=5)).strftime("%Y-%m-%d")
cached = get_cached_prices(ticker, start_str, end_str)
today = datetime.utcnow().strftime("%Y-%m-%d")
range_is_complete = end_str < today
if range_is_complete and cached:
return cached
data = yf.download(
ticker,
start=start_str,
end=end_str,
progress=False,
auto_adjust=True,
)
if data.empty:
return cached
fetched: dict[str, float] = {}
for ts, close_val in data["Close"].items():
date_key = ts.to_pydatetime().replace(tzinfo=None).strftime("%Y-%m-%d")
fetched[date_key] = float(close_val)
new_prices = {k: v for k, v in fetched.items() if k not in cached}
if new_prices:
upsert_prices(ticker, new_prices)
cached.update(fetched)
return cached
def _first_close_on_or_after(prices: dict[str, float], target: datetime) -> float:
target_str = target.strftime("%Y-%m-%d")
for date_str in sorted(prices):
if date_str >= target_str:
return prices[date_str]
raise ValueError(f"No price data on or after {target_str}")
def _first_close_before(prices: dict[str, float], target: datetime) -> float:
target_str = target.strftime("%Y-%m-%d")
result = None
for date_str in sorted(prices):
if date_str <= target_str:
result = prices[date_str]
if result is None:
raise ValueError(f"No price data on or before {target_str}")
return result
def run_backtest(
db_path: str = None,
holding_days: int = None,
min_score: float = 0.0,
min_cluster_size: int = 1,
) -> dict:
holding_days = holding_days or config.HOLDING_PERIOD_DAYS
signals = get_signals_for_backtest(min_score, min_cluster_size)
if not signals:
logger.warning("No signals found matching criteria")
return {}
results = []
spy_cache: dict[str, float] = {}
for signal in signals:
ticker = signal["ticker"]
entry_date_str = signal["trigger_date"]
try:
entry_date = datetime.strptime(entry_date_str, "%Y-%m-%d")
except ValueError:
continue
exit_date = entry_date + timedelta(days=holding_days)
try:
prices = _fetch_prices(ticker, entry_date, exit_date)
if not prices:
logger.debug(f"No price data for {ticker}")
continue
entry_price = _first_close_on_or_after(prices, entry_date)
exit_price = _first_close_before(prices, exit_date)
stock_return = (exit_price - entry_price) / entry_price
except Exception as e:
logger.debug(f"Failed to get data for {ticker}: {e}")
continue
period_key = entry_date_str
if period_key not in spy_cache:
try:
spy_prices = _fetch_prices("SPY", entry_date, exit_date)
if spy_prices:
spy_entry = _first_close_on_or_after(spy_prices, entry_date)
spy_exit = _first_close_before(spy_prices, exit_date)
spy_cache[period_key] = (spy_exit - spy_entry) / spy_entry
else:
spy_cache[period_key] = 0.0
except Exception:
spy_cache[period_key] = 0.0
spy_return = spy_cache[period_key]
alpha = stock_return - spy_return
results.append({
"ticker": ticker,
"entry_date": entry_date_str,
"stock_return": round(stock_return, 4),
"spy_return": round(spy_return, 4),
"alpha": round(alpha, 4),
"cluster_size": signal["cluster_size"],
"score": signal["score"],
})
if not results:
return {"error": "No results computed"}
returns = [r["stock_return"] for r in results]
alphas = [r["alpha"] for r in results]
win_rate = sum(1 for r in returns if r > 0) / len(returns)
avg_return = sum(returns) / len(returns)
avg_alpha = sum(alphas) / len(alphas)
std_dev = math.sqrt(sum((r - avg_return) ** 2 for r in returns) / len(returns))
sharpe = (avg_return / std_dev * math.sqrt(252 / holding_days)) if std_dev > 0 else 0.0
return {
"total_signals": len(results),
"win_rate": round(win_rate, 4),
"avg_return": round(avg_return, 4),
"avg_alpha_vs_spy": round(avg_alpha, 4),
"sharpe_ratio": round(sharpe, 4),
"holding_days": holding_days,
"results": results,
}
def print_summary(summary: dict):
if "error" in summary:
print(f"Error: {summary['error']}")
return
width = 40
print(f"\n{'=' * width}")
print(f"Backtest Results ({summary['holding_days']}-day hold)")
print(f"{'=' * width}")
print(f"Total signals: {summary['total_signals']}")
print(f"Win rate: {summary['win_rate']:.1%}")
print(f"Avg return: {summary['avg_return']:.2%}")
print(f"Avg alpha vs SPY: {summary['avg_alpha_vs_spy']:.2%}")
print(f"Sharpe ratio: {summary['sharpe_ratio']:.2f}")
print(f"{'=' * width}\n")