From fc7065792f09b7367fcb4923c0f030321119822f Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Tue, 30 Dec 2025 12:47:58 -0500 Subject: [PATCH] Add NBBO persistence, API/WS streaming, and UI context --- .env.example | 8 +- apps/web/app/globals.css | 90 ++++++++ apps/web/app/page.tsx | 215 +++++++++++++++--- packages/bus/src/subjects.ts | 2 + packages/storage/src/clickhouse.ts | 74 ++++++ packages/storage/src/index.ts | 1 + packages/storage/src/option-nbbo.ts | 26 +++ packages/storage/tests/option-nbbo.test.ts | 28 +++ services/api/src/index.ts | 71 +++++- services/compute/src/index.ts | 154 ++++++++++++- .../ingest-options/src/adapters/synthetic.ts | 102 ++++++++- services/ingest-options/src/index.ts | 43 +++- 12 files changed, 768 insertions(+), 46 deletions(-) create mode 100644 packages/storage/src/option-nbbo.ts create mode 100644 packages/storage/tests/option-nbbo.test.ts diff --git a/.env.example b/.env.example index 900fd08..53f8f64 100644 --- a/.env.example +++ b/.env.example @@ -52,7 +52,9 @@ TESTING_THROTTLE_MS=200 # Compute consumer behavior COMPUTE_DELIVER_POLICY=new COMPUTE_CONSUMER_RESET=false -CLASSIFIER_SWEEP_MIN_PREMIUM=50000 +NBBO_MAX_AGE_MS=1000 +NEXT_PUBLIC_NBBO_MAX_AGE_MS=1000 +CLASSIFIER_SWEEP_MIN_PREMIUM=40000 CLASSIFIER_SWEEP_MIN_COUNT=3 -CLASSIFIER_SPIKE_MIN_PREMIUM=25000 -CLASSIFIER_SPIKE_MIN_SIZE=500 +CLASSIFIER_SPIKE_MIN_PREMIUM=20000 +CLASSIFIER_SPIKE_MIN_SIZE=400 diff --git a/apps/web/app/globals.css b/apps/web/app/globals.css index 41b3c6d..ce90040 100644 --- a/apps/web/app/globals.css +++ b/apps/web/app/globals.css @@ -426,6 +426,96 @@ h1 { background: rgba(111, 91, 57, 0.12); } +.nbbo-meta { + font-size: 0.72rem; + color: #6f5b39; +} + +.nbbo-side { + position: relative; + display: inline-flex; + align-items: center; + margin-left: 4px; +} + +.nbbo-tag { + padding: 2px 6px; + border-radius: 999px; + border: 1px solid rgba(111, 91, 57, 0.35); + font-size: 0.7rem; + letter-spacing: 0.08em; + text-transform: uppercase; +} + +.nbbo-tag-a { + border-color: rgba(47, 109, 79, 0.5); + color: #2f6d4f; + background: rgba(47, 109, 79, 0.16); +} + +.nbbo-tag-aa { + border-color: rgba(26, 87, 60, 0.6); + color: #1a573c; + background: rgba(26, 87, 60, 0.2); +} + +.nbbo-tag-b { + border-color: rgba(140, 74, 22, 0.5); + color: #8c4a16; + background: rgba(196, 111, 42, 0.18); +} + +.nbbo-tag-bb { + border-color: rgba(110, 44, 12, 0.6); + color: #6e2c0c; + background: rgba(110, 44, 12, 0.2); +} + +.nbbo-tooltip { + position: absolute; + right: 0; + bottom: 100%; + transform: translateY(-6px); + display: grid; + gap: 4px; + padding: 8px 10px; + border-radius: 10px; + border: 1px solid rgba(217, 205, 184, 0.8); + background: #fffdf7; + box-shadow: 0 12px 26px rgba(66, 45, 18, 0.18); + opacity: 0; + pointer-events: none; + transition: opacity 0.15s ease, transform 0.15s ease; + z-index: 2; + white-space: nowrap; +} + +.nbbo-tooltip-row { + display: inline-flex; + align-items: center; + gap: 6px; + font-size: 0.68rem; + color: #6f5b39; +} + +.nbbo-side:hover .nbbo-tooltip, +.nbbo-side:focus-within .nbbo-tooltip { + opacity: 1; + transform: translateY(-10px); +} + +.nbbo-missing { + border-color: rgba(136, 58, 17, 0.4); + color: #8c3a11; + background: rgba(196, 111, 42, 0.16); +} + +.nbbo-stale { + border-color: rgba(31, 74, 123, 0.4); + color: #1f4a7b; + background: rgba(31, 74, 123, 0.12); +} + .severity-high { border-color: rgba(136, 58, 17, 0.6); color: #8c3a11; diff --git a/apps/web/app/page.tsx b/apps/web/app/page.tsx index 50be6a8..51bbfc0 100644 --- a/apps/web/app/page.tsx +++ b/apps/web/app/page.tsx @@ -1,16 +1,32 @@ "use client"; import { useCallback, useEffect, useLayoutEffect, useMemo, useRef, useState } from "react"; -import type { AlertEvent, ClassifierHitEvent, EquityPrint, FlowPacket, OptionPrint } from "@islandflow/types"; +import type { + AlertEvent, + ClassifierHitEvent, + EquityPrint, + FlowPacket, + OptionNBBO, + OptionPrint +} from "@islandflow/types"; const MAX_ITEMS = 500; +const NBBO_MAX_AGE_MS = Number(process.env.NEXT_PUBLIC_NBBO_MAX_AGE_MS); +const NBBO_MAX_AGE_MS_SAFE = + Number.isFinite(NBBO_MAX_AGE_MS) && NBBO_MAX_AGE_MS > 0 ? NBBO_MAX_AGE_MS : 1000; const LOCAL_HOSTS = new Set(["localhost", "127.0.0.1"]); type WsStatus = "connecting" | "connected" | "disconnected"; type TapeMode = "live" | "replay"; -type MessageType = "option-print" | "equity-print" | "flow-packet" | "classifier-hit" | "alert"; +type MessageType = + | "option-print" + | "option-nbbo" + | "equity-print" + | "flow-packet" + | "classifier-hit" + | "alert"; type StreamMessage = { type: MessageType; @@ -220,6 +236,39 @@ const parseNumber = (value: unknown, fallback: number): number => { return fallback; }; +type NbboSide = "AA" | "A" | "B" | "BB"; + +const classifyNbboSide = (price: number, quote: OptionNBBO | null | undefined): NbboSide | null => { + if (!quote || !Number.isFinite(price)) { + return null; + } + + const bid = quote.bid; + const ask = quote.ask; + if (!Number.isFinite(bid) || !Number.isFinite(ask) || ask <= 0) { + return null; + } + + const spread = Math.max(0, ask - bid); + const epsilon = Math.max(0.01, spread * 0.05); + + if (price > ask + epsilon) { + return "AA"; + } + if (price >= ask - epsilon) { + return "A"; + } + if (price < bid - epsilon) { + return "BB"; + } + if (price <= bid + epsilon) { + return "B"; + } + + const mid = (bid + ask) / 2; + return price >= mid ? "A" : "B"; +}; + type ListScrollState = { listRef: React.RefObject; isAtTop: boolean; @@ -1242,6 +1291,16 @@ export default function HomePage() { onNewItems: equitiesScroll.onNewItems }); + const nbbo = useTape({ + mode, + wsPath: "/ws/options-nbbo", + replayPath: "/replay/nbbo", + latestPath: "/nbbo/options", + expectedType: "option-nbbo", + batchSize: mode === "replay" ? 120 : undefined, + pollMs: mode === "replay" ? 200 : undefined + }); + const flow = useFlowStream(mode === "live", flowScroll.onNewItems, flowAnchor.capture); const alerts = useLiveStream({ enabled: mode === "live", @@ -1288,6 +1347,21 @@ export default function HomePage() { const tickerSet = useMemo(() => new Set(activeTickers), [activeTickers]); + const nbboMap = useMemo(() => { + const map = new Map(); + for (const quote of nbbo.items) { + const existing = map.get(quote.option_contract_id); + if ( + !existing || + quote.ts > existing.ts || + (quote.ts === existing.ts && quote.seq >= existing.seq) + ) { + map.set(quote.option_contract_id, quote); + } + } + return map; + }, [nbbo.items]); + const optionPrintMap = useMemo(() => { const map = new Map(); for (const print of options.items) { @@ -1541,22 +1615,68 @@ export default function HomePage() { : "Replay queue empty. Ensure ClickHouse has data."} ) : ( - filteredOptions.map((print) => ( -
-
-
{print.option_contract_id}
-
- ${formatPrice(print.price)} - {formatSize(print.size)}x - {print.exchange} - {print.conditions?.length ? ( - {print.conditions.join(", ")} - ) : null} + filteredOptions.map((print) => { + const quote = nbboMap.get(print.option_contract_id); + const nbboAge = quote ? Math.abs(print.ts - quote.ts) : null; + const nbboStale = nbboAge !== null && nbboAge > NBBO_MAX_AGE_MS_SAFE; + const nbboMid = quote ? (quote.bid + quote.ask) / 2 : null; + const nbboSide = classifyNbboSide(print.price, quote); + + return ( +
+
+
{print.option_contract_id}
+
+ ${formatPrice(print.price)} + {formatSize(print.size)}x + {print.exchange} + {print.conditions?.length ? ( + {print.conditions.join(", ")} + ) : null} +
+ {quote ? ( +
+ Bid ${formatPrice(quote.bid)} + Ask ${formatPrice(quote.ask)} + Mid ${formatPrice(nbboMid ?? 0)} + {Math.round(nbboAge ?? 0)}ms + {nbboSide ? ( + + + {nbboSide} + + + + A + Ask + + + AA + Above Ask + + + B + Bid + + + BB + Below Bid + + + + ) : null} + {nbboStale ? Stale : null} +
+ ) : ( +
+ NBBO missing +
+ )}
+
{formatTime(print.ts)}
-
{formatTime(print.ts)}
-
- )) + ); + }) )}
@@ -1658,27 +1778,52 @@ export default function HomePage() { const features = packet.features ?? {}; const contract = String(features.option_contract_id ?? packet.id ?? "unknown"); const count = parseNumber(features.count, packet.members.length); - const totalSize = parseNumber(features.total_size, 0); - const totalPremium = parseNumber(features.total_premium, 0); - const notional = totalPremium * 100; - const startTs = parseNumber(features.start_ts, packet.source_ts); - const endTs = parseNumber(features.end_ts, startTs); - const windowMs = parseNumber(features.window_ms, 0); + const totalSize = parseNumber(features.total_size, 0); + const totalPremium = parseNumber(features.total_premium, 0); + const notional = totalPremium * 100; + const startTs = parseNumber(features.start_ts, packet.source_ts); + const endTs = parseNumber(features.end_ts, startTs); + const windowMs = parseNumber(features.window_ms, 0); + const nbboBid = parseNumber(features.nbbo_bid, Number.NaN); + const nbboAsk = parseNumber(features.nbbo_ask, Number.NaN); + const nbboMid = parseNumber(features.nbbo_mid, Number.NaN); + const nbboSpread = parseNumber(features.nbbo_spread, Number.NaN); + const nbboAge = parseNumber(packet.join_quality.nbbo_age_ms, Number.NaN); + const nbboStale = parseNumber(packet.join_quality.nbbo_stale, 0) > 0; + const nbboMissing = parseNumber(packet.join_quality.nbbo_missing, 0) > 0; - return ( -
-
-
{contract}
-
- {formatFlowMetric(count)} prints - {formatFlowMetric(totalSize)} size - Premium ${formatPrice(totalPremium)} - Notional ${formatUsd(notional)} - {windowMs > 0 ? ( - {formatFlowMetric(windowMs, "ms")} - ) : null} -
+ return ( +
+
+
{contract}
+
+ {formatFlowMetric(count)} prints + {formatFlowMetric(totalSize)} size + Premium ${formatPrice(totalPremium)} + Notional ${formatUsd(notional)} + {windowMs > 0 ? ( + {formatFlowMetric(windowMs, "ms")} + ) : null} + {Number.isFinite(nbboBid) && Number.isFinite(nbboAsk) ? ( + + NBBO ${formatPrice(nbboBid)} x ${formatPrice(nbboAsk)} + + ) : null} + {Number.isFinite(nbboMid) ? ( + Mid ${formatPrice(nbboMid)} + ) : null} + {Number.isFinite(nbboSpread) ? ( + Spread ${formatPrice(nbboSpread)} + ) : null} + {Number.isFinite(nbboAge) ? ( + {Math.round(nbboAge)}ms + ) : null} + {nbboStale ? NBBO stale : null} + {nbboMissing ? ( + NBBO missing + ) : null}
+
{formatTime(startTs)} → {formatTime(endTs)}
diff --git a/packages/bus/src/subjects.ts b/packages/bus/src/subjects.ts index d2c7a39..4bd675a 100644 --- a/packages/bus/src/subjects.ts +++ b/packages/bus/src/subjects.ts @@ -1,5 +1,7 @@ export const STREAM_OPTION_PRINTS = "OPTIONS_PRINTS"; export const SUBJECT_OPTION_PRINTS = "options.prints"; +export const STREAM_OPTION_NBBO = "OPTIONS_NBBO"; +export const SUBJECT_OPTION_NBBO = "options.nbbo"; export const STREAM_EQUITY_PRINTS = "EQUITY_PRINTS"; export const SUBJECT_EQUITY_PRINTS = "equities.prints"; export const STREAM_FLOW_PACKETS = "FLOW_PACKETS"; diff --git a/packages/storage/src/clickhouse.ts b/packages/storage/src/clickhouse.ts index e20ff4b..f99dc16 100644 --- a/packages/storage/src/clickhouse.ts +++ b/packages/storage/src/clickhouse.ts @@ -4,6 +4,7 @@ import { ClassifierHitEventSchema, EquityPrintSchema, FlowPacketSchema, + OptionNBBOSchema, OptionPrintSchema } from "@islandflow/types"; import type { @@ -11,6 +12,7 @@ import type { ClassifierHitEvent, EquityPrint, FlowPacket, + OptionNBBO, OptionPrint } from "@islandflow/types"; import { @@ -18,6 +20,7 @@ import { optionPrintsTableDDL, OPTION_PRINTS_TABLE } from "./option-prints"; +import { normalizeOptionNBBO, optionNBBOTableDDL, OPTION_NBBO_TABLE } from "./option-nbbo"; import { equityPrintsTableDDL, EQUITY_PRINTS_TABLE, @@ -69,6 +72,14 @@ export const ensureOptionPrintsTable = async ( }); }; +export const ensureOptionNBBOTable = async ( + client: ClickHouseClient +): Promise => { + await client.exec({ + query: optionNBBOTableDDL() + }); +}; + export const ensureEquityPrintsTable = async ( client: ClickHouseClient ): Promise => { @@ -111,6 +122,18 @@ export const insertOptionPrint = async ( }); }; +export const insertOptionNBBO = async ( + client: ClickHouseClient, + nbbo: OptionNBBO +): Promise => { + const record = normalizeOptionNBBO(nbbo); + await client.insert({ + table: OPTION_NBBO_TABLE, + values: [record], + format: "JSONEachRow" + }); +}; + export const insertEquityPrint = async ( client: ClickHouseClient, print: EquityPrint @@ -213,6 +236,24 @@ const normalizeOptionRow = (row: unknown): unknown => { return row; }; +const normalizeOptionNbboRow = (row: unknown): unknown => { + if (row && typeof row === "object") { + return normalizeNumericFields(row as Record, [ + "source_ts", + "ingest_ts", + "seq", + "ts", + "bid", + "ask", + "bidSize", + "askSize" + ]); + } + + return row; +}; + + const normalizeEquityRow = (row: unknown): unknown => { if (row && typeof row === "object") { const record = normalizeNumericFields(row as Record, [ @@ -307,6 +348,20 @@ export const fetchRecentOptionPrints = async ( return OptionPrintSchema.array().parse(rows.map(normalizeOptionRow)); }; +export const fetchRecentOptionNBBO = async ( + client: ClickHouseClient, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const result = await client.query({ + query: `SELECT * FROM ${OPTION_NBBO_TABLE} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + return OptionNBBOSchema.array().parse(rows.map(normalizeOptionNbboRow)); +}; + export const fetchRecentEquityPrints = async ( client: ClickHouseClient, limit: number @@ -394,6 +449,25 @@ export const fetchOptionPrintsAfter = async ( return OptionPrintSchema.array().parse(rows.map(normalizeOptionRow)); }; +export const fetchOptionNBBOAfter = async ( + client: ClickHouseClient, + afterTs: number, + afterSeq: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const safeAfterTs = clampCursor(afterTs); + const safeAfterSeq = clampCursor(afterSeq); + + const result = await client.query({ + query: `SELECT * FROM ${OPTION_NBBO_TABLE} WHERE (ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + return OptionNBBOSchema.array().parse(rows.map(normalizeOptionNbboRow)); +}; + export const fetchEquityPrintsAfter = async ( client: ClickHouseClient, afterTs: number, diff --git a/packages/storage/src/index.ts b/packages/storage/src/index.ts index a1d4aed..c28d352 100644 --- a/packages/storage/src/index.ts +++ b/packages/storage/src/index.ts @@ -4,3 +4,4 @@ export * from "./alerts"; export * from "./flow-packets"; export * from "./equity-prints"; export * from "./option-prints"; +export * from "./option-nbbo"; diff --git a/packages/storage/src/option-nbbo.ts b/packages/storage/src/option-nbbo.ts new file mode 100644 index 0000000..a4dd9cd --- /dev/null +++ b/packages/storage/src/option-nbbo.ts @@ -0,0 +1,26 @@ +import type { OptionNBBO } from "@islandflow/types"; + +export const OPTION_NBBO_TABLE = "option_nbbo"; + +export const optionNBBOTableDDL = (): string => { + return ` +CREATE TABLE IF NOT EXISTS ${OPTION_NBBO_TABLE} ( + source_ts UInt64, + ingest_ts UInt64, + seq UInt64, + trace_id String, + ts UInt64, + option_contract_id String, + bid Float64, + ask Float64, + bidSize UInt32, + askSize UInt32 +) +ENGINE = MergeTree +ORDER BY (ts, option_contract_id) +`; +}; + +export const normalizeOptionNBBO = (nbbo: OptionNBBO): OptionNBBO => { + return nbbo; +}; diff --git a/packages/storage/tests/option-nbbo.test.ts b/packages/storage/tests/option-nbbo.test.ts new file mode 100644 index 0000000..a9c0baf --- /dev/null +++ b/packages/storage/tests/option-nbbo.test.ts @@ -0,0 +1,28 @@ +import { describe, expect, it } from "bun:test"; +import { normalizeOptionNBBO, optionNBBOTableDDL, OPTION_NBBO_TABLE } from "../src/option-nbbo"; + +const baseNbbo = { + source_ts: 100, + ingest_ts: 200, + seq: 1, + trace_id: "trace-1", + ts: 100, + option_contract_id: "SPY-2025-01-17-450-C", + bid: 1.2, + ask: 1.3, + bidSize: 10, + askSize: 12 +}; + +describe("option-nbbo storage helpers", () => { + it("keeps required fields intact", () => { + const normalized = normalizeOptionNBBO(baseNbbo); + expect(normalized).toEqual(baseNbbo); + }); + + it("includes the correct table name in the DDL", () => { + const ddl = optionNBBOTableDDL(); + expect(ddl).toContain(OPTION_NBBO_TABLE); + expect(ddl).toContain("CREATE TABLE IF NOT EXISTS"); + }); +}); diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 2f55bcb..4c4ee37 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -5,11 +5,13 @@ import { SUBJECT_CLASSIFIER_HITS, SUBJECT_EQUITY_PRINTS, SUBJECT_FLOW_PACKETS, + SUBJECT_OPTION_NBBO, SUBJECT_OPTION_PRINTS, STREAM_ALERTS, STREAM_CLASSIFIER_HITS, STREAM_EQUITY_PRINTS, STREAM_FLOW_PACKETS, + STREAM_OPTION_NBBO, STREAM_OPTION_PRINTS, buildDurableConsumer, connectJetStreamWithRetry, @@ -22,12 +24,15 @@ import { ensureClassifierHitsTable, ensureEquityPrintsTable, ensureFlowPacketsTable, + ensureOptionNBBOTable, ensureOptionPrintsTable, fetchRecentAlerts, fetchRecentClassifierHits, fetchRecentFlowPackets, + fetchRecentOptionNBBO, fetchEquityPrintsAfter, fetchRecentEquityPrints, + fetchOptionNBBOAfter, fetchOptionPrintsAfter, fetchRecentOptionPrints } from "@islandflow/storage"; @@ -36,6 +41,7 @@ import { ClassifierHitEventSchema, EquityPrintSchema, FlowPacketSchema, + OptionNBBOSchema, OptionPrintSchema } from "@islandflow/types"; import { z } from "zod"; @@ -87,13 +93,14 @@ const replayParamsSchema = z.object({ limit: z.coerce.number().int().positive().max(1000).default(200) }); -type Channel = "options" | "equities" | "flow" | "classifier-hits" | "alerts"; +type Channel = "options" | "options-nbbo" | "equities" | "flow" | "classifier-hits" | "alerts"; type WsData = { channel: Channel; }; const optionSockets = new Set>(); +const optionNbboSockets = new Set>(); const equitySockets = new Set>(); const flowSockets = new Set>(); const classifierHitSockets = new Set>(); @@ -169,6 +176,19 @@ const run = async () => { num_replicas: 1 }); + await ensureStream(jsm, { + name: STREAM_OPTION_NBBO, + subjects: [SUBJECT_OPTION_NBBO], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + await ensureStream(jsm, { name: STREAM_EQUITY_PRINTS, subjects: [SUBJECT_EQUITY_PRINTS], @@ -228,6 +248,7 @@ const run = async () => { await retry("clickhouse table init", 20, 500, async () => { await ensureOptionPrintsTable(clickhouse); + await ensureOptionNBBOTable(clickhouse); await ensureEquityPrintsTable(clickhouse); await ensureFlowPacketsTable(clickhouse); await ensureClassifierHitsTable(clickhouse); @@ -240,6 +261,12 @@ const run = async () => { buildDurableConsumer("api-option-prints") ); + const optionNbboSubscription = await subscribeJson( + js, + SUBJECT_OPTION_NBBO, + buildDurableConsumer("api-option-nbbo") + ); + const equitySubscription = await subscribeJson( js, SUBJECT_EQUITY_PRINTS, @@ -279,6 +306,21 @@ const run = async () => { } }; + const pumpOptionNbbo = async () => { + for await (const msg of optionNbboSubscription.messages) { + try { + const payload = OptionNBBOSchema.parse(optionNbboSubscription.decode(msg)); + broadcast(optionNbboSockets, { type: "option-nbbo", payload }); + msg.ack(); + } catch (error) { + logger.error("failed to process option nbbo", { + error: error instanceof Error ? error.message : String(error) + }); + msg.term(); + } + } + }; + const pumpEquities = async () => { for await (const msg of equitySubscription.messages) { try { @@ -340,6 +382,7 @@ const run = async () => { }; void pumpOptions(); + void pumpOptionNbbo(); void pumpEquities(); void pumpFlow(); void pumpClassifierHits(); @@ -360,6 +403,12 @@ const run = async () => { return jsonResponse({ data }); } + if (req.method === "GET" && url.pathname === "/nbbo/options") { + const limit = parseLimit(url.searchParams.get("limit")); + const data = await fetchRecentOptionNBBO(clickhouse, limit); + return jsonResponse({ data }); + } + if (req.method === "GET" && url.pathname === "/prints/equities") { const limit = parseLimit(url.searchParams.get("limit")); const data = await fetchRecentEquityPrints(clickhouse, limit); @@ -392,6 +441,14 @@ const run = async () => { return jsonResponse({ data, next }); } + if (req.method === "GET" && url.pathname === "/replay/nbbo") { + const { afterTs, afterSeq, limit } = parseReplayParams(url); + const data = await fetchOptionNBBOAfter(clickhouse, afterTs, afterSeq, limit); + const last = data.at(-1); + const next = last ? { ts: last.ts, seq: last.seq } : null; + return jsonResponse({ data, next }); + } + if (req.method === "GET" && url.pathname === "/replay/equities") { const { afterTs, afterSeq, limit } = parseReplayParams(url); const data = await fetchEquityPrintsAfter(clickhouse, afterTs, afterSeq, limit); @@ -408,6 +465,14 @@ const run = async () => { return jsonResponse({ error: "websocket upgrade failed" }, 400); } + if (req.method === "GET" && url.pathname === "/ws/options-nbbo") { + if (serverRef.upgrade(req, { data: { channel: "options-nbbo" } })) { + return new Response(null, { status: 101 }); + } + + return jsonResponse({ error: "websocket upgrade failed" }, 400); + } + if (req.method === "GET" && url.pathname === "/ws/equities") { if (serverRef.upgrade(req, { data: { channel: "equities" } })) { return new Response(null, { status: 101 }); @@ -446,6 +511,8 @@ const run = async () => { open: (socket) => { if (socket.data.channel === "options") { optionSockets.add(socket); + } else if (socket.data.channel === "options-nbbo") { + optionNbboSockets.add(socket); } else if (socket.data.channel === "equities") { equitySockets.add(socket); } else if (socket.data.channel === "flow") { @@ -461,6 +528,8 @@ const run = async () => { close: (socket) => { if (socket.data.channel === "options") { optionSockets.delete(socket); + } else if (socket.data.channel === "options-nbbo") { + optionNbboSockets.delete(socket); } else if (socket.data.channel === "equities") { equitySockets.delete(socket); } else if (socket.data.channel === "flow") { diff --git a/services/compute/src/index.ts b/services/compute/src/index.ts index 571a25d..615edbc 100644 --- a/services/compute/src/index.ts +++ b/services/compute/src/index.ts @@ -4,10 +4,12 @@ import { SUBJECT_ALERTS, SUBJECT_CLASSIFIER_HITS, SUBJECT_FLOW_PACKETS, + SUBJECT_OPTION_NBBO, SUBJECT_OPTION_PRINTS, STREAM_ALERTS, STREAM_CLASSIFIER_HITS, STREAM_FLOW_PACKETS, + STREAM_OPTION_NBBO, STREAM_OPTION_PRINTS, buildDurableConsumer, connectJetStreamWithRetry, @@ -28,10 +30,12 @@ import { AlertEventSchema, ClassifierHitEventSchema, FlowPacketSchema, + OptionNBBOSchema, OptionPrintSchema, type AlertEvent, type ClassifierHitEvent, type FlowPacket, + type OptionNBBO, type OptionPrint } from "@islandflow/types"; import { z } from "zod"; @@ -60,6 +64,7 @@ const envSchema = z.object({ return value; }, z.boolean()) .default(false), + NBBO_MAX_AGE_MS: z.coerce.number().int().positive().default(1000), CLASSIFIER_SWEEP_MIN_PREMIUM: z.coerce.number().positive().default(40_000), CLASSIFIER_SWEEP_MIN_COUNT: z.coerce.number().int().positive().default(3), CLASSIFIER_SPIKE_MIN_PREMIUM: z.coerce.number().positive().default(20_000), @@ -117,6 +122,7 @@ type ClusterState = { }; const clusters = new Map(); +const nbboCache = new Map(); const applyDeliverPolicy = ( opts: ReturnType, @@ -166,12 +172,43 @@ const updateCluster = (cluster: ClusterState, print: OptionPrint): ClusterState return cluster; }; +type NbboJoin = { + nbbo: OptionNBBO | null; + ageMs: number; + stale: boolean; +}; + +const updateNbboCache = (nbbo: OptionNBBO): void => { + const existing = nbboCache.get(nbbo.option_contract_id); + if ( + !existing || + nbbo.ts > existing.ts || + (nbbo.ts === existing.ts && nbbo.seq >= existing.seq) + ) { + nbboCache.set(nbbo.option_contract_id, nbbo); + } +}; + +const selectNbbo = (contractId: string, ts: number): NbboJoin => { + const nbbo = nbboCache.get(contractId) ?? null; + if (!nbbo) { + return { nbbo: null, ageMs: env.NBBO_MAX_AGE_MS + 1, stale: true }; + } + + const ageMs = Math.abs(ts - nbbo.ts); + const stale = ageMs > env.NBBO_MAX_AGE_MS; + return { nbbo, ageMs, stale }; +}; + const flushCluster = async ( clickhouse: ReturnType, js: Awaited>["js"], cluster: ClusterState ): Promise => { - const features = { + const joinQuality: Record = {}; + const nbboJoin = selectNbbo(cluster.contractId, cluster.endTs); + + const features: Record = { option_contract_id: cluster.contractId, count: cluster.members.length, total_size: cluster.totalSize, @@ -183,6 +220,23 @@ const flushCluster = async ( window_ms: env.CLUSTER_WINDOW_MS }; + if (!nbboJoin.nbbo) { + joinQuality.nbbo_missing = 1; + } else { + joinQuality.nbbo_age_ms = nbboJoin.ageMs; + if (nbboJoin.stale) { + joinQuality.nbbo_stale = 1; + } else { + const mid = (nbboJoin.nbbo.bid + nbboJoin.nbbo.ask) / 2; + features.nbbo_bid = nbboJoin.nbbo.bid; + features.nbbo_ask = nbboJoin.nbbo.ask; + features.nbbo_mid = Number(mid.toFixed(4)); + features.nbbo_spread = Number((nbboJoin.nbbo.ask - nbboJoin.nbbo.bid).toFixed(4)); + features.nbbo_bid_size = nbboJoin.nbbo.bidSize; + features.nbbo_ask_size = nbboJoin.nbbo.askSize; + } + } + const packet: FlowPacket = { source_ts: cluster.startSourceTs, ingest_ts: cluster.endIngestTs, @@ -191,7 +245,7 @@ const flushCluster = async ( id: `flowpacket:${cluster.contractId}:${cluster.startTs}:${cluster.endTs}`, members: cluster.members, features, - join_quality: {} + join_quality: joinQuality }; const validated = FlowPacketSchema.parse(packet); @@ -323,6 +377,19 @@ const run = async () => { num_replicas: 1 }); + await ensureStream(jsm, { + name: STREAM_OPTION_NBBO, + subjects: [SUBJECT_OPTION_NBBO], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + await ensureStream(jsm, { name: STREAM_FLOW_PACKETS, subjects: [SUBJECT_FLOW_PACKETS], @@ -374,6 +441,7 @@ const run = async () => { }); const durableName = "compute-option-prints"; + const nbboDurableName = "compute-option-nbbo"; if (env.COMPUTE_CONSUMER_RESET) { try { @@ -404,6 +472,35 @@ const run = async () => { } } + if (env.COMPUTE_CONSUMER_RESET) { + try { + await jsm.consumers.delete(STREAM_OPTION_NBBO, nbboDurableName); + logger.warn("reset jetstream consumer", { durable: nbboDurableName }); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (!message.includes("not found")) { + logger.warn("failed to reset jetstream consumer", { durable: nbboDurableName, error: message }); + } + } + } else { + try { + const info = await jsm.consumers.info(STREAM_OPTION_NBBO, nbboDurableName); + if (info?.config?.deliver_policy && info.config.deliver_policy !== env.COMPUTE_DELIVER_POLICY) { + logger.warn("resetting consumer due to deliver policy change", { + durable: nbboDurableName, + current: info.config.deliver_policy, + desired: env.COMPUTE_DELIVER_POLICY + }); + await jsm.consumers.delete(STREAM_OPTION_NBBO, nbboDurableName); + } + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (!message.includes("not found")) { + logger.warn("failed to inspect jetstream consumer", { durable: nbboDurableName, error: message }); + } + } + } + const subscription = await (async () => { const opts = buildDurableConsumer(durableName); applyDeliverPolicy(opts, env.COMPUTE_DELIVER_POLICY); @@ -440,6 +537,59 @@ const run = async () => { } })(); + const nbboSubscription = await (async () => { + const opts = buildDurableConsumer(nbboDurableName); + applyDeliverPolicy(opts, env.COMPUTE_DELIVER_POLICY); + try { + return await subscribeJson(js, SUBJECT_OPTION_NBBO, opts); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + const shouldReset = + message.includes("duplicate subscription") || + message.includes("durable requires") || + message.includes("subject does not match consumer"); + + if (!shouldReset) { + throw error; + } + + logger.warn("resetting jetstream consumer", { durable: nbboDurableName, error: message }); + + try { + await jsm.consumers.delete(STREAM_OPTION_NBBO, nbboDurableName); + } catch (deleteError) { + const deleteMessage = deleteError instanceof Error ? deleteError.message : String(deleteError); + if (!deleteMessage.includes("not found")) { + logger.warn("failed to delete jetstream consumer", { + durable: nbboDurableName, + error: deleteMessage + }); + } + } + + const resetOpts = buildDurableConsumer(nbboDurableName); + applyDeliverPolicy(resetOpts, env.COMPUTE_DELIVER_POLICY); + return await subscribeJson(js, SUBJECT_OPTION_NBBO, resetOpts); + } + })(); + + const nbboLoop = async () => { + for await (const msg of nbboSubscription.messages) { + try { + const nbbo = OptionNBBOSchema.parse(nbboSubscription.decode(msg)); + updateNbboCache(nbbo); + msg.ack(); + } catch (error) { + logger.error("failed to process option nbbo", { + error: error instanceof Error ? error.message : String(error) + }); + msg.term(); + } + } + }; + + void nbboLoop(); + const shutdown = async (signal: string) => { logger.info("service stopping", { signal }); diff --git a/services/ingest-options/src/adapters/synthetic.ts b/services/ingest-options/src/adapters/synthetic.ts index d0e51bc..23bf302 100644 --- a/services/ingest-options/src/adapters/synthetic.ts +++ b/services/ingest-options/src/adapters/synthetic.ts @@ -1,4 +1,4 @@ -import { SP500_SYMBOLS, type OptionPrint } from "@islandflow/types"; +import { SP500_SYMBOLS, type OptionNBBO, type OptionPrint } from "@islandflow/types"; import type { OptionIngestAdapter, OptionIngestHandlers } from "./types"; type SyntheticOptionsAdapterConfig = { @@ -13,6 +13,8 @@ type Burst = { conditions?: string[]; printCount: number; priceStep: number; + scenarioId: string; + seed: number; }; const MS_PER_DAY = 24 * 60 * 60 * 1000; @@ -21,6 +23,13 @@ const EXCHANGES = ["CBOE", "PHLX", "ISE", "ARCA", "BOX", "MIAX"]; const CONDITIONS = ["SWEEP", "ISO", "FILL", "TEST"]; const BURST_RUN_RANGE: [number, number] = [2, 4]; +type PricePlacement = "AA" | "A" | "B" | "BB"; + +type WeightedValue = { + value: T; + weight: number; +}; + type Scenario = { id: string; weight: number; @@ -75,6 +84,35 @@ const SCENARIOS: Scenario[] = [ } ]; +const PRICE_PLACEMENTS: Record[]> = { + bullish_sweep: [ + { value: "AA", weight: 25 }, + { value: "A", weight: 40 }, + { value: "B", weight: 20 }, + { value: "BB", weight: 15 } + ], + bearish_sweep: [ + { value: "AA", weight: 15 }, + { value: "A", weight: 20 }, + { value: "B", weight: 40 }, + { value: "BB", weight: 25 } + ], + contract_spike: [ + { value: "AA", weight: 25 }, + { value: "A", weight: 25 }, + { value: "B", weight: 25 }, + { value: "BB", weight: 25 } + ], + noise: [ + { value: "AA", weight: 25 }, + { value: "A", weight: 25 }, + { value: "B", weight: 25 }, + { value: "BB", weight: 25 } + ] +}; + +const PLACEMENT_PATTERN: PricePlacement[] = ["A", "AA", "B", "BB"]; + const pick = (items: T[], seed: number): T => { return items[Math.abs(seed) % items.length]; }; @@ -107,6 +145,19 @@ const pickWeighted = (items: T[], seed: number): T return items[0]; }; +const pickWeightedValue = (items: WeightedValue[], seed: number): T => { + return pickWeighted(items, seed).value; +}; + +const pickPlacement = (burst: Burst, index: number): PricePlacement => { + const placementOptions = PRICE_PLACEMENTS[burst.scenarioId] ?? PRICE_PLACEMENTS.noise; + const offset = Math.abs(burst.seed) % PLACEMENT_PATTERN.length; + if (index < PLACEMENT_PATTERN.length) { + return PLACEMENT_PATTERN[(offset + index) % PLACEMENT_PATTERN.length]; + } + return pickWeightedValue(placementOptions, burst.seed + index * 11); +}; + const hashSymbol = (value: string): number => { let hash = 0; for (let i = 0; i < value.length; i += 1) { @@ -128,7 +179,8 @@ const formatExpiry = (now: number, offsetDays: number): string => { const buildBurst = (burstIndex: number, now: number): Burst => { const symbol = SP500_SYMBOLS[burstIndex % SP500_SYMBOLS.length]; const symbolHash = hashSymbol(symbol); - const scenario = pickWeighted(SCENARIOS, symbolHash + burstIndex * 7); + const seed = symbolHash + burstIndex * 7; + const scenario = pickWeighted(SCENARIOS, seed); const baseUnderlying = 30 + (symbolHash % 470); const expiryOffset = pick(EXPIRY_OFFSETS, symbolHash + burstIndex); const expiry = formatExpiry(now, expiryOffset); @@ -166,7 +218,9 @@ const buildBurst = (burstIndex: number, now: number): Burst => { exchange, conditions, printCount, - priceStep + priceStep, + scenarioId: scenario.id, + seed }; }; @@ -177,6 +231,7 @@ export const createSyntheticOptionsAdapter = ( name: "synthetic", start: (handlers: OptionIngestHandlers) => { let seq = 0; + let nbboSeq = 0; let burstIndex = 0; let currentBurst: Burst | null = null; let remainingRuns = 0; @@ -203,6 +258,24 @@ export const createSyntheticOptionsAdapter = ( const priceJitter = ((i % 3) - 1) * 0.004; const sizeJitter = ((i % 3) - 1) * 0.08; const priceMultiplier = 1 + burst.priceStep * i + priceJitter; + const mid = Math.max(0.05, Number((burst.basePrice * priceMultiplier).toFixed(2))); + const spread = Math.max(0.02, Number((mid * 0.02).toFixed(2))); + const bid = Math.max(0.01, Number((mid - spread / 2).toFixed(2))); + const ask = Math.max(bid + 0.01, Number((mid + spread / 2).toFixed(2))); + const tick = Math.max(0.01, Number((spread * 0.25).toFixed(2))); + const placement = pickPlacement(burst, i); + let tradePrice = mid; + + if (placement === "AA") { + tradePrice = ask + tick; + } else if (placement === "A") { + tradePrice = ask; + } else if (placement === "BB") { + tradePrice = Math.max(0.01, bid - tick); + } else { + tradePrice = bid; + } + const print: OptionPrint = { source_ts: now + i * 5, ingest_ts: now + i * 5, @@ -210,13 +283,34 @@ export const createSyntheticOptionsAdapter = ( trace_id: `synthetic-options-${seq}`, ts: now + i * 5, option_contract_id: burst.contractId, - price: Math.max(0.05, Number((burst.basePrice * priceMultiplier).toFixed(2))), + price: tradePrice, size: Math.max(1, Math.round(burst.baseSize * (1 + sizeJitter))), exchange: burst.exchange, conditions: burst.conditions }; void handlers.onTrade(print); + + if (handlers.onNBBO) { + nbboSeq += 1; + const sizeBase = Math.max(1, Math.round(burst.baseSize * 0.4)); + const bidSize = Math.max(1, Math.round(sizeBase * (1 + sizeJitter))); + const askSize = Math.max(1, Math.round(sizeBase * (1 - sizeJitter))); + const nbbo: OptionNBBO = { + source_ts: print.ts, + ingest_ts: print.ingest_ts, + seq: nbboSeq, + trace_id: `synthetic-nbbo-${nbboSeq}`, + ts: print.ts, + option_contract_id: burst.contractId, + bid, + ask, + bidSize, + askSize + }; + + void handlers.onNBBO(nbbo); + } } remainingRuns -= 1; diff --git a/services/ingest-options/src/index.ts b/services/ingest-options/src/index.ts index a16b051..1cea78e 100644 --- a/services/ingest-options/src/index.ts +++ b/services/ingest-options/src/index.ts @@ -1,7 +1,9 @@ import { readEnv } from "@islandflow/config"; import { createLogger } from "@islandflow/observability"; import { + SUBJECT_OPTION_NBBO, SUBJECT_OPTION_PRINTS, + STREAM_OPTION_NBBO, STREAM_OPTION_PRINTS, connectJetStreamWithRetry, ensureStream, @@ -9,10 +11,12 @@ import { } from "@islandflow/bus"; import { createClickHouseClient, + ensureOptionNBBOTable, ensureOptionPrintsTable, + insertOptionNBBO, insertOptionPrint } from "@islandflow/storage"; -import { OptionPrintSchema, type OptionPrint } from "@islandflow/types"; +import { OptionNBBOSchema, OptionPrintSchema, type OptionNBBO, type OptionPrint } from "@islandflow/types"; import { createAlpacaOptionsAdapter } from "./adapters/alpaca"; import { createDatabentoOptionsAdapter } from "./adapters/databento"; import { createIbkrOptionsAdapter } from "./adapters/ibkr"; @@ -237,6 +241,19 @@ const run = async () => { num_replicas: 1 }); + await ensureStream(jsm, { + name: STREAM_OPTION_NBBO, + subjects: [SUBJECT_OPTION_NBBO], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + const clickhouse = createClickHouseClient({ url: env.CLICKHOUSE_URL, database: env.CLICKHOUSE_DATABASE @@ -244,11 +261,13 @@ const run = async () => { await retry("clickhouse table init", 20, 500, async () => { await ensureOptionPrintsTable(clickhouse); + await ensureOptionNBBOTable(clickhouse); }); const adapter = selectAdapter(env.OPTIONS_INGEST_ADAPTER); logger.info("ingest adapter selected", { adapter: adapter.name }); const allowPublish = buildThrottle(env.TESTING_MODE, env.TESTING_THROTTLE_MS); + const allowNbboPublish = buildThrottle(env.TESTING_MODE, env.TESTING_THROTTLE_MS); const stopAdapter: StopHandler = await adapter.start({ onTrade: async (candidate: OptionPrint) => { @@ -277,6 +296,28 @@ const run = async () => { trace_id: print.trace_id }); } + }, + onNBBO: async (candidate: OptionNBBO) => { + if (state.shuttingDown) { + return; + } + + const now = Date.now(); + if (!allowNbboPublish(now)) { + return; + } + + const nbbo = OptionNBBOSchema.parse(candidate); + + try { + await insertOptionNBBO(clickhouse, nbbo); + await publishJson(js, SUBJECT_OPTION_NBBO, nbbo); + } catch (error) { + logger.error("failed to publish option nbbo", { + error: error instanceof Error ? error.message : String(error), + trace_id: nbbo.trace_id + }); + } } });