From 4f743437d170d6b2e7b6d817f99cdd86495ad18e Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Sun, 11 Jan 2026 12:11:13 -0500 Subject: [PATCH] Add Databento NBBO replay ingestion --- .../ingest-options/py/databento_replay.py | 68 +++++- .../ingest-options/src/adapters/databento.ts | 206 ++++++++++++------ services/ingest-options/src/index.ts | 2 + 3 files changed, 200 insertions(+), 76 deletions(-) diff --git a/services/ingest-options/py/databento_replay.py b/services/ingest-options/py/databento_replay.py index cd15c4a..ea98b29 100644 --- a/services/ingest-options/py/databento_replay.py +++ b/services/ingest-options/py/databento_replay.py @@ -122,17 +122,32 @@ class SymbolResolver: return len(self._pending) +def _first_attr(record, names: list[str]): + for name in names: + if not name: + continue + value = getattr(record, name, None) + if value is not None: + return value + return None + + +def _to_int(value, default: int = 0) -> int: + try: + return int(value) + except (TypeError, ValueError): + return default + + def build_payload(record, symbol_override: str | None = None) -> dict | None: ts_event = getattr(record, "ts_event", None) - price = getattr(record, "price", None) - size = getattr(record, "size", None) symbol = symbol_override or ( getattr(record, "symbol", None) or getattr(record, "raw_symbol", None) or getattr(record, "instrument_id", None) ) - if ts_event is None or price is None or size is None or symbol is None: + if ts_event is None or symbol is None: return None ts_ms = normalize_ts(ts_event) @@ -144,21 +159,49 @@ def build_payload(record, symbol_override: str | None = None) -> dict | None: or getattr(record, "publisher_id", None) or getattr(record, "exchange_id", None) ) - conditions = getattr(record, "conditions", None) or getattr(record, "condition", None) - if isinstance(conditions, str): - conditions = [conditions] + + price = getattr(record, "price", None) + size = getattr(record, "size", None) + if price is not None and size is not None: + conditions = getattr(record, "conditions", None) or getattr(record, "condition", None) + if isinstance(conditions, str): + conditions = [conditions] + + payload = { + "type": "trade", + "ts": ts_ms, + "price": float(price), + "size": int(size), + "symbol": stringify(symbol), + } + + if exchange is not None: + payload["exchange"] = stringify(exchange) + if conditions: + payload["conditions"] = conditions + + return payload + + bid = _first_attr(record, ["bid_px", "bid_price", "bid"]) + ask = _first_attr(record, ["ask_px", "ask_price", "ask"]) + if bid is None or ask is None: + return None + + bid_size = _first_attr(record, ["bid_sz", "bid_size", "bid_qty", "bid_q"]) + ask_size = _first_attr(record, ["ask_sz", "ask_size", "ask_qty", "ask_q"]) payload = { + "type": "nbbo", "ts": ts_ms, - "price": float(price), - "size": int(size), + "bid": float(bid), + "ask": float(ask), + "bidSize": int(bid_size) if bid_size is not None else 0, + "askSize": int(ask_size) if ask_size is not None else 0, "symbol": stringify(symbol), } if exchange is not None: payload["exchange"] = stringify(exchange) - if conditions: - payload["conditions"] = conditions return payload @@ -239,7 +282,10 @@ def main() -> int: date = dt.datetime.utcfromtimestamp(ts_ms / 1000).date() if is_numeric_symbol(symbol): - instrument_id = int(symbol) + instrument_id = _to_int(symbol, default=-1) + if instrument_id < 0: + return + mapped = resolver.lookup(instrument_id, date) if mapped: emit_payload(build_payload(record, mapped)) diff --git a/services/ingest-options/src/adapters/databento.ts b/services/ingest-options/src/adapters/databento.ts index d1f5925..6f174d8 100644 --- a/services/ingest-options/src/adapters/databento.ts +++ b/services/ingest-options/src/adapters/databento.ts @@ -5,6 +5,7 @@ type DatabentoOptionsAdapterConfig = { apiKey: string; dataset: string; schema: string; + nbboSchema: string; start: string; end?: string; symbols: string; @@ -16,6 +17,7 @@ type DatabentoOptionsAdapterConfig = { }; type DatabentoTradeMessage = { + type: "trade"; ts: number; price: number; size: number; @@ -24,6 +26,19 @@ type DatabentoTradeMessage = { conditions?: string[] | string; }; +type DatabentoNbboMessage = { + type: "nbbo"; + ts: number; + bid: number; + ask: number; + bidSize?: number; + askSize?: number; + symbol: string; + exchange?: string; +}; + +type DatabentoReplayMessage = DatabentoTradeMessage | DatabentoNbboMessage; + type OptionContract = { root: string; expiry: string; @@ -139,45 +154,39 @@ export const createDatabentoOptionsAdapter = ( } const scriptPath = new URL("../../py/databento_replay.py", import.meta.url).pathname; - const args = [ - config.pythonBin, - scriptPath, - "--dataset", - config.dataset, - "--schema", - config.schema, - "--start", - config.start, - "--symbols", - config.symbols, - "--stype-in", - config.stypeIn, - "--stype-out", - config.stypeOut - ]; - if (config.end) { - args.push("--end", config.end); - } + const buildArgs = (schema: string): string[] => { + const args = [ + config.pythonBin, + scriptPath, + "--dataset", + config.dataset, + "--schema", + schema, + "--start", + config.start, + "--symbols", + config.symbols, + "--stype-in", + config.stypeIn, + "--stype-out", + config.stypeOut + ]; - if (config.limit > 0) { - args.push("--limit", String(config.limit)); - } - - const child = Bun.spawn(args, { - stdout: "pipe", - stderr: "inherit", - env: { - ...Bun.env, - DATABENTO_API_KEY: config.apiKey + if (config.end) { + args.push("--end", config.end); } - }); - if (!child.stdout) { - throw new Error("Databento adapter failed to attach stdout."); - } + if (config.limit > 0) { + args.push("--limit", String(config.limit)); + } - let seq = 0; + return args; + }; + + const children: Bun.Subprocess[] = []; + let tradeSeq = 0; + let nbboSeq = 0; const contractIdCache = new Map(); const warnedSymbols = new Set(); @@ -201,55 +210,122 @@ export const createDatabentoOptionsAdapter = ( const handleLine = (line: string) => { try { - const payload = JSON.parse(line) as DatabentoTradeMessage; - if (!payload) { + const payload = JSON.parse(line) as DatabentoReplayMessage; + if (!payload || typeof payload !== "object") { return; } - const price = Number(payload.price); - const size = Number(payload.size); - if (!Number.isFinite(price) || !Number.isFinite(size)) { - return; - } - - const symbol = String(payload.symbol ?? "").trim(); + const symbol = String((payload as { symbol?: unknown }).symbol ?? "").trim(); if (!symbol) { return; } - const sourceTs = normalizeTimestamp(Number(payload.ts)); + const sourceTs = normalizeTimestamp(Number((payload as { ts?: unknown }).ts)); + if (!Number.isFinite(sourceTs)) { + return; + } + const ingestTs = Date.now(); - seq += 1; + const contractId = resolveContractId(symbol); - const scaledPrice = config.priceScale === 1 ? price : price / config.priceScale; + if (payload.type === "trade") { + const price = Number(payload.price); + const size = Number(payload.size); + if (!Number.isFinite(price) || !Number.isFinite(size)) { + return; + } - const conditions = Array.isArray(payload.conditions) - ? payload.conditions.map((entry) => String(entry)) - : typeof payload.conditions === "string" - ? [payload.conditions] - : undefined; + const scaledPrice = + config.priceScale === 1 ? price : price / config.priceScale; - void handlers.onTrade({ - source_ts: sourceTs, - ingest_ts: ingestTs, - seq, - trace_id: `databento-${seq}`, - ts: sourceTs, - option_contract_id: resolveContractId(symbol), - price: scaledPrice, - size, - exchange: payload.exchange ? String(payload.exchange) : "OPRA", - conditions - }); + const conditions = Array.isArray(payload.conditions) + ? payload.conditions.map((entry) => String(entry)) + : typeof payload.conditions === "string" + ? [payload.conditions] + : undefined; + + tradeSeq += 1; + void handlers.onTrade({ + source_ts: sourceTs, + ingest_ts: ingestTs, + seq: tradeSeq, + trace_id: `databento-${tradeSeq}`, + ts: sourceTs, + option_contract_id: contractId, + price: scaledPrice, + size, + exchange: payload.exchange ? String(payload.exchange) : "OPRA", + conditions + }); + return; + } + + if (payload.type === "nbbo") { + if (!handlers.onNBBO) { + return; + } + + const bid = Number(payload.bid); + const ask = Number(payload.ask); + if (!Number.isFinite(bid) || !Number.isFinite(ask)) { + return; + } + + const scaledBid = config.priceScale === 1 ? bid : bid / config.priceScale; + const scaledAsk = config.priceScale === 1 ? ask : ask / config.priceScale; + + const bidSize = Math.max(0, Math.floor(Number(payload.bidSize ?? 0))); + const askSize = Math.max(0, Math.floor(Number(payload.askSize ?? 0))); + + nbboSeq += 1; + void handlers.onNBBO({ + source_ts: sourceTs, + ingest_ts: ingestTs, + seq: nbboSeq, + trace_id: `databento-${nbboSeq}`, + ts: sourceTs, + option_contract_id: contractId, + bid: scaledBid, + ask: scaledAsk, + bidSize, + askSize + }); + } } catch { // Ignore malformed lines to keep replay streaming. } }; - void readLines(child.stdout, handleLine); + const spawnStream = (schema: string): void => { + const trimmed = schema.trim(); + if (!trimmed) { + return; + } + + const child = Bun.spawn(buildArgs(trimmed), { + stdout: "pipe", + stderr: "inherit", + env: { + ...Bun.env, + DATABENTO_API_KEY: config.apiKey + } + }); + + if (!child.stdout) { + throw new Error("Databento adapter failed to attach stdout."); + } + + children.push(child); + void readLines(child.stdout, handleLine); + }; + + spawnStream(config.schema); + spawnStream(config.nbboSchema); return () => { - child.kill(); + for (const child of children) { + child.kill(); + } }; } }; diff --git a/services/ingest-options/src/index.ts b/services/ingest-options/src/index.ts index 3fc61ba..5d51678 100644 --- a/services/ingest-options/src/index.ts +++ b/services/ingest-options/src/index.ts @@ -46,6 +46,7 @@ const envSchema = z.object({ DATABENTO_API_KEY: z.string().default(""), DATABENTO_DATASET: z.string().default("OPRA.PILLAR"), DATABENTO_SCHEMA: z.string().default("trades"), + DATABENTO_NBBO_SCHEMA: z.string().default("tbbo"), DATABENTO_START: z.string().default(""), DATABENTO_END: z.string().default(""), DATABENTO_SYMBOLS: z.string().default("ALL"), @@ -188,6 +189,7 @@ const selectAdapter = (name: string): OptionIngestAdapter => { apiKey: env.DATABENTO_API_KEY, dataset: env.DATABENTO_DATASET, schema: env.DATABENTO_SCHEMA, + nbboSchema: env.DATABENTO_NBBO_SCHEMA, start: env.DATABENTO_START, end: env.DATABENTO_END || undefined, symbols: env.DATABENTO_SYMBOLS,