- backtest/plot.py: generates two plots saved to plots/
- hp_sweep.png: 7x7 heatmap of holding_days x round-trip cost, showing
annualised excess vs SPY and raw annualised return per cell
- equity_curves.png: portfolio equity vs SPY for 4 cost scenarios
- backtest/simulate.py: accept pre-loaded prices dict to avoid reloading
on every sweep iteration; return equity_curve in result
- main.py: add `plot` command
- README: updated results section with Alpaca-specific cost breakdown
(zero commission, costs are spread+slippage only); added honest analysis
of why insidercopytrading.com-style services show outperformance that
cannot be replicated in practice; note Alpaca integration not finished
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
155 lines
4.7 KiB
Python
155 lines
4.7 KiB
Python
import logging
|
|
import sys
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _run_signals(label: str = ""):
|
|
from db.db import get_all_buys_for_reprocess
|
|
from signals.filter_engine import process_filing
|
|
|
|
filings = get_all_buys_for_reprocess()
|
|
count = 0
|
|
seen: set[tuple] = set()
|
|
for filing in filings:
|
|
as_of = (filing.get("transaction_date") or filing.get("filed_date") or "")[:10]
|
|
key = (filing["ticker"], as_of)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
if process_filing(filing, as_of_date=as_of) is not None:
|
|
count += 1
|
|
logger.info(f"Signal generation{' ' + label if label else ''}: {count} signals")
|
|
return count
|
|
|
|
|
|
def _process_filing(filing: dict):
|
|
from signals.filter_engine import process_filing
|
|
from alerts.slack_alert import send_slack_alert
|
|
import config
|
|
|
|
signal = process_filing(filing)
|
|
if signal is None:
|
|
return
|
|
|
|
logger.info(f"Signal: {signal['ticker']} score={signal['score']} cluster={signal['cluster_size']}")
|
|
|
|
if config.SLACK_WEBHOOK_URL:
|
|
send_slack_alert(signal)
|
|
|
|
if config.ALPACA_KEY and config.ALPACA_SECRET:
|
|
from broker.alpaca_client import execute_signal
|
|
execute_signal(signal)
|
|
|
|
|
|
def _close_expired_positions():
|
|
import config
|
|
if config.ALPACA_KEY and config.ALPACA_SECRET:
|
|
from broker.alpaca_client import close_expired_positions
|
|
close_expired_positions()
|
|
|
|
|
|
def cmd_run():
|
|
"""Continuous live polling — polls EDGAR every 10 min, processes signals, executes trades."""
|
|
from db.db import init_db
|
|
from ingestion.edgar_poller import run_poller
|
|
|
|
init_db()
|
|
logger.info("Database initialized")
|
|
|
|
def on_new_filing(filing: dict):
|
|
_process_filing(filing)
|
|
_close_expired_positions()
|
|
|
|
run_poller(on_new_filing=on_new_filing)
|
|
|
|
|
|
def cmd_backfill():
|
|
"""Bulk-ingest historical Form 4 filings from SEC EDGAR quarterly archives.
|
|
|
|
Usage:
|
|
python main.py backfill --years 2023 2024 # full year range
|
|
python main.py backfill --year 2024 --quarter 1 # single quarter
|
|
"""
|
|
import argparse
|
|
from db.db import init_db
|
|
from ingestion.sec_bulk_ingest import ingest_years, ingest_quarter
|
|
|
|
parser = argparse.ArgumentParser(prog="main.py backfill")
|
|
group = parser.add_mutually_exclusive_group(required=True)
|
|
group.add_argument("--years", nargs=2, type=int, metavar=("FROM", "TO"),
|
|
help="Inclusive year range, e.g. --years 2023 2024")
|
|
group.add_argument("--year", type=int, help="Single year (use with --quarter)")
|
|
parser.add_argument("--quarter", type=int, choices=[1, 2, 3, 4])
|
|
parser.add_argument("--no-signals", action="store_true",
|
|
help="Skip signal generation after ingest")
|
|
args = parser.parse_args(sys.argv[2:])
|
|
|
|
init_db()
|
|
|
|
if args.years:
|
|
stored = ingest_years(args.years[0], args.years[1])
|
|
else:
|
|
if not args.quarter:
|
|
parser.error("--year requires --quarter")
|
|
stored = ingest_quarter(args.year, args.quarter)
|
|
|
|
logger.info(f"Ingest complete: {stored} transaction rows stored")
|
|
|
|
if not args.no_signals:
|
|
_run_signals("after backfill")
|
|
|
|
|
|
def cmd_backtest():
|
|
"""Backtest signals in the DB against historical prices via yfinance."""
|
|
from backtest.backtest import run_backtest, print_summary
|
|
import config
|
|
|
|
logger.info("Running backtest...")
|
|
summary = run_backtest(
|
|
db_path=config.DB_PATH,
|
|
holding_days=config.HOLDING_PERIOD_DAYS,
|
|
min_score=config.SCORE_ALERT_THRESHOLD,
|
|
min_cluster_size=config.MIN_CLUSTER_SIZE,
|
|
)
|
|
print_summary(summary)
|
|
|
|
|
|
def cmd_simulate():
|
|
"""Portfolio simulation with configurable strategy and transaction cost params.
|
|
|
|
Usage:
|
|
python main.py simulate [--holding-days 7] [--buy-delay 1]
|
|
[--position-size 0.10] [--min-score 0] [--min-cluster 1]
|
|
[--capital 100000]
|
|
[--spread 0.003] [--slippage 0.002] [--commission 0.001]
|
|
"""
|
|
from backtest.simulate import main as sim_main
|
|
sim_main()
|
|
|
|
|
|
def cmd_plot():
|
|
"""Generate HP heatmap and equity curve plots. Saves PNGs to plots/."""
|
|
from backtest.plot import main as plot_main
|
|
plot_main()
|
|
|
|
|
|
COMMANDS = {
|
|
"run": cmd_run,
|
|
"backfill": cmd_backfill,
|
|
"backtest": cmd_backtest,
|
|
"simulate": cmd_simulate,
|
|
"plot": cmd_plot,
|
|
}
|
|
|
|
if __name__ == "__main__":
|
|
cmd = sys.argv[1] if len(sys.argv) > 1 else "run"
|
|
if cmd not in COMMANDS:
|
|
print(f"Usage: python main.py [{' | '.join(COMMANDS)}]")
|
|
sys.exit(1)
|
|
COMMANDS[cmd]()
|