Add Databento historical replay adapter + symbol mapping; speed up replay UI + completion state

- add Databento options adapter (TS) with Python sidecar and env wiring
- add  to stream historical trades and resolve instrument_id -> raw_symbol via symbology
- include Databento + typing_extensions in ingest-options Python requirements
- expose Databento env settings in ingest-options index (dataset/schema/start/end/stype/limit/price scale/python bin)
- update README with Databento replay usage and env docs
- speed up UI replay polling/drain, add per-card replay time display
- stop replay at end and prevent fallback to synthetic by pinning replay to initial trace source
This commit is contained in:
dirtydishes 2025-12-28 21:30:24 -05:00
parent 6dc279099f
commit baaadcf105
6 changed files with 799 additions and 33 deletions

View file

@ -0,0 +1,278 @@
#!/usr/bin/env python3
import argparse
import datetime as dt
import inspect
import json
import os
import sys
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Replay Databento option trades as JSON lines.")
parser.add_argument("--dataset", required=True)
parser.add_argument("--schema", default="trades")
parser.add_argument("--start", required=True)
parser.add_argument("--end", default="")
parser.add_argument("--symbols", default="ALL")
parser.add_argument("--stype-in", dest="stype_in", default="raw_symbol")
parser.add_argument("--stype-out", dest="stype_out", default="raw_symbol")
parser.add_argument("--limit", type=int, default=0)
parser.add_argument("--api-key", dest="api_key", default="")
return parser.parse_args()
def resolve_client(db_module, api_key: str):
try:
return db_module.Historical(api_key)
except TypeError:
return db_module.Historical(key=api_key)
def normalize_symbols(value: str):
if not value or value.strip().upper() == "ALL":
return None
return [symbol.strip() for symbol in value.split(",") if symbol.strip()]
def parse_date(value: str | None) -> dt.date | None:
if not value:
return None
try:
parsed = dt.datetime.fromisoformat(value)
return parsed.date()
except ValueError:
try:
return dt.date.fromisoformat(value)
except ValueError:
return None
def normalize_ts(ts_value):
if ts_value is None:
return None
if isinstance(ts_value, dt.datetime):
return int(ts_value.timestamp() * 1000)
if isinstance(ts_value, dt.date):
return int(dt.datetime.combine(ts_value, dt.time()).timestamp() * 1000)
if isinstance(ts_value, (int, float)):
if ts_value > 1_000_000_000_000_000:
return int(ts_value / 1_000_000)
return int(ts_value)
return None
def stringify(value):
if value is None:
return None
if isinstance(value, bytes):
return value.decode("utf-8", errors="ignore")
return str(value)
def is_numeric_symbol(value: object) -> bool:
if isinstance(value, int):
return True
if isinstance(value, str):
return value.isdigit()
return False
class SymbolResolver:
def __init__(self, client, dataset: str, start_date: dt.date | None, end_date: dt.date | None):
from databento.common.symbology import InstrumentMap
self._client = client
self._dataset = dataset
self._start_date = start_date
self._end_date = end_date
self._map = InstrumentMap()
self._pending: list[int] = []
self._pending_set: set[int] = set()
def queue(self, instrument_id: int) -> None:
if instrument_id in self._pending_set:
return
self._pending_set.add(instrument_id)
self._pending.append(instrument_id)
def resolve_pending(self) -> None:
if not self._pending:
return
pending = self._pending
self._pending = []
self._pending_set.clear()
for i in range(0, len(pending), 2000):
chunk = pending[i : i + 2000]
response = self._client.symbology.resolve(
dataset=self._dataset,
symbols=chunk,
stype_in="instrument_id",
stype_out="raw_symbol",
start_date=self._start_date or dt.date.today(),
end_date=self._end_date,
)
self._map.insert_json(response)
def lookup(self, instrument_id: int, date: dt.date) -> str | None:
if instrument_id is None:
return None
return self._map.resolve(instrument_id, date)
def pending_count(self) -> int:
return len(self._pending)
def build_payload(record, symbol_override: str | None = None) -> dict | None:
ts_event = getattr(record, "ts_event", None)
price = getattr(record, "price", None)
size = getattr(record, "size", None)
symbol = symbol_override or (
getattr(record, "symbol", None)
or getattr(record, "raw_symbol", None)
or getattr(record, "instrument_id", None)
)
if ts_event is None or price is None or size is None or symbol is None:
return None
ts_ms = normalize_ts(ts_event)
if ts_ms is None:
return None
exchange = (
getattr(record, "exchange", None)
or getattr(record, "publisher_id", None)
or getattr(record, "exchange_id", None)
)
conditions = getattr(record, "conditions", None) or getattr(record, "condition", None)
if isinstance(conditions, str):
conditions = [conditions]
payload = {
"ts": ts_ms,
"price": float(price),
"size": int(size),
"symbol": stringify(symbol),
}
if exchange is not None:
payload["exchange"] = stringify(exchange)
if conditions:
payload["conditions"] = conditions
return payload
def emit_payload(payload: dict | None) -> None:
if payload is None:
return
print(json.dumps(payload), flush=True)
def main() -> int:
args = parse_args()
api_key = args.api_key or os.getenv("DATABENTO_API_KEY")
if not api_key:
sys.stderr.write("DATABENTO_API_KEY is required.\n")
return 1
try:
import databento as db
except ImportError:
sys.stderr.write("Missing Python package 'databento'. Install with pip.\n")
return 1
client = resolve_client(db, api_key)
start_date = parse_date(args.start)
end_date = parse_date(args.end) if args.end else None
resolver = SymbolResolver(client, args.dataset, start_date, end_date)
buffered: list[tuple[object, int, dt.date]] = []
kwargs = {
"dataset": args.dataset,
"schema": args.schema,
"start": args.start,
"end": args.end or None,
"symbols": normalize_symbols(args.symbols),
"stype_in": args.stype_in,
"stype_out": args.stype_out,
"limit": args.limit or None,
}
signature = inspect.signature(client.timeseries.get_range)
filtered_kwargs = {
key: value for key, value in kwargs.items() if key in signature.parameters and value is not None
}
data = client.timeseries.get_range(**filtered_kwargs)
def flush_buffer(force: bool = False) -> None:
if not buffered:
return
resolver.resolve_pending()
remaining: list[tuple[object, int, dt.date]] = []
for record, instrument_id, date in buffered:
mapped = resolver.lookup(instrument_id, date)
if mapped:
emit_payload(build_payload(record, mapped))
elif force:
emit_payload(build_payload(record, str(instrument_id)))
else:
remaining.append((record, instrument_id, date))
buffered[:] = remaining
def handle_record(record) -> None:
symbol = (
getattr(record, "symbol", None)
or getattr(record, "raw_symbol", None)
or getattr(record, "instrument_id", None)
)
ts_event = getattr(record, "ts_event", None)
ts_ms = normalize_ts(ts_event)
if ts_ms is None:
return
date = dt.datetime.utcfromtimestamp(ts_ms / 1000).date()
if is_numeric_symbol(symbol):
instrument_id = int(symbol)
mapped = resolver.lookup(instrument_id, date)
if mapped:
emit_payload(build_payload(record, mapped))
return
resolver.queue(instrument_id)
buffered.append((record, instrument_id, date))
if resolver.pending_count() >= 200:
flush_buffer()
return
emit_payload(build_payload(record))
if hasattr(data, "replay"):
try:
data.replay(callback=handle_record)
except TypeError:
data.replay(handle_record)
flush_buffer(force=True)
return 0
if hasattr(data, "__iter__"):
for record in data:
handle_record(record)
if len(buffered) >= 2000:
flush_buffer()
flush_buffer(force=True)
return 0
sys.stderr.write("Unsupported Databento response type.\n")
return 1
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -1 +1,3 @@
ib_insync>=0.9.83
databento
typing_extensions

View file

@ -0,0 +1,256 @@
import { createLogger } from "@islandflow/observability";
import type { OptionIngestAdapter, OptionIngestHandlers } from "./types";
type DatabentoOptionsAdapterConfig = {
apiKey: string;
dataset: string;
schema: string;
start: string;
end?: string;
symbols: string;
stypeIn: string;
stypeOut: string;
limit: number;
priceScale: number;
pythonBin: string;
};
type DatabentoTradeMessage = {
ts: number;
price: number;
size: number;
symbol: string;
exchange?: string;
conditions?: string[] | string;
};
type OptionContract = {
root: string;
expiry: string;
strike: number;
right: "C" | "P";
};
const logger = createLogger({ service: "ingest-options" });
const formatDate = (date: Date): string => date.toISOString().slice(0, 10);
const parseOccSymbol = (symbol: string): OptionContract | null => {
if (symbol.length < 15) {
return null;
}
const tail = symbol.slice(-15);
const rootRaw = symbol.slice(0, -15).trim();
const expiryRaw = tail.slice(0, 6);
const right = tail.slice(6, 7);
const strikeRaw = tail.slice(7);
if (!/^\d{6}$/.test(expiryRaw) || !/^\d{8}$/.test(strikeRaw)) {
return null;
}
if (right !== "C" && right !== "P") {
return null;
}
const year = 2000 + Number(expiryRaw.slice(0, 2));
const month = Number(expiryRaw.slice(2, 4)) - 1;
const day = Number(expiryRaw.slice(4, 6));
const expiryDate = new Date(Date.UTC(year, month, day));
const expiry = formatDate(expiryDate);
const strike = Number(strikeRaw) / 1000;
if (!rootRaw || !Number.isFinite(strike)) {
return null;
}
return {
root: rootRaw,
expiry,
strike,
right
};
};
const formatStrike = (strike: number): string => {
const fixed = strike.toFixed(3);
return fixed.replace(/\.?0+$/, "");
};
const formatContractId = (contract: OptionContract): string =>
`${contract.root}-${contract.expiry}-${formatStrike(contract.strike)}-${contract.right}`;
const normalizeTimestamp = (value: number): number => {
if (!Number.isFinite(value)) {
return Date.now();
}
if (value > 1_000_000_000_000_000) {
return Math.floor(value / 1_000_000);
}
return value;
};
const readLines = async (
stream: ReadableStream<Uint8Array>,
onLine: (line: string) => void
): Promise<void> => {
const reader = stream.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
const trimmed = line.trim();
if (trimmed.length > 0) {
onLine(trimmed);
}
}
}
if (buffer.trim().length > 0) {
onLine(buffer.trim());
}
};
export const createDatabentoOptionsAdapter = (
config: DatabentoOptionsAdapterConfig
): OptionIngestAdapter => {
return {
name: "databento",
start: (handlers: OptionIngestHandlers) => {
if (!config.apiKey) {
throw new Error("DATABENTO_API_KEY is required for the Databento adapter.");
}
if (!config.start) {
throw new Error("DATABENTO_START is required for the Databento adapter.");
}
const scriptPath = new URL("../../py/databento_replay.py", import.meta.url).pathname;
const args = [
config.pythonBin,
scriptPath,
"--dataset",
config.dataset,
"--schema",
config.schema,
"--start",
config.start,
"--symbols",
config.symbols,
"--stype-in",
config.stypeIn,
"--stype-out",
config.stypeOut
];
if (config.end) {
args.push("--end", config.end);
}
if (config.limit > 0) {
args.push("--limit", String(config.limit));
}
const child = Bun.spawn(args, {
stdout: "pipe",
stderr: "inherit",
env: {
...Bun.env,
DATABENTO_API_KEY: config.apiKey
}
});
if (!child.stdout) {
throw new Error("Databento adapter failed to attach stdout.");
}
let seq = 0;
const contractIdCache = new Map<string, string>();
const warnedSymbols = new Set<string>();
const resolveContractId = (symbol: string): string => {
const cached = contractIdCache.get(symbol);
if (cached) {
return cached;
}
const parsed = parseOccSymbol(symbol);
const contractId = parsed ? formatContractId(parsed) : symbol.trim() || symbol;
contractIdCache.set(symbol, contractId);
if (!parsed && !warnedSymbols.has(symbol)) {
warnedSymbols.add(symbol);
logger.warn("databento symbol parse failed; using raw symbol", { symbol });
}
return contractId;
};
const handleLine = (line: string) => {
try {
const payload = JSON.parse(line) as DatabentoTradeMessage;
if (!payload) {
return;
}
const price = Number(payload.price);
const size = Number(payload.size);
if (!Number.isFinite(price) || !Number.isFinite(size)) {
return;
}
const symbol = String(payload.symbol ?? "").trim();
if (!symbol) {
return;
}
const sourceTs = normalizeTimestamp(Number(payload.ts));
const ingestTs = Date.now();
seq += 1;
const scaledPrice = config.priceScale === 1 ? price : price / config.priceScale;
const conditions = Array.isArray(payload.conditions)
? payload.conditions.map((entry) => String(entry))
: typeof payload.conditions === "string"
? [payload.conditions]
: undefined;
void handlers.onTrade({
source_ts: sourceTs,
ingest_ts: ingestTs,
seq,
trace_id: `databento-${seq}`,
ts: sourceTs,
option_contract_id: resolveContractId(symbol),
price: scaledPrice,
size,
exchange: payload.exchange ? String(payload.exchange) : "OPRA",
conditions
});
} catch {
// Ignore malformed lines to keep replay streaming.
}
};
void readLines(child.stdout, handleLine);
return () => {
child.kill();
};
}
};
};

View file

@ -14,6 +14,7 @@ import {
} from "@islandflow/storage";
import { OptionPrintSchema, type OptionPrint } from "@islandflow/types";
import { createAlpacaOptionsAdapter } from "./adapters/alpaca";
import { createDatabentoOptionsAdapter } from "./adapters/databento";
import { createIbkrOptionsAdapter } from "./adapters/ibkr";
import { createSyntheticOptionsAdapter } from "./adapters/synthetic";
import type { OptionIngestAdapter, StopHandler } from "./adapters/types";
@ -38,6 +39,17 @@ const envSchema = z.object({
ALPACA_MONEYNESS_PCT: z.coerce.number().positive().default(0.06),
ALPACA_MONEYNESS_FALLBACK_PCT: z.coerce.number().positive().default(0.1),
ALPACA_MAX_QUOTES: z.coerce.number().int().positive().default(200),
DATABENTO_API_KEY: z.string().default(""),
DATABENTO_DATASET: z.string().default("OPRA.PILLAR"),
DATABENTO_SCHEMA: z.string().default("trades"),
DATABENTO_START: z.string().default(""),
DATABENTO_END: z.string().default(""),
DATABENTO_SYMBOLS: z.string().default("ALL"),
DATABENTO_STYPE_IN: z.string().default("raw_symbol"),
DATABENTO_STYPE_OUT: z.string().default("raw_symbol"),
DATABENTO_LIMIT: z.coerce.number().int().nonnegative().default(0),
DATABENTO_PRICE_SCALE: z.coerce.number().positive().default(1),
DATABENTO_PYTHON_BIN: z.string().default("python3"),
IBKR_HOST: z.string().default("127.0.0.1"),
IBKR_PORT: z.coerce.number().int().positive().default(7497),
IBKR_CLIENT_ID: z.coerce.number().int().nonnegative().default(0),
@ -113,6 +125,30 @@ const selectAdapter = (name: string): OptionIngestAdapter => {
});
}
if (name === "databento") {
if (!env.DATABENTO_API_KEY) {
throw new Error("DATABENTO_API_KEY is required for the databento adapter.");
}
if (!env.DATABENTO_START) {
throw new Error("DATABENTO_START is required for the databento adapter.");
}
return createDatabentoOptionsAdapter({
apiKey: env.DATABENTO_API_KEY,
dataset: env.DATABENTO_DATASET,
schema: env.DATABENTO_SCHEMA,
start: env.DATABENTO_START,
end: env.DATABENTO_END || undefined,
symbols: env.DATABENTO_SYMBOLS,
stypeIn: env.DATABENTO_STYPE_IN,
stypeOut: env.DATABENTO_STYPE_OUT,
limit: env.DATABENTO_LIMIT,
priceScale: env.DATABENTO_PRICE_SCALE,
pythonBin: env.DATABENTO_PYTHON_BIN
});
}
if (name === "ibkr") {
return createIbkrOptionsAdapter({
host: env.IBKR_HOST,