Add NBBO persistence, API/WS streaming, and UI context

This commit is contained in:
dirtydishes 2025-12-30 12:47:58 -05:00
parent 15fce370ef
commit fc7065792f
12 changed files with 768 additions and 46 deletions

View file

@ -5,11 +5,13 @@ import {
SUBJECT_CLASSIFIER_HITS,
SUBJECT_EQUITY_PRINTS,
SUBJECT_FLOW_PACKETS,
SUBJECT_OPTION_NBBO,
SUBJECT_OPTION_PRINTS,
STREAM_ALERTS,
STREAM_CLASSIFIER_HITS,
STREAM_EQUITY_PRINTS,
STREAM_FLOW_PACKETS,
STREAM_OPTION_NBBO,
STREAM_OPTION_PRINTS,
buildDurableConsumer,
connectJetStreamWithRetry,
@ -22,12 +24,15 @@ import {
ensureClassifierHitsTable,
ensureEquityPrintsTable,
ensureFlowPacketsTable,
ensureOptionNBBOTable,
ensureOptionPrintsTable,
fetchRecentAlerts,
fetchRecentClassifierHits,
fetchRecentFlowPackets,
fetchRecentOptionNBBO,
fetchEquityPrintsAfter,
fetchRecentEquityPrints,
fetchOptionNBBOAfter,
fetchOptionPrintsAfter,
fetchRecentOptionPrints
} from "@islandflow/storage";
@ -36,6 +41,7 @@ import {
ClassifierHitEventSchema,
EquityPrintSchema,
FlowPacketSchema,
OptionNBBOSchema,
OptionPrintSchema
} from "@islandflow/types";
import { z } from "zod";
@ -87,13 +93,14 @@ const replayParamsSchema = z.object({
limit: z.coerce.number().int().positive().max(1000).default(200)
});
type Channel = "options" | "equities" | "flow" | "classifier-hits" | "alerts";
type Channel = "options" | "options-nbbo" | "equities" | "flow" | "classifier-hits" | "alerts";
type WsData = {
channel: Channel;
};
const optionSockets = new Set<WebSocket<WsData>>();
const optionNbboSockets = new Set<WebSocket<WsData>>();
const equitySockets = new Set<WebSocket<WsData>>();
const flowSockets = new Set<WebSocket<WsData>>();
const classifierHitSockets = new Set<WebSocket<WsData>>();
@ -169,6 +176,19 @@ const run = async () => {
num_replicas: 1
});
await ensureStream(jsm, {
name: STREAM_OPTION_NBBO,
subjects: [SUBJECT_OPTION_NBBO],
retention: "limits",
storage: "file",
discard: "old",
max_msgs_per_subject: -1,
max_msgs: -1,
max_bytes: -1,
max_age: 0,
num_replicas: 1
});
await ensureStream(jsm, {
name: STREAM_EQUITY_PRINTS,
subjects: [SUBJECT_EQUITY_PRINTS],
@ -228,6 +248,7 @@ const run = async () => {
await retry("clickhouse table init", 20, 500, async () => {
await ensureOptionPrintsTable(clickhouse);
await ensureOptionNBBOTable(clickhouse);
await ensureEquityPrintsTable(clickhouse);
await ensureFlowPacketsTable(clickhouse);
await ensureClassifierHitsTable(clickhouse);
@ -240,6 +261,12 @@ const run = async () => {
buildDurableConsumer("api-option-prints")
);
const optionNbboSubscription = await subscribeJson(
js,
SUBJECT_OPTION_NBBO,
buildDurableConsumer("api-option-nbbo")
);
const equitySubscription = await subscribeJson(
js,
SUBJECT_EQUITY_PRINTS,
@ -279,6 +306,21 @@ const run = async () => {
}
};
const pumpOptionNbbo = async () => {
for await (const msg of optionNbboSubscription.messages) {
try {
const payload = OptionNBBOSchema.parse(optionNbboSubscription.decode(msg));
broadcast(optionNbboSockets, { type: "option-nbbo", payload });
msg.ack();
} catch (error) {
logger.error("failed to process option nbbo", {
error: error instanceof Error ? error.message : String(error)
});
msg.term();
}
}
};
const pumpEquities = async () => {
for await (const msg of equitySubscription.messages) {
try {
@ -340,6 +382,7 @@ const run = async () => {
};
void pumpOptions();
void pumpOptionNbbo();
void pumpEquities();
void pumpFlow();
void pumpClassifierHits();
@ -360,6 +403,12 @@ const run = async () => {
return jsonResponse({ data });
}
if (req.method === "GET" && url.pathname === "/nbbo/options") {
const limit = parseLimit(url.searchParams.get("limit"));
const data = await fetchRecentOptionNBBO(clickhouse, limit);
return jsonResponse({ data });
}
if (req.method === "GET" && url.pathname === "/prints/equities") {
const limit = parseLimit(url.searchParams.get("limit"));
const data = await fetchRecentEquityPrints(clickhouse, limit);
@ -392,6 +441,14 @@ const run = async () => {
return jsonResponse({ data, next });
}
if (req.method === "GET" && url.pathname === "/replay/nbbo") {
const { afterTs, afterSeq, limit } = parseReplayParams(url);
const data = await fetchOptionNBBOAfter(clickhouse, afterTs, afterSeq, limit);
const last = data.at(-1);
const next = last ? { ts: last.ts, seq: last.seq } : null;
return jsonResponse({ data, next });
}
if (req.method === "GET" && url.pathname === "/replay/equities") {
const { afterTs, afterSeq, limit } = parseReplayParams(url);
const data = await fetchEquityPrintsAfter(clickhouse, afterTs, afterSeq, limit);
@ -408,6 +465,14 @@ const run = async () => {
return jsonResponse({ error: "websocket upgrade failed" }, 400);
}
if (req.method === "GET" && url.pathname === "/ws/options-nbbo") {
if (serverRef.upgrade(req, { data: { channel: "options-nbbo" } })) {
return new Response(null, { status: 101 });
}
return jsonResponse({ error: "websocket upgrade failed" }, 400);
}
if (req.method === "GET" && url.pathname === "/ws/equities") {
if (serverRef.upgrade(req, { data: { channel: "equities" } })) {
return new Response(null, { status: 101 });
@ -446,6 +511,8 @@ const run = async () => {
open: (socket) => {
if (socket.data.channel === "options") {
optionSockets.add(socket);
} else if (socket.data.channel === "options-nbbo") {
optionNbboSockets.add(socket);
} else if (socket.data.channel === "equities") {
equitySockets.add(socket);
} else if (socket.data.channel === "flow") {
@ -461,6 +528,8 @@ const run = async () => {
close: (socket) => {
if (socket.data.channel === "options") {
optionSockets.delete(socket);
} else if (socket.data.channel === "options-nbbo") {
optionNbboSockets.delete(socket);
} else if (socket.data.channel === "equities") {
equitySockets.delete(socket);
} else if (socket.data.channel === "flow") {

View file

@ -4,10 +4,12 @@ import {
SUBJECT_ALERTS,
SUBJECT_CLASSIFIER_HITS,
SUBJECT_FLOW_PACKETS,
SUBJECT_OPTION_NBBO,
SUBJECT_OPTION_PRINTS,
STREAM_ALERTS,
STREAM_CLASSIFIER_HITS,
STREAM_FLOW_PACKETS,
STREAM_OPTION_NBBO,
STREAM_OPTION_PRINTS,
buildDurableConsumer,
connectJetStreamWithRetry,
@ -28,10 +30,12 @@ import {
AlertEventSchema,
ClassifierHitEventSchema,
FlowPacketSchema,
OptionNBBOSchema,
OptionPrintSchema,
type AlertEvent,
type ClassifierHitEvent,
type FlowPacket,
type OptionNBBO,
type OptionPrint
} from "@islandflow/types";
import { z } from "zod";
@ -60,6 +64,7 @@ const envSchema = z.object({
return value;
}, z.boolean())
.default(false),
NBBO_MAX_AGE_MS: z.coerce.number().int().positive().default(1000),
CLASSIFIER_SWEEP_MIN_PREMIUM: z.coerce.number().positive().default(40_000),
CLASSIFIER_SWEEP_MIN_COUNT: z.coerce.number().int().positive().default(3),
CLASSIFIER_SPIKE_MIN_PREMIUM: z.coerce.number().positive().default(20_000),
@ -117,6 +122,7 @@ type ClusterState = {
};
const clusters = new Map<string, ClusterState>();
const nbboCache = new Map<string, OptionNBBO>();
const applyDeliverPolicy = (
opts: ReturnType<typeof buildDurableConsumer>,
@ -166,12 +172,43 @@ const updateCluster = (cluster: ClusterState, print: OptionPrint): ClusterState
return cluster;
};
type NbboJoin = {
nbbo: OptionNBBO | null;
ageMs: number;
stale: boolean;
};
const updateNbboCache = (nbbo: OptionNBBO): void => {
const existing = nbboCache.get(nbbo.option_contract_id);
if (
!existing ||
nbbo.ts > existing.ts ||
(nbbo.ts === existing.ts && nbbo.seq >= existing.seq)
) {
nbboCache.set(nbbo.option_contract_id, nbbo);
}
};
const selectNbbo = (contractId: string, ts: number): NbboJoin => {
const nbbo = nbboCache.get(contractId) ?? null;
if (!nbbo) {
return { nbbo: null, ageMs: env.NBBO_MAX_AGE_MS + 1, stale: true };
}
const ageMs = Math.abs(ts - nbbo.ts);
const stale = ageMs > env.NBBO_MAX_AGE_MS;
return { nbbo, ageMs, stale };
};
const flushCluster = async (
clickhouse: ReturnType<typeof createClickHouseClient>,
js: Awaited<ReturnType<typeof connectJetStreamWithRetry>>["js"],
cluster: ClusterState
): Promise<void> => {
const features = {
const joinQuality: Record<string, number> = {};
const nbboJoin = selectNbbo(cluster.contractId, cluster.endTs);
const features: Record<string, string | number | boolean> = {
option_contract_id: cluster.contractId,
count: cluster.members.length,
total_size: cluster.totalSize,
@ -183,6 +220,23 @@ const flushCluster = async (
window_ms: env.CLUSTER_WINDOW_MS
};
if (!nbboJoin.nbbo) {
joinQuality.nbbo_missing = 1;
} else {
joinQuality.nbbo_age_ms = nbboJoin.ageMs;
if (nbboJoin.stale) {
joinQuality.nbbo_stale = 1;
} else {
const mid = (nbboJoin.nbbo.bid + nbboJoin.nbbo.ask) / 2;
features.nbbo_bid = nbboJoin.nbbo.bid;
features.nbbo_ask = nbboJoin.nbbo.ask;
features.nbbo_mid = Number(mid.toFixed(4));
features.nbbo_spread = Number((nbboJoin.nbbo.ask - nbboJoin.nbbo.bid).toFixed(4));
features.nbbo_bid_size = nbboJoin.nbbo.bidSize;
features.nbbo_ask_size = nbboJoin.nbbo.askSize;
}
}
const packet: FlowPacket = {
source_ts: cluster.startSourceTs,
ingest_ts: cluster.endIngestTs,
@ -191,7 +245,7 @@ const flushCluster = async (
id: `flowpacket:${cluster.contractId}:${cluster.startTs}:${cluster.endTs}`,
members: cluster.members,
features,
join_quality: {}
join_quality: joinQuality
};
const validated = FlowPacketSchema.parse(packet);
@ -323,6 +377,19 @@ const run = async () => {
num_replicas: 1
});
await ensureStream(jsm, {
name: STREAM_OPTION_NBBO,
subjects: [SUBJECT_OPTION_NBBO],
retention: "limits",
storage: "file",
discard: "old",
max_msgs_per_subject: -1,
max_msgs: -1,
max_bytes: -1,
max_age: 0,
num_replicas: 1
});
await ensureStream(jsm, {
name: STREAM_FLOW_PACKETS,
subjects: [SUBJECT_FLOW_PACKETS],
@ -374,6 +441,7 @@ const run = async () => {
});
const durableName = "compute-option-prints";
const nbboDurableName = "compute-option-nbbo";
if (env.COMPUTE_CONSUMER_RESET) {
try {
@ -404,6 +472,35 @@ const run = async () => {
}
}
if (env.COMPUTE_CONSUMER_RESET) {
try {
await jsm.consumers.delete(STREAM_OPTION_NBBO, nbboDurableName);
logger.warn("reset jetstream consumer", { durable: nbboDurableName });
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
if (!message.includes("not found")) {
logger.warn("failed to reset jetstream consumer", { durable: nbboDurableName, error: message });
}
}
} else {
try {
const info = await jsm.consumers.info(STREAM_OPTION_NBBO, nbboDurableName);
if (info?.config?.deliver_policy && info.config.deliver_policy !== env.COMPUTE_DELIVER_POLICY) {
logger.warn("resetting consumer due to deliver policy change", {
durable: nbboDurableName,
current: info.config.deliver_policy,
desired: env.COMPUTE_DELIVER_POLICY
});
await jsm.consumers.delete(STREAM_OPTION_NBBO, nbboDurableName);
}
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
if (!message.includes("not found")) {
logger.warn("failed to inspect jetstream consumer", { durable: nbboDurableName, error: message });
}
}
}
const subscription = await (async () => {
const opts = buildDurableConsumer(durableName);
applyDeliverPolicy(opts, env.COMPUTE_DELIVER_POLICY);
@ -440,6 +537,59 @@ const run = async () => {
}
})();
const nbboSubscription = await (async () => {
const opts = buildDurableConsumer(nbboDurableName);
applyDeliverPolicy(opts, env.COMPUTE_DELIVER_POLICY);
try {
return await subscribeJson(js, SUBJECT_OPTION_NBBO, opts);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
const shouldReset =
message.includes("duplicate subscription") ||
message.includes("durable requires") ||
message.includes("subject does not match consumer");
if (!shouldReset) {
throw error;
}
logger.warn("resetting jetstream consumer", { durable: nbboDurableName, error: message });
try {
await jsm.consumers.delete(STREAM_OPTION_NBBO, nbboDurableName);
} catch (deleteError) {
const deleteMessage = deleteError instanceof Error ? deleteError.message : String(deleteError);
if (!deleteMessage.includes("not found")) {
logger.warn("failed to delete jetstream consumer", {
durable: nbboDurableName,
error: deleteMessage
});
}
}
const resetOpts = buildDurableConsumer(nbboDurableName);
applyDeliverPolicy(resetOpts, env.COMPUTE_DELIVER_POLICY);
return await subscribeJson(js, SUBJECT_OPTION_NBBO, resetOpts);
}
})();
const nbboLoop = async () => {
for await (const msg of nbboSubscription.messages) {
try {
const nbbo = OptionNBBOSchema.parse(nbboSubscription.decode(msg));
updateNbboCache(nbbo);
msg.ack();
} catch (error) {
logger.error("failed to process option nbbo", {
error: error instanceof Error ? error.message : String(error)
});
msg.term();
}
}
};
void nbboLoop();
const shutdown = async (signal: string) => {
logger.info("service stopping", { signal });

View file

@ -1,4 +1,4 @@
import { SP500_SYMBOLS, type OptionPrint } from "@islandflow/types";
import { SP500_SYMBOLS, type OptionNBBO, type OptionPrint } from "@islandflow/types";
import type { OptionIngestAdapter, OptionIngestHandlers } from "./types";
type SyntheticOptionsAdapterConfig = {
@ -13,6 +13,8 @@ type Burst = {
conditions?: string[];
printCount: number;
priceStep: number;
scenarioId: string;
seed: number;
};
const MS_PER_DAY = 24 * 60 * 60 * 1000;
@ -21,6 +23,13 @@ const EXCHANGES = ["CBOE", "PHLX", "ISE", "ARCA", "BOX", "MIAX"];
const CONDITIONS = ["SWEEP", "ISO", "FILL", "TEST"];
const BURST_RUN_RANGE: [number, number] = [2, 4];
type PricePlacement = "AA" | "A" | "B" | "BB";
type WeightedValue<T> = {
value: T;
weight: number;
};
type Scenario = {
id: string;
weight: number;
@ -75,6 +84,35 @@ const SCENARIOS: Scenario[] = [
}
];
const PRICE_PLACEMENTS: Record<string, WeightedValue<PricePlacement>[]> = {
bullish_sweep: [
{ value: "AA", weight: 25 },
{ value: "A", weight: 40 },
{ value: "B", weight: 20 },
{ value: "BB", weight: 15 }
],
bearish_sweep: [
{ value: "AA", weight: 15 },
{ value: "A", weight: 20 },
{ value: "B", weight: 40 },
{ value: "BB", weight: 25 }
],
contract_spike: [
{ value: "AA", weight: 25 },
{ value: "A", weight: 25 },
{ value: "B", weight: 25 },
{ value: "BB", weight: 25 }
],
noise: [
{ value: "AA", weight: 25 },
{ value: "A", weight: 25 },
{ value: "B", weight: 25 },
{ value: "BB", weight: 25 }
]
};
const PLACEMENT_PATTERN: PricePlacement[] = ["A", "AA", "B", "BB"];
const pick = <T,>(items: T[], seed: number): T => {
return items[Math.abs(seed) % items.length];
};
@ -107,6 +145,19 @@ const pickWeighted = <T extends { weight: number }>(items: T[], seed: number): T
return items[0];
};
const pickWeightedValue = <T>(items: WeightedValue<T>[], seed: number): T => {
return pickWeighted(items, seed).value;
};
const pickPlacement = (burst: Burst, index: number): PricePlacement => {
const placementOptions = PRICE_PLACEMENTS[burst.scenarioId] ?? PRICE_PLACEMENTS.noise;
const offset = Math.abs(burst.seed) % PLACEMENT_PATTERN.length;
if (index < PLACEMENT_PATTERN.length) {
return PLACEMENT_PATTERN[(offset + index) % PLACEMENT_PATTERN.length];
}
return pickWeightedValue(placementOptions, burst.seed + index * 11);
};
const hashSymbol = (value: string): number => {
let hash = 0;
for (let i = 0; i < value.length; i += 1) {
@ -128,7 +179,8 @@ const formatExpiry = (now: number, offsetDays: number): string => {
const buildBurst = (burstIndex: number, now: number): Burst => {
const symbol = SP500_SYMBOLS[burstIndex % SP500_SYMBOLS.length];
const symbolHash = hashSymbol(symbol);
const scenario = pickWeighted(SCENARIOS, symbolHash + burstIndex * 7);
const seed = symbolHash + burstIndex * 7;
const scenario = pickWeighted(SCENARIOS, seed);
const baseUnderlying = 30 + (symbolHash % 470);
const expiryOffset = pick(EXPIRY_OFFSETS, symbolHash + burstIndex);
const expiry = formatExpiry(now, expiryOffset);
@ -166,7 +218,9 @@ const buildBurst = (burstIndex: number, now: number): Burst => {
exchange,
conditions,
printCount,
priceStep
priceStep,
scenarioId: scenario.id,
seed
};
};
@ -177,6 +231,7 @@ export const createSyntheticOptionsAdapter = (
name: "synthetic",
start: (handlers: OptionIngestHandlers) => {
let seq = 0;
let nbboSeq = 0;
let burstIndex = 0;
let currentBurst: Burst | null = null;
let remainingRuns = 0;
@ -203,6 +258,24 @@ export const createSyntheticOptionsAdapter = (
const priceJitter = ((i % 3) - 1) * 0.004;
const sizeJitter = ((i % 3) - 1) * 0.08;
const priceMultiplier = 1 + burst.priceStep * i + priceJitter;
const mid = Math.max(0.05, Number((burst.basePrice * priceMultiplier).toFixed(2)));
const spread = Math.max(0.02, Number((mid * 0.02).toFixed(2)));
const bid = Math.max(0.01, Number((mid - spread / 2).toFixed(2)));
const ask = Math.max(bid + 0.01, Number((mid + spread / 2).toFixed(2)));
const tick = Math.max(0.01, Number((spread * 0.25).toFixed(2)));
const placement = pickPlacement(burst, i);
let tradePrice = mid;
if (placement === "AA") {
tradePrice = ask + tick;
} else if (placement === "A") {
tradePrice = ask;
} else if (placement === "BB") {
tradePrice = Math.max(0.01, bid - tick);
} else {
tradePrice = bid;
}
const print: OptionPrint = {
source_ts: now + i * 5,
ingest_ts: now + i * 5,
@ -210,13 +283,34 @@ export const createSyntheticOptionsAdapter = (
trace_id: `synthetic-options-${seq}`,
ts: now + i * 5,
option_contract_id: burst.contractId,
price: Math.max(0.05, Number((burst.basePrice * priceMultiplier).toFixed(2))),
price: tradePrice,
size: Math.max(1, Math.round(burst.baseSize * (1 + sizeJitter))),
exchange: burst.exchange,
conditions: burst.conditions
};
void handlers.onTrade(print);
if (handlers.onNBBO) {
nbboSeq += 1;
const sizeBase = Math.max(1, Math.round(burst.baseSize * 0.4));
const bidSize = Math.max(1, Math.round(sizeBase * (1 + sizeJitter)));
const askSize = Math.max(1, Math.round(sizeBase * (1 - sizeJitter)));
const nbbo: OptionNBBO = {
source_ts: print.ts,
ingest_ts: print.ingest_ts,
seq: nbboSeq,
trace_id: `synthetic-nbbo-${nbboSeq}`,
ts: print.ts,
option_contract_id: burst.contractId,
bid,
ask,
bidSize,
askSize
};
void handlers.onNBBO(nbbo);
}
}
remainingRuns -= 1;

View file

@ -1,7 +1,9 @@
import { readEnv } from "@islandflow/config";
import { createLogger } from "@islandflow/observability";
import {
SUBJECT_OPTION_NBBO,
SUBJECT_OPTION_PRINTS,
STREAM_OPTION_NBBO,
STREAM_OPTION_PRINTS,
connectJetStreamWithRetry,
ensureStream,
@ -9,10 +11,12 @@ import {
} from "@islandflow/bus";
import {
createClickHouseClient,
ensureOptionNBBOTable,
ensureOptionPrintsTable,
insertOptionNBBO,
insertOptionPrint
} from "@islandflow/storage";
import { OptionPrintSchema, type OptionPrint } from "@islandflow/types";
import { OptionNBBOSchema, OptionPrintSchema, type OptionNBBO, type OptionPrint } from "@islandflow/types";
import { createAlpacaOptionsAdapter } from "./adapters/alpaca";
import { createDatabentoOptionsAdapter } from "./adapters/databento";
import { createIbkrOptionsAdapter } from "./adapters/ibkr";
@ -237,6 +241,19 @@ const run = async () => {
num_replicas: 1
});
await ensureStream(jsm, {
name: STREAM_OPTION_NBBO,
subjects: [SUBJECT_OPTION_NBBO],
retention: "limits",
storage: "file",
discard: "old",
max_msgs_per_subject: -1,
max_msgs: -1,
max_bytes: -1,
max_age: 0,
num_replicas: 1
});
const clickhouse = createClickHouseClient({
url: env.CLICKHOUSE_URL,
database: env.CLICKHOUSE_DATABASE
@ -244,11 +261,13 @@ const run = async () => {
await retry("clickhouse table init", 20, 500, async () => {
await ensureOptionPrintsTable(clickhouse);
await ensureOptionNBBOTable(clickhouse);
});
const adapter = selectAdapter(env.OPTIONS_INGEST_ADAPTER);
logger.info("ingest adapter selected", { adapter: adapter.name });
const allowPublish = buildThrottle(env.TESTING_MODE, env.TESTING_THROTTLE_MS);
const allowNbboPublish = buildThrottle(env.TESTING_MODE, env.TESTING_THROTTLE_MS);
const stopAdapter: StopHandler = await adapter.start({
onTrade: async (candidate: OptionPrint) => {
@ -277,6 +296,28 @@ const run = async () => {
trace_id: print.trace_id
});
}
},
onNBBO: async (candidate: OptionNBBO) => {
if (state.shuttingDown) {
return;
}
const now = Date.now();
if (!allowNbboPublish(now)) {
return;
}
const nbbo = OptionNBBOSchema.parse(candidate);
try {
await insertOptionNBBO(clickhouse, nbbo);
await publishJson(js, SUBJECT_OPTION_NBBO, nbbo);
} catch (error) {
logger.error("failed to publish option nbbo", {
error: error instanceof Error ? error.message : String(error),
trace_id: nbbo.trace_id
});
}
}
});