diff --git a/README.md b/README.md index 80080b2..722a536 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ Done now (in repo): - UI: live tapes for options/equities/flow + replay toggle + pause controls In progress / blocked: -- Real data adapters (requires licensed data source) +- Live data adapters (requires licensed data source) - Rolling stats and advanced clustering Not started: @@ -36,12 +36,13 @@ Not started: ## Current Capabilities - Synthetic options/equity prints with deterministic sequencing -- Ingest adapter seam (env-selected, default `synthetic`) for options and equities +- Ingest adapter seam (env-selected; options default `alpaca`, equities default `synthetic`) - Raw event persistence in ClickHouse + streaming via NATS JetStream - Deterministic option FlowPacket clustering (time-window) - API gateway with REST, WS, and replay endpoints - UI tapes for options/equities/flow packets with live/replay toggle and pause controls - Alpaca options adapter (dev-only) with bounded contract selection +- Databento historical replay adapter (options, Python sidecar) ## Planned Capabilities (from PLAN.md) @@ -113,6 +114,15 @@ Alpaca selection policy (dev-only, deterministic): - For each expiry, select 8 strikes per side closest to ATM within ±6% (fallback to ±10% if needed) - Subscriptions are built once at startup to keep the stream bounded and repeatable +Databento historical replay adapter (options, via Python `databento`): +- Install Python deps: `python3 -m pip install -r services/ingest-options/py/requirements.txt` +- Set `INGEST_ADAPTER=databento` and configure: + - `DATABENTO_API_KEY`, `DATABENTO_START` (ISO date/time) + - Optional: `DATABENTO_END`, `DATABENTO_DATASET` (default `OPRA.PILLAR`), `DATABENTO_SCHEMA` (default `trades`) + - Optional: `DATABENTO_SYMBOLS` (`ALL` or comma list), `DATABENTO_STYPE_IN`/`DATABENTO_STYPE_OUT` (default `raw_symbol`) + - Optional: `DATABENTO_LIMIT` (record cap), `DATABENTO_PRICE_SCALE` (divide raw price), `DATABENTO_PYTHON_BIN` +- This adapter replays historical data only; live capture will be added later. + Run tests: - `bun test` diff --git a/apps/web/app/page.tsx b/apps/web/app/page.tsx index cba6fec..c96a75c 100644 --- a/apps/web/app/page.tsx +++ b/apps/web/app/page.tsx @@ -27,10 +27,25 @@ type ReplayResponse = { next: ReplayCursor | null; }; +const inferTracePrefix = (traceId: string): string => { + const match = traceId.match(/^(.*)-\d+$/); + return match ? match[1] : traceId; +}; + +const extractTracePrefix = (item: T): string | null => { + const traceId = (item as { trace_id?: string }).trace_id; + if (!traceId) { + return null; + } + return inferTracePrefix(traceId); +}; + type TapeState = { status: WsStatus; items: T[]; lastUpdate: number | null; + replayTime: number | null; + replayComplete: boolean; paused: boolean; dropped: number; togglePause: () => void; @@ -128,6 +143,7 @@ type TapeConfig = { mode: TapeMode; wsPath: string; replayPath: string; + latestPath?: string; expectedType: MessageType; batchSize?: number; pollMs?: number; @@ -136,17 +152,23 @@ type TapeConfig = { const useTape = ( config: TapeConfig ): TapeState => { - const { mode, wsPath, replayPath, expectedType } = config; + const { mode, wsPath, replayPath, expectedType, latestPath } = config; const batchSize = config.batchSize ?? 40; const pollMs = config.pollMs ?? 1000; const [status, setStatus] = useState("connecting"); const [items, setItems] = useState([]); const [lastUpdate, setLastUpdate] = useState(null); + const [replayTime, setReplayTime] = useState(null); + const [replayComplete, setReplayComplete] = useState(false); const [paused, setPaused] = useState(false); const [dropped, setDropped] = useState(0); const reconnectRef = useRef(null); const socketRef = useRef(null); const cursorRef = useRef({ ts: 0, seq: 0 }); + const replayEndRef = useRef(null); + const replayCompleteRef = useRef(false); + const replaySourceRef = useRef(null); + const emptyPollsRef = useRef(0); const pausedRef = useRef(paused); useEffect(() => { @@ -166,11 +188,53 @@ const useTape = ( useEffect(() => { setItems([]); setLastUpdate(null); + setReplayTime(null); + setReplayComplete(false); + replayCompleteRef.current = false; + replaySourceRef.current = null; + emptyPollsRef.current = 0; setDropped(0); setStatus("connecting"); cursorRef.current = { ts: 0, seq: 0 }; }, [mode]); + useEffect(() => { + if (mode !== "replay" || !latestPath) { + replayEndRef.current = null; + return; + } + + let active = true; + replayEndRef.current = null; + setReplayComplete(false); + replayCompleteRef.current = false; + + const fetchReplayEnd = async () => { + try { + const url = new URL(buildApiUrl(latestPath)); + url.searchParams.set("limit", "1"); + const response = await fetch(url.toString()); + if (!response.ok) { + throw new Error(`Replay baseline failed with ${response.status}`); + } + + const payload = (await response.json()) as { data?: T[] }; + const latest = payload.data?.[0]; + if (active && latest) { + replayEndRef.current = latest.ts; + } + } catch (error) { + console.warn("Failed to load replay end cursor", error); + } + }; + + void fetchReplayEnd(); + + return () => { + active = false; + }; + }, [mode, latestPath]); + useEffect(() => { if (mode !== "live") { return; @@ -268,33 +332,104 @@ const useTape = ( return; } + if (replayCompleteRef.current) { + return; + } + try { - const cursor = cursorRef.current; - const url = new URL(buildApiUrl(replayPath)); - url.searchParams.set("after_ts", cursor.ts.toString()); - url.searchParams.set("after_seq", cursor.seq.toString()); - url.searchParams.set("limit", batchSize.toString()); + let keepPolling = true; - const response = await fetch(url.toString()); - if (!response.ok) { - throw new Error(`Replay request failed with ${response.status}`); + while (keepPolling && active && !pausedRef.current) { + const replayEnd = replayEndRef.current; + const cursor = cursorRef.current; + + if (replayEnd !== null && cursor.ts >= replayEnd) { + replayCompleteRef.current = true; + setReplayComplete(true); + setStatus("disconnected"); + return; + } + + const url = new URL(buildApiUrl(replayPath)); + url.searchParams.set("after_ts", cursor.ts.toString()); + url.searchParams.set("after_seq", cursor.seq.toString()); + url.searchParams.set("limit", batchSize.toString()); + + const response = await fetch(url.toString()); + if (!response.ok) { + throw new Error(`Replay request failed with ${response.status}`); + } + + const payload = (await response.json()) as ReplayResponse; + + let sourcePrefix = replaySourceRef.current; + if (!sourcePrefix) { + const firstWithTrace = payload.data.find((item) => extractTracePrefix(item)); + if (firstWithTrace) { + sourcePrefix = extractTracePrefix(firstWithTrace); + replaySourceRef.current = sourcePrefix ?? null; + } + } + + const filtered = sourcePrefix + ? payload.data.filter((item) => extractTracePrefix(item) === sourcePrefix) + : payload.data; + + const hasForeign = + sourcePrefix && + payload.data.some((item) => { + const prefix = extractTracePrefix(item); + return prefix !== null && prefix !== sourcePrefix; + }); + + if (filtered.length > 0) { + const nextItems = [...filtered].reverse(); + setItems((prev) => { + const next = [...nextItems, ...prev]; + return next.slice(0, MAX_ITEMS); + }); + setLastUpdate(Date.now()); + const last = filtered.at(-1); + if (last) { + setReplayTime(last.ts); + if (replayEnd !== null && last.ts >= replayEnd) { + cursorRef.current = { ts: last.ts, seq: last.seq }; + replayCompleteRef.current = true; + setReplayComplete(true); + setStatus("disconnected"); + return; + } + } + emptyPollsRef.current = 0; + } else if (sourcePrefix) { + emptyPollsRef.current += 1; + } + + if (payload.next) { + cursorRef.current = payload.next; + } + + setStatus("connected"); + keepPolling = filtered.length === batchSize; + + if (keepPolling) { + await new Promise((resolve) => setTimeout(resolve, 0)); + } + + if (hasForeign) { + replayCompleteRef.current = true; + setReplayComplete(true); + setStatus("disconnected"); + return; + } + + if (sourcePrefix && emptyPollsRef.current >= 3) { + replayCompleteRef.current = true; + setReplayComplete(true); + setStatus("disconnected"); + return; + } } - - const payload = (await response.json()) as ReplayResponse; - if (payload.data.length > 0) { - const nextItems = [...payload.data].reverse(); - setItems((prev) => { - const next = [...nextItems, ...prev]; - return next.slice(0, MAX_ITEMS); - }); - setLastUpdate(Date.now()); - } - - if (payload.next) { - cursorRef.current = payload.next; - } - - setStatus("connected"); } catch (error) { console.warn("Replay poll failed", error); setStatus("disconnected"); @@ -310,13 +445,24 @@ const useTape = ( }; }, [mode, replayPath, batchSize, pollMs]); - return { status, items, lastUpdate, paused, dropped, togglePause }; + return { + status, + items, + lastUpdate, + replayTime, + replayComplete, + paused, + dropped, + togglePause + }; }; const useFlowStream = (enabled: boolean): TapeState => { const [status, setStatus] = useState(enabled ? "connecting" : "disconnected"); const [items, setItems] = useState([]); const [lastUpdate, setLastUpdate] = useState(null); + const [replayTime] = useState(null); + const [replayComplete] = useState(false); const [paused, setPaused] = useState(false); const [dropped, setDropped] = useState(0); const reconnectRef = useRef(null); @@ -423,26 +569,47 @@ const useFlowStream = (enabled: boolean): TapeState => { }; }, [enabled]); - return { status, items, lastUpdate, paused, dropped, togglePause }; + return { + status, + items, + lastUpdate, + replayTime, + replayComplete, + paused, + dropped, + togglePause + }; }; type TapeStatusProps = { status: WsStatus; lastUpdate: number | null; + replayTime: number | null; + replayComplete: boolean; paused: boolean; dropped: number; mode: TapeMode; onTogglePause: () => void; }; -const TapeStatus = ({ status, lastUpdate, paused, dropped, mode, onTogglePause }: TapeStatusProps) => { +const TapeStatus = ({ + status, + lastUpdate, + replayTime, + replayComplete, + paused, + dropped, + mode, + onTogglePause +}: TapeStatusProps) => { const replayClass = mode === "replay" ? "status-replay" : ""; const pausedClass = paused ? "status-paused" : ""; + const label = replayComplete ? "Replay Complete" : statusLabel(status, paused, mode); return (
- {statusLabel(status, paused, mode)} + {label} {lastUpdate ? ( Updated {formatTime(lastUpdate)} ) : ( @@ -451,6 +618,11 @@ const TapeStatus = ({ status, lastUpdate, paused, dropped, mode, onTogglePause } {paused && dropped > 0 ? ( {dropped} new while paused ) : null} + {mode === "replay" ? ( + + Replay time {replayTime ? formatTime(replayTime) : "—"} + + ) : null} @@ -473,14 +645,20 @@ export default function HomePage() { mode, wsPath: "/ws/options", replayPath: "/replay/options", - expectedType: "option-print" + latestPath: "/prints/options", + expectedType: "option-print", + batchSize: mode === "replay" ? 120 : undefined, + pollMs: mode === "replay" ? 200 : undefined }); const equities = useTape({ mode, wsPath: "/ws/equities", replayPath: "/replay/equities", - expectedType: "equity-print" + latestPath: "/prints/equities", + expectedType: "equity-print", + batchSize: mode === "replay" ? 120 : undefined, + pollMs: mode === "replay" ? 200 : undefined }); const flow = useFlowStream(mode === "live"); @@ -526,6 +704,8 @@ export default function HomePage() { argparse.Namespace: + parser = argparse.ArgumentParser(description="Replay Databento option trades as JSON lines.") + parser.add_argument("--dataset", required=True) + parser.add_argument("--schema", default="trades") + parser.add_argument("--start", required=True) + parser.add_argument("--end", default="") + parser.add_argument("--symbols", default="ALL") + parser.add_argument("--stype-in", dest="stype_in", default="raw_symbol") + parser.add_argument("--stype-out", dest="stype_out", default="raw_symbol") + parser.add_argument("--limit", type=int, default=0) + parser.add_argument("--api-key", dest="api_key", default="") + return parser.parse_args() + + +def resolve_client(db_module, api_key: str): + try: + return db_module.Historical(api_key) + except TypeError: + return db_module.Historical(key=api_key) + + +def normalize_symbols(value: str): + if not value or value.strip().upper() == "ALL": + return None + return [symbol.strip() for symbol in value.split(",") if symbol.strip()] + + +def parse_date(value: str | None) -> dt.date | None: + if not value: + return None + try: + parsed = dt.datetime.fromisoformat(value) + return parsed.date() + except ValueError: + try: + return dt.date.fromisoformat(value) + except ValueError: + return None + + +def normalize_ts(ts_value): + if ts_value is None: + return None + if isinstance(ts_value, dt.datetime): + return int(ts_value.timestamp() * 1000) + if isinstance(ts_value, dt.date): + return int(dt.datetime.combine(ts_value, dt.time()).timestamp() * 1000) + if isinstance(ts_value, (int, float)): + if ts_value > 1_000_000_000_000_000: + return int(ts_value / 1_000_000) + return int(ts_value) + return None + + +def stringify(value): + if value is None: + return None + if isinstance(value, bytes): + return value.decode("utf-8", errors="ignore") + return str(value) + +def is_numeric_symbol(value: object) -> bool: + if isinstance(value, int): + return True + if isinstance(value, str): + return value.isdigit() + return False + + +class SymbolResolver: + def __init__(self, client, dataset: str, start_date: dt.date | None, end_date: dt.date | None): + from databento.common.symbology import InstrumentMap + + self._client = client + self._dataset = dataset + self._start_date = start_date + self._end_date = end_date + self._map = InstrumentMap() + self._pending: list[int] = [] + self._pending_set: set[int] = set() + + def queue(self, instrument_id: int) -> None: + if instrument_id in self._pending_set: + return + self._pending_set.add(instrument_id) + self._pending.append(instrument_id) + + def resolve_pending(self) -> None: + if not self._pending: + return + pending = self._pending + self._pending = [] + self._pending_set.clear() + + for i in range(0, len(pending), 2000): + chunk = pending[i : i + 2000] + response = self._client.symbology.resolve( + dataset=self._dataset, + symbols=chunk, + stype_in="instrument_id", + stype_out="raw_symbol", + start_date=self._start_date or dt.date.today(), + end_date=self._end_date, + ) + self._map.insert_json(response) + + def lookup(self, instrument_id: int, date: dt.date) -> str | None: + if instrument_id is None: + return None + return self._map.resolve(instrument_id, date) + + def pending_count(self) -> int: + return len(self._pending) + + +def build_payload(record, symbol_override: str | None = None) -> dict | None: + ts_event = getattr(record, "ts_event", None) + price = getattr(record, "price", None) + size = getattr(record, "size", None) + symbol = symbol_override or ( + getattr(record, "symbol", None) + or getattr(record, "raw_symbol", None) + or getattr(record, "instrument_id", None) + ) + + if ts_event is None or price is None or size is None or symbol is None: + return None + + ts_ms = normalize_ts(ts_event) + if ts_ms is None: + return None + + exchange = ( + getattr(record, "exchange", None) + or getattr(record, "publisher_id", None) + or getattr(record, "exchange_id", None) + ) + conditions = getattr(record, "conditions", None) or getattr(record, "condition", None) + if isinstance(conditions, str): + conditions = [conditions] + + payload = { + "ts": ts_ms, + "price": float(price), + "size": int(size), + "symbol": stringify(symbol), + } + + if exchange is not None: + payload["exchange"] = stringify(exchange) + if conditions: + payload["conditions"] = conditions + + return payload + + +def emit_payload(payload: dict | None) -> None: + if payload is None: + return + print(json.dumps(payload), flush=True) + + +def main() -> int: + args = parse_args() + + api_key = args.api_key or os.getenv("DATABENTO_API_KEY") + if not api_key: + sys.stderr.write("DATABENTO_API_KEY is required.\n") + return 1 + + try: + import databento as db + except ImportError: + sys.stderr.write("Missing Python package 'databento'. Install with pip.\n") + return 1 + + client = resolve_client(db, api_key) + + start_date = parse_date(args.start) + end_date = parse_date(args.end) if args.end else None + resolver = SymbolResolver(client, args.dataset, start_date, end_date) + buffered: list[tuple[object, int, dt.date]] = [] + + kwargs = { + "dataset": args.dataset, + "schema": args.schema, + "start": args.start, + "end": args.end or None, + "symbols": normalize_symbols(args.symbols), + "stype_in": args.stype_in, + "stype_out": args.stype_out, + "limit": args.limit or None, + } + + signature = inspect.signature(client.timeseries.get_range) + filtered_kwargs = { + key: value for key, value in kwargs.items() if key in signature.parameters and value is not None + } + + data = client.timeseries.get_range(**filtered_kwargs) + + def flush_buffer(force: bool = False) -> None: + if not buffered: + return + + resolver.resolve_pending() + remaining: list[tuple[object, int, dt.date]] = [] + for record, instrument_id, date in buffered: + mapped = resolver.lookup(instrument_id, date) + if mapped: + emit_payload(build_payload(record, mapped)) + elif force: + emit_payload(build_payload(record, str(instrument_id))) + else: + remaining.append((record, instrument_id, date)) + buffered[:] = remaining + + def handle_record(record) -> None: + symbol = ( + getattr(record, "symbol", None) + or getattr(record, "raw_symbol", None) + or getattr(record, "instrument_id", None) + ) + + ts_event = getattr(record, "ts_event", None) + ts_ms = normalize_ts(ts_event) + if ts_ms is None: + return + + date = dt.datetime.utcfromtimestamp(ts_ms / 1000).date() + + if is_numeric_symbol(symbol): + instrument_id = int(symbol) + mapped = resolver.lookup(instrument_id, date) + if mapped: + emit_payload(build_payload(record, mapped)) + return + + resolver.queue(instrument_id) + buffered.append((record, instrument_id, date)) + + if resolver.pending_count() >= 200: + flush_buffer() + return + + emit_payload(build_payload(record)) + + if hasattr(data, "replay"): + try: + data.replay(callback=handle_record) + except TypeError: + data.replay(handle_record) + flush_buffer(force=True) + return 0 + + if hasattr(data, "__iter__"): + for record in data: + handle_record(record) + if len(buffered) >= 2000: + flush_buffer() + flush_buffer(force=True) + return 0 + + sys.stderr.write("Unsupported Databento response type.\n") + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/services/ingest-options/py/requirements.txt b/services/ingest-options/py/requirements.txt index f7ca71d..4fc8457 100644 --- a/services/ingest-options/py/requirements.txt +++ b/services/ingest-options/py/requirements.txt @@ -1 +1,3 @@ ib_insync>=0.9.83 +databento +typing_extensions diff --git a/services/ingest-options/src/adapters/databento.ts b/services/ingest-options/src/adapters/databento.ts new file mode 100644 index 0000000..d1f5925 --- /dev/null +++ b/services/ingest-options/src/adapters/databento.ts @@ -0,0 +1,256 @@ +import { createLogger } from "@islandflow/observability"; +import type { OptionIngestAdapter, OptionIngestHandlers } from "./types"; + +type DatabentoOptionsAdapterConfig = { + apiKey: string; + dataset: string; + schema: string; + start: string; + end?: string; + symbols: string; + stypeIn: string; + stypeOut: string; + limit: number; + priceScale: number; + pythonBin: string; +}; + +type DatabentoTradeMessage = { + ts: number; + price: number; + size: number; + symbol: string; + exchange?: string; + conditions?: string[] | string; +}; + +type OptionContract = { + root: string; + expiry: string; + strike: number; + right: "C" | "P"; +}; + +const logger = createLogger({ service: "ingest-options" }); + +const formatDate = (date: Date): string => date.toISOString().slice(0, 10); + +const parseOccSymbol = (symbol: string): OptionContract | null => { + if (symbol.length < 15) { + return null; + } + + const tail = symbol.slice(-15); + const rootRaw = symbol.slice(0, -15).trim(); + const expiryRaw = tail.slice(0, 6); + const right = tail.slice(6, 7); + const strikeRaw = tail.slice(7); + + if (!/^\d{6}$/.test(expiryRaw) || !/^\d{8}$/.test(strikeRaw)) { + return null; + } + + if (right !== "C" && right !== "P") { + return null; + } + + const year = 2000 + Number(expiryRaw.slice(0, 2)); + const month = Number(expiryRaw.slice(2, 4)) - 1; + const day = Number(expiryRaw.slice(4, 6)); + const expiryDate = new Date(Date.UTC(year, month, day)); + const expiry = formatDate(expiryDate); + const strike = Number(strikeRaw) / 1000; + + if (!rootRaw || !Number.isFinite(strike)) { + return null; + } + + return { + root: rootRaw, + expiry, + strike, + right + }; +}; + +const formatStrike = (strike: number): string => { + const fixed = strike.toFixed(3); + return fixed.replace(/\.?0+$/, ""); +}; + +const formatContractId = (contract: OptionContract): string => + `${contract.root}-${contract.expiry}-${formatStrike(contract.strike)}-${contract.right}`; + +const normalizeTimestamp = (value: number): number => { + if (!Number.isFinite(value)) { + return Date.now(); + } + + if (value > 1_000_000_000_000_000) { + return Math.floor(value / 1_000_000); + } + + return value; +}; + +const readLines = async ( + stream: ReadableStream, + onLine: (line: string) => void +): Promise => { + const reader = stream.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + while (true) { + const { value, done } = await reader.read(); + if (done) { + break; + } + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split("\n"); + buffer = lines.pop() ?? ""; + + for (const line of lines) { + const trimmed = line.trim(); + if (trimmed.length > 0) { + onLine(trimmed); + } + } + } + + if (buffer.trim().length > 0) { + onLine(buffer.trim()); + } +}; + +export const createDatabentoOptionsAdapter = ( + config: DatabentoOptionsAdapterConfig +): OptionIngestAdapter => { + return { + name: "databento", + start: (handlers: OptionIngestHandlers) => { + if (!config.apiKey) { + throw new Error("DATABENTO_API_KEY is required for the Databento adapter."); + } + + if (!config.start) { + throw new Error("DATABENTO_START is required for the Databento adapter."); + } + + const scriptPath = new URL("../../py/databento_replay.py", import.meta.url).pathname; + const args = [ + config.pythonBin, + scriptPath, + "--dataset", + config.dataset, + "--schema", + config.schema, + "--start", + config.start, + "--symbols", + config.symbols, + "--stype-in", + config.stypeIn, + "--stype-out", + config.stypeOut + ]; + + if (config.end) { + args.push("--end", config.end); + } + + if (config.limit > 0) { + args.push("--limit", String(config.limit)); + } + + const child = Bun.spawn(args, { + stdout: "pipe", + stderr: "inherit", + env: { + ...Bun.env, + DATABENTO_API_KEY: config.apiKey + } + }); + + if (!child.stdout) { + throw new Error("Databento adapter failed to attach stdout."); + } + + let seq = 0; + const contractIdCache = new Map(); + const warnedSymbols = new Set(); + + const resolveContractId = (symbol: string): string => { + const cached = contractIdCache.get(symbol); + if (cached) { + return cached; + } + + const parsed = parseOccSymbol(symbol); + const contractId = parsed ? formatContractId(parsed) : symbol.trim() || symbol; + contractIdCache.set(symbol, contractId); + + if (!parsed && !warnedSymbols.has(symbol)) { + warnedSymbols.add(symbol); + logger.warn("databento symbol parse failed; using raw symbol", { symbol }); + } + + return contractId; + }; + + const handleLine = (line: string) => { + try { + const payload = JSON.parse(line) as DatabentoTradeMessage; + if (!payload) { + return; + } + + const price = Number(payload.price); + const size = Number(payload.size); + if (!Number.isFinite(price) || !Number.isFinite(size)) { + return; + } + + const symbol = String(payload.symbol ?? "").trim(); + if (!symbol) { + return; + } + + const sourceTs = normalizeTimestamp(Number(payload.ts)); + const ingestTs = Date.now(); + seq += 1; + + const scaledPrice = config.priceScale === 1 ? price : price / config.priceScale; + + const conditions = Array.isArray(payload.conditions) + ? payload.conditions.map((entry) => String(entry)) + : typeof payload.conditions === "string" + ? [payload.conditions] + : undefined; + + void handlers.onTrade({ + source_ts: sourceTs, + ingest_ts: ingestTs, + seq, + trace_id: `databento-${seq}`, + ts: sourceTs, + option_contract_id: resolveContractId(symbol), + price: scaledPrice, + size, + exchange: payload.exchange ? String(payload.exchange) : "OPRA", + conditions + }); + } catch { + // Ignore malformed lines to keep replay streaming. + } + }; + + void readLines(child.stdout, handleLine); + + return () => { + child.kill(); + }; + } + }; +}; diff --git a/services/ingest-options/src/index.ts b/services/ingest-options/src/index.ts index d141178..82cce12 100644 --- a/services/ingest-options/src/index.ts +++ b/services/ingest-options/src/index.ts @@ -14,6 +14,7 @@ import { } from "@islandflow/storage"; import { OptionPrintSchema, type OptionPrint } from "@islandflow/types"; import { createAlpacaOptionsAdapter } from "./adapters/alpaca"; +import { createDatabentoOptionsAdapter } from "./adapters/databento"; import { createIbkrOptionsAdapter } from "./adapters/ibkr"; import { createSyntheticOptionsAdapter } from "./adapters/synthetic"; import type { OptionIngestAdapter, StopHandler } from "./adapters/types"; @@ -38,6 +39,17 @@ const envSchema = z.object({ ALPACA_MONEYNESS_PCT: z.coerce.number().positive().default(0.06), ALPACA_MONEYNESS_FALLBACK_PCT: z.coerce.number().positive().default(0.1), ALPACA_MAX_QUOTES: z.coerce.number().int().positive().default(200), + DATABENTO_API_KEY: z.string().default(""), + DATABENTO_DATASET: z.string().default("OPRA.PILLAR"), + DATABENTO_SCHEMA: z.string().default("trades"), + DATABENTO_START: z.string().default(""), + DATABENTO_END: z.string().default(""), + DATABENTO_SYMBOLS: z.string().default("ALL"), + DATABENTO_STYPE_IN: z.string().default("raw_symbol"), + DATABENTO_STYPE_OUT: z.string().default("raw_symbol"), + DATABENTO_LIMIT: z.coerce.number().int().nonnegative().default(0), + DATABENTO_PRICE_SCALE: z.coerce.number().positive().default(1), + DATABENTO_PYTHON_BIN: z.string().default("python3"), IBKR_HOST: z.string().default("127.0.0.1"), IBKR_PORT: z.coerce.number().int().positive().default(7497), IBKR_CLIENT_ID: z.coerce.number().int().nonnegative().default(0), @@ -113,6 +125,30 @@ const selectAdapter = (name: string): OptionIngestAdapter => { }); } + if (name === "databento") { + if (!env.DATABENTO_API_KEY) { + throw new Error("DATABENTO_API_KEY is required for the databento adapter."); + } + + if (!env.DATABENTO_START) { + throw new Error("DATABENTO_START is required for the databento adapter."); + } + + return createDatabentoOptionsAdapter({ + apiKey: env.DATABENTO_API_KEY, + dataset: env.DATABENTO_DATASET, + schema: env.DATABENTO_SCHEMA, + start: env.DATABENTO_START, + end: env.DATABENTO_END || undefined, + symbols: env.DATABENTO_SYMBOLS, + stypeIn: env.DATABENTO_STYPE_IN, + stypeOut: env.DATABENTO_STYPE_OUT, + limit: env.DATABENTO_LIMIT, + priceScale: env.DATABENTO_PRICE_SCALE, + pythonBin: env.DATABENTO_PYTHON_BIN + }); + } + if (name === "ibkr") { return createIbkrOptionsAdapter({ host: env.IBKR_HOST,