Unify live session streaming and evidence fetching
- Route live terminal data through a shared live session socket - Fetch missing evidence for alerts and classifier hits - Add live type definitions and storage/API tests
This commit is contained in:
parent
824b7f2fa0
commit
d30513119a
10 changed files with 1923 additions and 258 deletions
|
|
@ -39,8 +39,12 @@ import {
|
|||
ensureOptionNBBOTable,
|
||||
ensureOptionPrintsTable,
|
||||
fetchAlertsAfter,
|
||||
fetchAlertsBefore,
|
||||
fetchClassifierHitsAfter,
|
||||
fetchClassifierHitsBefore,
|
||||
fetchFlowPacketsAfter,
|
||||
fetchFlowPacketById,
|
||||
fetchFlowPacketsBefore,
|
||||
fetchRecentAlerts,
|
||||
fetchRecentClassifierHits,
|
||||
fetchRecentEquityPrintJoins,
|
||||
|
|
@ -49,31 +53,46 @@ import {
|
|||
fetchRecentEquityQuotes,
|
||||
fetchEquityCandlesAfter,
|
||||
fetchEquityCandlesRange,
|
||||
fetchEquityPrintJoinsByIds,
|
||||
fetchEquityPrintJoinsBefore,
|
||||
fetchRecentOptionNBBO,
|
||||
fetchEquityPrintsAfter,
|
||||
fetchEquityPrintsBefore,
|
||||
fetchEquityPrintsRange,
|
||||
fetchEquityPrintJoinsAfter,
|
||||
fetchEquityQuotesAfter,
|
||||
fetchInferredDarkBefore,
|
||||
fetchInferredDarkAfter,
|
||||
fetchRecentEquityPrints,
|
||||
fetchOptionNBBOBefore,
|
||||
fetchOptionNBBOAfter,
|
||||
fetchOptionPrintsBefore,
|
||||
fetchOptionPrintsAfter,
|
||||
fetchOptionPrintsByTraceIds,
|
||||
fetchRecentOptionPrints
|
||||
} from "@islandflow/storage";
|
||||
import {
|
||||
AlertEventSchema,
|
||||
ClassifierHitEventSchema,
|
||||
Cursor,
|
||||
EquityCandleSchema,
|
||||
EquityPrintSchema,
|
||||
EquityPrintJoinSchema,
|
||||
EquityQuoteSchema,
|
||||
FeedSnapshot,
|
||||
InferredDarkEventSchema,
|
||||
LiveClientMessageSchema,
|
||||
LiveServerMessage,
|
||||
LiveSubscription,
|
||||
LiveSubscriptionSchema,
|
||||
FlowPacketSchema,
|
||||
OptionNBBOSchema,
|
||||
OptionPrintSchema
|
||||
OptionPrintSchema,
|
||||
getSubscriptionKey
|
||||
} from "@islandflow/types";
|
||||
import { createClient } from "redis";
|
||||
import { z } from "zod";
|
||||
import { LiveStateManager } from "./live";
|
||||
|
||||
const service = "api";
|
||||
const logger = createLogger({ service });
|
||||
|
|
@ -148,6 +167,11 @@ const replayParamsSchema = z.object({
|
|||
after_seq: z.coerce.number().int().nonnegative().default(0),
|
||||
limit: z.coerce.number().int().positive().max(1000).default(200)
|
||||
});
|
||||
const beforeParamsSchema = z.object({
|
||||
before_ts: z.coerce.number().int().nonnegative(),
|
||||
before_seq: z.coerce.number().int().nonnegative(),
|
||||
limit: z.coerce.number().int().positive().max(1000).default(200)
|
||||
});
|
||||
|
||||
const replaySourceSchema = z
|
||||
.string()
|
||||
|
|
@ -192,16 +216,26 @@ type WsData = {
|
|||
channel: Channel;
|
||||
};
|
||||
|
||||
const optionSockets = new Set<WebSocket<WsData>>();
|
||||
const optionNbboSockets = new Set<WebSocket<WsData>>();
|
||||
const equitySockets = new Set<WebSocket<WsData>>();
|
||||
const equityCandleSockets = new Set<WebSocket<WsData>>();
|
||||
const equityQuoteSockets = new Set<WebSocket<WsData>>();
|
||||
const equityJoinSockets = new Set<WebSocket<WsData>>();
|
||||
const inferredDarkSockets = new Set<WebSocket<WsData>>();
|
||||
const flowSockets = new Set<WebSocket<WsData>>();
|
||||
const classifierHitSockets = new Set<WebSocket<WsData>>();
|
||||
const alertSockets = new Set<WebSocket<WsData>>();
|
||||
type LiveWsData = {
|
||||
channel: "live";
|
||||
};
|
||||
|
||||
type LegacySocket = any;
|
||||
type LiveSocket = any;
|
||||
|
||||
const optionSockets = new Set<LegacySocket>();
|
||||
const optionNbboSockets = new Set<LegacySocket>();
|
||||
const equitySockets = new Set<LegacySocket>();
|
||||
const equityCandleSockets = new Set<LegacySocket>();
|
||||
const equityQuoteSockets = new Set<LegacySocket>();
|
||||
const equityJoinSockets = new Set<LegacySocket>();
|
||||
const inferredDarkSockets = new Set<LegacySocket>();
|
||||
const flowSockets = new Set<LegacySocket>();
|
||||
const classifierHitSockets = new Set<LegacySocket>();
|
||||
const alertSockets = new Set<LegacySocket>();
|
||||
const liveSocketSubscriptions = new Map<LiveSocket, Set<string>>();
|
||||
const subscriptionSockets = new Map<string, Set<LiveSocket>>();
|
||||
const liveHeartbeats = new Map<LiveSocket, ReturnType<typeof setInterval>>();
|
||||
|
||||
const jsonResponse = (body: unknown, status = 200): Response => {
|
||||
return new Response(JSON.stringify(body), {
|
||||
|
|
@ -234,6 +268,20 @@ const parseReplayParams = (url: URL): { afterTs: number; afterSeq: number; limit
|
|||
};
|
||||
};
|
||||
|
||||
const parseBeforeParams = (url: URL): { beforeTs: number; beforeSeq: number; limit: number } => {
|
||||
const params = beforeParamsSchema.parse({
|
||||
before_ts: url.searchParams.get("before_ts") ?? undefined,
|
||||
before_seq: url.searchParams.get("before_seq") ?? undefined,
|
||||
limit: url.searchParams.get("limit") ?? undefined
|
||||
});
|
||||
|
||||
return {
|
||||
beforeTs: params.before_ts,
|
||||
beforeSeq: params.before_seq,
|
||||
limit: params.limit
|
||||
};
|
||||
};
|
||||
|
||||
const parseReplaySource = (url: URL): string | null => {
|
||||
const raw = url.searchParams.get("source");
|
||||
if (!raw) {
|
||||
|
|
@ -330,7 +378,7 @@ const parseCandleReplayParams = (
|
|||
};
|
||||
};
|
||||
|
||||
const broadcast = (sockets: Set<WebSocket<WsData>>, payload: unknown): void => {
|
||||
const broadcast = (sockets: Set<LegacySocket>, payload: unknown): void => {
|
||||
const message = JSON.stringify(payload);
|
||||
|
||||
for (const socket of sockets) {
|
||||
|
|
@ -345,6 +393,71 @@ const broadcast = (sockets: Set<WebSocket<WsData>>, payload: unknown): void => {
|
|||
}
|
||||
};
|
||||
|
||||
const sendLiveMessage = (socket: LiveSocket, payload: LiveServerMessage): void => {
|
||||
try {
|
||||
socket.send(JSON.stringify(payload));
|
||||
} catch (error) {
|
||||
logger.warn("failed to send live websocket message", {
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const subscribeSocket = (socket: LiveSocket, subscription: LiveSubscription): void => {
|
||||
const key = getSubscriptionKey(subscription);
|
||||
const keys = liveSocketSubscriptions.get(socket) ?? new Set<string>();
|
||||
keys.add(key);
|
||||
liveSocketSubscriptions.set(socket, keys);
|
||||
|
||||
const sockets = subscriptionSockets.get(key) ?? new Set<LiveSocket>();
|
||||
sockets.add(socket);
|
||||
subscriptionSockets.set(key, sockets);
|
||||
};
|
||||
|
||||
const unsubscribeSocket = (socket: LiveSocket, subscription: LiveSubscription): void => {
|
||||
const key = getSubscriptionKey(subscription);
|
||||
liveSocketSubscriptions.get(socket)?.delete(key);
|
||||
|
||||
const sockets = subscriptionSockets.get(key);
|
||||
if (!sockets) {
|
||||
return;
|
||||
}
|
||||
sockets.delete(socket);
|
||||
if (sockets.size === 0) {
|
||||
subscriptionSockets.delete(key);
|
||||
}
|
||||
};
|
||||
|
||||
const cleanupLiveSocket = (socket: LiveSocket): void => {
|
||||
const keys = liveSocketSubscriptions.get(socket);
|
||||
if (keys) {
|
||||
for (const key of keys) {
|
||||
const sockets = subscriptionSockets.get(key);
|
||||
sockets?.delete(socket);
|
||||
if (sockets && sockets.size === 0) {
|
||||
subscriptionSockets.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
liveSocketSubscriptions.delete(socket);
|
||||
const heartbeat = liveHeartbeats.get(socket);
|
||||
if (heartbeat) {
|
||||
clearInterval(heartbeat);
|
||||
liveHeartbeats.delete(socket);
|
||||
}
|
||||
};
|
||||
|
||||
const buildHistoryResponse = <T extends { seq: number }>(
|
||||
items: T[],
|
||||
cursorOf: (item: T) => Cursor
|
||||
): { data: T[]; next_before: Cursor | null } => {
|
||||
const last = items.at(-1);
|
||||
return {
|
||||
data: items,
|
||||
next_before: last ? cursorOf(last) : null
|
||||
};
|
||||
};
|
||||
|
||||
const buildCandleCacheKey = (underlyingId: string, intervalMs: number): string => {
|
||||
return `candles:equity:${intervalMs}:${underlyingId}`;
|
||||
};
|
||||
|
|
@ -563,6 +676,9 @@ const run = async () => {
|
|||
redis = null;
|
||||
}
|
||||
|
||||
const liveState = new LiveStateManager(clickhouse, redis);
|
||||
await liveState.hydrate();
|
||||
|
||||
const subscribeWithReset = async <T>(
|
||||
subject: string,
|
||||
stream: string,
|
||||
|
|
@ -661,11 +777,34 @@ const run = async () => {
|
|||
"api-alerts"
|
||||
);
|
||||
|
||||
const fanoutLive = async (
|
||||
subscription: LiveSubscription,
|
||||
item: unknown,
|
||||
ingestChannel: "options" | "nbbo" | "equities" | "equity-candles" | "equity-overlay" | "equity-joins" | "flow" | "classifier-hits" | "alerts" | "inferred-dark"
|
||||
) => {
|
||||
const key = getSubscriptionKey(subscription);
|
||||
const sockets = subscriptionSockets.get(key);
|
||||
const watermark = await liveState.ingest(ingestChannel, item);
|
||||
if (!sockets || sockets.size === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (const socket of sockets) {
|
||||
sendLiveMessage(socket, {
|
||||
op: "event",
|
||||
subscription,
|
||||
item,
|
||||
watermark
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const pumpOptions = async () => {
|
||||
for await (const msg of optionSubscription.messages) {
|
||||
try {
|
||||
const payload = OptionPrintSchema.parse(optionSubscription.decode(msg));
|
||||
broadcast(optionSockets, { type: "option-print", payload });
|
||||
await fanoutLive({ channel: "options" }, payload, "options");
|
||||
msg.ack();
|
||||
} catch (error) {
|
||||
logger.error("failed to process option print", {
|
||||
|
|
@ -681,6 +820,7 @@ const run = async () => {
|
|||
try {
|
||||
const payload = OptionNBBOSchema.parse(optionNbboSubscription.decode(msg));
|
||||
broadcast(optionNbboSockets, { type: "option-nbbo", payload });
|
||||
await fanoutLive({ channel: "nbbo" }, payload, "nbbo");
|
||||
msg.ack();
|
||||
} catch (error) {
|
||||
logger.error("failed to process option nbbo", {
|
||||
|
|
@ -696,6 +836,12 @@ const run = async () => {
|
|||
try {
|
||||
const payload = EquityPrintSchema.parse(equitySubscription.decode(msg));
|
||||
broadcast(equitySockets, { type: "equity-print", payload });
|
||||
await fanoutLive({ channel: "equities" }, payload, "equities");
|
||||
await fanoutLive(
|
||||
{ channel: "equity-overlay", underlying_id: payload.underlying_id },
|
||||
payload,
|
||||
"equity-overlay"
|
||||
);
|
||||
msg.ack();
|
||||
} catch (error) {
|
||||
logger.error("failed to process equity print", {
|
||||
|
|
@ -726,6 +872,15 @@ const run = async () => {
|
|||
try {
|
||||
const payload = EquityCandleSchema.parse(equityCandleSubscription.decode(msg));
|
||||
broadcast(equityCandleSockets, { type: "equity-candle", payload });
|
||||
await fanoutLive(
|
||||
{
|
||||
channel: "equity-candles",
|
||||
underlying_id: payload.underlying_id,
|
||||
interval_ms: payload.interval_ms
|
||||
},
|
||||
payload,
|
||||
"equity-candles"
|
||||
);
|
||||
msg.ack();
|
||||
} catch (error) {
|
||||
logger.error("failed to process equity candle", {
|
||||
|
|
@ -741,6 +896,7 @@ const run = async () => {
|
|||
try {
|
||||
const payload = EquityPrintJoinSchema.parse(equityJoinSubscription.decode(msg));
|
||||
broadcast(equityJoinSockets, { type: "equity-join", payload });
|
||||
await fanoutLive({ channel: "equity-joins" }, payload, "equity-joins");
|
||||
msg.ack();
|
||||
} catch (error) {
|
||||
logger.error("failed to process equity join", {
|
||||
|
|
@ -756,6 +912,7 @@ const run = async () => {
|
|||
try {
|
||||
const payload = InferredDarkEventSchema.parse(inferredDarkSubscription.decode(msg));
|
||||
broadcast(inferredDarkSockets, { type: "inferred-dark", payload });
|
||||
await fanoutLive({ channel: "inferred-dark" }, payload, "inferred-dark");
|
||||
msg.ack();
|
||||
} catch (error) {
|
||||
logger.error("failed to process inferred dark event", {
|
||||
|
|
@ -771,6 +928,7 @@ const run = async () => {
|
|||
try {
|
||||
const payload = FlowPacketSchema.parse(flowSubscription.decode(msg));
|
||||
broadcast(flowSockets, { type: "flow-packet", payload });
|
||||
await fanoutLive({ channel: "flow" }, payload, "flow");
|
||||
msg.ack();
|
||||
} catch (error) {
|
||||
logger.error("failed to process flow packet", {
|
||||
|
|
@ -786,6 +944,7 @@ const run = async () => {
|
|||
try {
|
||||
const payload = ClassifierHitEventSchema.parse(classifierHitSubscription.decode(msg));
|
||||
broadcast(classifierHitSockets, { type: "classifier-hit", payload });
|
||||
await fanoutLive({ channel: "classifier-hits" }, payload, "classifier-hits");
|
||||
msg.ack();
|
||||
} catch (error) {
|
||||
logger.error("failed to process classifier hit", {
|
||||
|
|
@ -801,6 +960,7 @@ const run = async () => {
|
|||
try {
|
||||
const payload = AlertEventSchema.parse(alertSubscription.decode(msg));
|
||||
broadcast(alertSockets, { type: "alert", payload });
|
||||
await fanoutLive({ channel: "alerts" }, payload, "alerts");
|
||||
msg.ack();
|
||||
} catch (error) {
|
||||
logger.error("failed to process alert", {
|
||||
|
|
@ -822,9 +982,9 @@ const run = async () => {
|
|||
void pumpClassifierHits();
|
||||
void pumpAlerts();
|
||||
|
||||
const server = Bun.serve<WsData>({
|
||||
const server = Bun.serve<WsData | LiveWsData>({
|
||||
port: env.API_PORT,
|
||||
fetch: async (req, serverRef) => {
|
||||
fetch: async (req: Request, serverRef: any) => {
|
||||
const url = new URL(req.url);
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/health") {
|
||||
|
|
@ -940,6 +1100,84 @@ const run = async () => {
|
|||
return jsonResponse({ data });
|
||||
}
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/history/options") {
|
||||
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
|
||||
const source = parseReplaySource(url) ?? undefined;
|
||||
const data = await fetchOptionPrintsBefore(clickhouse, beforeTs, beforeSeq, limit, source);
|
||||
return jsonResponse(buildHistoryResponse(data, (item) => ({ ts: item.ts, seq: item.seq })));
|
||||
}
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/history/nbbo") {
|
||||
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
|
||||
const source = parseReplaySource(url) ?? undefined;
|
||||
const data = await fetchOptionNBBOBefore(clickhouse, beforeTs, beforeSeq, limit, source);
|
||||
return jsonResponse(buildHistoryResponse(data, (item) => ({ ts: item.ts, seq: item.seq })));
|
||||
}
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/history/equities") {
|
||||
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
|
||||
const data = await fetchEquityPrintsBefore(clickhouse, beforeTs, beforeSeq, limit);
|
||||
return jsonResponse(buildHistoryResponse(data, (item) => ({ ts: item.ts, seq: item.seq })));
|
||||
}
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/history/equity-joins") {
|
||||
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
|
||||
const data = await fetchEquityPrintJoinsBefore(clickhouse, beforeTs, beforeSeq, limit);
|
||||
return jsonResponse(
|
||||
buildHistoryResponse(data, (item) => ({ ts: item.source_ts, seq: item.seq }))
|
||||
);
|
||||
}
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/history/flow") {
|
||||
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
|
||||
const data = await fetchFlowPacketsBefore(clickhouse, beforeTs, beforeSeq, limit);
|
||||
return jsonResponse(
|
||||
buildHistoryResponse(data, (item) => ({ ts: item.source_ts, seq: item.seq }))
|
||||
);
|
||||
}
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/history/classifier-hits") {
|
||||
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
|
||||
const data = await fetchClassifierHitsBefore(clickhouse, beforeTs, beforeSeq, limit);
|
||||
return jsonResponse(
|
||||
buildHistoryResponse(data, (item) => ({ ts: item.source_ts, seq: item.seq }))
|
||||
);
|
||||
}
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/history/alerts") {
|
||||
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
|
||||
const data = await fetchAlertsBefore(clickhouse, beforeTs, beforeSeq, limit);
|
||||
return jsonResponse(
|
||||
buildHistoryResponse(data, (item) => ({ ts: item.source_ts, seq: item.seq }))
|
||||
);
|
||||
}
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/history/inferred-dark") {
|
||||
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
|
||||
const data = await fetchInferredDarkBefore(clickhouse, beforeTs, beforeSeq, limit);
|
||||
return jsonResponse(
|
||||
buildHistoryResponse(data, (item) => ({ ts: item.source_ts, seq: item.seq }))
|
||||
);
|
||||
}
|
||||
|
||||
if (req.method === "GET" && /^\/flow\/packets\/[^/]+$/.test(url.pathname)) {
|
||||
const id = decodeURIComponent(url.pathname.slice("/flow/packets/".length));
|
||||
const data = await fetchFlowPacketById(clickhouse, id);
|
||||
return jsonResponse({ data });
|
||||
}
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/option-prints/by-trace") {
|
||||
const traceIds = url.searchParams.getAll("trace_id");
|
||||
const data = await fetchOptionPrintsByTraceIds(clickhouse, traceIds);
|
||||
return jsonResponse({ data });
|
||||
}
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/equity-joins/by-id") {
|
||||
const ids = url.searchParams.getAll("id");
|
||||
const data = await fetchEquityPrintJoinsByIds(clickhouse, ids);
|
||||
return jsonResponse({ data });
|
||||
}
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/replay/options") {
|
||||
const { afterTs, afterSeq, limit } = parseReplayParams(url);
|
||||
const source = parseReplaySource(url) ?? undefined;
|
||||
|
|
@ -1120,11 +1358,25 @@ const run = async () => {
|
|||
return jsonResponse({ error: "websocket upgrade failed" }, 400);
|
||||
}
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/ws/live") {
|
||||
if (serverRef.upgrade(req, { data: { channel: "live" } })) {
|
||||
return new Response(null, { status: 101 });
|
||||
}
|
||||
|
||||
return jsonResponse({ error: "websocket upgrade failed" }, 400);
|
||||
}
|
||||
|
||||
return jsonResponse({ error: "not found" }, 404);
|
||||
},
|
||||
websocket: {
|
||||
open: (socket) => {
|
||||
if (socket.data.channel === "options") {
|
||||
open: (socket: any) => {
|
||||
if (socket.data.channel === "live") {
|
||||
sendLiveMessage(socket, { op: "ready" });
|
||||
const heartbeat = setInterval(() => {
|
||||
sendLiveMessage(socket, { op: "heartbeat", ts: Date.now() });
|
||||
}, 15000);
|
||||
liveHeartbeats.set(socket, heartbeat);
|
||||
} else if (socket.data.channel === "options") {
|
||||
optionSockets.add(socket);
|
||||
} else if (socket.data.channel === "options-nbbo") {
|
||||
optionNbboSockets.add(socket);
|
||||
|
|
@ -1148,8 +1400,44 @@ const run = async () => {
|
|||
|
||||
logger.info("websocket connected", { channel: socket.data.channel });
|
||||
},
|
||||
close: (socket) => {
|
||||
if (socket.data.channel === "options") {
|
||||
message: async (socket: any, message: string | ArrayBuffer | Uint8Array) => {
|
||||
if (socket.data.channel !== "live") {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const payload =
|
||||
typeof message === "string"
|
||||
? message
|
||||
: new TextDecoder().decode(message instanceof Uint8Array ? message : new Uint8Array(message));
|
||||
const parsed = LiveClientMessageSchema.parse(JSON.parse(payload));
|
||||
if (parsed.op === "ping") {
|
||||
sendLiveMessage(socket, { op: "heartbeat", ts: Date.now() });
|
||||
return;
|
||||
}
|
||||
|
||||
for (const subscription of parsed.subscriptions) {
|
||||
LiveSubscriptionSchema.parse(subscription);
|
||||
if (parsed.op === "unsubscribe") {
|
||||
unsubscribeSocket(socket, subscription);
|
||||
continue;
|
||||
}
|
||||
|
||||
subscribeSocket(socket, subscription);
|
||||
const snapshot = await liveState.getSnapshot(subscription);
|
||||
sendLiveMessage(socket, { op: "snapshot", snapshot });
|
||||
}
|
||||
} catch (error) {
|
||||
sendLiveMessage(socket, {
|
||||
op: "error",
|
||||
message: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
}
|
||||
},
|
||||
close: (socket: any) => {
|
||||
if (socket.data.channel === "live") {
|
||||
cleanupLiveSocket(socket);
|
||||
} else if (socket.data.channel === "options") {
|
||||
optionSockets.delete(socket);
|
||||
} else if (socket.data.channel === "options-nbbo") {
|
||||
optionNbboSockets.delete(socket);
|
||||
|
|
|
|||
370
services/api/src/live.ts
Normal file
370
services/api/src/live.ts
Normal file
|
|
@ -0,0 +1,370 @@
|
|||
import {
|
||||
fetchRecentAlerts,
|
||||
fetchRecentClassifierHits,
|
||||
fetchRecentEquityCandles,
|
||||
fetchRecentEquityPrintJoins,
|
||||
fetchRecentEquityPrints,
|
||||
fetchRecentFlowPackets,
|
||||
fetchRecentInferredDark,
|
||||
fetchRecentOptionNBBO,
|
||||
fetchRecentOptionPrints,
|
||||
type ClickHouseClient
|
||||
} from "@islandflow/storage";
|
||||
import {
|
||||
AlertEventSchema,
|
||||
ClassifierHitEventSchema,
|
||||
CursorSchema,
|
||||
EquityCandleSchema,
|
||||
EquityPrintJoinSchema,
|
||||
EquityPrintSchema,
|
||||
FeedSnapshot,
|
||||
FlowPacketSchema,
|
||||
InferredDarkEventSchema,
|
||||
LiveGenericChannel,
|
||||
LiveSubscription,
|
||||
OptionNBBOSchema,
|
||||
OptionPrintSchema,
|
||||
type Cursor,
|
||||
type EquityCandle,
|
||||
type EquityPrint,
|
||||
type LiveChannel
|
||||
} from "@islandflow/types";
|
||||
import type { RedisClientType } from "redis";
|
||||
|
||||
const CURSOR_HASH_KEY = "live:cursors";
|
||||
|
||||
const GENERIC_LIMITS = {
|
||||
options: 500,
|
||||
nbbo: 500,
|
||||
equities: 500,
|
||||
"equity-joins": 500,
|
||||
flow: 500,
|
||||
"classifier-hits": 500,
|
||||
alerts: 500,
|
||||
"inferred-dark": 500
|
||||
} as const;
|
||||
|
||||
const CHART_LIMITS = {
|
||||
candles: 500,
|
||||
overlay: 1500
|
||||
} as const;
|
||||
|
||||
type GenericFeedConfig = {
|
||||
redisKey: string;
|
||||
cursorField: string;
|
||||
limit: number;
|
||||
parse: (value: unknown) => any;
|
||||
cursor: (item: any) => Cursor;
|
||||
fetchRecent: (clickhouse: ClickHouseClient, limit: number) => Promise<any[]>;
|
||||
};
|
||||
|
||||
type RedisLike = Pick<
|
||||
RedisClientType,
|
||||
"isOpen" | "lRange" | "lPush" | "lTrim" | "hGet" | "hSet"
|
||||
>;
|
||||
|
||||
const parseCursor = (value: string | null): Cursor | null => {
|
||||
if (!value) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
return CursorSchema.parse(JSON.parse(value));
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
const getGenericConfig = (): {
|
||||
[K in LiveGenericChannel]: GenericFeedConfig;
|
||||
} => ({
|
||||
options: {
|
||||
redisKey: "live:options",
|
||||
cursorField: "options",
|
||||
limit: GENERIC_LIMITS.options,
|
||||
parse: (value) => OptionPrintSchema.parse(value),
|
||||
cursor: (item) => ({ ts: item.ts, seq: item.seq }),
|
||||
fetchRecent: fetchRecentOptionPrints
|
||||
},
|
||||
nbbo: {
|
||||
redisKey: "live:nbbo",
|
||||
cursorField: "nbbo",
|
||||
limit: GENERIC_LIMITS.nbbo,
|
||||
parse: (value) => OptionNBBOSchema.parse(value),
|
||||
cursor: (item) => ({ ts: item.ts, seq: item.seq }),
|
||||
fetchRecent: fetchRecentOptionNBBO
|
||||
},
|
||||
equities: {
|
||||
redisKey: "live:equities",
|
||||
cursorField: "equities",
|
||||
limit: GENERIC_LIMITS.equities,
|
||||
parse: (value) => EquityPrintSchema.parse(value),
|
||||
cursor: (item) => ({ ts: item.ts, seq: item.seq }),
|
||||
fetchRecent: fetchRecentEquityPrints
|
||||
},
|
||||
"equity-joins": {
|
||||
redisKey: "live:equity-joins",
|
||||
cursorField: "equity-joins",
|
||||
limit: GENERIC_LIMITS["equity-joins"],
|
||||
parse: (value) => EquityPrintJoinSchema.parse(value),
|
||||
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
|
||||
fetchRecent: fetchRecentEquityPrintJoins
|
||||
},
|
||||
flow: {
|
||||
redisKey: "live:flow",
|
||||
cursorField: "flow",
|
||||
limit: GENERIC_LIMITS.flow,
|
||||
parse: (value) => FlowPacketSchema.parse(value),
|
||||
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
|
||||
fetchRecent: fetchRecentFlowPackets
|
||||
},
|
||||
"classifier-hits": {
|
||||
redisKey: "live:classifier-hits",
|
||||
cursorField: "classifier-hits",
|
||||
limit: GENERIC_LIMITS["classifier-hits"],
|
||||
parse: (value) => ClassifierHitEventSchema.parse(value),
|
||||
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
|
||||
fetchRecent: fetchRecentClassifierHits
|
||||
},
|
||||
alerts: {
|
||||
redisKey: "live:alerts",
|
||||
cursorField: "alerts",
|
||||
limit: GENERIC_LIMITS.alerts,
|
||||
parse: (value) => AlertEventSchema.parse(value),
|
||||
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
|
||||
fetchRecent: fetchRecentAlerts
|
||||
},
|
||||
"inferred-dark": {
|
||||
redisKey: "live:inferred-dark",
|
||||
cursorField: "inferred-dark",
|
||||
limit: GENERIC_LIMITS["inferred-dark"],
|
||||
parse: (value) => InferredDarkEventSchema.parse(value),
|
||||
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
|
||||
fetchRecent: fetchRecentInferredDark
|
||||
}
|
||||
});
|
||||
|
||||
const parseJsonList = <T>(payloads: string[], parse: (value: unknown) => T): T[] => {
|
||||
const items: T[] = [];
|
||||
for (const payload of payloads) {
|
||||
try {
|
||||
items.push(parse(JSON.parse(payload)));
|
||||
} catch {
|
||||
// ignore bad cache entries
|
||||
}
|
||||
}
|
||||
return items;
|
||||
};
|
||||
|
||||
const nextBeforeForItems = <T>(items: T[], cursorOf: (item: T) => Cursor): Cursor | null => {
|
||||
const last = items.at(-1);
|
||||
return last ? cursorOf(last) : null;
|
||||
};
|
||||
|
||||
const candleRedisKey = (underlyingId: string, intervalMs: number): string =>
|
||||
`live:equity-candles:${underlyingId}:${intervalMs}`;
|
||||
|
||||
const candleCursorField = (underlyingId: string, intervalMs: number): string =>
|
||||
`equity-candles:${underlyingId}:${intervalMs}`;
|
||||
|
||||
const overlayRedisKey = (underlyingId: string): string => `live:equity-overlay:${underlyingId}`;
|
||||
const overlayCursorField = (underlyingId: string): string => `equities:${underlyingId}`;
|
||||
|
||||
export class LiveStateManager {
|
||||
private readonly generic = getGenericConfig();
|
||||
private readonly genericItems = new Map<LiveGenericChannel, any[]>();
|
||||
private readonly genericCursors = new Map<string, Cursor | null>();
|
||||
private readonly candleItems = new Map<string, EquityCandle[]>();
|
||||
private readonly candleCursors = new Map<string, Cursor | null>();
|
||||
private readonly overlayItems = new Map<string, EquityPrint[]>();
|
||||
private readonly overlayCursors = new Map<string, Cursor | null>();
|
||||
|
||||
constructor(
|
||||
private readonly clickhouse: ClickHouseClient,
|
||||
private readonly redis: RedisLike | null
|
||||
) {}
|
||||
|
||||
async hydrate(): Promise<void> {
|
||||
const channels = Object.keys(this.generic) as LiveGenericChannel[];
|
||||
await Promise.all(channels.map((channel) => this.hydrateGeneric(channel)));
|
||||
}
|
||||
|
||||
private async hydrateGeneric(channel: LiveGenericChannel): Promise<void> {
|
||||
const config = this.generic[channel];
|
||||
if (this.redis?.isOpen) {
|
||||
const payloads = await this.redis.lRange(config.redisKey, 0, config.limit - 1);
|
||||
const cached = parseJsonList(payloads, config.parse);
|
||||
if (cached.length > 0) {
|
||||
this.genericItems.set(channel, cached);
|
||||
this.genericCursors.set(config.cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, config.cursorField)));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const fresh = await config.fetchRecent(this.clickhouse, config.limit);
|
||||
this.genericItems.set(channel, fresh);
|
||||
const watermark = fresh[0] ? config.cursor(fresh[0]) : null;
|
||||
this.genericCursors.set(config.cursorField, watermark);
|
||||
await this.persistList(config.redisKey, config.cursorField, fresh, config.limit, watermark);
|
||||
}
|
||||
|
||||
async getSnapshot(subscription: LiveSubscription): Promise<FeedSnapshot<unknown>> {
|
||||
switch (subscription.channel) {
|
||||
case "equity-candles": {
|
||||
const key = candleRedisKey(subscription.underlying_id, subscription.interval_ms);
|
||||
const cursorField = candleCursorField(subscription.underlying_id, subscription.interval_ms);
|
||||
if (!this.candleItems.has(key)) {
|
||||
await this.hydrateCandles(subscription.underlying_id, subscription.interval_ms);
|
||||
}
|
||||
const items = this.candleItems.get(key) ?? [];
|
||||
return {
|
||||
subscription,
|
||||
items,
|
||||
watermark: this.candleCursors.get(cursorField) ?? null,
|
||||
next_before: nextBeforeForItems(items, (item) => ({ ts: item.ts, seq: item.seq }))
|
||||
};
|
||||
}
|
||||
case "equity-overlay": {
|
||||
const key = overlayRedisKey(subscription.underlying_id);
|
||||
const cursorField = overlayCursorField(subscription.underlying_id);
|
||||
if (!this.overlayItems.has(key)) {
|
||||
await this.hydrateOverlay(subscription.underlying_id);
|
||||
}
|
||||
const items = this.overlayItems.get(key) ?? [];
|
||||
return {
|
||||
subscription,
|
||||
items,
|
||||
watermark: this.overlayCursors.get(cursorField) ?? null,
|
||||
next_before: nextBeforeForItems(items, (item) => ({ ts: item.ts, seq: item.seq }))
|
||||
};
|
||||
}
|
||||
default: {
|
||||
const config = this.generic[subscription.channel];
|
||||
const items = this.genericItems.get(subscription.channel) ?? [];
|
||||
return {
|
||||
subscription,
|
||||
items,
|
||||
watermark: this.genericCursors.get(config.cursorField) ?? null,
|
||||
next_before: nextBeforeForItems(items, config.cursor)
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async ingest(channel: LiveChannel, item: unknown): Promise<Cursor | null> {
|
||||
switch (channel) {
|
||||
case "equity-candles": {
|
||||
const candle = EquityCandleSchema.parse(item);
|
||||
const key = candleRedisKey(candle.underlying_id, candle.interval_ms);
|
||||
const cursorField = candleCursorField(candle.underlying_id, candle.interval_ms);
|
||||
const items = this.candleItems.get(key) ?? [];
|
||||
const next = [candle, ...items]
|
||||
.sort((a, b) => (b.ts - a.ts) || (b.seq - a.seq))
|
||||
.slice(0, CHART_LIMITS.candles);
|
||||
this.candleItems.set(key, next);
|
||||
const cursor = { ts: candle.ts, seq: candle.seq };
|
||||
this.candleCursors.set(cursorField, cursor);
|
||||
await this.persistList(key, cursorField, next, CHART_LIMITS.candles, cursor);
|
||||
return cursor;
|
||||
}
|
||||
case "equity-overlay": {
|
||||
const print = EquityPrintSchema.parse(item);
|
||||
const key = overlayRedisKey(print.underlying_id);
|
||||
const cursorField = overlayCursorField(print.underlying_id);
|
||||
const items = this.overlayItems.get(key) ?? [];
|
||||
const next = [print, ...items]
|
||||
.sort((a, b) => (b.ts - a.ts) || (b.seq - a.seq))
|
||||
.slice(0, CHART_LIMITS.overlay);
|
||||
this.overlayItems.set(key, next);
|
||||
const cursor = { ts: print.ts, seq: print.seq };
|
||||
this.overlayCursors.set(cursorField, cursor);
|
||||
await this.persistList(key, cursorField, next, CHART_LIMITS.overlay, cursor);
|
||||
return cursor;
|
||||
}
|
||||
default: {
|
||||
const config = this.generic[channel];
|
||||
const parsed = config.parse(item);
|
||||
const items = this.genericItems.get(channel) ?? [];
|
||||
const next = [parsed, ...items]
|
||||
.sort((a, b) => {
|
||||
const aCursor = config.cursor(a);
|
||||
const bCursor = config.cursor(b);
|
||||
return (bCursor.ts - aCursor.ts) || (bCursor.seq - aCursor.seq);
|
||||
})
|
||||
.slice(0, config.limit);
|
||||
this.genericItems.set(channel, next);
|
||||
const cursor = config.cursor(parsed);
|
||||
this.genericCursors.set(config.cursorField, cursor);
|
||||
await this.persistList(config.redisKey, config.cursorField, next, config.limit, cursor);
|
||||
return cursor;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async hydrateCandles(underlyingId: string, intervalMs: number): Promise<void> {
|
||||
const key = candleRedisKey(underlyingId, intervalMs);
|
||||
const cursorField = candleCursorField(underlyingId, intervalMs);
|
||||
if (this.redis?.isOpen) {
|
||||
const payloads = await this.redis.lRange(key, 0, CHART_LIMITS.candles - 1);
|
||||
const cached = parseJsonList(payloads, (value) => EquityCandleSchema.parse(value));
|
||||
if (cached.length > 0) {
|
||||
this.candleItems.set(key, cached);
|
||||
this.candleCursors.set(cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, cursorField)));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const fresh = await fetchRecentEquityCandles(this.clickhouse, underlyingId, intervalMs, CHART_LIMITS.candles);
|
||||
this.candleItems.set(key, fresh);
|
||||
const watermark = fresh[0] ? { ts: fresh[0].ts, seq: fresh[0].seq } : null;
|
||||
this.candleCursors.set(cursorField, watermark);
|
||||
await this.persistList(key, cursorField, fresh, CHART_LIMITS.candles, watermark);
|
||||
}
|
||||
|
||||
private async hydrateOverlay(underlyingId: string): Promise<void> {
|
||||
const key = overlayRedisKey(underlyingId);
|
||||
const cursorField = overlayCursorField(underlyingId);
|
||||
if (this.redis?.isOpen) {
|
||||
const payloads = await this.redis.lRange(key, 0, CHART_LIMITS.overlay - 1);
|
||||
const cached = parseJsonList(payloads, (value) => EquityPrintSchema.parse(value));
|
||||
if (cached.length > 0) {
|
||||
this.overlayItems.set(key, cached);
|
||||
this.overlayCursors.set(cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, cursorField)));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const fresh = (await fetchRecentEquityPrints(this.clickhouse, CHART_LIMITS.overlay)).filter(
|
||||
(item) => item.underlying_id === underlyingId
|
||||
);
|
||||
this.overlayItems.set(key, fresh);
|
||||
const watermark = fresh[0] ? { ts: fresh[0].ts, seq: fresh[0].seq } : null;
|
||||
this.overlayCursors.set(cursorField, watermark);
|
||||
await this.persistList(key, cursorField, fresh, CHART_LIMITS.overlay, watermark);
|
||||
}
|
||||
|
||||
private async persistList<T>(
|
||||
listKey: string,
|
||||
cursorField: string,
|
||||
items: T[],
|
||||
limit: number,
|
||||
cursor: Cursor | null
|
||||
): Promise<void> {
|
||||
if (!this.redis?.isOpen) {
|
||||
return;
|
||||
}
|
||||
|
||||
const payloads = items.map((item) => JSON.stringify(item));
|
||||
await this.redis.lTrim(listKey, 1, 0);
|
||||
if (payloads.length > 0) {
|
||||
for (let idx = payloads.length - 1; idx >= 0; idx -= 1) {
|
||||
const payload = payloads[idx];
|
||||
if (payload) {
|
||||
await this.redis.lPush(listKey, payload);
|
||||
}
|
||||
}
|
||||
await this.redis.lTrim(listKey, 0, limit - 1);
|
||||
}
|
||||
await this.redis.hSet(CURSOR_HASH_KEY, cursorField, JSON.stringify(cursor));
|
||||
}
|
||||
}
|
||||
123
services/api/tests/live.test.ts
Normal file
123
services/api/tests/live.test.ts
Normal file
|
|
@ -0,0 +1,123 @@
|
|||
import { describe, expect, it } from "bun:test";
|
||||
import type { ClickHouseClient } from "@islandflow/storage";
|
||||
import { LiveStateManager } from "../src/live";
|
||||
|
||||
const makeClickHouse = (): ClickHouseClient =>
|
||||
({
|
||||
exec: async () => {},
|
||||
insert: async () => {},
|
||||
ping: async () => ({ success: true }),
|
||||
close: async () => {},
|
||||
query: async () => ({
|
||||
async json<T>() {
|
||||
return [] as T;
|
||||
}
|
||||
})
|
||||
}) as ClickHouseClient;
|
||||
|
||||
const makeRedis = () => {
|
||||
const lists = new Map<string, string[]>();
|
||||
const hashes = new Map<string, Map<string, string>>();
|
||||
|
||||
return {
|
||||
isOpen: true,
|
||||
async lRange(key: string, start: number, stop: number) {
|
||||
return (lists.get(key) ?? []).slice(start, stop + 1);
|
||||
},
|
||||
async lPush(key: string, value: string) {
|
||||
const next = lists.get(key) ?? [];
|
||||
next.unshift(value);
|
||||
lists.set(key, next);
|
||||
return next.length;
|
||||
},
|
||||
async lTrim(key: string, start: number, stop: number) {
|
||||
const next = lists.get(key) ?? [];
|
||||
lists.set(key, start > stop ? [] : next.slice(start, stop + 1));
|
||||
return "OK";
|
||||
},
|
||||
async hGet(key: string, field: string) {
|
||||
return hashes.get(key)?.get(field) ?? null;
|
||||
},
|
||||
async hSet(key: string, field: string, value: string) {
|
||||
const hash = hashes.get(key) ?? new Map<string, string>();
|
||||
hash.set(field, value);
|
||||
hashes.set(key, hash);
|
||||
return 1;
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
describe("LiveStateManager", () => {
|
||||
it("hydrates snapshots from redis generic windows", async () => {
|
||||
const redis = makeRedis();
|
||||
await redis.lPush(
|
||||
"live:flow",
|
||||
JSON.stringify({
|
||||
source_ts: 100,
|
||||
ingest_ts: 101,
|
||||
seq: 1,
|
||||
trace_id: "flow-1",
|
||||
id: "flow-1",
|
||||
members: ["a"],
|
||||
features: {},
|
||||
join_quality: {}
|
||||
})
|
||||
);
|
||||
await redis.hSet("live:cursors", "flow", JSON.stringify({ ts: 100, seq: 1 }));
|
||||
|
||||
const manager = new LiveStateManager(makeClickHouse(), redis as never);
|
||||
await manager.hydrate();
|
||||
const snapshot = await manager.getSnapshot({ channel: "flow" });
|
||||
|
||||
expect(snapshot.items).toHaveLength(1);
|
||||
expect(snapshot.watermark).toEqual({ ts: 100, seq: 1 });
|
||||
expect(snapshot.next_before).toEqual({ ts: 100, seq: 1 });
|
||||
});
|
||||
|
||||
it("persists parameterized candle and overlay caches on ingest", async () => {
|
||||
const redis = makeRedis();
|
||||
const manager = new LiveStateManager(makeClickHouse(), redis as never);
|
||||
await manager.ingest("equity-candles", {
|
||||
source_ts: 100,
|
||||
ingest_ts: 101,
|
||||
seq: 1,
|
||||
trace_id: "candle:SPY:60000:100",
|
||||
ts: 100,
|
||||
interval_ms: 60000,
|
||||
underlying_id: "SPY",
|
||||
open: 1,
|
||||
high: 2,
|
||||
low: 1,
|
||||
close: 2,
|
||||
volume: 10,
|
||||
trade_count: 1
|
||||
});
|
||||
await manager.ingest("equity-overlay", {
|
||||
source_ts: 110,
|
||||
ingest_ts: 111,
|
||||
seq: 2,
|
||||
trace_id: "eq-1",
|
||||
ts: 110,
|
||||
underlying_id: "SPY",
|
||||
price: 10,
|
||||
size: 5,
|
||||
exchange: "X",
|
||||
offExchangeFlag: true
|
||||
});
|
||||
|
||||
const candleSnapshot = await manager.getSnapshot({
|
||||
channel: "equity-candles",
|
||||
underlying_id: "SPY",
|
||||
interval_ms: 60000
|
||||
});
|
||||
const overlaySnapshot = await manager.getSnapshot({
|
||||
channel: "equity-overlay",
|
||||
underlying_id: "SPY"
|
||||
});
|
||||
|
||||
expect(candleSnapshot.items).toHaveLength(1);
|
||||
expect(overlaySnapshot.items).toHaveLength(1);
|
||||
expect(candleSnapshot.watermark).toEqual({ ts: 100, seq: 1 });
|
||||
expect(overlaySnapshot.watermark).toEqual({ ts: 110, seq: 2 });
|
||||
});
|
||||
});
|
||||
Loading…
Add table
Add a link
Reference in a new issue