From ba0daf52084c71661ada724debd74f4c8a3934be Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Mon, 4 May 2026 03:29:38 -0400 Subject: [PATCH] Implement server-backed live history --- .env.example | 1 + README.md | 6 +- apps/web/app/globals.css | 24 +++ apps/web/app/terminal.tsx | 209 +++++++++++++++++++ packages/storage/src/clickhouse.ts | 16 ++ packages/storage/tests/equity-quotes.test.ts | 32 +++ packages/types/src/live.ts | 6 +- services/api/src/index.ts | 14 +- services/api/src/live.ts | 54 ++--- services/api/tests/live.test.ts | 84 +++++++- 10 files changed, 402 insertions(+), 44 deletions(-) diff --git a/.env.example b/.env.example index f86691a..8a9ead7 100644 --- a/.env.example +++ b/.env.example @@ -92,6 +92,7 @@ REPLAY_LOG_EVERY=1000 LIVE_LIMIT_OPTIONS=10000 LIVE_LIMIT_NBBO=10000 LIVE_LIMIT_EQUITIES=10000 +LIVE_LIMIT_EQUITY_QUOTES=10000 LIVE_LIMIT_EQUITY_JOINS=10000 LIVE_LIMIT_FLOW=10000 LIVE_LIMIT_CLASSIFIER_HITS=10000 diff --git a/README.md b/README.md index f07916b..b5720fa 100644 --- a/README.md +++ b/README.md @@ -261,6 +261,7 @@ Default `smart-money` policy rejects lower-information prints and keeps high-con | `LIVE_LIMIT_OPTIONS` | `10000` | In-memory/Redis live cache depth for options channel (clamped `1..100000`). | | `LIVE_LIMIT_NBBO` | `10000` | Live cache depth for options NBBO channel (clamped `1..100000`). | | `LIVE_LIMIT_EQUITIES` | `10000` | Live cache depth for equities channel (clamped `1..100000`). | +| `LIVE_LIMIT_EQUITY_QUOTES` | `10000` | Live cache depth for equity quotes channel (clamped `1..100000`). | | `LIVE_LIMIT_EQUITY_JOINS` | `10000` | Live cache depth for equity join channel (clamped `1..100000`). | | `LIVE_LIMIT_FLOW` | `10000` | Live cache depth for flow packet channel (clamped `1..100000`). | | `LIVE_LIMIT_CLASSIFIER_HITS` | `10000` | Live cache depth for classifier hits channel (clamped `1..100000`). | @@ -303,7 +304,10 @@ Default `smart-money` policy rejects lower-information prints and keeps high-con - `view=raw` — audit/debug path that preserves every stored print. - The default Tape page options/packets posture is now stock-only, hides `B` / `BB`, keeps calls and puts visible, and applies in-memory min-notional controls immediately. - Live retention uses a two-tier model: - - API/Redis maintain a bounded hot cache per live generic channel. + - ClickHouse is durable server history; Redis is a bounded hot cache per live generic channel. + - `LIVE_LIMIT_*` controls initial snapshot/hot-cache depth, not total persisted history. + - Browser state is only a rendering window and UI preferences, not a market-data database. + - Devices connected to the same API hydrate from the same server-seen history. - UI keeps a bounded hot window for rendering performance around the signal view rather than raw noise. - Options prints can use a deeper dedicated cap via `NEXT_PUBLIC_LIVE_HOT_WINDOW_OPTIONS` without raising every other feed. - Alert/drawer evidence is pinned and hydrated by id/trace so details remain inspectable after hot-window eviction. diff --git a/apps/web/app/globals.css b/apps/web/app/globals.css index 8e2cfca..1dfa32d 100644 --- a/apps/web/app/globals.css +++ b/apps/web/app/globals.css @@ -783,6 +783,30 @@ h3 { white-space: nowrap; } +.load-older { + display: flex; + flex: 0 0 auto; + align-items: center; + justify-content: center; + gap: 10px; + padding: 4px 0 0; + font-size: 0.76rem; + color: var(--muted); +} + +.load-older button { + min-width: 112px; + white-space: nowrap; +} + +.load-older span { + max-width: 260px; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; + color: var(--danger); +} + .missed-count { width: 86px; font-size: 0.72rem; diff --git a/apps/web/app/terminal.tsx b/apps/web/app/terminal.tsx index 9f56047..d720116 100644 --- a/apps/web/app/terminal.tsx +++ b/apps/web/app/terminal.tsx @@ -23,6 +23,7 @@ import type { EquityCandle, EquityPrint, EquityPrintJoin, + EquityQuote, FlowPacket, InferredDarkEvent, LiveServerMessage, @@ -2173,9 +2174,15 @@ type LiveSessionState = { connectedAt: number | null; lastUpdate: number | null; lastEventByChannel: Partial>; + manifest: LiveSubscription[]; + historyCursors: Partial>; + historyLoading: Partial>; + historyErrors: Partial>; + loadOlder: (channel: LiveSubscription["channel"]) => Promise; options: OptionPrint[]; nbbo: OptionNBBO[]; equities: EquityPrint[]; + equityQuotes: EquityQuote[]; equityJoins: EquityPrintJoin[]; flow: FlowPacket[]; classifierHits: ClassifierHitEvent[]; @@ -2185,6 +2192,46 @@ type LiveSessionState = { chartOverlay: EquityPrint[]; }; +type LiveHistoryResponse = { + data: T[]; + next_before: Cursor | null; +}; + +const LIVE_HISTORY_ENDPOINTS: Partial> = { + options: "/history/options", + nbbo: "/history/nbbo", + equities: "/history/equities", + "equity-quotes": "/history/equity-quotes", + "equity-joins": "/history/equity-joins", + flow: "/history/flow", + "classifier-hits": "/history/classifier-hits", + alerts: "/history/alerts", + "inferred-dark": "/history/inferred-dark" +}; + +const appendOptionFlowFilters = (params: URLSearchParams, filters: OptionFlowFilters | undefined): void => { + if (!filters) { + return; + } + if (filters.view) { + params.set("view", filters.view); + } + if (filters.securityTypes?.length === 1) { + params.set("security", filters.securityTypes[0]); + } else if (filters.securityTypes && filters.securityTypes.length > 1) { + params.set("security", "all"); + } + if (filters.nbboSides?.length) { + params.set("side", filters.nbboSides.join(",")); + } + if (filters.optionTypes?.length) { + params.set("type", filters.optionTypes.join(",")); + } + if (typeof filters.minNotional === "number") { + params.set("min_notional", String(filters.minNotional)); + } +}; + const dedupeLiveSubscriptions = (subscriptions: LiveSubscription[]): LiveSubscription[] => { const seen = new Set(); return subscriptions.filter((subscription) => { @@ -2266,9 +2313,13 @@ const useLiveSession = ( const [lastEventByChannel, setLastEventByChannel] = useState< Partial> >({}); + const [historyCursors, setHistoryCursors] = useState>>({}); + const [historyLoading, setHistoryLoading] = useState>>({}); + const [historyErrors, setHistoryErrors] = useState>>({}); const [options, setOptions] = useState([]); const [nbbo, setNbbo] = useState([]); const [equities, setEquities] = useState([]); + const [equityQuotes, setEquityQuotes] = useState([]); const [equityJoins, setEquityJoins] = useState([]); const [flow, setFlow] = useState([]); const [classifierHits, setClassifierHits] = useState([]); @@ -2291,9 +2342,13 @@ const useLiveSession = ( setConnectedAt(null); setLastUpdate(null); setLastEventByChannel({}); + setHistoryCursors({}); + setHistoryLoading({}); + setHistoryErrors({}); setOptions([]); setNbbo([]); setEquities([]); + setEquityQuotes([]); setEquityJoins([]); setFlow([]); setClassifierHits([]); @@ -2347,6 +2402,7 @@ const useLiveSession = ( const subscription = message.op === "snapshot" ? message.snapshot.subscription : message.subscription; const items = message.op === "snapshot" ? message.snapshot.items : [message.item]; + const subscriptionKey = getLiveSubscriptionKey(subscription); const updateAt = Date.now(); const mergeItems = ( @@ -2380,6 +2436,9 @@ const useLiveSession = ( case "equities": mergeItems(setEquities, items as EquityPrint[]); break; + case "equity-quotes": + mergeItems(setEquityQuotes, items as EquityQuote[]); + break; case "equity-joins": mergeItems(setEquityJoins, items as EquityPrintJoin[]); break; @@ -2403,6 +2462,17 @@ const useLiveSession = ( break; } + if (message.op === "snapshot") { + setHistoryCursors((current) => ({ + ...current, + [subscriptionKey]: message.snapshot.next_before + })); + setHistoryErrors((current) => ({ + ...current, + [subscriptionKey]: null + })); + } + if (items.length > 0) { setLastEventByChannel((current) => ({ ...current, @@ -2503,14 +2573,114 @@ const useLiveSession = ( subscribedMapRef.current = nextMap; }, [enabled, manifest]); + const loadOlder = useCallback( + async (channel: LiveSubscription["channel"]) => { + const subscription = manifest.find((candidate) => candidate.channel === channel); + if (!enabled || !subscription) { + return; + } + const endpoint = LIVE_HISTORY_ENDPOINTS[subscription.channel]; + if (!endpoint) { + return; + } + const key = getLiveSubscriptionKey(subscription); + const cursor = historyCursors[key]; + if (!cursor || historyLoading[key]) { + return; + } + + setHistoryLoading((current) => ({ ...current, [key]: true })); + setHistoryErrors((current) => ({ ...current, [key]: null })); + + try { + const params = new URLSearchParams({ + before_ts: String(cursor.ts), + before_seq: String(cursor.seq), + limit: String(subscription.channel === "options" ? 500 : 200) + }); + if (subscription.channel === "options" || subscription.channel === "flow") { + appendOptionFlowFilters(params, subscription.filters); + } + const response = await fetch(buildApiUrl(`${endpoint}?${params.toString()}`)); + if (!response.ok) { + const detail = await readErrorDetail(response); + throw new Error(detail || `HTTP ${response.status}`); + } + const payload = (await response.json()) as LiveHistoryResponse; + const older = payload.data ?? []; + + const mergeOlder = ( + setter: Dispatch>, + limit: number + ) => { + setter((prev) => + mergeNewest(older as T[], prev, limit, (evicted) => + incrementRetentionMetric("hotWindowEvictions", evicted) + ) + ); + }; + + switch (subscription.channel) { + case "options": + mergeOlder(setOptions, LIVE_HOT_WINDOW_OPTIONS); + break; + case "nbbo": + mergeOlder(setNbbo, LIVE_HOT_WINDOW); + break; + case "equities": + mergeOlder(setEquities, LIVE_HOT_WINDOW); + break; + case "equity-quotes": + mergeOlder(setEquityQuotes, LIVE_HOT_WINDOW); + break; + case "equity-joins": + mergeOlder(setEquityJoins, LIVE_HOT_WINDOW); + break; + case "flow": + mergeOlder(setFlow, LIVE_HOT_WINDOW); + break; + case "classifier-hits": + mergeOlder(setClassifierHits, LIVE_HOT_WINDOW); + break; + case "alerts": + mergeOlder(setAlerts, LIVE_HOT_WINDOW); + break; + case "inferred-dark": + mergeOlder(setInferredDark, LIVE_HOT_WINDOW); + break; + } + + setHistoryCursors((current) => ({ + ...current, + [key]: older.length > 0 ? payload.next_before : null + })); + setLastUpdate(Date.now()); + } catch (error) { + setHistoryErrors((current) => ({ + ...current, + [key]: error instanceof Error ? error.message : String(error) + })); + } finally { + setHistoryLoading((current) => ({ ...current, [key]: false })); + } + }, + [enabled, manifest, historyCursors, historyLoading] + ); + return { status, connectedAt, lastUpdate, lastEventByChannel, + manifest, + historyCursors, + historyLoading, + historyErrors, + loadOlder, options, nbbo, equities, + equityQuotes, equityJoins, flow, classifierHits, @@ -2582,6 +2752,39 @@ const TapeControls = ({ paused, onTogglePause, isAtTop, missed, onJump }: TapeCo ); }; +type LoadOlderControlProps = { + channel: LiveSubscription["channel"]; +}; + +const LoadOlderControl = ({ channel }: LoadOlderControlProps) => { + const state = useTerminal(); + const subscription = state.liveSession.manifest.find((candidate) => candidate.channel === channel); + if (state.mode !== "live" || !subscription || !(subscription.channel in LIVE_HISTORY_ENDPOINTS)) { + return null; + } + + const key = getLiveSubscriptionKey(subscription); + const cursor = state.liveSession.historyCursors[key]; + const loading = Boolean(state.liveSession.historyLoading[key]); + const error = state.liveSession.historyErrors[key]; + if (!cursor && !loading && !error) { + return null; + } + + return ( +
+ + {error ? {error} : null} +
+ ); +}; + type CandleChartProps = { ticker: string; intervalMs: number; @@ -5265,6 +5468,7 @@ const OptionsPane = ({ limit }: OptionsPaneProps) => { ) : null} )} + {!limit ? : null} ); @@ -5342,6 +5546,7 @@ const EquitiesPane = ({ limit }: EquitiesPaneProps) => { {virtual.bottomSpacerHeight > 0 ? (
) : null} + {!limit ? : null} )}
@@ -5481,6 +5686,7 @@ const FlowPane = ({ limit, title = "Flow" }: FlowPaneProps) => { {virtual.bottomSpacerHeight > 0 ? (
) : null} + {!limit ? : null} )}
@@ -5576,6 +5782,7 @@ const AlertsPane = ({ limit, withStrip = false, className }: AlertsPaneProps) => {virtual.bottomSpacerHeight > 0 ? (
) : null} + {!limit ? : null} )}
@@ -5656,6 +5863,7 @@ const ClassifierPane = ({ limit, className }: ClassifierPaneProps) => { {virtual.bottomSpacerHeight > 0 ? (
) : null} + {!limit ? : null} )}
@@ -5743,6 +5951,7 @@ const DarkPane = ({ limit, className }: DarkPaneProps) => { {virtual.bottomSpacerHeight > 0 ? (
) : null} + {!limit ? : null} )}
diff --git a/packages/storage/src/clickhouse.ts b/packages/storage/src/clickhouse.ts index c53caa4..850b699 100644 --- a/packages/storage/src/clickhouse.ts +++ b/packages/storage/src/clickhouse.ts @@ -1264,6 +1264,22 @@ export const fetchEquityPrintsBefore = async ( return EquityPrintSchema.array().parse(rows.map(normalizeEquityRow)); }; +export const fetchEquityQuotesBefore = async ( + client: ClickHouseClient, + beforeTs: number, + beforeSeq: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const result = await client.query({ + query: `SELECT * FROM ${EQUITY_QUOTES_TABLE} WHERE ${buildBeforeTupleCondition("ts", "seq", beforeTs, beforeSeq)} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + return EquityQuoteSchema.array().parse(rows.map(normalizeEquityQuoteRow)); +}; + export const fetchEquityPrintJoinsBefore = async ( client: ClickHouseClient, beforeTs: number, diff --git a/packages/storage/tests/equity-quotes.test.ts b/packages/storage/tests/equity-quotes.test.ts index bc3917e..23a34f0 100644 --- a/packages/storage/tests/equity-quotes.test.ts +++ b/packages/storage/tests/equity-quotes.test.ts @@ -4,6 +4,7 @@ import { EQUITY_QUOTES_TABLE, normalizeEquityQuote } from "../src/equity-quotes"; +import { fetchEquityQuotesBefore, type ClickHouseClient } from "../src/clickhouse"; const baseQuote = { source_ts: 100, @@ -27,4 +28,35 @@ describe("equity-quotes storage helpers", () => { expect(ddl).toContain(EQUITY_QUOTES_TABLE); expect(ddl).toContain("CREATE TABLE IF NOT EXISTS"); }); + + it("fetches older quotes with tuple cursor ordering", async () => { + let queryText = ""; + const client = { + query: async ({ query }: { query: string }) => { + queryText = query; + return { + async json() { + return [ + { + ...baseQuote, + source_ts: 90, + ingest_ts: 201, + seq: 2, + trace_id: "trace-2", + ts: 90 + } + ] as T; + } + }; + } + } as unknown as ClickHouseClient; + + const rows = await fetchEquityQuotesBefore(client, 100, 3, 25); + + expect(rows).toHaveLength(1); + expect(rows[0]?.trace_id).toBe("trace-2"); + expect(queryText).toContain(EQUITY_QUOTES_TABLE); + expect(queryText).toContain("WHERE (ts, seq) < (100, 3)"); + expect(queryText).toContain("ORDER BY ts DESC, seq DESC LIMIT 25"); + }); }); diff --git a/packages/types/src/live.ts b/packages/types/src/live.ts index 3d86883..da86c86 100644 --- a/packages/types/src/live.ts +++ b/packages/types/src/live.ts @@ -5,6 +5,7 @@ import { EquityCandleSchema, EquityPrintJoinSchema, EquityPrintSchema, + EquityQuoteSchema, FlowPacketSchema, InferredDarkEventSchema, OptionNBBOSchema, @@ -26,6 +27,7 @@ export const LiveGenericChannelSchema = z.enum([ "options", "nbbo", "equities", + "equity-quotes", "equity-joins", "flow", "classifier-hits", @@ -37,6 +39,7 @@ export const LiveChannelSchema = z.enum([ "options", "nbbo", "equities", + "equity-quotes", "equity-joins", "flow", "classifier-hits", @@ -59,7 +62,7 @@ export const LiveSubscriptionSchema = z.discriminatedUnion("channel", [ filters: OptionFlowFiltersSchema.optional() }), z.object({ - channel: z.enum(["nbbo", "equities", "equity-joins", "classifier-hits", "alerts", "inferred-dark"]) + channel: z.enum(["nbbo", "equities", "equity-quotes", "equity-joins", "classifier-hits", "alerts", "inferred-dark"]) }), z.object({ channel: z.literal("equity-candles"), @@ -78,6 +81,7 @@ const livePayloadSchemas = { options: OptionPrintSchema, nbbo: OptionNBBOSchema, equities: EquityPrintSchema, + "equity-quotes": EquityQuoteSchema, "equity-joins": EquityPrintJoinSchema, flow: FlowPacketSchema, "classifier-hits": ClassifierHitEventSchema, diff --git a/services/api/src/index.ts b/services/api/src/index.ts index c8fa667..911c4bf 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -60,6 +60,7 @@ import { fetchEquityPrintsBefore, fetchEquityPrintsRange, fetchEquityPrintJoinsAfter, + fetchEquityQuotesBefore, fetchEquityQuotesAfter, fetchInferredDarkBefore, fetchInferredDarkAfter, @@ -977,19 +978,21 @@ const run = async () => { const fanoutLive = async ( subscription: LiveSubscription, item: unknown, - ingestChannel: "options" | "nbbo" | "equities" | "equity-candles" | "equity-overlay" | "equity-joins" | "flow" | "classifier-hits" | "alerts" | "inferred-dark" + ingestChannel: "options" | "nbbo" | "equities" | "equity-quotes" | "equity-candles" | "equity-overlay" | "equity-joins" | "flow" | "classifier-hits" | "alerts" | "inferred-dark" ) => { + const watermark = await liveState.ingest(ingestChannel, item); + if ( (ingestChannel === "options" || ingestChannel === "nbbo" || ingestChannel === "equities" || + ingestChannel === "equity-quotes" || ingestChannel === "flow") && !isLiveItemFresh(ingestChannel, item) ) { return; } - const watermark = await liveState.ingest(ingestChannel, item); const matchingSubscriptions = subscription.channel === "options" || subscription.channel === "flow" ? [...subscriptionDefinitions.entries()].filter(([, candidate]) => candidate.channel === subscription.channel) @@ -1088,6 +1091,7 @@ const run = async () => { try { const payload = EquityQuoteSchema.parse(equityQuoteSubscription.decode(msg)); broadcast(equityQuoteSockets, { type: "equity-quote", payload }); + await fanoutLive({ channel: "equity-quotes" }, payload, "equity-quotes"); msg.ack(); } catch (error) { logger.error("failed to process equity quote", { @@ -1380,6 +1384,12 @@ const run = async () => { return jsonResponse(buildHistoryResponse(data, (item) => ({ ts: item.ts, seq: item.seq }))); } + if (req.method === "GET" && url.pathname === "/history/equity-quotes") { + const { beforeTs, beforeSeq, limit } = parseBeforeParams(url); + const data = await fetchEquityQuotesBefore(clickhouse, beforeTs, beforeSeq, limit); + return jsonResponse(buildHistoryResponse(data, (item) => ({ ts: item.ts, seq: item.seq }))); + } + if (req.method === "GET" && url.pathname === "/history/equity-joins") { const { beforeTs, beforeSeq, limit } = parseBeforeParams(url); const data = await fetchEquityPrintJoinsBefore(clickhouse, beforeTs, beforeSeq, limit); diff --git a/services/api/src/live.ts b/services/api/src/live.ts index df916fb..f10cb33 100644 --- a/services/api/src/live.ts +++ b/services/api/src/live.ts @@ -5,6 +5,7 @@ import { fetchRecentEquityCandles, fetchRecentEquityPrintJoins, fetchRecentEquityPrints, + fetchRecentEquityQuotes, fetchRecentFlowPackets, fetchRecentInferredDark, fetchRecentOptionNBBO, @@ -18,6 +19,7 @@ import { EquityCandleSchema, EquityPrintJoinSchema, EquityPrintSchema, + EquityQuoteSchema, FeedSnapshot, FlowPacketSchema, InferredDarkEventSchema, @@ -44,6 +46,7 @@ const GENERIC_LIMIT_ENV_KEYS: Record = { options: "LIVE_LIMIT_OPTIONS", nbbo: "LIVE_LIMIT_NBBO", equities: "LIVE_LIMIT_EQUITIES", + "equity-quotes": "LIVE_LIMIT_EQUITY_QUOTES", "equity-joins": "LIVE_LIMIT_EQUITY_JOINS", flow: "LIVE_LIMIT_FLOW", "classifier-hits": "LIVE_LIMIT_CLASSIFIER_HITS", @@ -69,6 +72,7 @@ export const LIVE_FRESHNESS_THRESHOLDS: Partial ({ ts: item.ts, seq: item.seq }), fetchRecent: fetchRecentEquityPrints }, + "equity-quotes": { + redisKey: "live:equity-quotes", + cursorField: "equity-quotes", + limit: limits["equity-quotes"], + parse: (value) => EquityQuoteSchema.parse(value), + cursor: (item) => ({ ts: item.ts, seq: item.seq }), + fetchRecent: fetchRecentEquityQuotes + }, "equity-joins": { redisKey: "live:equity-joins", cursorField: "equity-joins", @@ -251,6 +264,7 @@ const extractFreshnessTs = (channel: LiveGenericChannel, item: any): number | nu case "options": case "nbbo": case "equities": + case "equity-quotes": return typeof item.ts === "number" ? item.ts : null; case "flow": return typeof item.source_ts === "number" ? item.source_ts : null; @@ -275,19 +289,6 @@ export const isLiveItemFresh = ( return now - ts <= thresholdMs; }; -const filterFreshGenericItems = ( - channel: LiveGenericChannel, - items: T[], - now = Date.now() -): T[] => { - const thresholdMs = LIVE_FRESHNESS_THRESHOLDS[channel]; - if (!thresholdMs) { - return items; - } - - return items.filter((item) => isLiveItemFresh(channel, item, now)); -}; - const nextBeforeForItems = (items: T[], cursorOf: (item: T) => Cursor): Cursor | null => { const last = items.at(-1); return last ? cursorOf(last) : null; @@ -396,21 +397,17 @@ export class LiveStateManager { undefined, storageFilters ); - const freshItems = filterFreshGenericItems("options", items); return { subscription, - items: freshItems, + items, watermark: items[0] ? { ts: items[0].ts, seq: items[0].seq } : null, - next_before: nextBeforeForItems(freshItems, (item) => ({ ts: item.ts, seq: item.seq })) + next_before: nextBeforeForItems(items, (item) => ({ ts: item.ts, seq: item.seq })) }; } const config = this.generic.options; - const items = filterFreshGenericItems( - "options", - (this.genericItems.get("options") ?? []).filter((item) => - matchesOptionPrintFilters(item, subscription.filters) - ) + const items = (this.genericItems.get("options") ?? []).filter((item) => + matchesOptionPrintFilters(item, subscription.filters) ); return { subscription, @@ -421,11 +418,8 @@ export class LiveStateManager { } case "flow": { const config = this.generic.flow; - const items = filterFreshGenericItems( - "flow", - (this.genericItems.get("flow") ?? []).filter((item) => - matchesFlowPacketFilters(item, subscription.filters) - ) + const items = (this.genericItems.get("flow") ?? []).filter((item) => + matchesFlowPacketFilters(item, subscription.filters) ); return { subscription, @@ -464,10 +458,7 @@ export class LiveStateManager { } default: { const config = this.generic[subscription.channel]; - const items = filterFreshGenericItems( - subscription.channel, - this.genericItems.get(subscription.channel) ?? [] - ); + const items = this.genericItems.get(subscription.channel) ?? []; return { subscription, items, @@ -513,9 +504,6 @@ export class LiveStateManager { default: { const config = this.generic[channel]; const parsed = config.parse(item); - if (!isLiveItemFresh(channel, parsed)) { - return this.genericCursors.get(config.cursorField) ?? null; - } const items = this.genericItems.get(channel) ?? []; const next = normalizeGenericItems(channel, [parsed, ...items], config); this.genericItems.set(channel, next); diff --git a/services/api/tests/live.test.ts b/services/api/tests/live.test.ts index 21bcd28..41ad732 100644 --- a/services/api/tests/live.test.ts +++ b/services/api/tests/live.test.ts @@ -58,6 +58,7 @@ describe("LiveStateManager", () => { expect(limits.options).toBe(777); expect(limits.nbbo).toBe(100000); expect(limits.flow).toBe(10000); + expect(limits["equity-quotes"]).toBe(10000); expect(limits.alerts).toBe(10000); }); @@ -145,6 +146,7 @@ describe("LiveStateManager", () => { options: 10000, nbbo: 10000, equities: 10000, + "equity-quotes": 10000, "equity-joins": 10000, flow: 2, "classifier-hits": 10000, @@ -277,7 +279,7 @@ describe("LiveStateManager", () => { expect(flowSnapshot.items).toHaveLength(1); }); - it("suppresses stale items from live snapshots while preserving fresh ones", async () => { + it("keeps stale persisted items in live snapshots", async () => { const manager = new LiveStateManager(makeClickHouse(), null); const now = Date.now(); @@ -383,16 +385,20 @@ describe("LiveStateManager", () => { ]); expect((optionsSnapshot.items as Array<{ trace_id: string }>).map((item) => item.trace_id)).toEqual([ - "opt-fresh" + "opt-fresh", + "opt-stale" ]); expect((nbboSnapshot.items as Array<{ trace_id: string }>).map((item) => item.trace_id)).toEqual([ - "nbbo-fresh" + "nbbo-fresh", + "nbbo-stale" ]); expect((equitiesSnapshot.items as Array<{ trace_id: string }>).map((item) => item.trace_id)).toEqual([ - "eq-fresh" + "eq-fresh", + "eq-stale" ]); expect((flowSnapshot.items as Array<{ id: string }>).map((item) => item.id)).toEqual([ - "flow-fresh" + "flow-fresh", + "flow-stale" ]); }); @@ -476,7 +482,7 @@ describe("LiveStateManager", () => { ]); }); - it("rejects stale ingest for freshness-gated channels", async () => { + it("stores older valid ingest for freshness-gated channels", async () => { const manager = new LiveStateManager(makeClickHouse(), null); const now = Date.now(); @@ -494,7 +500,71 @@ describe("LiveStateManager", () => { }); const snapshot = await manager.getSnapshot({ channel: "equities" }); - expect(snapshot.items).toHaveLength(0); + expect(snapshot.items).toHaveLength(1); + expect(snapshot.next_before).toEqual({ ts: now - 60_000, seq: 1 }); + }); + + it("hydrates equity quotes from redis", async () => { + const redis = makeRedis(); + const now = Date.now(); + await redis.lPush( + "live:equity-quotes", + JSON.stringify({ + source_ts: now, + ingest_ts: now + 1, + seq: 1, + trace_id: "quote-1", + ts: now, + underlying_id: "SPY", + bid: 450, + ask: 450.01 + }) + ); + await redis.hSet("live:cursors", "equity-quotes", JSON.stringify({ ts: now, seq: 1 })); + + const manager = new LiveStateManager(makeClickHouse(), redis as never); + await manager.hydrate(); + const snapshot = await manager.getSnapshot({ channel: "equity-quotes" }); + + expect(snapshot.items).toHaveLength(1); + expect(snapshot.watermark).toEqual({ ts: now, seq: 1 }); + expect(snapshot.next_before).toEqual({ ts: now, seq: 1 }); + }); + + it("hydrates equity quotes from clickhouse when redis is empty and persists hot cache", async () => { + const redis = makeRedis(); + const now = Date.now(); + const clickhouse = { + ...makeClickHouse(), + query: async ({ query }: { query: string }) => ({ + async json() { + if (query.includes("equity_quotes")) { + return [ + { + source_ts: now, + ingest_ts: now + 1, + seq: 2, + trace_id: "quote-2", + ts: now, + underlying_id: "SPY", + bid: 451, + ask: 451.01 + } + ] as T; + } + return [] as T; + } + }) + } as ClickHouseClient; + + const manager = new LiveStateManager(clickhouse, redis as never); + await manager.hydrate(); + const snapshot = await manager.getSnapshot({ channel: "equity-quotes" }); + const persisted = await redis.lRange("live:equity-quotes", 0, 10); + + expect(snapshot.items).toHaveLength(1); + expect(snapshot.watermark).toEqual({ ts: now, seq: 2 }); + expect(persisted).toHaveLength(1); }); it("exposes freshness helper for event fanout gating", () => {