Add Databento NBBO replay ingestion

This commit is contained in:
dirtydishes 2026-01-11 12:11:13 -05:00
parent c8c8094594
commit 4f743437d1
3 changed files with 200 additions and 76 deletions

View file

@ -122,17 +122,32 @@ class SymbolResolver:
return len(self._pending)
def _first_attr(record, names: list[str]):
for name in names:
if not name:
continue
value = getattr(record, name, None)
if value is not None:
return value
return None
def _to_int(value, default: int = 0) -> int:
try:
return int(value)
except (TypeError, ValueError):
return default
def build_payload(record, symbol_override: str | None = None) -> dict | None:
ts_event = getattr(record, "ts_event", None)
price = getattr(record, "price", None)
size = getattr(record, "size", None)
symbol = symbol_override or (
getattr(record, "symbol", None)
or getattr(record, "raw_symbol", None)
or getattr(record, "instrument_id", None)
)
if ts_event is None or price is None or size is None or symbol is None:
if ts_event is None or symbol is None:
return None
ts_ms = normalize_ts(ts_event)
@ -144,21 +159,49 @@ def build_payload(record, symbol_override: str | None = None) -> dict | None:
or getattr(record, "publisher_id", None)
or getattr(record, "exchange_id", None)
)
conditions = getattr(record, "conditions", None) or getattr(record, "condition", None)
if isinstance(conditions, str):
conditions = [conditions]
price = getattr(record, "price", None)
size = getattr(record, "size", None)
if price is not None and size is not None:
conditions = getattr(record, "conditions", None) or getattr(record, "condition", None)
if isinstance(conditions, str):
conditions = [conditions]
payload = {
"type": "trade",
"ts": ts_ms,
"price": float(price),
"size": int(size),
"symbol": stringify(symbol),
}
if exchange is not None:
payload["exchange"] = stringify(exchange)
if conditions:
payload["conditions"] = conditions
return payload
bid = _first_attr(record, ["bid_px", "bid_price", "bid"])
ask = _first_attr(record, ["ask_px", "ask_price", "ask"])
if bid is None or ask is None:
return None
bid_size = _first_attr(record, ["bid_sz", "bid_size", "bid_qty", "bid_q"])
ask_size = _first_attr(record, ["ask_sz", "ask_size", "ask_qty", "ask_q"])
payload = {
"type": "nbbo",
"ts": ts_ms,
"price": float(price),
"size": int(size),
"bid": float(bid),
"ask": float(ask),
"bidSize": int(bid_size) if bid_size is not None else 0,
"askSize": int(ask_size) if ask_size is not None else 0,
"symbol": stringify(symbol),
}
if exchange is not None:
payload["exchange"] = stringify(exchange)
if conditions:
payload["conditions"] = conditions
return payload
@ -239,7 +282,10 @@ def main() -> int:
date = dt.datetime.utcfromtimestamp(ts_ms / 1000).date()
if is_numeric_symbol(symbol):
instrument_id = int(symbol)
instrument_id = _to_int(symbol, default=-1)
if instrument_id < 0:
return
mapped = resolver.lookup(instrument_id, date)
if mapped:
emit_payload(build_payload(record, mapped))

View file

@ -5,6 +5,7 @@ type DatabentoOptionsAdapterConfig = {
apiKey: string;
dataset: string;
schema: string;
nbboSchema: string;
start: string;
end?: string;
symbols: string;
@ -16,6 +17,7 @@ type DatabentoOptionsAdapterConfig = {
};
type DatabentoTradeMessage = {
type: "trade";
ts: number;
price: number;
size: number;
@ -24,6 +26,19 @@ type DatabentoTradeMessage = {
conditions?: string[] | string;
};
type DatabentoNbboMessage = {
type: "nbbo";
ts: number;
bid: number;
ask: number;
bidSize?: number;
askSize?: number;
symbol: string;
exchange?: string;
};
type DatabentoReplayMessage = DatabentoTradeMessage | DatabentoNbboMessage;
type OptionContract = {
root: string;
expiry: string;
@ -139,45 +154,39 @@ export const createDatabentoOptionsAdapter = (
}
const scriptPath = new URL("../../py/databento_replay.py", import.meta.url).pathname;
const args = [
config.pythonBin,
scriptPath,
"--dataset",
config.dataset,
"--schema",
config.schema,
"--start",
config.start,
"--symbols",
config.symbols,
"--stype-in",
config.stypeIn,
"--stype-out",
config.stypeOut
];
if (config.end) {
args.push("--end", config.end);
}
const buildArgs = (schema: string): string[] => {
const args = [
config.pythonBin,
scriptPath,
"--dataset",
config.dataset,
"--schema",
schema,
"--start",
config.start,
"--symbols",
config.symbols,
"--stype-in",
config.stypeIn,
"--stype-out",
config.stypeOut
];
if (config.limit > 0) {
args.push("--limit", String(config.limit));
}
const child = Bun.spawn(args, {
stdout: "pipe",
stderr: "inherit",
env: {
...Bun.env,
DATABENTO_API_KEY: config.apiKey
if (config.end) {
args.push("--end", config.end);
}
});
if (!child.stdout) {
throw new Error("Databento adapter failed to attach stdout.");
}
if (config.limit > 0) {
args.push("--limit", String(config.limit));
}
let seq = 0;
return args;
};
const children: Bun.Subprocess[] = [];
let tradeSeq = 0;
let nbboSeq = 0;
const contractIdCache = new Map<string, string>();
const warnedSymbols = new Set<string>();
@ -201,55 +210,122 @@ export const createDatabentoOptionsAdapter = (
const handleLine = (line: string) => {
try {
const payload = JSON.parse(line) as DatabentoTradeMessage;
if (!payload) {
const payload = JSON.parse(line) as DatabentoReplayMessage;
if (!payload || typeof payload !== "object") {
return;
}
const price = Number(payload.price);
const size = Number(payload.size);
if (!Number.isFinite(price) || !Number.isFinite(size)) {
return;
}
const symbol = String(payload.symbol ?? "").trim();
const symbol = String((payload as { symbol?: unknown }).symbol ?? "").trim();
if (!symbol) {
return;
}
const sourceTs = normalizeTimestamp(Number(payload.ts));
const sourceTs = normalizeTimestamp(Number((payload as { ts?: unknown }).ts));
if (!Number.isFinite(sourceTs)) {
return;
}
const ingestTs = Date.now();
seq += 1;
const contractId = resolveContractId(symbol);
const scaledPrice = config.priceScale === 1 ? price : price / config.priceScale;
if (payload.type === "trade") {
const price = Number(payload.price);
const size = Number(payload.size);
if (!Number.isFinite(price) || !Number.isFinite(size)) {
return;
}
const conditions = Array.isArray(payload.conditions)
? payload.conditions.map((entry) => String(entry))
: typeof payload.conditions === "string"
? [payload.conditions]
: undefined;
const scaledPrice =
config.priceScale === 1 ? price : price / config.priceScale;
void handlers.onTrade({
source_ts: sourceTs,
ingest_ts: ingestTs,
seq,
trace_id: `databento-${seq}`,
ts: sourceTs,
option_contract_id: resolveContractId(symbol),
price: scaledPrice,
size,
exchange: payload.exchange ? String(payload.exchange) : "OPRA",
conditions
});
const conditions = Array.isArray(payload.conditions)
? payload.conditions.map((entry) => String(entry))
: typeof payload.conditions === "string"
? [payload.conditions]
: undefined;
tradeSeq += 1;
void handlers.onTrade({
source_ts: sourceTs,
ingest_ts: ingestTs,
seq: tradeSeq,
trace_id: `databento-${tradeSeq}`,
ts: sourceTs,
option_contract_id: contractId,
price: scaledPrice,
size,
exchange: payload.exchange ? String(payload.exchange) : "OPRA",
conditions
});
return;
}
if (payload.type === "nbbo") {
if (!handlers.onNBBO) {
return;
}
const bid = Number(payload.bid);
const ask = Number(payload.ask);
if (!Number.isFinite(bid) || !Number.isFinite(ask)) {
return;
}
const scaledBid = config.priceScale === 1 ? bid : bid / config.priceScale;
const scaledAsk = config.priceScale === 1 ? ask : ask / config.priceScale;
const bidSize = Math.max(0, Math.floor(Number(payload.bidSize ?? 0)));
const askSize = Math.max(0, Math.floor(Number(payload.askSize ?? 0)));
nbboSeq += 1;
void handlers.onNBBO({
source_ts: sourceTs,
ingest_ts: ingestTs,
seq: nbboSeq,
trace_id: `databento-${nbboSeq}`,
ts: sourceTs,
option_contract_id: contractId,
bid: scaledBid,
ask: scaledAsk,
bidSize,
askSize
});
}
} catch {
// Ignore malformed lines to keep replay streaming.
}
};
void readLines(child.stdout, handleLine);
const spawnStream = (schema: string): void => {
const trimmed = schema.trim();
if (!trimmed) {
return;
}
const child = Bun.spawn(buildArgs(trimmed), {
stdout: "pipe",
stderr: "inherit",
env: {
...Bun.env,
DATABENTO_API_KEY: config.apiKey
}
});
if (!child.stdout) {
throw new Error("Databento adapter failed to attach stdout.");
}
children.push(child);
void readLines(child.stdout, handleLine);
};
spawnStream(config.schema);
spawnStream(config.nbboSchema);
return () => {
child.kill();
for (const child of children) {
child.kill();
}
};
}
};

View file

@ -46,6 +46,7 @@ const envSchema = z.object({
DATABENTO_API_KEY: z.string().default(""),
DATABENTO_DATASET: z.string().default("OPRA.PILLAR"),
DATABENTO_SCHEMA: z.string().default("trades"),
DATABENTO_NBBO_SCHEMA: z.string().default("tbbo"),
DATABENTO_START: z.string().default(""),
DATABENTO_END: z.string().default(""),
DATABENTO_SYMBOLS: z.string().default("ALL"),
@ -188,6 +189,7 @@ const selectAdapter = (name: string): OptionIngestAdapter => {
apiKey: env.DATABENTO_API_KEY,
dataset: env.DATABENTO_DATASET,
schema: env.DATABENTO_SCHEMA,
nbboSchema: env.DATABENTO_NBBO_SCHEMA,
start: env.DATABENTO_START,
end: env.DATABENTO_END || undefined,
symbols: env.DATABENTO_SYMBOLS,