smaug/db/db.py
Claude 2e2be3e9c7 fix: address sanity-check issues + rebrand to Smaug
Co-authored-by: dodox <dodox@users.noreply.local>
2026-05-04 16:32:00 +00:00

147 lines
3.9 KiB
Python

import sqlite3
import os
from datetime import datetime
import config
def get_connection():
conn = sqlite3.connect(config.DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def init_db():
schema_path = os.path.join(os.path.dirname(__file__), "schema.sql")
with open(schema_path, "r") as f:
schema = f.read()
conn = get_connection()
conn.executescript(schema)
conn.commit()
conn.close()
def insert_filing(filing: dict) -> bool:
conn = get_connection()
try:
conn.execute(
"""
INSERT OR IGNORE INTO filings
(accession_number, ticker, cik, insider_name, role,
transaction_date, filed_date, shares, price, total_value,
flag, is_10b51, post_tx_shares)
VALUES
(:accession_number, :ticker, :cik, :insider_name, :role,
:transaction_date, :filed_date, :shares, :price, :total_value,
:flag, :is_10b51, :post_tx_shares)
""",
filing,
)
inserted = conn.execute("SELECT changes()").fetchone()[0] > 0
conn.commit()
return inserted
finally:
conn.close()
def insert_signal(signal: dict) -> int:
conn = get_connection()
try:
cur = conn.execute(
"""
INSERT INTO signals
(ticker, trigger_date, cluster_size, total_cluster_value, score)
VALUES
(:ticker, :trigger_date, :cluster_size, :total_cluster_value, :score)
""",
signal,
)
signal_id = cur.lastrowid
conn.commit()
return signal_id
finally:
conn.close()
def mark_signal_alerted(signal_id: int):
conn = get_connection()
try:
conn.execute("UPDATE signals SET alerted=1 WHERE id=?", (signal_id,))
conn.commit()
finally:
conn.close()
def mark_signal_executed(signal_id: int):
conn = get_connection()
try:
conn.execute(
"UPDATE signals SET executed=1, executed_at=? WHERE id=?",
(datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"), signal_id),
)
conn.commit()
finally:
conn.close()
def mark_signal_closed(signal_id: int):
conn = get_connection()
try:
conn.execute("UPDATE signals SET closed=1 WHERE id=?", (signal_id,))
conn.commit()
finally:
conn.close()
def get_unalerted_signals() -> list[dict]:
conn = get_connection()
try:
rows = conn.execute(
"SELECT * FROM signals WHERE alerted=0 ORDER BY created_at ASC"
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def get_executed_unclosed_signals() -> list[dict]:
conn = get_connection()
try:
rows = conn.execute(
"SELECT * FROM signals WHERE executed=1 AND closed=0 AND executed_at IS NOT NULL"
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def get_recent_buys_for_ticker(ticker: str, window_days: int) -> list:
conn = get_connection()
try:
rows = conn.execute(
"""
SELECT * FROM filings
WHERE ticker=?
AND flag='A'
AND is_10b51=0
AND transaction_date >= date('now', ? || ' days')
ORDER BY transaction_date DESC
""",
(ticker, f"-{window_days}"),
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def accession_exists(accession_number: str) -> bool:
conn = get_connection()
try:
row = conn.execute(
"SELECT 1 FROM filings WHERE accession_number=?", (accession_number,)
).fetchone()
return row is not None
finally:
conn.close()