feat: cap-tier filtering, Alpaca cost model, README cleanup

- simulate.py: --cap-tier large|mid|small|micro; yfinance market cap fetch
  with DB cache (ticker_meta table); argv fix for main.py dispatch
- plot.py: equity curves now show cap tiers with Alpaca costs (zero commission);
  HP sweep uses Alpaca cost decomposition; SPY line clamped to last strategy date
- db/models.py: TickerMeta table
- db/db.py: get_cached_market_caps, upsert_market_caps
- README: add --cap-tier to simulate docs; backfill note (~3 days for 2 years
  at SEC 10 req/s limit); remove duplicate setup block; remove em-dashes in prose;
  results table tilde estimates to be updated once cap-tier sims complete

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Dominik Moritz Roth 2026-05-26 18:10:09 +02:00
parent 56ec0b4a81
commit d0e98b9cb7
6 changed files with 127 additions and 381 deletions

340
PLAN.md
View File

@ -1,340 +0,0 @@
# Insider Copytrade System -- Implementation Plan
## Description
A personal system that monitors SEC EDGAR Form 4 filings in real-time, filters for high-quality insider buying signals, alerts via Slack, and optionally executes trades automatically through Alpaca's paper or live trading API.
The system is fully self-hosted, uses only free/public data sources, and requires no third-party data subscriptions.
---
## Background
Company insiders (executives, directors, >10% shareholders) must file SEC Form 4 within 2 business days of any trade. This is public data via SEC EDGAR. The signal value of insider *buying* is academically documented -- executives buying their own stock with personal capital is a meaningful vote of confidence, particularly when:
- Multiple insiders buy simultaneously (cluster signal)
- The trade is unplanned (not a 10b5-1 scheduled plan)
- The company is small/mid-cap (less institutional arbitrage)
The edge vs. political trade copying: 2-day disclosure lag vs. 45 days, and the signal is company-specific rather than sector-level.
**Key risk:** This signal is publicly known and tracked. The edge is in filtering quality and execution speed, not data exclusivity. Large-cap Form 4 signals are arbitraged quickly. Focus on small/mid-cap, clustered, unplanned buys.
---
## System Outline
```
SEC EDGAR RSS Feed (poll every 10 min)
|
[Ingestion Layer]
|
Parse Form 4 XML
|
[Filter Engine]
- Buy only (flag = A)
- Exclude 10b5-1 plans
- Min transaction size
- Role weighting
- Cluster detection
|
SQLite Database
|
┌────────────┬──────────────┐
| | |
[Backtester] [Slack Alert] [Alpaca API]
(manual) (paper/live)
```
---
## Actionables
### Phase 1 -- Data Ingestion
**Goal:** Reliably pull and parse Form 4 filings as they appear.
**Tasks:**
1. Set up project structure
```
insider-copytrade/
ingestion/
edgar_poller.py # polls EDGAR RSS
form4_parser.py # parses XML -> structured dict
db/
schema.sql
db.py # SQLite interface
signals/
filter_engine.py # applies signal filters
cluster_detector.py
alerts/
slack_alert.py
broker/
alpaca_client.py
backtest/
backtest.py
config.py
main.py
```
2. Poll EDGAR RSS for Form 4 filings every 10 minutes:
```
https://www.sec.gov/cgi-bin/browse-edgar?action=getcurrent&type=4&dateb=&owner=include&count=40&search_text=&action=getcurrent
```
SEC also provides a structured latest filings feed:
```
https://efts.sec.gov/LATEST/search-index?q=&forms=4
```
3. For each new filing, fetch and parse the XML document. Key fields to extract:
- `issuerTradingSymbol` (ticker)
- `rptOwnerName`, `officerTitle` (insider name + role)
- `transactionDate`
- `transactionAcquiredDisposedCode` (A = buy, D = sell)
- `transactionShares`, `transactionPricePerShare`
- `transactionTotalValue` (compute if not present)
- `footnotes` (check for "10b5-1" mention)
- `sharesOwnedFollowingTransaction`
4. Store raw filing XML + parsed fields. Track `accessionNumber` as dedup key.
**SQLite schema:**
```sql
CREATE TABLE filings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
accession_number TEXT UNIQUE,
ticker TEXT,
cik TEXT,
insider_name TEXT,
role TEXT,
transaction_date TEXT,
filed_date TEXT,
shares REAL,
price REAL,
total_value REAL,
flag TEXT, -- A or D
is_10b51 INTEGER, -- 0 or 1
post_tx_shares REAL,
created_at TEXT
);
CREATE TABLE signals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT,
trigger_date TEXT,
cluster_size INTEGER,
total_cluster_value REAL,
score REAL,
alerted INTEGER DEFAULT 0,
executed INTEGER DEFAULT 0,
created_at TEXT
);
```
---
### Phase 2 -- Filter Engine
**Goal:** Reduce noise to actionable signals only.
**Filters to apply (in order):**
| Filter | Logic |
|---|---|
| Buy only | `flag == 'A'` |
| Exclude 10b5-1 | Scan footnotes for "10b5-1", "Rule 10b5", "adopted a plan" |
| Min transaction value | `total_value >= 50000` (configurable) |
| Exclude derivative transactions | Options exercises are weaker signal than open market purchases |
| Role weighting | CEO/CFO/President = high; Director = medium; 10% owner = context-dependent |
| Cluster detection | 2+ insiders buying same ticker within 30 days = elevated signal |
**Scoring formula (simple v1):**
```python
score = base_role_weight * log(total_value) * cluster_multiplier
# cluster_multiplier = 1.0 + (0.5 * (cluster_size - 1))
```
Expose all thresholds in `config.py` for easy tuning during backtesting.
---
### Phase 3 -- SQLite Storage
SQLite is sufficient for this workload (low write volume, single process). Use WAL mode for concurrent reads during backtesting:
```python
conn = sqlite3.connect('insider.db')
conn.execute('PRAGMA journal_mode=WAL')
```
Keep raw filing XML in a `/data/filings/` directory keyed by accession number. Parse on ingest, re-parse never needed.
---
### Phase 4 -- Slack Alerts
**Goal:** Get notified immediately when a signal fires, with enough context to decide manually.
1. Create a Slack app, get a webhook URL (takes 5 minutes)
2. Alert format:
```
INSIDER BUY SIGNAL
Ticker: $ACME
Insider: John Smith (CEO)
Date: 2025-05-01
Shares: 10,000 @ $14.50 = $145,000
Cluster: 3 insiders in last 14 days
Score: 8.4
10b5-1: No
EDGAR: https://www.sec.gov/cgi-bin/browse-edgar?...
```
3. Alert only on signals above configurable score threshold
4. Mark `alerted = 1` in DB after sending to avoid duplicates on re-poll
```python
import requests
def send_slack_alert(webhook_url, signal):
requests.post(webhook_url, json={"text": format_signal(signal)})
```
---
### Phase 5 -- Backtesting
**Goal:** Validate filter parameters on historical data before going live.
**Data:**
- Historical Form 4 filings: download bulk XML from `https://www.sec.gov/dera/data/form-4-data`
- Price data: `yfinance` (free, sufficient for backtesting)
**Backtest logic:**
```python
# For each signal in historical data:
# - Entry: next market open after filed_date
# - Exit: N days later (configurable: 30/60/90/180)
# - Calculate return vs SPY over same period
# - Aggregate by role, cluster_size, market_cap bucket
```
**Use `vectorbt` for performance:**
```python
import vectorbt as vbt
# Build entry/exit signal matrices aligned to price data
# Run portfolio simulation with configurable position sizing
```
**Output metrics:**
- Annualized return vs SPY benchmark
- Win rate
- Avg return by holding period
- Avg return by role / cluster size
- Max drawdown
- Sharpe ratio
**Critical:** Test on post-2022 data specifically. Pre-2022 results are likely inflated -- the signal became widely tracked after Autopilot/media coverage.
**Parameter grid to test:**
```python
MIN_VALUE = [25_000, 50_000, 100_000]
HOLDING_DAYS = [30, 60, 90, 180]
CLUSTER_WINDOW = [14, 30]
MIN_CLUSTER_SIZE = [1, 2, 3]
ROLES = ['all', 'c-suite-only']
```
---
### Phase 6 -- Alpaca Integration
**Goal:** Optionally auto-execute signals. Start with paper trading.
**Paper trading base URL:** `https://paper-api.alpaca.markets`
**Live trading base URL:** `https://api.alpaca.markets`
Swap via config flag -- never hardcode.
```python
from alpaca_trade_api import REST
api = REST(
key_id=config.ALPACA_KEY,
secret_key=config.ALPACA_SECRET,
base_url=config.ALPACA_BASE_URL # paper or live
)
def execute_signal(ticker, portfolio_value, signal_score):
# Fixed fractional sizing: 2% of portfolio per signal
price = api.get_latest_trade(ticker).price
allocation = portfolio_value * 0.02
qty = int(allocation / price)
if qty < 1:
return
api.submit_order(
symbol=ticker,
qty=qty,
side='buy',
type='market',
time_in_force='day'
)
```
Position sizing: start at 2% per signal, max 10% in any single ticker. Add a max open positions limit (e.g. 20) to cap exposure.
Exit logic (v1): time-based only (close after N days). Add trailing stop later.
---
## Build Order
| Step | Deliverable | Est. Time |
|---|---|---|
| 1 | EDGAR poller + Form 4 XML parser + SQLite storage | 1 day |
| 2 | Filter engine + cluster detector | 0.5 day |
| 3 | Slack alert | 1 hour |
| 4 | Historical data download + backtest | 1-2 days |
| 5 | Alpaca paper trading integration | 0.5 day |
| 6 | Run paper trading 4-8 weeks, monitor | -- |
| 7 | Switch to live with small capital | -- |
Do not proceed to Step 7 without meaningful paper trading history.
---
## Dependencies
```
requests
lxml
sqlite3 (stdlib)
yfinance
vectorbt
alpaca-trade-api
python-dotenv
```
All free. No paid APIs required.
---
## Config Template
```python
# config.py
EDGAR_POLL_INTERVAL = 600 # seconds
MIN_TRANSACTION_VALUE = 50_000
MIN_CLUSTER_SIZE = 1 # raise to 2 for higher quality
CLUSTER_WINDOW_DAYS = 30
HOLDING_PERIOD_DAYS = 90
POSITION_SIZE_PCT = 0.02 # 2% per signal
MAX_POSITIONS = 20
SCORE_ALERT_THRESHOLD = 5.0
SLACK_WEBHOOK_URL = ""
ALPACA_KEY = ""
ALPACA_SECRET = ""
ALPACA_BASE_URL = "https://paper-api.alpaca.markets" # switch for live
```

View File

@ -44,7 +44,7 @@ cp .env.example .env # fill in credentials
# Live polling (every 10 min)
python main.py run
# Bulk-ingest historical filings
# Bulk-ingest historical filings (2 years took ~3 days at SEC's 10 req/s rate limit)
python main.py backfill --years 2023 2024
python main.py backfill --year 2024 --quarter 1
@ -67,9 +67,10 @@ Strategy:
--position-size F Fraction of available cash per trade (default: 0.10)
--min-score F Minimum signal score (default: 0.0)
--min-cluster N Minimum cluster size (default: 1)
--cap-tier large|mid|small|micro Filter by market cap tier (default: all)
--capital F Initial capital (default: 100000)
Transaction costs:
Transaction costs (Alpaca has zero commission, set --commission 0):
--spread F One-way bid-ask half-spread at entry and exit (default: 0.003)
--slippage F Entry slippage / market impact (default: 0.002)
--commission F Per-trade commission as fraction of notional (default: 0.001)
@ -77,12 +78,10 @@ Transaction costs:
Round-trip = spread x 2 + slippage + commission x 2.
## Setup
Cap tiers: large >$10B, mid $2-10B, small $300M-2B, micro <$300M.
Market caps are fetched from yfinance on first use and cached in the DB.
```bash
cp .env.example .env
pip install -r requirements.txt
```
## Setup
| Variable | Default | Description |
|---|---|---|
@ -147,7 +146,7 @@ Alpaca charges $0 commission on US equities. Real costs are spread + slippage on
SPY annualised over the same period: ~+16%.
Break-even is roughly 0.3-0.5% round-trip. On Alpaca that means large-cap stocks only -- but most insider buying happens in small and mid-cap names, so filtering aggressively kills signal count.
Break-even is roughly 0.3-0.5% round-trip. On Alpaca that means large-cap stocks only. Most insider buying happens in small and mid-cap names, so filtering aggressively kills signal count.
### Is insidercopytrading.com a scam?
@ -174,14 +173,14 @@ Alpaca integration exists in the codebase (`broker/alpaca_client.py`) but is not
| `ingestion/edgar_poller.py` | EDGAR Atom feed polling |
| `ingestion/sec_bulk_ingest.py` | Bulk historical ingest via form.idx |
| `ingestion/form4_parser.py` | Form 4 XML parser; 10b5-1 detection |
| `db/models.py` | SQLAlchemy ORM models |
| `db/models.py` | SQLAlchemy ORM models (Filing, Signal, PriceCache, TickerMeta) |
| `db/db.py` | DB access layer |
| `signals/filter_engine.py` | Filing to signal pipeline |
| `signals/cluster_detector.py` | Cluster detection |
| `alerts/slack_alert.py` | Slack webhook |
| `broker/alpaca_client.py` | Alpaca order execution |
| `backtest/backtest.py` | Per-signal backtest |
| `backtest/simulate.py` | Portfolio simulator |
| `backtest/simulate.py` | Portfolio simulator with cap-tier filtering |
| `backtest/plot.py` | Plot generator |
| `main.py` | CLI: `run / backfill / backtest / simulate / plot` |

View File

@ -41,15 +41,11 @@ def plot_hp_heatmap(prices: dict, out_dir: str = PLOTS_DIR) -> str:
hold_days = [3, 5, 7, 10, 14, 21, 30]
rt_pcts = [0.3, 0.5, 0.7, 1.0, 1.2, 1.5, 2.0]
# decompose round-trip into (spread, slippage, commission) that sum correctly:
# roundtrip = 2*spread + slippage + 2*commission
# allocate 40% spread, 40% slippage, 20% commission (all relative to RT)
# => spread = RT*0.4/2 = RT*0.2 (one-way)
# => slippage = RT*0.4
# => commission = RT*0.2/2 = RT*0.1 (one-way)
# verify: 2*0.2 + 0.4 + 2*0.1 = 0.4+0.4+0.2 = 1.0 * RT ✓
# Alpaca: zero commission. Decompose RT into spread + slippage only (50/50).
# roundtrip = 2*spread + slippage => spread = RT*0.25, slippage = RT*0.5
# verify: 2*0.25 + 0.5 = 1.0 * RT ✓
def _costs(rt):
return dict(spread=rt * 0.2, slippage=rt * 0.4, commission=rt * 0.1)
return dict(spread=rt * 0.25, slippage=rt * 0.5, commission=0)
rows_excess = []
rows_ann = []
@ -116,7 +112,7 @@ def plot_hp_heatmap(prices: dict, out_dir: str = PLOTS_DIR) -> str:
ax.text(j, i, txt, ha="center", va="center", fontsize=7.5, color=color)
fig.suptitle(
"HP sweep: 1-day entry delay, 10% position size, buy filter only",
"HP sweep: Alpaca (zero commission), 1-day entry delay, 10% position size, all cap tiers",
fontsize=12,
)
plt.tight_layout()
@ -135,22 +131,25 @@ def plot_equity_curves(prices: dict, out_dir: str = PLOTS_DIR) -> str:
"""
matplotlib, plt, mdates, np = _get_matplotlib()
# Alpaca zero-commission costs by cap tier (spread + slippage only)
scenarios = [
{"label": "0% RT cost (theoretical)", "spread": 0, "slippage": 0, "commission": 0},
{"label": "0.67% RT (best case)", "spread": 0.0014, "slippage": 0.0027, "commission": 0.0007},
{"label": "1.0% RT (mid)", "spread": 0.002, "slippage": 0.004, "commission": 0.001},
{"label": "1.5% RT (realistic small-cap)","spread": 0.003, "slippage": 0.006, "commission": 0.0015},
{"label": "Large cap (~0.2% RT)", "cap_tier": "large", "spread": 0.001, "slippage": 0.001},
{"label": "Mid cap (~0.5% RT)", "cap_tier": "mid", "spread": 0.0025, "slippage": 0.0025},
{"label": "Small cap (~0.8% RT)", "cap_tier": "small", "spread": 0.004, "slippage": 0.004},
{"label": "All tickers (0% RT)", "cap_tier": None, "spread": 0, "slippage": 0},
]
fig, ax = plt.subplots(figsize=(13, 7))
colors = ["#2ecc71", "#3498db", "#e67e22", "#e74c3c"]
sim_start = sim_end = None
colors = ["#2ecc71", "#3498db", "#e67e22", "#aaaaaa"]
sim_start = None
last_curve_date = None
for sc, color in zip(scenarios, colors):
s = Strategy(
holding_days=7, buy_delay=1,
spread=sc["spread"], slippage=sc["slippage"], commission=sc["commission"],
spread=sc["spread"], slippage=sc["slippage"], commission=0,
cap_tier=sc["cap_tier"],
)
r = simulate(s, prices=prices)
curve = r.get("equity_curve", [])
@ -158,7 +157,7 @@ def plot_equity_curves(prices: dict, out_dir: str = PLOTS_DIR) -> str:
continue
sim_start = sim_start or r["period"]["start"]
sim_end = r["period"]["end"]
last_curve_date = curve[-1][0] # actual last signal date in this curve
dates = [datetime.strptime(d, "%Y-%m-%d") for d, _ in curve]
values = [v for _, v in curve]
@ -166,10 +165,10 @@ def plot_equity_curves(prices: dict, out_dir: str = PLOTS_DIR) -> str:
ax.plot(dates, [v / base * 100 for v in values],
label=sc["label"], color=color, linewidth=1.8)
# SPY buy-and-hold overlay
# SPY buy-and-hold overlay — clamp to last data point of strategy curves
spy_px = prices.get("SPY", {})
if spy_px and sim_start and sim_end:
spy_dates = sorted(d for d in spy_px if sim_start <= d <= sim_end)
if spy_px and sim_start and last_curve_date:
spy_dates = sorted(d for d in spy_px if sim_start <= d <= last_curve_date)
if spy_dates:
base = spy_px[spy_dates[0]]
ax.plot(
@ -182,7 +181,7 @@ def plot_equity_curves(prices: dict, out_dir: str = PLOTS_DIR) -> str:
ax.set_xlabel("Date", fontsize=11)
ax.set_ylabel("Portfolio value (indexed to 100)", fontsize=11)
ax.set_title(
"Insider Copytrade: equity curves vs SPY (7d hold, 1d delay, 10% position size)",
"Insider Copytrade: equity curves by cap tier, Alpaca costs (7d hold, 1d delay, 10% position size)",
fontsize=12,
)
ax.legend(fontsize=10)

View File

@ -32,7 +32,39 @@ from datetime import datetime, timedelta
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import config
from db.db import get_signals_for_backtest
from db.db import get_signals_for_backtest, get_cached_market_caps, upsert_market_caps
CAP_TIERS = {
"large": (10_000_000_000, None),
"mid": (2_000_000_000, 10_000_000_000),
"small": (300_000_000, 2_000_000_000),
"micro": (0, 300_000_000),
}
def _fetch_market_caps(tickers: list[str]) -> dict[str, float]:
"""Return market cap for each ticker, using DB cache then yfinance for misses."""
import yfinance as yf
cached = get_cached_market_caps(tickers)
missing = [t for t in tickers if t not in cached]
if missing:
logger.info(f"Fetching market caps for {len(missing)} tickers via yfinance...")
fetched = {}
for ticker in missing:
try:
info = yf.Ticker(ticker).fast_info
cap = getattr(info, "market_cap", None)
if cap:
fetched[ticker] = float(cap)
except Exception:
pass
if fetched:
upsert_market_caps(fetched)
cached.update(fetched)
return cached
logger = logging.getLogger(__name__)
@ -92,6 +124,7 @@ class Strategy:
spread: float = 0.003,
slippage: float = 0.002,
commission: float = 0.001,
cap_tier: str = None,
):
self.holding_days = holding_days
self.buy_delay = buy_delay
@ -102,6 +135,7 @@ class Strategy:
self.spread = spread
self.slippage = slippage
self.commission = commission
self.cap_tier = cap_tier # "large" | "mid" | "small" | "micro" | None
# cost applied at entry: half-spread + slippage + commission
@property
@ -137,6 +171,22 @@ def simulate(strategy: Strategy, prices: dict = None) -> dict:
if not signals:
return {"error": "No signals after filtering"}
if strategy.cap_tier:
tier = CAP_TIERS.get(strategy.cap_tier)
if tier is None:
raise ValueError(f"Unknown cap_tier {strategy.cap_tier!r}. Use: {list(CAP_TIERS)}")
cap_min, cap_max = tier
tickers = list({s["ticker"] for s in signals})
market_caps = _fetch_market_caps(tickers)
signals = [
s for s in signals
if market_caps.get(s["ticker"], 0) >= cap_min
and (cap_max is None or market_caps.get(s["ticker"], 0) < cap_max)
]
logger.info(f"Cap tier '{strategy.cap_tier}': {len(signals)} signals after filtering")
if not signals:
return {"error": f"No signals after cap_tier={strategy.cap_tier} filter"}
if prices is None:
prices = _load_all_prices()
@ -291,6 +341,7 @@ def simulate(strategy: Strategy, prices: dict = None) -> dict:
"min_score": strategy.min_score,
"min_cluster": strategy.min_cluster,
"roundtrip_cost_pct": round(strategy.roundtrip_cost * 100, 3),
"cap_tier": strategy.cap_tier or "all",
},
"period": {
"start": equity_curve[0][0] if equity_curve else "n/a",
@ -338,7 +389,7 @@ def _print_results(r: dict):
print(f"{'=' * w}")
print(f" Strategy")
print(f" Hold: {s['holding_days']}d | Delay: {s['buy_delay']}d | Size: {s['position_size']*100:.0f}% of cash")
print(f" Score ≥ {s['min_score']} | Cluster ≥ {s['min_cluster']}")
print(f" Score ≥ {s['min_score']} | Cluster ≥ {s['min_cluster']} | Cap: {s['cap_tier']}")
print(f" Round-trip cost: {s['roundtrip_cost_pct']:.2f}%")
print(f" Period: {period['start']}{period['end']} ({period['years']}y)")
print(f"{'' * w}")
@ -373,6 +424,8 @@ def main():
help="Fraction of available cash per trade (0.10 = 10%%)")
parser.add_argument("--min-score", type=float, default=0.0)
parser.add_argument("--min-cluster", type=int, default=1)
parser.add_argument("--cap-tier", choices=["large", "mid", "small", "micro"],
default=None, help="Filter by market cap tier")
parser.add_argument("--capital", type=float, default=100_000.0)
# Costs
parser.add_argument("--spread", type=float, default=0.003,
@ -382,7 +435,11 @@ def main():
parser.add_argument("--commission", type=float, default=0.001,
help="Per-trade commission as fraction of notional")
args = parser.parse_args()
# When invoked via `python main.py simulate ...`, argv[1] is 'simulate' -- skip it
raw = sys.argv[1:]
if raw and raw[0] == "simulate":
raw = raw[1:]
args = parser.parse_args(raw)
from db.db import init_db
init_db()
@ -397,6 +454,7 @@ def main():
spread=args.spread,
slippage=args.slippage,
commission=args.commission,
cap_tier=args.cap_tier,
)
result = simulate(strategy)

View File

@ -6,7 +6,7 @@ from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
import config
from db.models import Base, Filing, PriceCache, Signal
from db.models import Base, Filing, PriceCache, Signal, TickerMeta
def _engine():
@ -219,6 +219,28 @@ def get_signals_for_backtest(min_score: float, min_cluster_size: int) -> list[di
return [_signal_to_dict(r) for r in rows]
def get_cached_market_caps(tickers: list[str]) -> dict[str, float]:
if not tickers:
return {}
with _session() as session:
rows = session.scalars(
select(TickerMeta).where(TickerMeta.ticker.in_(tickers))
).all()
return {r.ticker: r.market_cap for r in rows if r.market_cap is not None}
def upsert_market_caps(caps: dict[str, float]) -> None:
with _session() as session:
for ticker, cap in caps.items():
existing = session.get(TickerMeta, ticker)
if existing:
existing.market_cap = cap
existing.fetched_at = datetime.utcnow()
else:
session.add(TickerMeta(ticker=ticker, market_cap=cap))
session.commit()
def get_cached_prices(ticker: str, start_date: str, end_date: str) -> dict[str, float]:
with _session() as session:
rows = session.scalars(

View File

@ -66,6 +66,14 @@ class Signal(Base):
)
class TickerMeta(Base):
__tablename__ = "ticker_meta"
ticker = Column(String, primary_key=True)
market_cap = Column(Float, nullable=True)
fetched_at = Column(DateTime, default=datetime.utcnow)
class PriceCache(Base):
__tablename__ = "price_cache"