Merge pull request #20 from dirtydishes/persistence

unify live session streaming and evidence fetching
This commit is contained in:
dirtydishes 2026-04-27 13:36:20 -04:00 committed by GitHub
commit 2c1e4cb3e1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 1923 additions and 258 deletions

File diff suppressed because it is too large Load diff

View file

@ -418,6 +418,14 @@ const clampLimit = (limit: number): number => {
return Math.max(1, Math.min(1000, Math.floor(limit))); return Math.max(1, Math.min(1000, Math.floor(limit)));
}; };
const clampLookupLimit = (limit: number): number => {
if (!Number.isFinite(limit)) {
return 100;
}
return Math.max(1, Math.min(5000, Math.floor(limit)));
};
const clampPositiveInt = (value: number, fallback = 1): number => { const clampPositiveInt = (value: number, fallback = 1): number => {
if (!Number.isFinite(value)) { if (!Number.isFinite(value)) {
return fallback; return fallback;
@ -450,6 +458,10 @@ const quoteString = (value: string): string => {
return `'${escaped}'`; return `'${escaped}'`;
}; };
const buildStringList = (values: string[]): string => {
return values.map((value) => quoteString(value)).join(", ");
};
const buildTracePrefixCondition = (tracePrefix: string | undefined): string | null => { const buildTracePrefixCondition = (tracePrefix: string | undefined): string | null => {
if (!tracePrefix) { if (!tracePrefix) {
return null; return null;
@ -461,6 +473,15 @@ const buildTracePrefixCondition = (tracePrefix: string | undefined): string | nu
return `startsWith(trace_id, ${quoteString(normalized)})`; return `startsWith(trace_id, ${quoteString(normalized)})`;
}; };
const buildBeforeTupleCondition = (
tsColumn: string,
seqColumn: string,
beforeTs: number,
beforeSeq: number
): string => {
return `(${tsColumn}, ${seqColumn}) < (${clampCursor(beforeTs)}, ${clampCursor(beforeSeq)})`;
};
const normalizeNumericFields = ( const normalizeNumericFields = (
row: Record<string, unknown>, row: Record<string, unknown>,
fields: string[] fields: string[]
@ -1095,3 +1116,215 @@ export const fetchAlertsAfter = async (
const alerts = records.map(fromAlertRecord); const alerts = records.map(fromAlertRecord);
return AlertEventSchema.array().parse(alerts); return AlertEventSchema.array().parse(alerts);
}; };
export const fetchOptionPrintsBefore = async (
client: ClickHouseClient,
beforeTs: number,
beforeSeq: number,
limit: number,
tracePrefix?: string
): Promise<OptionPrint[]> => {
const safeLimit = clampLimit(limit);
const conditions = [buildBeforeTupleCondition("ts", "seq", beforeTs, beforeSeq)];
const traceCondition = buildTracePrefixCondition(tracePrefix);
if (traceCondition) {
conditions.push(traceCondition);
}
const result = await client.query({
query: `SELECT * FROM ${OPTION_PRINTS_TABLE} WHERE ${conditions.join(" AND ")} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
return OptionPrintSchema.array().parse(rows.map(normalizeOptionRow));
};
export const fetchOptionNBBOBefore = async (
client: ClickHouseClient,
beforeTs: number,
beforeSeq: number,
limit: number,
tracePrefix?: string
): Promise<OptionNBBO[]> => {
const safeLimit = clampLimit(limit);
const conditions = [buildBeforeTupleCondition("ts", "seq", beforeTs, beforeSeq)];
const traceCondition = buildTracePrefixCondition(tracePrefix);
if (traceCondition) {
conditions.push(traceCondition);
}
const result = await client.query({
query: `SELECT * FROM ${OPTION_NBBO_TABLE} WHERE ${conditions.join(" AND ")} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
return OptionNBBOSchema.array().parse(rows.map(normalizeOptionNbboRow));
};
export const fetchEquityPrintsBefore = async (
client: ClickHouseClient,
beforeTs: number,
beforeSeq: number,
limit: number
): Promise<EquityPrint[]> => {
const safeLimit = clampLimit(limit);
const result = await client.query({
query: `SELECT * FROM ${EQUITY_PRINTS_TABLE} WHERE ${buildBeforeTupleCondition("ts", "seq", beforeTs, beforeSeq)} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
return EquityPrintSchema.array().parse(rows.map(normalizeEquityRow));
};
export const fetchEquityPrintJoinsBefore = async (
client: ClickHouseClient,
beforeTs: number,
beforeSeq: number,
limit: number
): Promise<EquityPrintJoin[]> => {
const safeLimit = clampLimit(limit);
const result = await client.query({
query: `SELECT * FROM ${EQUITY_PRINT_JOINS_TABLE} WHERE ${buildBeforeTupleCondition("source_ts", "seq", beforeTs, beforeSeq)} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const records = rows
.map(normalizeEquityPrintJoinRow)
.filter((record): record is EquityPrintJoinRecord => record !== null);
return EquityPrintJoinSchema.array().parse(records.map(fromEquityPrintJoinRecord));
};
export const fetchFlowPacketsBefore = async (
client: ClickHouseClient,
beforeTs: number,
beforeSeq: number,
limit: number
): Promise<FlowPacket[]> => {
const safeLimit = clampLimit(limit);
const result = await client.query({
query: `SELECT * FROM ${FLOW_PACKETS_TABLE} WHERE ${buildBeforeTupleCondition("source_ts", "seq", beforeTs, beforeSeq)} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const records = rows
.map(normalizeFlowPacketRow)
.filter((record): record is FlowPacketRecord => record !== null);
return FlowPacketSchema.array().parse(records.map(fromFlowPacketRecord));
};
export const fetchClassifierHitsBefore = async (
client: ClickHouseClient,
beforeTs: number,
beforeSeq: number,
limit: number
): Promise<ClassifierHitEvent[]> => {
const safeLimit = clampLimit(limit);
const result = await client.query({
query: `SELECT * FROM ${CLASSIFIER_HITS_TABLE} WHERE ${buildBeforeTupleCondition("source_ts", "seq", beforeTs, beforeSeq)} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const records = rows
.map(normalizeClassifierHitRow)
.filter((record): record is ClassifierHitRecord => record !== null);
return ClassifierHitEventSchema.array().parse(records.map(fromClassifierHitRecord));
};
export const fetchAlertsBefore = async (
client: ClickHouseClient,
beforeTs: number,
beforeSeq: number,
limit: number
): Promise<AlertEvent[]> => {
const safeLimit = clampLimit(limit);
const result = await client.query({
query: `SELECT * FROM ${ALERTS_TABLE} WHERE ${buildBeforeTupleCondition("source_ts", "seq", beforeTs, beforeSeq)} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const records = rows
.map(normalizeAlertRow)
.filter((record): record is AlertRecord => record !== null);
return AlertEventSchema.array().parse(records.map(fromAlertRecord));
};
export const fetchInferredDarkBefore = async (
client: ClickHouseClient,
beforeTs: number,
beforeSeq: number,
limit: number
): Promise<InferredDarkEvent[]> => {
const safeLimit = clampLimit(limit);
const result = await client.query({
query: `SELECT * FROM ${INFERRED_DARK_TABLE} WHERE ${buildBeforeTupleCondition("source_ts", "seq", beforeTs, beforeSeq)} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const records = rows
.map(normalizeInferredDarkRow)
.filter((record): record is InferredDarkRecord => record !== null);
return InferredDarkEventSchema.array().parse(records.map(fromInferredDarkRecord));
};
export const fetchFlowPacketById = async (
client: ClickHouseClient,
id: string
): Promise<FlowPacket | null> => {
const result = await client.query({
query: `SELECT * FROM ${FLOW_PACKETS_TABLE} WHERE id = ${quoteString(id)} ORDER BY source_ts DESC, seq DESC LIMIT 1`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const record = rows
.map(normalizeFlowPacketRow)
.find((row): row is FlowPacketRecord => row !== null);
return record ? FlowPacketSchema.parse(fromFlowPacketRecord(record)) : null;
};
export const fetchOptionPrintsByTraceIds = async (
client: ClickHouseClient,
traceIds: string[]
): Promise<OptionPrint[]> => {
const ids = Array.from(new Set(traceIds.map((id) => id.trim()).filter(Boolean)));
if (ids.length === 0) {
return [];
}
const result = await client.query({
query: `SELECT * FROM ${OPTION_PRINTS_TABLE} WHERE trace_id IN (${buildStringList(ids)}) ORDER BY ts DESC, seq DESC LIMIT ${clampLookupLimit(ids.length)}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
return OptionPrintSchema.array().parse(rows.map(normalizeOptionRow));
};
export const fetchEquityPrintJoinsByIds = async (
client: ClickHouseClient,
ids: string[]
): Promise<EquityPrintJoin[]> => {
const uniqueIds = Array.from(new Set(ids.map((id) => id.trim()).filter(Boolean)));
if (uniqueIds.length === 0) {
return [];
}
const result = await client.query({
query: `SELECT * FROM ${EQUITY_PRINT_JOINS_TABLE} WHERE id IN (${buildStringList(uniqueIds)}) ORDER BY source_ts DESC, seq DESC LIMIT ${clampLookupLimit(uniqueIds.length)}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const records = rows
.map(normalizeEquityPrintJoinRow)
.filter((record): record is EquityPrintJoinRecord => record !== null);
return EquityPrintJoinSchema.array().parse(records.map(fromEquityPrintJoinRecord));
};

View file

@ -1,4 +1,5 @@
import { describe, expect, it } from "bun:test"; import { describe, expect, it } from "bun:test";
import { createClickHouseClient, fetchFlowPacketById, fetchFlowPacketsBefore } from "../src/clickhouse";
import { import {
flowPacketsTableDDL, flowPacketsTableDDL,
FLOW_PACKETS_TABLE, FLOW_PACKETS_TABLE,
@ -36,4 +37,24 @@ describe("flow-packets storage helpers", () => {
expect(restored.features).toEqual(packet.features); expect(restored.features).toEqual(packet.features);
expect(restored.join_quality).toEqual(packet.join_quality); expect(restored.join_quality).toEqual(packet.join_quality);
}); });
it("builds before-history and id lookup queries", async () => {
const queries: string[] = [];
const client = createClickHouseClient({ url: "http://127.0.0.1:8123" });
client.query = async ({ query }) => {
queries.push(query);
return {
async json<T>() {
return [] as T;
}
};
};
await fetchFlowPacketsBefore(client, 200, 3, 15);
await fetchFlowPacketById(client, "fp-1");
expect(queries[0]).toContain("(source_ts, seq) < (200, 3)");
expect(queries[0]).toContain("ORDER BY source_ts DESC, seq DESC LIMIT 15");
expect(queries[1]).toContain("WHERE id = 'fp-1'");
});
}); });

View file

@ -1,4 +1,5 @@
import { describe, expect, it } from "bun:test"; import { describe, expect, it } from "bun:test";
import { createClickHouseClient, fetchOptionPrintsBefore, fetchOptionPrintsByTraceIds } from "../src/clickhouse";
import { normalizeOptionPrint, optionPrintsTableDDL, OPTION_PRINTS_TABLE } from "../src/option-prints"; import { normalizeOptionPrint, optionPrintsTableDDL, OPTION_PRINTS_TABLE } from "../src/option-prints";
const basePrint = { const basePrint = {
@ -24,4 +25,25 @@ describe("option-prints storage helpers", () => {
expect(ddl).toContain(OPTION_PRINTS_TABLE); expect(ddl).toContain(OPTION_PRINTS_TABLE);
expect(ddl).toContain("CREATE TABLE IF NOT EXISTS"); expect(ddl).toContain("CREATE TABLE IF NOT EXISTS");
}); });
it("builds before/history and trace lookup queries", async () => {
const queries: string[] = [];
const client = createClickHouseClient({ url: "http://127.0.0.1:8123" });
client.query = async ({ query }) => {
queries.push(query);
return {
async json<T>() {
return [] as T;
}
};
};
await fetchOptionPrintsBefore(client, 100, 5, 20, "alpaca");
await fetchOptionPrintsByTraceIds(client, ["trace-1", "trace-2"]);
expect(queries[0]).toContain("(ts, seq) < (100, 5)");
expect(queries[0]).toContain("startsWith(trace_id, 'alpaca')");
expect(queries[0]).toContain("ORDER BY ts DESC, seq DESC LIMIT 20");
expect(queries[1]).toContain("trace_id IN ('trace-1', 'trace-2')");
});
}); });

View file

@ -1,2 +1,3 @@
export * from "./events"; export * from "./events";
export * from "./live";
export * from "./sp500"; export * from "./sp500";

182
packages/types/src/live.ts Normal file
View file

@ -0,0 +1,182 @@
import { z } from "zod";
import {
AlertEventSchema,
ClassifierHitEventSchema,
EquityCandleSchema,
EquityPrintJoinSchema,
EquityPrintSchema,
FlowPacketSchema,
InferredDarkEventSchema,
OptionNBBOSchema,
OptionPrintSchema
} from "./events";
export const CursorSchema = z.object({
ts: z.number().int().nonnegative(),
seq: z.number().int().nonnegative()
});
export type Cursor = z.infer<typeof CursorSchema>;
export const LiveGenericChannelSchema = z.enum([
"options",
"nbbo",
"equities",
"equity-joins",
"flow",
"classifier-hits",
"alerts",
"inferred-dark"
]);
export const LiveChannelSchema = z.enum([
"options",
"nbbo",
"equities",
"equity-joins",
"flow",
"classifier-hits",
"alerts",
"inferred-dark",
"equity-candles",
"equity-overlay"
]);
export type LiveChannel = z.infer<typeof LiveChannelSchema>;
export type LiveGenericChannel = z.infer<typeof LiveGenericChannelSchema>;
export const LiveSubscriptionSchema = z.discriminatedUnion("channel", [
z.object({
channel: LiveGenericChannelSchema
}),
z.object({
channel: z.literal("equity-candles"),
underlying_id: z.string().min(1),
interval_ms: z.number().int().positive()
}),
z.object({
channel: z.literal("equity-overlay"),
underlying_id: z.string().min(1)
})
]);
export type LiveSubscription = z.infer<typeof LiveSubscriptionSchema>;
const livePayloadSchemas = {
options: OptionPrintSchema,
nbbo: OptionNBBOSchema,
equities: EquityPrintSchema,
"equity-joins": EquityPrintJoinSchema,
flow: FlowPacketSchema,
"classifier-hits": ClassifierHitEventSchema,
alerts: AlertEventSchema,
"inferred-dark": InferredDarkEventSchema,
"equity-candles": EquityCandleSchema,
"equity-overlay": EquityPrintSchema
} as const;
export const FeedSnapshotSchema = z.object({
subscription: LiveSubscriptionSchema,
items: z.array(z.unknown()),
watermark: CursorSchema.nullable(),
next_before: CursorSchema.nullable()
});
export type FeedSnapshot<T> = {
subscription: LiveSubscription;
items: T[];
watermark: Cursor | null;
next_before: Cursor | null;
};
export const LiveSubscribeMessageSchema = z.object({
op: z.literal("subscribe"),
subscriptions: z.array(LiveSubscriptionSchema).min(1)
});
export type LiveSubscribeMessage = z.infer<typeof LiveSubscribeMessageSchema>;
export const LiveUnsubscribeMessageSchema = z.object({
op: z.literal("unsubscribe"),
subscriptions: z.array(LiveSubscriptionSchema).min(1)
});
export type LiveUnsubscribeMessage = z.infer<typeof LiveUnsubscribeMessageSchema>;
export const LivePingMessageSchema = z.object({
op: z.literal("ping")
});
export type LivePingMessage = z.infer<typeof LivePingMessageSchema>;
export const LiveClientMessageSchema = z.discriminatedUnion("op", [
LiveSubscribeMessageSchema,
LiveUnsubscribeMessageSchema,
LivePingMessageSchema
]);
export type LiveClientMessage = z.infer<typeof LiveClientMessageSchema>;
export const LiveReadyMessageSchema = z.object({
op: z.literal("ready")
});
export type LiveReadyMessage = z.infer<typeof LiveReadyMessageSchema>;
export const LiveSnapshotMessageSchema = z.object({
op: z.literal("snapshot"),
snapshot: FeedSnapshotSchema
});
export type LiveSnapshotMessage = z.infer<typeof LiveSnapshotMessageSchema>;
export const LiveEventMessageSchema = z.object({
op: z.literal("event"),
subscription: LiveSubscriptionSchema,
item: z.unknown(),
watermark: CursorSchema.nullable()
});
export type LiveEventMessage = z.infer<typeof LiveEventMessageSchema>;
export const LiveHeartbeatMessageSchema = z.object({
op: z.literal("heartbeat"),
ts: z.number().int().nonnegative()
});
export type LiveHeartbeatMessage = z.infer<typeof LiveHeartbeatMessageSchema>;
export const LiveErrorMessageSchema = z.object({
op: z.literal("error"),
message: z.string().min(1)
});
export type LiveErrorMessage = z.infer<typeof LiveErrorMessageSchema>;
export const LiveServerMessageSchema = z.discriminatedUnion("op", [
LiveReadyMessageSchema,
LiveSnapshotMessageSchema,
LiveEventMessageSchema,
LiveHeartbeatMessageSchema,
LiveErrorMessageSchema
]);
export type LiveServerMessage = z.infer<typeof LiveServerMessageSchema>;
export const getSubscriptionKey = (subscription: LiveSubscription): string => {
switch (subscription.channel) {
case "equity-candles":
return `${subscription.channel}|${subscription.underlying_id}|${subscription.interval_ms}`;
case "equity-overlay":
return `${subscription.channel}|${subscription.underlying_id}`;
default:
return subscription.channel;
}
};
export const parseLivePayload = (
channel: LiveChannel,
item: unknown
): z.infer<(typeof livePayloadSchemas)[typeof channel]> => {
return livePayloadSchemas[channel].parse(item);
};

View file

@ -0,0 +1,69 @@
import { describe, expect, it } from "bun:test";
import {
CursorSchema,
LiveClientMessageSchema,
LiveServerMessageSchema,
getSubscriptionKey
} from "../src/live";
describe("live protocol types", () => {
it("builds stable keys for generic and parameterized subscriptions", () => {
expect(getSubscriptionKey({ channel: "flow" })).toBe("flow");
expect(
getSubscriptionKey({
channel: "equity-candles",
underlying_id: "SPY",
interval_ms: 60000
})
).toBe("equity-candles|SPY|60000");
expect(getSubscriptionKey({ channel: "equity-overlay", underlying_id: "SPY" })).toBe(
"equity-overlay|SPY"
);
});
it("validates subscribe messages", () => {
const parsed = LiveClientMessageSchema.parse({
op: "subscribe",
subscriptions: [
{ channel: "flow" },
{ channel: "equity-candles", underlying_id: "SPY", interval_ms: 60000 }
]
});
expect(parsed.op).toBe("subscribe");
expect(parsed.subscriptions).toHaveLength(2);
});
it("validates snapshot and event server messages", () => {
const cursor = CursorSchema.parse({ ts: 100, seq: 2 });
const snapshot = LiveServerMessageSchema.parse({
op: "snapshot",
snapshot: {
subscription: { channel: "alerts" },
items: [],
watermark: cursor,
next_before: null
}
});
const event = LiveServerMessageSchema.parse({
op: "event",
subscription: { channel: "equity-overlay", underlying_id: "SPY" },
item: {
source_ts: 100,
ingest_ts: 101,
seq: 1,
trace_id: "eq-1",
ts: 100,
underlying_id: "SPY",
price: 500,
size: 10,
exchange: "X",
offExchangeFlag: true
},
watermark: cursor
});
expect(snapshot.op).toBe("snapshot");
expect(event.op).toBe("event");
});
});

View file

@ -39,8 +39,12 @@ import {
ensureOptionNBBOTable, ensureOptionNBBOTable,
ensureOptionPrintsTable, ensureOptionPrintsTable,
fetchAlertsAfter, fetchAlertsAfter,
fetchAlertsBefore,
fetchClassifierHitsAfter, fetchClassifierHitsAfter,
fetchClassifierHitsBefore,
fetchFlowPacketsAfter, fetchFlowPacketsAfter,
fetchFlowPacketById,
fetchFlowPacketsBefore,
fetchRecentAlerts, fetchRecentAlerts,
fetchRecentClassifierHits, fetchRecentClassifierHits,
fetchRecentEquityPrintJoins, fetchRecentEquityPrintJoins,
@ -49,31 +53,46 @@ import {
fetchRecentEquityQuotes, fetchRecentEquityQuotes,
fetchEquityCandlesAfter, fetchEquityCandlesAfter,
fetchEquityCandlesRange, fetchEquityCandlesRange,
fetchEquityPrintJoinsByIds,
fetchEquityPrintJoinsBefore,
fetchRecentOptionNBBO, fetchRecentOptionNBBO,
fetchEquityPrintsAfter, fetchEquityPrintsAfter,
fetchEquityPrintsBefore,
fetchEquityPrintsRange, fetchEquityPrintsRange,
fetchEquityPrintJoinsAfter, fetchEquityPrintJoinsAfter,
fetchEquityQuotesAfter, fetchEquityQuotesAfter,
fetchInferredDarkBefore,
fetchInferredDarkAfter, fetchInferredDarkAfter,
fetchRecentEquityPrints, fetchRecentEquityPrints,
fetchOptionNBBOBefore,
fetchOptionNBBOAfter, fetchOptionNBBOAfter,
fetchOptionPrintsBefore,
fetchOptionPrintsAfter, fetchOptionPrintsAfter,
fetchOptionPrintsByTraceIds,
fetchRecentOptionPrints fetchRecentOptionPrints
} from "@islandflow/storage"; } from "@islandflow/storage";
import { import {
AlertEventSchema, AlertEventSchema,
ClassifierHitEventSchema, ClassifierHitEventSchema,
Cursor,
EquityCandleSchema, EquityCandleSchema,
EquityPrintSchema, EquityPrintSchema,
EquityPrintJoinSchema, EquityPrintJoinSchema,
EquityQuoteSchema, EquityQuoteSchema,
FeedSnapshot,
InferredDarkEventSchema, InferredDarkEventSchema,
LiveClientMessageSchema,
LiveServerMessage,
LiveSubscription,
LiveSubscriptionSchema,
FlowPacketSchema, FlowPacketSchema,
OptionNBBOSchema, OptionNBBOSchema,
OptionPrintSchema OptionPrintSchema,
getSubscriptionKey
} from "@islandflow/types"; } from "@islandflow/types";
import { createClient } from "redis"; import { createClient } from "redis";
import { z } from "zod"; import { z } from "zod";
import { LiveStateManager } from "./live";
const service = "api"; const service = "api";
const logger = createLogger({ service }); const logger = createLogger({ service });
@ -148,6 +167,11 @@ const replayParamsSchema = z.object({
after_seq: z.coerce.number().int().nonnegative().default(0), after_seq: z.coerce.number().int().nonnegative().default(0),
limit: z.coerce.number().int().positive().max(1000).default(200) limit: z.coerce.number().int().positive().max(1000).default(200)
}); });
const beforeParamsSchema = z.object({
before_ts: z.coerce.number().int().nonnegative(),
before_seq: z.coerce.number().int().nonnegative(),
limit: z.coerce.number().int().positive().max(1000).default(200)
});
const replaySourceSchema = z const replaySourceSchema = z
.string() .string()
@ -192,16 +216,26 @@ type WsData = {
channel: Channel; channel: Channel;
}; };
const optionSockets = new Set<WebSocket<WsData>>(); type LiveWsData = {
const optionNbboSockets = new Set<WebSocket<WsData>>(); channel: "live";
const equitySockets = new Set<WebSocket<WsData>>(); };
const equityCandleSockets = new Set<WebSocket<WsData>>();
const equityQuoteSockets = new Set<WebSocket<WsData>>(); type LegacySocket = any;
const equityJoinSockets = new Set<WebSocket<WsData>>(); type LiveSocket = any;
const inferredDarkSockets = new Set<WebSocket<WsData>>();
const flowSockets = new Set<WebSocket<WsData>>(); const optionSockets = new Set<LegacySocket>();
const classifierHitSockets = new Set<WebSocket<WsData>>(); const optionNbboSockets = new Set<LegacySocket>();
const alertSockets = new Set<WebSocket<WsData>>(); const equitySockets = new Set<LegacySocket>();
const equityCandleSockets = new Set<LegacySocket>();
const equityQuoteSockets = new Set<LegacySocket>();
const equityJoinSockets = new Set<LegacySocket>();
const inferredDarkSockets = new Set<LegacySocket>();
const flowSockets = new Set<LegacySocket>();
const classifierHitSockets = new Set<LegacySocket>();
const alertSockets = new Set<LegacySocket>();
const liveSocketSubscriptions = new Map<LiveSocket, Set<string>>();
const subscriptionSockets = new Map<string, Set<LiveSocket>>();
const liveHeartbeats = new Map<LiveSocket, ReturnType<typeof setInterval>>();
const jsonResponse = (body: unknown, status = 200): Response => { const jsonResponse = (body: unknown, status = 200): Response => {
return new Response(JSON.stringify(body), { return new Response(JSON.stringify(body), {
@ -234,6 +268,20 @@ const parseReplayParams = (url: URL): { afterTs: number; afterSeq: number; limit
}; };
}; };
const parseBeforeParams = (url: URL): { beforeTs: number; beforeSeq: number; limit: number } => {
const params = beforeParamsSchema.parse({
before_ts: url.searchParams.get("before_ts") ?? undefined,
before_seq: url.searchParams.get("before_seq") ?? undefined,
limit: url.searchParams.get("limit") ?? undefined
});
return {
beforeTs: params.before_ts,
beforeSeq: params.before_seq,
limit: params.limit
};
};
const parseReplaySource = (url: URL): string | null => { const parseReplaySource = (url: URL): string | null => {
const raw = url.searchParams.get("source"); const raw = url.searchParams.get("source");
if (!raw) { if (!raw) {
@ -330,7 +378,7 @@ const parseCandleReplayParams = (
}; };
}; };
const broadcast = (sockets: Set<WebSocket<WsData>>, payload: unknown): void => { const broadcast = (sockets: Set<LegacySocket>, payload: unknown): void => {
const message = JSON.stringify(payload); const message = JSON.stringify(payload);
for (const socket of sockets) { for (const socket of sockets) {
@ -345,6 +393,71 @@ const broadcast = (sockets: Set<WebSocket<WsData>>, payload: unknown): void => {
} }
}; };
const sendLiveMessage = (socket: LiveSocket, payload: LiveServerMessage): void => {
try {
socket.send(JSON.stringify(payload));
} catch (error) {
logger.warn("failed to send live websocket message", {
error: error instanceof Error ? error.message : String(error)
});
}
};
const subscribeSocket = (socket: LiveSocket, subscription: LiveSubscription): void => {
const key = getSubscriptionKey(subscription);
const keys = liveSocketSubscriptions.get(socket) ?? new Set<string>();
keys.add(key);
liveSocketSubscriptions.set(socket, keys);
const sockets = subscriptionSockets.get(key) ?? new Set<LiveSocket>();
sockets.add(socket);
subscriptionSockets.set(key, sockets);
};
const unsubscribeSocket = (socket: LiveSocket, subscription: LiveSubscription): void => {
const key = getSubscriptionKey(subscription);
liveSocketSubscriptions.get(socket)?.delete(key);
const sockets = subscriptionSockets.get(key);
if (!sockets) {
return;
}
sockets.delete(socket);
if (sockets.size === 0) {
subscriptionSockets.delete(key);
}
};
const cleanupLiveSocket = (socket: LiveSocket): void => {
const keys = liveSocketSubscriptions.get(socket);
if (keys) {
for (const key of keys) {
const sockets = subscriptionSockets.get(key);
sockets?.delete(socket);
if (sockets && sockets.size === 0) {
subscriptionSockets.delete(key);
}
}
}
liveSocketSubscriptions.delete(socket);
const heartbeat = liveHeartbeats.get(socket);
if (heartbeat) {
clearInterval(heartbeat);
liveHeartbeats.delete(socket);
}
};
const buildHistoryResponse = <T extends { seq: number }>(
items: T[],
cursorOf: (item: T) => Cursor
): { data: T[]; next_before: Cursor | null } => {
const last = items.at(-1);
return {
data: items,
next_before: last ? cursorOf(last) : null
};
};
const buildCandleCacheKey = (underlyingId: string, intervalMs: number): string => { const buildCandleCacheKey = (underlyingId: string, intervalMs: number): string => {
return `candles:equity:${intervalMs}:${underlyingId}`; return `candles:equity:${intervalMs}:${underlyingId}`;
}; };
@ -563,6 +676,9 @@ const run = async () => {
redis = null; redis = null;
} }
const liveState = new LiveStateManager(clickhouse, redis);
await liveState.hydrate();
const subscribeWithReset = async <T>( const subscribeWithReset = async <T>(
subject: string, subject: string,
stream: string, stream: string,
@ -661,11 +777,34 @@ const run = async () => {
"api-alerts" "api-alerts"
); );
const fanoutLive = async (
subscription: LiveSubscription,
item: unknown,
ingestChannel: "options" | "nbbo" | "equities" | "equity-candles" | "equity-overlay" | "equity-joins" | "flow" | "classifier-hits" | "alerts" | "inferred-dark"
) => {
const key = getSubscriptionKey(subscription);
const sockets = subscriptionSockets.get(key);
const watermark = await liveState.ingest(ingestChannel, item);
if (!sockets || sockets.size === 0) {
return;
}
for (const socket of sockets) {
sendLiveMessage(socket, {
op: "event",
subscription,
item,
watermark
});
}
};
const pumpOptions = async () => { const pumpOptions = async () => {
for await (const msg of optionSubscription.messages) { for await (const msg of optionSubscription.messages) {
try { try {
const payload = OptionPrintSchema.parse(optionSubscription.decode(msg)); const payload = OptionPrintSchema.parse(optionSubscription.decode(msg));
broadcast(optionSockets, { type: "option-print", payload }); broadcast(optionSockets, { type: "option-print", payload });
await fanoutLive({ channel: "options" }, payload, "options");
msg.ack(); msg.ack();
} catch (error) { } catch (error) {
logger.error("failed to process option print", { logger.error("failed to process option print", {
@ -681,6 +820,7 @@ const run = async () => {
try { try {
const payload = OptionNBBOSchema.parse(optionNbboSubscription.decode(msg)); const payload = OptionNBBOSchema.parse(optionNbboSubscription.decode(msg));
broadcast(optionNbboSockets, { type: "option-nbbo", payload }); broadcast(optionNbboSockets, { type: "option-nbbo", payload });
await fanoutLive({ channel: "nbbo" }, payload, "nbbo");
msg.ack(); msg.ack();
} catch (error) { } catch (error) {
logger.error("failed to process option nbbo", { logger.error("failed to process option nbbo", {
@ -696,6 +836,12 @@ const run = async () => {
try { try {
const payload = EquityPrintSchema.parse(equitySubscription.decode(msg)); const payload = EquityPrintSchema.parse(equitySubscription.decode(msg));
broadcast(equitySockets, { type: "equity-print", payload }); broadcast(equitySockets, { type: "equity-print", payload });
await fanoutLive({ channel: "equities" }, payload, "equities");
await fanoutLive(
{ channel: "equity-overlay", underlying_id: payload.underlying_id },
payload,
"equity-overlay"
);
msg.ack(); msg.ack();
} catch (error) { } catch (error) {
logger.error("failed to process equity print", { logger.error("failed to process equity print", {
@ -726,6 +872,15 @@ const run = async () => {
try { try {
const payload = EquityCandleSchema.parse(equityCandleSubscription.decode(msg)); const payload = EquityCandleSchema.parse(equityCandleSubscription.decode(msg));
broadcast(equityCandleSockets, { type: "equity-candle", payload }); broadcast(equityCandleSockets, { type: "equity-candle", payload });
await fanoutLive(
{
channel: "equity-candles",
underlying_id: payload.underlying_id,
interval_ms: payload.interval_ms
},
payload,
"equity-candles"
);
msg.ack(); msg.ack();
} catch (error) { } catch (error) {
logger.error("failed to process equity candle", { logger.error("failed to process equity candle", {
@ -741,6 +896,7 @@ const run = async () => {
try { try {
const payload = EquityPrintJoinSchema.parse(equityJoinSubscription.decode(msg)); const payload = EquityPrintJoinSchema.parse(equityJoinSubscription.decode(msg));
broadcast(equityJoinSockets, { type: "equity-join", payload }); broadcast(equityJoinSockets, { type: "equity-join", payload });
await fanoutLive({ channel: "equity-joins" }, payload, "equity-joins");
msg.ack(); msg.ack();
} catch (error) { } catch (error) {
logger.error("failed to process equity join", { logger.error("failed to process equity join", {
@ -756,6 +912,7 @@ const run = async () => {
try { try {
const payload = InferredDarkEventSchema.parse(inferredDarkSubscription.decode(msg)); const payload = InferredDarkEventSchema.parse(inferredDarkSubscription.decode(msg));
broadcast(inferredDarkSockets, { type: "inferred-dark", payload }); broadcast(inferredDarkSockets, { type: "inferred-dark", payload });
await fanoutLive({ channel: "inferred-dark" }, payload, "inferred-dark");
msg.ack(); msg.ack();
} catch (error) { } catch (error) {
logger.error("failed to process inferred dark event", { logger.error("failed to process inferred dark event", {
@ -771,6 +928,7 @@ const run = async () => {
try { try {
const payload = FlowPacketSchema.parse(flowSubscription.decode(msg)); const payload = FlowPacketSchema.parse(flowSubscription.decode(msg));
broadcast(flowSockets, { type: "flow-packet", payload }); broadcast(flowSockets, { type: "flow-packet", payload });
await fanoutLive({ channel: "flow" }, payload, "flow");
msg.ack(); msg.ack();
} catch (error) { } catch (error) {
logger.error("failed to process flow packet", { logger.error("failed to process flow packet", {
@ -786,6 +944,7 @@ const run = async () => {
try { try {
const payload = ClassifierHitEventSchema.parse(classifierHitSubscription.decode(msg)); const payload = ClassifierHitEventSchema.parse(classifierHitSubscription.decode(msg));
broadcast(classifierHitSockets, { type: "classifier-hit", payload }); broadcast(classifierHitSockets, { type: "classifier-hit", payload });
await fanoutLive({ channel: "classifier-hits" }, payload, "classifier-hits");
msg.ack(); msg.ack();
} catch (error) { } catch (error) {
logger.error("failed to process classifier hit", { logger.error("failed to process classifier hit", {
@ -801,6 +960,7 @@ const run = async () => {
try { try {
const payload = AlertEventSchema.parse(alertSubscription.decode(msg)); const payload = AlertEventSchema.parse(alertSubscription.decode(msg));
broadcast(alertSockets, { type: "alert", payload }); broadcast(alertSockets, { type: "alert", payload });
await fanoutLive({ channel: "alerts" }, payload, "alerts");
msg.ack(); msg.ack();
} catch (error) { } catch (error) {
logger.error("failed to process alert", { logger.error("failed to process alert", {
@ -822,9 +982,9 @@ const run = async () => {
void pumpClassifierHits(); void pumpClassifierHits();
void pumpAlerts(); void pumpAlerts();
const server = Bun.serve<WsData>({ const server = Bun.serve<WsData | LiveWsData>({
port: env.API_PORT, port: env.API_PORT,
fetch: async (req, serverRef) => { fetch: async (req: Request, serverRef: any) => {
const url = new URL(req.url); const url = new URL(req.url);
if (req.method === "GET" && url.pathname === "/health") { if (req.method === "GET" && url.pathname === "/health") {
@ -940,6 +1100,84 @@ const run = async () => {
return jsonResponse({ data }); return jsonResponse({ data });
} }
if (req.method === "GET" && url.pathname === "/history/options") {
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
const source = parseReplaySource(url) ?? undefined;
const data = await fetchOptionPrintsBefore(clickhouse, beforeTs, beforeSeq, limit, source);
return jsonResponse(buildHistoryResponse(data, (item) => ({ ts: item.ts, seq: item.seq })));
}
if (req.method === "GET" && url.pathname === "/history/nbbo") {
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
const source = parseReplaySource(url) ?? undefined;
const data = await fetchOptionNBBOBefore(clickhouse, beforeTs, beforeSeq, limit, source);
return jsonResponse(buildHistoryResponse(data, (item) => ({ ts: item.ts, seq: item.seq })));
}
if (req.method === "GET" && url.pathname === "/history/equities") {
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
const data = await fetchEquityPrintsBefore(clickhouse, beforeTs, beforeSeq, limit);
return jsonResponse(buildHistoryResponse(data, (item) => ({ ts: item.ts, seq: item.seq })));
}
if (req.method === "GET" && url.pathname === "/history/equity-joins") {
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
const data = await fetchEquityPrintJoinsBefore(clickhouse, beforeTs, beforeSeq, limit);
return jsonResponse(
buildHistoryResponse(data, (item) => ({ ts: item.source_ts, seq: item.seq }))
);
}
if (req.method === "GET" && url.pathname === "/history/flow") {
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
const data = await fetchFlowPacketsBefore(clickhouse, beforeTs, beforeSeq, limit);
return jsonResponse(
buildHistoryResponse(data, (item) => ({ ts: item.source_ts, seq: item.seq }))
);
}
if (req.method === "GET" && url.pathname === "/history/classifier-hits") {
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
const data = await fetchClassifierHitsBefore(clickhouse, beforeTs, beforeSeq, limit);
return jsonResponse(
buildHistoryResponse(data, (item) => ({ ts: item.source_ts, seq: item.seq }))
);
}
if (req.method === "GET" && url.pathname === "/history/alerts") {
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
const data = await fetchAlertsBefore(clickhouse, beforeTs, beforeSeq, limit);
return jsonResponse(
buildHistoryResponse(data, (item) => ({ ts: item.source_ts, seq: item.seq }))
);
}
if (req.method === "GET" && url.pathname === "/history/inferred-dark") {
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
const data = await fetchInferredDarkBefore(clickhouse, beforeTs, beforeSeq, limit);
return jsonResponse(
buildHistoryResponse(data, (item) => ({ ts: item.source_ts, seq: item.seq }))
);
}
if (req.method === "GET" && /^\/flow\/packets\/[^/]+$/.test(url.pathname)) {
const id = decodeURIComponent(url.pathname.slice("/flow/packets/".length));
const data = await fetchFlowPacketById(clickhouse, id);
return jsonResponse({ data });
}
if (req.method === "GET" && url.pathname === "/option-prints/by-trace") {
const traceIds = url.searchParams.getAll("trace_id");
const data = await fetchOptionPrintsByTraceIds(clickhouse, traceIds);
return jsonResponse({ data });
}
if (req.method === "GET" && url.pathname === "/equity-joins/by-id") {
const ids = url.searchParams.getAll("id");
const data = await fetchEquityPrintJoinsByIds(clickhouse, ids);
return jsonResponse({ data });
}
if (req.method === "GET" && url.pathname === "/replay/options") { if (req.method === "GET" && url.pathname === "/replay/options") {
const { afterTs, afterSeq, limit } = parseReplayParams(url); const { afterTs, afterSeq, limit } = parseReplayParams(url);
const source = parseReplaySource(url) ?? undefined; const source = parseReplaySource(url) ?? undefined;
@ -1120,11 +1358,25 @@ const run = async () => {
return jsonResponse({ error: "websocket upgrade failed" }, 400); return jsonResponse({ error: "websocket upgrade failed" }, 400);
} }
if (req.method === "GET" && url.pathname === "/ws/live") {
if (serverRef.upgrade(req, { data: { channel: "live" } })) {
return new Response(null, { status: 101 });
}
return jsonResponse({ error: "websocket upgrade failed" }, 400);
}
return jsonResponse({ error: "not found" }, 404); return jsonResponse({ error: "not found" }, 404);
}, },
websocket: { websocket: {
open: (socket) => { open: (socket: any) => {
if (socket.data.channel === "options") { if (socket.data.channel === "live") {
sendLiveMessage(socket, { op: "ready" });
const heartbeat = setInterval(() => {
sendLiveMessage(socket, { op: "heartbeat", ts: Date.now() });
}, 15000);
liveHeartbeats.set(socket, heartbeat);
} else if (socket.data.channel === "options") {
optionSockets.add(socket); optionSockets.add(socket);
} else if (socket.data.channel === "options-nbbo") { } else if (socket.data.channel === "options-nbbo") {
optionNbboSockets.add(socket); optionNbboSockets.add(socket);
@ -1148,8 +1400,44 @@ const run = async () => {
logger.info("websocket connected", { channel: socket.data.channel }); logger.info("websocket connected", { channel: socket.data.channel });
}, },
close: (socket) => { message: async (socket: any, message: string | ArrayBuffer | Uint8Array) => {
if (socket.data.channel === "options") { if (socket.data.channel !== "live") {
return;
}
try {
const payload =
typeof message === "string"
? message
: new TextDecoder().decode(message instanceof Uint8Array ? message : new Uint8Array(message));
const parsed = LiveClientMessageSchema.parse(JSON.parse(payload));
if (parsed.op === "ping") {
sendLiveMessage(socket, { op: "heartbeat", ts: Date.now() });
return;
}
for (const subscription of parsed.subscriptions) {
LiveSubscriptionSchema.parse(subscription);
if (parsed.op === "unsubscribe") {
unsubscribeSocket(socket, subscription);
continue;
}
subscribeSocket(socket, subscription);
const snapshot = await liveState.getSnapshot(subscription);
sendLiveMessage(socket, { op: "snapshot", snapshot });
}
} catch (error) {
sendLiveMessage(socket, {
op: "error",
message: error instanceof Error ? error.message : String(error)
});
}
},
close: (socket: any) => {
if (socket.data.channel === "live") {
cleanupLiveSocket(socket);
} else if (socket.data.channel === "options") {
optionSockets.delete(socket); optionSockets.delete(socket);
} else if (socket.data.channel === "options-nbbo") { } else if (socket.data.channel === "options-nbbo") {
optionNbboSockets.delete(socket); optionNbboSockets.delete(socket);

370
services/api/src/live.ts Normal file
View file

@ -0,0 +1,370 @@
import {
fetchRecentAlerts,
fetchRecentClassifierHits,
fetchRecentEquityCandles,
fetchRecentEquityPrintJoins,
fetchRecentEquityPrints,
fetchRecentFlowPackets,
fetchRecentInferredDark,
fetchRecentOptionNBBO,
fetchRecentOptionPrints,
type ClickHouseClient
} from "@islandflow/storage";
import {
AlertEventSchema,
ClassifierHitEventSchema,
CursorSchema,
EquityCandleSchema,
EquityPrintJoinSchema,
EquityPrintSchema,
FeedSnapshot,
FlowPacketSchema,
InferredDarkEventSchema,
LiveGenericChannel,
LiveSubscription,
OptionNBBOSchema,
OptionPrintSchema,
type Cursor,
type EquityCandle,
type EquityPrint,
type LiveChannel
} from "@islandflow/types";
import type { RedisClientType } from "redis";
const CURSOR_HASH_KEY = "live:cursors";
const GENERIC_LIMITS = {
options: 500,
nbbo: 500,
equities: 500,
"equity-joins": 500,
flow: 500,
"classifier-hits": 500,
alerts: 500,
"inferred-dark": 500
} as const;
const CHART_LIMITS = {
candles: 500,
overlay: 1500
} as const;
type GenericFeedConfig = {
redisKey: string;
cursorField: string;
limit: number;
parse: (value: unknown) => any;
cursor: (item: any) => Cursor;
fetchRecent: (clickhouse: ClickHouseClient, limit: number) => Promise<any[]>;
};
type RedisLike = Pick<
RedisClientType,
"isOpen" | "lRange" | "lPush" | "lTrim" | "hGet" | "hSet"
>;
const parseCursor = (value: string | null): Cursor | null => {
if (!value) {
return null;
}
try {
return CursorSchema.parse(JSON.parse(value));
} catch {
return null;
}
};
const getGenericConfig = (): {
[K in LiveGenericChannel]: GenericFeedConfig;
} => ({
options: {
redisKey: "live:options",
cursorField: "options",
limit: GENERIC_LIMITS.options,
parse: (value) => OptionPrintSchema.parse(value),
cursor: (item) => ({ ts: item.ts, seq: item.seq }),
fetchRecent: fetchRecentOptionPrints
},
nbbo: {
redisKey: "live:nbbo",
cursorField: "nbbo",
limit: GENERIC_LIMITS.nbbo,
parse: (value) => OptionNBBOSchema.parse(value),
cursor: (item) => ({ ts: item.ts, seq: item.seq }),
fetchRecent: fetchRecentOptionNBBO
},
equities: {
redisKey: "live:equities",
cursorField: "equities",
limit: GENERIC_LIMITS.equities,
parse: (value) => EquityPrintSchema.parse(value),
cursor: (item) => ({ ts: item.ts, seq: item.seq }),
fetchRecent: fetchRecentEquityPrints
},
"equity-joins": {
redisKey: "live:equity-joins",
cursorField: "equity-joins",
limit: GENERIC_LIMITS["equity-joins"],
parse: (value) => EquityPrintJoinSchema.parse(value),
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
fetchRecent: fetchRecentEquityPrintJoins
},
flow: {
redisKey: "live:flow",
cursorField: "flow",
limit: GENERIC_LIMITS.flow,
parse: (value) => FlowPacketSchema.parse(value),
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
fetchRecent: fetchRecentFlowPackets
},
"classifier-hits": {
redisKey: "live:classifier-hits",
cursorField: "classifier-hits",
limit: GENERIC_LIMITS["classifier-hits"],
parse: (value) => ClassifierHitEventSchema.parse(value),
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
fetchRecent: fetchRecentClassifierHits
},
alerts: {
redisKey: "live:alerts",
cursorField: "alerts",
limit: GENERIC_LIMITS.alerts,
parse: (value) => AlertEventSchema.parse(value),
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
fetchRecent: fetchRecentAlerts
},
"inferred-dark": {
redisKey: "live:inferred-dark",
cursorField: "inferred-dark",
limit: GENERIC_LIMITS["inferred-dark"],
parse: (value) => InferredDarkEventSchema.parse(value),
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
fetchRecent: fetchRecentInferredDark
}
});
const parseJsonList = <T>(payloads: string[], parse: (value: unknown) => T): T[] => {
const items: T[] = [];
for (const payload of payloads) {
try {
items.push(parse(JSON.parse(payload)));
} catch {
// ignore bad cache entries
}
}
return items;
};
const nextBeforeForItems = <T>(items: T[], cursorOf: (item: T) => Cursor): Cursor | null => {
const last = items.at(-1);
return last ? cursorOf(last) : null;
};
const candleRedisKey = (underlyingId: string, intervalMs: number): string =>
`live:equity-candles:${underlyingId}:${intervalMs}`;
const candleCursorField = (underlyingId: string, intervalMs: number): string =>
`equity-candles:${underlyingId}:${intervalMs}`;
const overlayRedisKey = (underlyingId: string): string => `live:equity-overlay:${underlyingId}`;
const overlayCursorField = (underlyingId: string): string => `equities:${underlyingId}`;
export class LiveStateManager {
private readonly generic = getGenericConfig();
private readonly genericItems = new Map<LiveGenericChannel, any[]>();
private readonly genericCursors = new Map<string, Cursor | null>();
private readonly candleItems = new Map<string, EquityCandle[]>();
private readonly candleCursors = new Map<string, Cursor | null>();
private readonly overlayItems = new Map<string, EquityPrint[]>();
private readonly overlayCursors = new Map<string, Cursor | null>();
constructor(
private readonly clickhouse: ClickHouseClient,
private readonly redis: RedisLike | null
) {}
async hydrate(): Promise<void> {
const channels = Object.keys(this.generic) as LiveGenericChannel[];
await Promise.all(channels.map((channel) => this.hydrateGeneric(channel)));
}
private async hydrateGeneric(channel: LiveGenericChannel): Promise<void> {
const config = this.generic[channel];
if (this.redis?.isOpen) {
const payloads = await this.redis.lRange(config.redisKey, 0, config.limit - 1);
const cached = parseJsonList(payloads, config.parse);
if (cached.length > 0) {
this.genericItems.set(channel, cached);
this.genericCursors.set(config.cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, config.cursorField)));
return;
}
}
const fresh = await config.fetchRecent(this.clickhouse, config.limit);
this.genericItems.set(channel, fresh);
const watermark = fresh[0] ? config.cursor(fresh[0]) : null;
this.genericCursors.set(config.cursorField, watermark);
await this.persistList(config.redisKey, config.cursorField, fresh, config.limit, watermark);
}
async getSnapshot(subscription: LiveSubscription): Promise<FeedSnapshot<unknown>> {
switch (subscription.channel) {
case "equity-candles": {
const key = candleRedisKey(subscription.underlying_id, subscription.interval_ms);
const cursorField = candleCursorField(subscription.underlying_id, subscription.interval_ms);
if (!this.candleItems.has(key)) {
await this.hydrateCandles(subscription.underlying_id, subscription.interval_ms);
}
const items = this.candleItems.get(key) ?? [];
return {
subscription,
items,
watermark: this.candleCursors.get(cursorField) ?? null,
next_before: nextBeforeForItems(items, (item) => ({ ts: item.ts, seq: item.seq }))
};
}
case "equity-overlay": {
const key = overlayRedisKey(subscription.underlying_id);
const cursorField = overlayCursorField(subscription.underlying_id);
if (!this.overlayItems.has(key)) {
await this.hydrateOverlay(subscription.underlying_id);
}
const items = this.overlayItems.get(key) ?? [];
return {
subscription,
items,
watermark: this.overlayCursors.get(cursorField) ?? null,
next_before: nextBeforeForItems(items, (item) => ({ ts: item.ts, seq: item.seq }))
};
}
default: {
const config = this.generic[subscription.channel];
const items = this.genericItems.get(subscription.channel) ?? [];
return {
subscription,
items,
watermark: this.genericCursors.get(config.cursorField) ?? null,
next_before: nextBeforeForItems(items, config.cursor)
};
}
}
}
async ingest(channel: LiveChannel, item: unknown): Promise<Cursor | null> {
switch (channel) {
case "equity-candles": {
const candle = EquityCandleSchema.parse(item);
const key = candleRedisKey(candle.underlying_id, candle.interval_ms);
const cursorField = candleCursorField(candle.underlying_id, candle.interval_ms);
const items = this.candleItems.get(key) ?? [];
const next = [candle, ...items]
.sort((a, b) => (b.ts - a.ts) || (b.seq - a.seq))
.slice(0, CHART_LIMITS.candles);
this.candleItems.set(key, next);
const cursor = { ts: candle.ts, seq: candle.seq };
this.candleCursors.set(cursorField, cursor);
await this.persistList(key, cursorField, next, CHART_LIMITS.candles, cursor);
return cursor;
}
case "equity-overlay": {
const print = EquityPrintSchema.parse(item);
const key = overlayRedisKey(print.underlying_id);
const cursorField = overlayCursorField(print.underlying_id);
const items = this.overlayItems.get(key) ?? [];
const next = [print, ...items]
.sort((a, b) => (b.ts - a.ts) || (b.seq - a.seq))
.slice(0, CHART_LIMITS.overlay);
this.overlayItems.set(key, next);
const cursor = { ts: print.ts, seq: print.seq };
this.overlayCursors.set(cursorField, cursor);
await this.persistList(key, cursorField, next, CHART_LIMITS.overlay, cursor);
return cursor;
}
default: {
const config = this.generic[channel];
const parsed = config.parse(item);
const items = this.genericItems.get(channel) ?? [];
const next = [parsed, ...items]
.sort((a, b) => {
const aCursor = config.cursor(a);
const bCursor = config.cursor(b);
return (bCursor.ts - aCursor.ts) || (bCursor.seq - aCursor.seq);
})
.slice(0, config.limit);
this.genericItems.set(channel, next);
const cursor = config.cursor(parsed);
this.genericCursors.set(config.cursorField, cursor);
await this.persistList(config.redisKey, config.cursorField, next, config.limit, cursor);
return cursor;
}
}
}
private async hydrateCandles(underlyingId: string, intervalMs: number): Promise<void> {
const key = candleRedisKey(underlyingId, intervalMs);
const cursorField = candleCursorField(underlyingId, intervalMs);
if (this.redis?.isOpen) {
const payloads = await this.redis.lRange(key, 0, CHART_LIMITS.candles - 1);
const cached = parseJsonList(payloads, (value) => EquityCandleSchema.parse(value));
if (cached.length > 0) {
this.candleItems.set(key, cached);
this.candleCursors.set(cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, cursorField)));
return;
}
}
const fresh = await fetchRecentEquityCandles(this.clickhouse, underlyingId, intervalMs, CHART_LIMITS.candles);
this.candleItems.set(key, fresh);
const watermark = fresh[0] ? { ts: fresh[0].ts, seq: fresh[0].seq } : null;
this.candleCursors.set(cursorField, watermark);
await this.persistList(key, cursorField, fresh, CHART_LIMITS.candles, watermark);
}
private async hydrateOverlay(underlyingId: string): Promise<void> {
const key = overlayRedisKey(underlyingId);
const cursorField = overlayCursorField(underlyingId);
if (this.redis?.isOpen) {
const payloads = await this.redis.lRange(key, 0, CHART_LIMITS.overlay - 1);
const cached = parseJsonList(payloads, (value) => EquityPrintSchema.parse(value));
if (cached.length > 0) {
this.overlayItems.set(key, cached);
this.overlayCursors.set(cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, cursorField)));
return;
}
}
const fresh = (await fetchRecentEquityPrints(this.clickhouse, CHART_LIMITS.overlay)).filter(
(item) => item.underlying_id === underlyingId
);
this.overlayItems.set(key, fresh);
const watermark = fresh[0] ? { ts: fresh[0].ts, seq: fresh[0].seq } : null;
this.overlayCursors.set(cursorField, watermark);
await this.persistList(key, cursorField, fresh, CHART_LIMITS.overlay, watermark);
}
private async persistList<T>(
listKey: string,
cursorField: string,
items: T[],
limit: number,
cursor: Cursor | null
): Promise<void> {
if (!this.redis?.isOpen) {
return;
}
const payloads = items.map((item) => JSON.stringify(item));
await this.redis.lTrim(listKey, 1, 0);
if (payloads.length > 0) {
for (let idx = payloads.length - 1; idx >= 0; idx -= 1) {
const payload = payloads[idx];
if (payload) {
await this.redis.lPush(listKey, payload);
}
}
await this.redis.lTrim(listKey, 0, limit - 1);
}
await this.redis.hSet(CURSOR_HASH_KEY, cursorField, JSON.stringify(cursor));
}
}

View file

@ -0,0 +1,123 @@
import { describe, expect, it } from "bun:test";
import type { ClickHouseClient } from "@islandflow/storage";
import { LiveStateManager } from "../src/live";
const makeClickHouse = (): ClickHouseClient =>
({
exec: async () => {},
insert: async () => {},
ping: async () => ({ success: true }),
close: async () => {},
query: async () => ({
async json<T>() {
return [] as T;
}
})
}) as ClickHouseClient;
const makeRedis = () => {
const lists = new Map<string, string[]>();
const hashes = new Map<string, Map<string, string>>();
return {
isOpen: true,
async lRange(key: string, start: number, stop: number) {
return (lists.get(key) ?? []).slice(start, stop + 1);
},
async lPush(key: string, value: string) {
const next = lists.get(key) ?? [];
next.unshift(value);
lists.set(key, next);
return next.length;
},
async lTrim(key: string, start: number, stop: number) {
const next = lists.get(key) ?? [];
lists.set(key, start > stop ? [] : next.slice(start, stop + 1));
return "OK";
},
async hGet(key: string, field: string) {
return hashes.get(key)?.get(field) ?? null;
},
async hSet(key: string, field: string, value: string) {
const hash = hashes.get(key) ?? new Map<string, string>();
hash.set(field, value);
hashes.set(key, hash);
return 1;
}
};
};
describe("LiveStateManager", () => {
it("hydrates snapshots from redis generic windows", async () => {
const redis = makeRedis();
await redis.lPush(
"live:flow",
JSON.stringify({
source_ts: 100,
ingest_ts: 101,
seq: 1,
trace_id: "flow-1",
id: "flow-1",
members: ["a"],
features: {},
join_quality: {}
})
);
await redis.hSet("live:cursors", "flow", JSON.stringify({ ts: 100, seq: 1 }));
const manager = new LiveStateManager(makeClickHouse(), redis as never);
await manager.hydrate();
const snapshot = await manager.getSnapshot({ channel: "flow" });
expect(snapshot.items).toHaveLength(1);
expect(snapshot.watermark).toEqual({ ts: 100, seq: 1 });
expect(snapshot.next_before).toEqual({ ts: 100, seq: 1 });
});
it("persists parameterized candle and overlay caches on ingest", async () => {
const redis = makeRedis();
const manager = new LiveStateManager(makeClickHouse(), redis as never);
await manager.ingest("equity-candles", {
source_ts: 100,
ingest_ts: 101,
seq: 1,
trace_id: "candle:SPY:60000:100",
ts: 100,
interval_ms: 60000,
underlying_id: "SPY",
open: 1,
high: 2,
low: 1,
close: 2,
volume: 10,
trade_count: 1
});
await manager.ingest("equity-overlay", {
source_ts: 110,
ingest_ts: 111,
seq: 2,
trace_id: "eq-1",
ts: 110,
underlying_id: "SPY",
price: 10,
size: 5,
exchange: "X",
offExchangeFlag: true
});
const candleSnapshot = await manager.getSnapshot({
channel: "equity-candles",
underlying_id: "SPY",
interval_ms: 60000
});
const overlaySnapshot = await manager.getSnapshot({
channel: "equity-overlay",
underlying_id: "SPY"
});
expect(candleSnapshot.items).toHaveLength(1);
expect(overlaySnapshot.items).toHaveLength(1);
expect(candleSnapshot.watermark).toEqual({ ts: 100, seq: 1 });
expect(overlaySnapshot.watermark).toEqual({ ts: 110, seq: 2 });
});
});