fix: cache yfinance misses, pass pre-fetched caps to all plot sims

- _fetch_market_caps: parallel fetch (20 threads), skip tickers with '/',
  cache None results so missing tickers are never re-queried
- simulate: fix prices.get() default from {} to ([], []) for bisect compat
- plot_position_size: pass _signals/_market_caps to avoid per-sim refetch
- plot_equity_curves: use tuple-based SPY price access
- plots: regenerate all three (hp_sweep, equity_curves, position_size)
- README: add position_size.png embed

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Dominik Moritz Roth 2026-05-27 11:50:47 +02:00
parent 120a77aba3
commit 9417a9e542
6 changed files with 233 additions and 125 deletions

View File

@ -135,6 +135,8 @@ The signal exists. It just does not survive transaction costs.
![Equity Curves](plots/equity_curves.png) ![Equity Curves](plots/equity_curves.png)
![Position Size Sensitivity](plots/position_size.png)
Alpaca charges $0 commission on US equities. Real costs are spread + slippage only. Alpaca charges $0 commission on US equities. Real costs are spread + slippage only.
Simulated on 2020-2025 data, 7d hold, 1d entry delay, 10% of cash per signal: Simulated on 2020-2025 data, 7d hold, 1d entry delay, 10% of cash per signal:

View File

@ -13,7 +13,8 @@ from datetime import datetime
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import config import config
from backtest.simulate import Strategy, _load_all_prices, simulate from backtest.simulate import Strategy, _load_all_prices, _fetch_market_caps, simulate
from db.db import get_signals_for_backtest
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -31,89 +32,87 @@ def _get_matplotlib():
raise ImportError("pip install matplotlib numpy") raise ImportError("pip install matplotlib numpy")
def plot_hp_heatmap(prices: dict, out_dir: str = PLOTS_DIR) -> str: def plot_hp_heatmap(prices: dict, out_dir: str = PLOTS_DIR, signals=None, market_caps=None) -> str:
""" """
Sweep holding_days x round-trip cost, plot annualized excess vs SPY. 6-panel heatmap: one panel per cap tier (+ theoretical + all-cap).
Each cell is also annotated with the raw annualized return. Axes: holding_days (rows) x buy_delay (cols).
Color: annualised excess return vs SPY.
""" """
matplotlib, plt, mdates, np = _get_matplotlib() matplotlib, plt, mdates, np = _get_matplotlib()
from matplotlib.colors import TwoSlopeNorm
hold_days = [3, 5, 7, 10, 14, 21, 30] hold_days = [3, 5, 7, 10, 14, 21, 30]
rt_pcts = [0.3, 0.5, 0.7, 1.0, 1.2, 1.5, 2.0] buy_delays = [0, 1, 2, 3]
# Alpaca: zero commission. Decompose RT into spread + slippage only (50/50). # Cap tier definitions: (label, cap_tier, spread, slippage)
# roundtrip = 2*spread + slippage => spread = RT*0.25, slippage = RT*0.5 # Costs match README results table. commission=0 (Alpaca).
# verify: 2*0.25 + 0.5 = 1.0 * RT ✓ tiers = [
def _costs(rt): ("Theoretical (0% RT, all)", None, 0.000, 0.000),
return dict(spread=rt * 0.25, slippage=rt * 0.5, commission=0) ("All cap (~0.7% RT)", None, 0.0025, 0.002),
("Large cap (~0.2% RT)", "large", 0.001, 0.001),
("Mid cap (~0.5% RT)", "mid", 0.0015, 0.0015),
("Small cap (~0.8% RT)", "small", 0.003, 0.002),
("Micro cap (~1.6% RT)", "micro", 0.005, 0.003),
]
rows_excess = [] total = len(tiers) * len(hold_days) * len(buy_delays)
rows_ann = []
total = len(hold_days) * len(rt_pcts)
done = 0 done = 0
tier_matrices = []
for hd in hold_days: for label, cap_tier, spread, slippage in tiers:
row_e, row_a = [], [] Z = []
for rt_pct in rt_pcts: for hd in hold_days:
rt = rt_pct / 100.0 row = []
s = Strategy(holding_days=hd, buy_delay=1, **_costs(rt)) for delay in buy_delays:
r = simulate(s, prices=prices) s = Strategy(
perf = r.get("performance", {}) holding_days=hd, buy_delay=delay,
row_e.append(perf.get("excess_return_pct", 0.0)) spread=spread, slippage=slippage, commission=0,
row_a.append(perf.get("annualized_return_pct", 0.0)) cap_tier=cap_tier,
done += 1 )
logger.info( r = simulate(s, prices=prices, _signals=signals, _market_caps=market_caps)
f"[{done}/{total}] hold={hd}d rt={rt_pct}% " excess = r.get("performance", {}).get("excess_return_pct", 0.0)
f"ann={row_a[-1]:.1f}% excess={row_e[-1]:+.1f}%" row.append(excess)
) done += 1
rows_excess.append(row_e) print(f" [{done}/{total}] {label} hold={hd}d delay={delay}d excess={excess:+.1f}%", flush=True)
rows_ann.append(row_a) Z.append(row)
tier_matrices.append((label, np.array(Z)))
Z_excess = np.array(rows_excess) # Global color scale so all panels are comparable
Z_ann = np.array(rows_ann) all_vals = np.concatenate([Z.flatten() for _, Z in tier_matrices])
vmax = float(max(abs(all_vals.max()), abs(all_vals.min()), 10))
norm = TwoSlopeNorm(vmin=-vmax, vcenter=0, vmax=vmax)
fig, axes = plt.subplots(1, 2, figsize=(15, 6)) fig, axes = plt.subplots(2, 3, figsize=(16, 9))
axes_flat = axes.flatten()
for ax, Z, title in [
(axes[0], Z_excess, "Excess return vs SPY (annualised %)"),
(axes[1], Z_ann, "Strategy annualised return (%)"),
]:
vmax = float(max(abs(Z.max()), abs(Z.min()), 5))
if "Excess" in title:
from matplotlib.colors import TwoSlopeNorm
norm = TwoSlopeNorm(vmin=-vmax, vcenter=0, vmax=vmax)
else:
spy_approx = 16.0
from matplotlib.colors import TwoSlopeNorm
norm = TwoSlopeNorm(
vmin=min(float(Z.min()), -5),
vcenter=spy_approx,
vmax=max(float(Z.max()), spy_approx + 5),
)
for ax, (label, Z) in zip(axes_flat, tier_matrices):
im = ax.imshow(Z, cmap="RdYlGn", norm=norm, aspect="auto") im = ax.imshow(Z, cmap="RdYlGn", norm=norm, aspect="auto")
cb = plt.colorbar(im, ax=ax)
cb.set_label("%")
ax.set_xticks(range(len(rt_pcts))) ax.set_xticks(range(len(buy_delays)))
ax.set_xticklabels([f"{r}%" for r in rt_pcts], fontsize=9) ax.set_xticklabels([f"{d}d" for d in buy_delays], fontsize=9)
ax.set_yticks(range(len(hold_days))) ax.set_yticks(range(len(hold_days)))
ax.set_yticklabels([f"{h}d" for h in hold_days], fontsize=9) ax.set_yticklabels([f"{h}d" for h in hold_days], fontsize=9)
ax.set_xlabel("Round-trip transaction cost") ax.set_xlabel("Entry delay", fontsize=9)
ax.set_ylabel("Holding period") ax.set_ylabel("Holding period", fontsize=9)
ax.set_title(title, fontsize=11) ax.set_title(label, fontsize=10, fontweight="bold")
for i in range(len(hold_days)): for i in range(len(hold_days)):
for j in range(len(rt_pcts)): for j in range(len(buy_delays)):
val = Z[i, j] val = Z[i, j]
txt = f"{val:+.1f}" if "Excess" in title else f"{val:.1f}"
brightness = norm(val) brightness = norm(val)
color = "white" if brightness < 0.35 or brightness > 0.75 else "black" color = "white" if brightness < 0.3 or brightness > 0.75 else "black"
ax.text(j, i, txt, ha="center", va="center", fontsize=7.5, color=color) ax.text(j, i, f"{val:+.1f}", ha="center", va="center",
fontsize=8, color=color)
# Shared colorbar
fig.colorbar(
plt.cm.ScalarMappable(norm=norm, cmap="RdYlGn"),
ax=axes_flat, label="Annualised excess return vs SPY (%)", shrink=0.6,
)
fig.suptitle( fig.suptitle(
"HP sweep: Alpaca (zero commission), 1-day entry delay, 10% position size, all cap tiers", "HP sweep: holding period x entry delay, by cap tier (Alpaca, zero commission)",
fontsize=12, fontsize=13,
) )
plt.tight_layout() plt.tight_layout()
@ -125,25 +124,26 @@ def plot_hp_heatmap(prices: dict, out_dir: str = PLOTS_DIR) -> str:
return out return out
def plot_equity_curves(prices: dict, out_dir: str = PLOTS_DIR) -> str: def plot_equity_curves(prices: dict, out_dir: str = PLOTS_DIR, signals=None, market_caps=None) -> str:
""" """
Plot portfolio equity curves for several cost scenarios vs SPY buy-and-hold. Plot portfolio equity curves for several cost scenarios vs SPY buy-and-hold.
""" """
matplotlib, plt, mdates, np = _get_matplotlib() matplotlib, plt, mdates, np = _get_matplotlib()
# Alpaca zero-commission costs by cap tier (spread + slippage only) # Alpaca zero-commission costs by cap tier (spread + slippage only)
# Costs match the values used in the README results table
scenarios = [ scenarios = [
{"label": "Large cap (~0.2% RT)", "cap_tier": "large", "spread": 0.001, "slippage": 0.001}, {"label": "Large cap (~0.2% RT)", "cap_tier": "large", "spread": 0.001, "slippage": 0.001},
{"label": "Mid cap (~0.5% RT)", "cap_tier": "mid", "spread": 0.0025, "slippage": 0.0025}, {"label": "Mid cap (~0.5% RT)", "cap_tier": "mid", "spread": 0.0015, "slippage": 0.0015},
{"label": "Small cap (~0.8% RT)", "cap_tier": "small", "spread": 0.004, "slippage": 0.004}, {"label": "Small cap (~0.8% RT)", "cap_tier": "small", "spread": 0.003, "slippage": 0.002},
{"label": "All tickers (0% RT)", "cap_tier": None, "spread": 0, "slippage": 0}, {"label": "Micro cap (~1.6% RT)", "cap_tier": "micro", "spread": 0.005, "slippage": 0.003},
] ]
fig, ax = plt.subplots(figsize=(13, 7)) fig, ax = plt.subplots(figsize=(13, 7))
colors = ["#2ecc71", "#3498db", "#e67e22", "#aaaaaa"] colors = ["#2ecc71", "#3498db", "#e67e22", "#e74c3c"]
sim_start = None sim_start = None
last_curve_date = None last_curve_date = None # earliest end across all scenarios — SPY clipped here
for sc, color in zip(scenarios, colors): for sc, color in zip(scenarios, colors):
s = Strategy( s = Strategy(
@ -151,13 +151,15 @@ def plot_equity_curves(prices: dict, out_dir: str = PLOTS_DIR) -> str:
spread=sc["spread"], slippage=sc["slippage"], commission=0, spread=sc["spread"], slippage=sc["slippage"], commission=0,
cap_tier=sc["cap_tier"], cap_tier=sc["cap_tier"],
) )
r = simulate(s, prices=prices) print(f" equity curve: {sc['label']}...", flush=True)
r = simulate(s, prices=prices, _signals=signals, _market_caps=market_caps)
curve = r.get("equity_curve", []) curve = r.get("equity_curve", [])
if not curve: if not curve:
continue continue
sim_start = sim_start or r["period"]["start"] sim_start = sim_start or r["period"]["start"]
last_curve_date = curve[-1][0] # actual last signal date in this curve end = curve[-1][0]
last_curve_date = min(last_curve_date, end) if last_curve_date else end
dates = [datetime.strptime(d, "%Y-%m-%d") for d, _ in curve] dates = [datetime.strptime(d, "%Y-%m-%d") for d, _ in curve]
values = [v for _, v in curve] values = [v for _, v in curve]
@ -166,14 +168,16 @@ def plot_equity_curves(prices: dict, out_dir: str = PLOTS_DIR) -> str:
label=sc["label"], color=color, linewidth=1.8) label=sc["label"], color=color, linewidth=1.8)
# SPY buy-and-hold overlay — clamp to last data point of strategy curves # SPY buy-and-hold overlay — clamp to last data point of strategy curves
spy_px = prices.get("SPY", {}) spy_entry = prices.get("SPY")
if spy_px and sim_start and last_curve_date: if spy_entry and spy_entry[0] and sim_start and last_curve_date:
spy_dates = sorted(d for d in spy_px if sim_start <= d <= last_curve_date) spy_dates_all, spy_closes_all = spy_entry
if spy_dates: spy_pairs = [(d, c) for d, c in zip(spy_dates_all, spy_closes_all)
base = spy_px[spy_dates[0]] if sim_start <= d <= last_curve_date]
if spy_pairs:
base = spy_pairs[0][1]
ax.plot( ax.plot(
[datetime.strptime(d, "%Y-%m-%d") for d in spy_dates], [datetime.strptime(d, "%Y-%m-%d") for d, _ in spy_pairs],
[spy_px[d] / base * 100 for d in spy_dates], [c / base * 100 for _, c in spy_pairs],
label="SPY buy & hold", color="black", linewidth=2.2, linestyle="--", label="SPY buy & hold", color="black", linewidth=2.2, linestyle="--",
) )
@ -199,6 +203,74 @@ def plot_equity_curves(prices: dict, out_dir: str = PLOTS_DIR) -> str:
return out return out
def plot_position_size(prices: dict, out_dir: str = PLOTS_DIR, signals=None, market_caps=None) -> str:
"""
Line chart: annualised return vs position size for each cap tier.
Shows whether 10% is conservative or optimal.
"""
matplotlib, plt, mdates, np = _get_matplotlib()
pos_sizes = [0.03, 0.05, 0.07, 0.10, 0.15, 0.20, 0.25]
tiers = [
("Large (~0.2% RT)", "large", 0.001, 0.001),
("Mid (~0.5% RT)", "mid", 0.0015, 0.0015),
("Small (~0.8% RT)", "small", 0.003, 0.002),
("Micro (~1.6% RT)", "micro", 0.005, 0.003),
]
colors = ["#2ecc71", "#3498db", "#e67e22", "#e74c3c"]
fig, ax = plt.subplots(figsize=(10, 6))
spy_ann = None
total = len(tiers) * len(pos_sizes)
done = 0
for (label, cap_tier, spread, slippage), color in zip(tiers, colors):
ann_returns = []
for ps in pos_sizes:
s = Strategy(
holding_days=7, buy_delay=1,
spread=spread, slippage=slippage, commission=0,
cap_tier=cap_tier, position_size=ps,
)
r = simulate(s, prices=prices, _signals=signals, _market_caps=market_caps)
perf = r.get("performance", {})
ann_returns.append(perf.get("annualized_return_pct", 0.0))
if spy_ann is None:
spy_ann = perf.get("spy_annualized_pct", 16.0)
done += 1
print(f" [{done}/{total}] {label} pos={ps:.0%} ann={ann_returns[-1]:.1f}%", flush=True)
ax.plot([p * 100 for p in pos_sizes], ann_returns,
label=label, color=color, linewidth=2, marker="o", markersize=5)
if spy_ann is not None:
ax.axhline(spy_ann, color="black", linewidth=1.8, linestyle="--",
label=f"SPY buy & hold ({spy_ann:.1f}%)")
ax.axvline(10, color="gray", linewidth=1, linestyle=":", alpha=0.7)
ax.text(10.3, ax.get_ylim()[0] + 1, "default\n(10%)", fontsize=8, color="gray")
ax.set_xlabel("Position size (% of available cash per signal)", fontsize=11)
ax.set_ylabel("Annualised return (%)", fontsize=11)
ax.set_title(
"Position size sensitivity by cap tier (7d hold, 1d delay, Alpaca costs)",
fontsize=12,
)
ax.legend(fontsize=10)
ax.grid(True, alpha=0.25)
plt.tight_layout()
os.makedirs(out_dir, exist_ok=True)
out = os.path.join(out_dir, "position_size.png")
plt.savefig(out, dpi=150, bbox_inches="tight")
plt.close()
logger.info(f"Saved {out}")
return out
def main(): def main():
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@ -211,13 +283,22 @@ def main():
logger.info("Loading price cache...") logger.info("Loading price cache...")
prices = _load_all_prices() prices = _load_all_prices()
logger.info("Generating HP heatmap (49 simulations)...") logger.info("Pre-fetching signals and market caps...")
p1 = plot_hp_heatmap(prices) signals = get_signals_for_backtest(0.0, 1)
tickers = list({s["ticker"] for s in signals})
market_caps = _fetch_market_caps(tickers)
logger.info(f" {len(signals)} signals, {len(market_caps)} market caps cached")
logger.info("Generating HP heatmap (168 simulations)...")
p1 = plot_hp_heatmap(prices, signals=signals, market_caps=market_caps)
logger.info("Generating equity curves (4 simulations)...") logger.info("Generating equity curves (4 simulations)...")
p2 = plot_equity_curves(prices) p2 = plot_equity_curves(prices, signals=signals, market_caps=market_caps)
print(f"\nPlots saved:\n {p1}\n {p2}\n") logger.info("Generating position size sensitivity (28 simulations)...")
p3 = plot_position_size(prices, signals=signals, market_caps=market_caps)
print(f"\nPlots saved:\n {p1}\n {p2}\n {p3}\n")
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -45,24 +45,38 @@ CAP_TIERS = {
def _fetch_market_caps(tickers: list[str]) -> dict[str, float]: def _fetch_market_caps(tickers: list[str]) -> dict[str, float]:
"""Return market cap for each ticker, using DB cache then yfinance for misses.""" """Return market cap for each ticker, using DB cache then yfinance for misses."""
import yfinance as yf import yfinance as yf
from concurrent.futures import ThreadPoolExecutor, as_completed
cached = get_cached_market_caps(tickers) cached = get_cached_market_caps(tickers)
missing = [t for t in tickers if t not in cached] # Skip tickers with special chars that yfinance can't handle
missing = [t for t in tickers if t not in cached and "/" not in t]
if missing: if missing:
logger.info(f"Fetching market caps for {len(missing)} tickers via yfinance...") logger.info(f"Fetching market caps for {len(missing)} tickers via yfinance (parallel)...")
fetched = {} fetched = {}
for ticker in missing:
def _get_cap(ticker):
try: try:
info = yf.Ticker(ticker).fast_info cap = getattr(yf.Ticker(ticker).fast_info, "market_cap", None)
cap = getattr(info, "market_cap", None) return ticker, float(cap) if cap else None
if cap:
fetched[ticker] = float(cap)
except Exception: except Exception:
pass return ticker, None
if fetched:
upsert_market_caps(fetched) with ThreadPoolExecutor(max_workers=20) as pool:
cached.update(fetched) futures = {pool.submit(_get_cap, t): t for t in missing}
done = 0
for future in as_completed(futures):
ticker, cap = future.result()
done += 1
if cap:
fetched[ticker] = cap
if done % 50 == 0:
print(f" market caps: {done}/{len(missing)} fetched, {len(fetched)} hits", flush=True)
# Cache all results (including None = not found) to avoid re-querying
all_results = {t: fetched.get(t) for t in missing}
upsert_market_caps(all_results)
cached.update({t: v for t, v in all_results.items() if v is not None})
return cached return cached
@ -73,39 +87,47 @@ logger = logging.getLogger(__name__)
# Price loading # Price loading
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def _load_all_prices() -> dict[str, dict[str, float]]: def _load_all_prices() -> dict[str, tuple[list, list]]:
"""Load entire price cache from DB into memory: {ticker: {date: close}}.""" """
Load price cache from DB into memory.
Returns {ticker: (sorted_dates, closes)} for O(log n) bisect lookups.
"""
from sqlalchemy import create_engine, text from sqlalchemy import create_engine, text
from collections import defaultdict
engine = create_engine( engine = create_engine(
f"sqlite:///{config.DB_PATH}", f"sqlite:///{config.DB_PATH}",
connect_args={"check_same_thread": False}, connect_args={"check_same_thread": False},
) )
with engine.connect() as conn: with engine.connect() as conn:
rows = conn.execute(text("SELECT ticker, date, close FROM price_cache")).fetchall() rows = conn.execute(text(
"SELECT ticker, date, close FROM price_cache ORDER BY ticker, date"
)).fetchall()
prices: dict[str, dict[str, float]] = defaultdict(dict) raw: dict[str, list] = defaultdict(list)
for ticker, date, close in rows: for ticker, date, close in rows:
prices[ticker][date] = close raw[ticker].append((date, close))
logger.info(f"Loaded prices for {len(prices)} tickers ({sum(len(v) for v in prices.values())} rows)")
return dict(prices) prices = {
ticker: ([d for d, _ in pairs], [c for _, c in pairs])
for ticker, pairs in raw.items()
}
logger.info(f"Loaded prices for {len(prices)} tickers ({sum(len(v[0]) for v in prices.values())} rows)")
return prices
def _closest_price_on_or_after(prices: dict[str, float], date_str: str) -> float | None: def _closest_price_on_or_after(prices: tuple, date_str: str) -> float | None:
for d in sorted(prices): import bisect
if d >= date_str: dates, closes = prices
return prices[d] i = bisect.bisect_left(dates, date_str)
return None return closes[i] if i < len(closes) else None
def _closest_price_on_or_before(prices: dict[str, float], date_str: str) -> float | None: def _closest_price_on_or_before(prices: tuple, date_str: str) -> float | None:
result = None import bisect
for d in sorted(prices): dates, closes = prices
if d <= date_str: i = bisect.bisect_right(dates, date_str) - 1
result = prices[d] return closes[i] if i >= 0 else None
else:
break
return result
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -152,12 +174,14 @@ class Strategy:
return self.entry_cost + self.exit_cost return self.entry_cost + self.exit_cost
def simulate(strategy: Strategy, prices: dict = None) -> dict: def simulate(strategy: Strategy, prices: dict = None,
signals = get_signals_for_backtest(strategy.min_score, strategy.min_cluster) _signals: list = None, _market_caps: dict = None) -> dict:
if _signals is None:
_signals = get_signals_for_backtest(strategy.min_score, strategy.min_cluster)
# Filter malformed dates # Filter malformed dates
valid = [] valid = []
for s in signals: for s in _signals:
try: try:
date_str = s["trigger_date"][:10] date_str = s["trigger_date"][:10]
yr = int(date_str[:4]) yr = int(date_str[:4])
@ -176,12 +200,13 @@ def simulate(strategy: Strategy, prices: dict = None) -> dict:
if tier is None: if tier is None:
raise ValueError(f"Unknown cap_tier {strategy.cap_tier!r}. Use: {list(CAP_TIERS)}") raise ValueError(f"Unknown cap_tier {strategy.cap_tier!r}. Use: {list(CAP_TIERS)}")
cap_min, cap_max = tier cap_min, cap_max = tier
tickers = list({s["ticker"] for s in signals}) if _market_caps is None:
market_caps = _fetch_market_caps(tickers) tickers = list({s["ticker"] for s in signals})
_market_caps = _fetch_market_caps(tickers)
signals = [ signals = [
s for s in signals s for s in signals
if market_caps.get(s["ticker"], 0) >= cap_min if _market_caps.get(s["ticker"], 0) >= cap_min
and (cap_max is None or market_caps.get(s["ticker"], 0) < cap_max) and (cap_max is None or _market_caps.get(s["ticker"], 0) < cap_max)
] ]
logger.info(f"Cap tier '{strategy.cap_tier}': {len(signals)} signals after filtering") logger.info(f"Cap tier '{strategy.cap_tier}': {len(signals)} signals after filtering")
if not signals: if not signals:
@ -213,7 +238,7 @@ def simulate(strategy: Strategy, prices: dict = None) -> dict:
trades_executed = 0 trades_executed = 0
trades_skipped_no_price = 0 trades_skipped_no_price = 0
spy_prices = prices.get("SPY", {}) spy_prices = prices.get("SPY", ([], []))
for date_str in all_dates: for date_str in all_dates:
# 1. Close any positions whose exit_date <= today # 1. Close any positions whose exit_date <= today
@ -221,7 +246,7 @@ def simulate(strategy: Strategy, prices: dict = None) -> dict:
for pos in open_positions: for pos in open_positions:
exit_dt_str, ticker, cost_basis, shares, notional = pos exit_dt_str, ticker, cost_basis, shares, notional = pos
if exit_dt_str <= date_str: if exit_dt_str <= date_str:
px = prices.get(ticker, {}) px = prices.get(ticker, ([], []))
exit_price = _closest_price_on_or_before(px, exit_dt_str) exit_price = _closest_price_on_or_before(px, exit_dt_str)
if exit_price is None: if exit_price is None:
exit_price = _closest_price_on_or_before(px, date_str) exit_price = _closest_price_on_or_before(px, date_str)
@ -258,7 +283,7 @@ def simulate(strategy: Strategy, prices: dict = None) -> dict:
# 2. Open new positions for today's signals # 2. Open new positions for today's signals
for ticker, exit_date_str, sig in trades_by_entry[date_str]: for ticker, exit_date_str, sig in trades_by_entry[date_str]:
px = prices.get(ticker, {}) px = prices.get(ticker, ([], []))
entry_price = _closest_price_on_or_after(px, date_str) entry_price = _closest_price_on_or_after(px, date_str)
if entry_price is None: if entry_price is None:
trades_skipped_no_price += 1 trades_skipped_no_price += 1
@ -281,7 +306,7 @@ def simulate(strategy: Strategy, prices: dict = None) -> dict:
# Close all remaining open positions at last available price # Close all remaining open positions at last available price
for exit_dt_str, ticker, cost_basis, shares, notional in open_positions: for exit_dt_str, ticker, cost_basis, shares, notional in open_positions:
px = prices.get(ticker, {}) px = prices.get(ticker, ([], []))
exit_price = _closest_price_on_or_before(px, exit_dt_str) or cost_basis exit_price = _closest_price_on_or_before(px, exit_dt_str) or cost_basis
gross_return = (exit_price - cost_basis) / cost_basis gross_return = (exit_price - cost_basis) / cost_basis
net_return = gross_return - strategy.exit_cost net_return = gross_return - strategy.exit_cost
@ -290,7 +315,7 @@ def simulate(strategy: Strategy, prices: dict = None) -> dict:
final_value = cash final_value = cash
# SPY benchmark # SPY benchmark
if equity_curve and spy_prices: if equity_curve and spy_prices[0]:
start_str = equity_curve[0][0] start_str = equity_curve[0][0]
end_str = equity_curve[-1][0] end_str = equity_curve[-1][0]
spy_start = _closest_price_on_or_after(spy_prices, start_str) spy_start = _closest_price_on_or_after(spy_prices, start_str)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 173 KiB

After

Width:  |  Height:  |  Size: 209 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 131 KiB

After

Width:  |  Height:  |  Size: 230 KiB

BIN
plots/position_size.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 104 KiB