From 034d24f8acaa6650604bd49eec02bdf5f66615ff Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Thu, 7 May 2026 00:39:26 -0400 Subject: [PATCH] Restore continuous live tape history --- apps/web/app/terminal.test.ts | 75 ++++++++ apps/web/app/terminal.tsx | 327 +++++++++++++++++++++++--------- services/api/src/index.ts | 8 +- services/api/src/live.ts | 28 +-- services/api/tests/live.test.ts | 162 +++++++++++++++- 5 files changed, 483 insertions(+), 117 deletions(-) diff --git a/apps/web/app/terminal.test.ts b/apps/web/app/terminal.test.ts index c96d86e..78c7c70 100644 --- a/apps/web/app/terminal.test.ts +++ b/apps/web/app/terminal.test.ts @@ -1,4 +1,5 @@ import { describe, expect, it } from "bun:test"; +import { getSubscriptionKey as getLiveSubscriptionKey } from "@islandflow/types"; import { NAV_ITEMS, appendHistoryTail, @@ -10,10 +11,12 @@ import { formatOptionContractLabel, flushPausableTapeData, getAlertWindowAnchorTs, + getScopedLiveAutoHydrationChannels, getLiveHistoryRetentionCap, getOptionTableSnapshot, getLiveFeedStatus, getLiveManifest, + mergeNewestWithOverflow, normalizeAlertSeverity, nextFlowFilterPopoverState, projectPausableTapeState, @@ -243,6 +246,36 @@ describe("live tape pausable helpers", () => { }); describe("live tape history helpers", () => { + it("promotes hot-window overflow into the history tail", () => { + const currentHot = [makeItem("hot-3", 3, 300), makeItem("hot-2", 2, 200), makeItem("hot-1", 1, 100)]; + const incoming = [makeItem("hot-4", 4, 400)]; + + const { kept, evicted } = mergeNewestWithOverflow(incoming, currentHot, 3); + const nextHistory = appendHistoryTail([], evicted, kept, 5000); + + expect(kept.map((item) => item.trace_id)).toEqual(["hot-4", "hot-3", "hot-2"]); + expect(nextHistory.map((item) => item.trace_id)).toEqual(["hot-1"]); + }); + + it("keeps the combined tape continuous beyond the hot live window", () => { + let hot: Array> = []; + let history: Array> = []; + + for (let seq = 1; seq <= 5; seq += 1) { + const { kept, evicted } = mergeNewestWithOverflow([makeItem(`row-${seq}`, seq, seq * 100)], hot, 2); + hot = kept; + history = appendHistoryTail(history, evicted, hot, 5000); + } + + expect([...hot, ...history].map((item) => item.trace_id)).toEqual([ + "row-5", + "row-4", + "row-3", + "row-2", + "row-1" + ]); + }); + it("appends older scoped rows behind the hot live head", () => { const liveHead = Array.from({ length: 100 }, (_, idx) => makeItem(`hot-${idx}`, 200 - idx, 2_000 - idx) @@ -263,6 +296,16 @@ describe("live tape history helpers", () => { expect(next.map((item) => item.trace_id)).toEqual(["older"]); }); + it("dedupes the seam between promoted overflow and fetched history", () => { + const currentHot = [makeItem("hot-3", 3, 300), makeItem("hot-2", 2, 200), makeItem("hot-1", 1, 100)]; + const { kept, evicted } = mergeNewestWithOverflow([makeItem("hot-4", 4, 400)], currentHot, 3); + const promoted = appendHistoryTail([], evicted, kept, 5000); + const merged = appendHistoryTail(promoted, [makeItem("hot-1", 1, 100), makeItem("older", 0, 50)], kept, 5000); + + expect(merged.map((item) => item.trace_id)).toEqual(["hot-1", "older"]); + expect(new Set([...kept, ...merged].map((item) => item.trace_id)).size).toBe(kept.length + merged.length); + }); + it("trims the history tail to the soft cap", () => { const current = [makeItem("existing", 4, 400)]; const older = [makeItem("older-1", 3, 300), makeItem("older-2", 2, 200)]; @@ -287,6 +330,38 @@ describe("live tape history helpers", () => { } as any) ).toBeGreaterThan(0); }); + + it("keeps auto-hydrating scoped live history while next_before exists", () => { + const manifest = getLiveManifest( + "/tape", + "AAPL", + 60000, + buildDefaultFlowFilters(), + { + underlying_ids: ["AAPL"], + option_contract_id: "AAPL-2025-01-17-200-C" + }, + { underlying_ids: ["AAPL"] } + ); + const historyCursors = Object.fromEntries( + manifest.map((subscription) => [getLiveSubscriptionKey(subscription), { ts: 1, seq: 1 }]) + ); + + expect( + getScopedLiveAutoHydrationChannels(true, "/tape", manifest, historyCursors, {}) + ).toEqual(["options", "equities"]); + expect( + getScopedLiveAutoHydrationChannels(true, "/tape", manifest, historyCursors, { + [getLiveSubscriptionKey(manifest.find((subscription) => subscription.channel === "options")!)]: true + }) + ).toEqual(["equities"]); + expect( + getScopedLiveAutoHydrationChannels(true, "/tape", manifest, { + ...historyCursors, + [getLiveSubscriptionKey(manifest.find((subscription) => subscription.channel === "equities")!)]: null + }, {}) + ).toEqual(["options"]); + }); }); describe("options display formatters", () => { diff --git a/apps/web/app/terminal.tsx b/apps/web/app/terminal.tsx index d20be39..72edbd5 100644 --- a/apps/web/app/terminal.tsx +++ b/apps/web/app/terminal.tsx @@ -368,15 +368,15 @@ const buildItemKey = (item: SortableItem): string | null => { return null; }; -const mergeNewest = ( +export const mergeNewestWithOverflow = ( incoming: T[], existing: T[], limit = LIVE_HOT_WINDOW, onTrim?: (evicted: number) => void -): T[] => { +): { kept: T[]; evicted: T[] } => { const combined = [...incoming, ...existing]; if (combined.length === 0) { - return combined; + return { kept: combined, evicted: [] }; } const seen = new Set(); @@ -402,12 +402,24 @@ const mergeNewest = ( }); const safeLimit = Math.max(1, Math.floor(limit)); - const evicted = Math.max(0, deduped.length - safeLimit); - if (evicted > 0) { - onTrim?.(evicted); + const evicted = deduped.slice(safeLimit); + if (evicted.length > 0) { + onTrim?.(evicted.length); } - return deduped.slice(0, safeLimit); + return { + kept: deduped.slice(0, safeLimit), + evicted + }; +}; + +const mergeNewest = ( + incoming: T[], + existing: T[], + limit = LIVE_HOT_WINDOW, + onTrim?: (evicted: number) => void +): T[] => { + return mergeNewestWithOverflow(incoming, existing, limit, onTrim).kept; }; const getTapeItemKey = (item: SortableItem): string => { @@ -520,25 +532,27 @@ export const appendHistoryTail = ( return current; } - const seen = new Set(); - for (const item of liveHead) { - seen.add(getTapeItemKey(item)); - } - for (const item of current) { - seen.add(getTapeItemKey(item)); - } + const seen = new Set(liveHead.map((item) => getTapeItemKey(item))); + const combined: T[] = []; - const appended = [...current]; - for (const item of incoming) { + for (const item of [...current, ...incoming]) { const key = getTapeItemKey(item); if (seen.has(key)) { continue; } seen.add(key); - appended.push(item); + combined.push(item); } - return cap > 0 ? appended.slice(0, cap) : appended; + combined.sort((a, b) => { + const delta = extractSortTs(b) - extractSortTs(a); + if (delta !== 0) { + return delta; + } + return extractSortSeq(b) - extractSortSeq(a); + }); + + return cap > 0 ? combined.slice(0, cap) : combined; }; export const getLiveHistoryRetentionCap = (subscription: LiveSubscription): number => { @@ -551,6 +565,36 @@ export const getLiveHistoryRetentionCap = (subscription: LiveSubscription): numb } }; +export const getScopedLiveAutoHydrationChannels = ( + enabled: boolean, + pathname: string, + manifest: LiveSubscription[], + historyCursors: Partial>, + historyLoading: Partial> +): Array> => { + if (!enabled || pathname !== "/tape") { + return []; + } + + const channels: Array> = []; + for (const subscription of manifest) { + const scoped = + (subscription.channel === "options" && + (subscription.underlying_ids?.length || subscription.option_contract_id)) || + (subscription.channel === "equities" && subscription.underlying_ids?.length); + if (!scoped) { + continue; + } + + const key = getLiveSubscriptionKey(subscription); + if (historyCursors[key] && !historyLoading[key]) { + channels.push(subscription.channel); + } + } + + return channels; +}; + export const getLiveFeedStatus = ( sourceStatus: WsStatus, freshestTs: number | null, @@ -2544,6 +2588,27 @@ const useLiveSession = ( const [inferredDarkHistory, setInferredDarkHistory] = useState([]); const [chartCandles, setChartCandles] = useState([]); const [chartOverlay, setChartOverlay] = useState([]); + const optionsRef = useRef([]); + const nbboRef = useRef([]); + const equitiesRef = useRef([]); + const equityQuotesRef = useRef([]); + const equityJoinsRef = useRef([]); + const flowRef = useRef([]); + const smartMoneyRef = useRef([]); + const classifierHitsRef = useRef([]); + const alertsRef = useRef([]); + const inferredDarkRef = useRef([]); + const chartCandlesRef = useRef([]); + const chartOverlayRef = useRef([]); + const optionsHistoryRef = useRef([]); + const nbboHistoryRef = useRef([]); + const equitiesHistoryRef = useRef([]); + const equityJoinsHistoryRef = useRef([]); + const flowHistoryRef = useRef([]); + const smartMoneyHistoryRef = useRef([]); + const classifierHitsHistoryRef = useRef([]); + const alertsHistoryRef = useRef([]); + const inferredDarkHistoryRef = useRef([]); const socketRef = useRef(null); const reconnectRef = useRef(null); const idleWatchdogRef = useRef(null); @@ -2556,6 +2621,27 @@ const useLiveSession = ( [pathname, chartTicker, chartIntervalMs, flowFilters, optionScope, equityScope] ); + const replaceArrayState = ( + setter: Dispatch>, + ref: { current: T[] }, + next: T[] + ): void => { + ref.current = next; + setter(next); + }; + + const mergeHistoryState = ( + setter: Dispatch>, + ref: { current: T[] }, + incoming: T[], + liveHead: T[], + cap = LIVE_HISTORY_SOFT_CAP + ): void => { + const next = appendHistoryTail(ref.current, incoming, liveHead, cap); + ref.current = next; + setter(next); + }; + useEffect(() => { if (!enabled) { setStatus("disconnected"); @@ -2586,6 +2672,27 @@ const useLiveSession = ( setInferredDarkHistory([]); setChartCandles([]); setChartOverlay([]); + optionsRef.current = []; + nbboRef.current = []; + equitiesRef.current = []; + equityQuotesRef.current = []; + equityJoinsRef.current = []; + flowRef.current = []; + smartMoneyRef.current = []; + classifierHitsRef.current = []; + alertsRef.current = []; + inferredDarkRef.current = []; + chartCandlesRef.current = []; + chartOverlayRef.current = []; + optionsHistoryRef.current = []; + nbboHistoryRef.current = []; + equitiesHistoryRef.current = []; + equityJoinsHistoryRef.current = []; + flowHistoryRef.current = []; + smartMoneyHistoryRef.current = []; + classifierHitsHistoryRef.current = []; + alertsHistoryRef.current = []; + inferredDarkHistoryRef.current = []; subscribedKeysRef.current = new Set(); subscribedMapRef.current = new Map(); if (socketRef.current) { @@ -2642,62 +2749,112 @@ const useLiveSession = ( const updateAt = Date.now(); const mergeItems = ( - setter: React.Dispatch>, + setter: Dispatch>, + ref: { current: T[] }, nextItems: T[], - retentionLimit = LIVE_HOT_WINDOW + retentionLimit = LIVE_HOT_WINDOW, + history?: { + setter: Dispatch>; + ref: { current: T[] }; + cap?: number; + } ) => { - setter((prev) => - message.op === "snapshot" - ? shouldRetainLiveSnapshotHistory( - subscription.channel, - true, - nextItems.length, - prev.length - ) - ? prev - : (nextItems as T[]) - : mergeNewest(nextItems as T[], prev, retentionLimit, (evicted) => - incrementRetentionMetric("hotWindowEvictions", evicted) - ) + if (message.op === "snapshot") { + const next = shouldRetainLiveSnapshotHistory( + subscription.channel, + true, + nextItems.length, + ref.current.length + ) + ? ref.current + : nextItems; + replaceArrayState(setter, ref, next); + return; + } + + const { kept, evicted } = mergeNewestWithOverflow( + nextItems, + ref.current, + retentionLimit, + (evictedCount) => incrementRetentionMetric("hotWindowEvictions", evictedCount) ); + replaceArrayState(setter, ref, kept); + if (history && evicted.length > 0) { + mergeHistoryState(history.setter, history.ref, evicted, kept, history.cap); + } }; switch (subscription.channel) { case "options": - mergeItems(setOptions, items as OptionPrint[], LIVE_HOT_WINDOW_OPTIONS); + mergeItems(setOptions, optionsRef, items as OptionPrint[], LIVE_HOT_WINDOW_OPTIONS, { + setter: setOptionsHistory, + ref: optionsHistoryRef, + cap: getLiveHistoryRetentionCap(subscription) + }); break; case "nbbo": - mergeItems(setNbbo, items as OptionNBBO[]); + mergeItems(setNbbo, nbboRef, items as OptionNBBO[], LIVE_HOT_WINDOW, { + setter: setNbboHistory, + ref: nbboHistoryRef + }); break; case "equities": - mergeItems(setEquities, items as EquityPrint[]); + mergeItems(setEquities, equitiesRef, items as EquityPrint[], LIVE_HOT_WINDOW, { + setter: setEquitiesHistory, + ref: equitiesHistoryRef, + cap: getLiveHistoryRetentionCap(subscription) + }); break; case "equity-quotes": - mergeItems(setEquityQuotes, items as EquityQuote[]); + mergeItems(setEquityQuotes, equityQuotesRef, items as EquityQuote[]); break; case "equity-joins": - mergeItems(setEquityJoins, items as EquityPrintJoin[]); + mergeItems(setEquityJoins, equityJoinsRef, items as EquityPrintJoin[], LIVE_HOT_WINDOW, { + setter: setEquityJoinsHistory, + ref: equityJoinsHistoryRef + }); break; case "flow": - mergeItems(setFlow, items as FlowPacket[]); + mergeItems(setFlow, flowRef, items as FlowPacket[], LIVE_HOT_WINDOW, { + setter: setFlowHistory, + ref: flowHistoryRef + }); break; case "smart-money": - mergeItems(setSmartMoney, items as SmartMoneyEvent[]); + mergeItems(setSmartMoney, smartMoneyRef, items as SmartMoneyEvent[], LIVE_HOT_WINDOW, { + setter: setSmartMoneyHistory, + ref: smartMoneyHistoryRef + }); break; case "classifier-hits": - mergeItems(setClassifierHits, items as ClassifierHitEvent[]); + mergeItems( + setClassifierHits, + classifierHitsRef, + items as ClassifierHitEvent[], + LIVE_HOT_WINDOW, + { + setter: setClassifierHitsHistory, + ref: classifierHitsHistoryRef + } + ); break; case "alerts": - mergeItems(setAlerts, items as AlertEvent[]); + mergeItems(setAlerts, alertsRef, items as AlertEvent[], LIVE_HOT_WINDOW, { + setter: setAlertsHistory, + ref: alertsHistoryRef + }); break; case "inferred-dark": - mergeItems(setInferredDark, items as InferredDarkEvent[]); + mergeItems(setInferredDark, inferredDarkRef, items as InferredDarkEvent[], LIVE_HOT_WINDOW, { + setter: setInferredDarkHistory, + ref: inferredDarkHistoryRef + }); break; case "equity-candles": - mergeItems(setChartCandles, items as EquityCandle[]); + mergeItems(setChartCandles, chartCandlesRef, items as EquityCandle[]); break; case "equity-overlay": - mergeItems(setChartOverlay, items as EquityPrint[]); + mergeItems(setChartOverlay, chartOverlayRef, items as EquityPrint[]); break; } @@ -2839,10 +2996,14 @@ const useLiveSession = ( .filter((channel) => channel === "options" || channel === "equities") ); if (resetScopedChannels.has("options")) { + optionsRef.current = []; + optionsHistoryRef.current = []; setOptions([]); setOptionsHistory([]); } if (resetScopedChannels.has("equities")) { + equitiesRef.current = []; + equitiesHistoryRef.current = []; setEquities([]); setEquitiesHistory([]); } @@ -2926,41 +3087,56 @@ const useLiveSession = ( const mergeOlder = ( setter: Dispatch>, + ref: { current: T[] }, liveHead: T[], cap = LIVE_HISTORY_SOFT_CAP ) => { - setter((prev) => appendHistoryTail(prev, older as T[], liveHead, cap)); + mergeHistoryState(setter, ref, older as T[], liveHead, cap); }; switch (subscription.channel) { case "options": - mergeOlder(setOptionsHistory, options, getLiveHistoryRetentionCap(subscription)); + mergeOlder( + setOptionsHistory, + optionsHistoryRef, + optionsRef.current, + getLiveHistoryRetentionCap(subscription) + ); break; case "nbbo": - mergeOlder(setNbboHistory, nbbo); + mergeOlder(setNbboHistory, nbboHistoryRef, nbboRef.current); break; case "equities": - mergeOlder(setEquitiesHistory, equities, getLiveHistoryRetentionCap(subscription)); + mergeOlder( + setEquitiesHistory, + equitiesHistoryRef, + equitiesRef.current, + getLiveHistoryRetentionCap(subscription) + ); break; case "equity-quotes": break; case "equity-joins": - mergeOlder(setEquityJoinsHistory, equityJoins); + mergeOlder(setEquityJoinsHistory, equityJoinsHistoryRef, equityJoinsRef.current); break; case "flow": - mergeOlder(setFlowHistory, flow); + mergeOlder(setFlowHistory, flowHistoryRef, flowRef.current); break; case "smart-money": - mergeOlder(setSmartMoneyHistory, smartMoney); + mergeOlder(setSmartMoneyHistory, smartMoneyHistoryRef, smartMoneyRef.current); break; case "classifier-hits": - mergeOlder(setClassifierHitsHistory, classifierHits); + mergeOlder( + setClassifierHitsHistory, + classifierHitsHistoryRef, + classifierHitsRef.current + ); break; case "alerts": - mergeOlder(setAlertsHistory, alerts); + mergeOlder(setAlertsHistory, alertsHistoryRef, alertsRef.current); break; case "inferred-dark": - mergeOlder(setInferredDarkHistory, inferredDark); + mergeOlder(setInferredDarkHistory, inferredDarkHistoryRef, inferredDarkRef.current); break; } @@ -2978,41 +3154,18 @@ const useLiveSession = ( setHistoryLoading((current) => ({ ...current, [key]: false })); } }, - [ - enabled, - manifest, - historyCursors, - historyLoading, - options, - nbbo, - equities, - equityJoins, - flow, - smartMoney, - classifierHits, - alerts, - inferredDark - ] + [enabled, manifest, historyCursors, historyLoading] ); useEffect(() => { - if (!enabled || pathname !== "/tape") { - return; - } - const scoped = manifest.filter( - (subscription) => - (subscription.channel === "options" && - (subscription.underlying_ids?.length || subscription.option_contract_id)) || - (subscription.channel === "equities" && subscription.underlying_ids?.length) - ); - if (scoped.length === 0) { - return; - } - for (const subscription of scoped) { - const key = getLiveSubscriptionKey(subscription); - if (historyCursors[key] && !historyLoading[key]) { - void loadOlder(subscription.channel); - } + for (const channel of getScopedLiveAutoHydrationChannels( + enabled, + pathname, + manifest, + historyCursors, + historyLoading + )) { + void loadOlder(channel); } }, [enabled, pathname, manifest, historyCursors, historyLoading, loadOlder]); diff --git a/services/api/src/index.ts b/services/api/src/index.ts index c450ea7..ff72307 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -112,7 +112,7 @@ import { } from "@islandflow/types"; import { createClient } from "redis"; import { z } from "zod"; -import { LIVE_FEED_LOOKBACK_MS, LiveStateManager, shouldFanoutLiveEvent } from "./live"; +import { LiveStateManager, shouldFanoutLiveEvent } from "./live"; const service = "api"; const logger = createLogger({ service }); @@ -617,14 +617,12 @@ const parseLiveOptionPrintFilters = (url: URL): OptionPrintQueryFilters => { return { ...storageFilters, underlyingIds: parseScopeList(url, "underlying_id", "underlying_ids"), - optionContractId: url.searchParams.get("option_contract_id") ?? undefined, - sinceTs: Date.now() - LIVE_FEED_LOOKBACK_MS + optionContractId: url.searchParams.get("option_contract_id") ?? undefined }; }; const parseLiveEquityPrintFilters = (url: URL): EquityPrintQueryFilters => ({ - underlyingIds: parseScopeList(url, "underlying_id", "underlying_ids"), - sinceTs: Date.now() - LIVE_FEED_LOOKBACK_MS + underlyingIds: parseScopeList(url, "underlying_id", "underlying_ids") }); const matchesScopedOptionSubscription = ( diff --git a/services/api/src/live.ts b/services/api/src/live.ts index aa4281c..2907214 100644 --- a/services/api/src/live.ts +++ b/services/api/src/live.ts @@ -408,13 +408,7 @@ export class LiveStateManager { const config = this.generic[channel]; if (this.redis?.isOpen) { const payloads = await this.redis.lRange(config.redisKey, 0, config.limit - 1); - const cached = normalizeGenericItems( - channel, - parseJsonList(payloads, config.parse).filter((item) => - isWithinLiveFeedLookback(channel, item) - ), - config - ); + const cached = normalizeGenericItems(channel, parseJsonList(payloads, config.parse), config); if (cached.length > 0) { this.genericItems.set(channel, cached); this.stats.genericHydrateFromRedis += 1; @@ -434,9 +428,7 @@ export class LiveStateManager { const fresh = normalizeGenericItems( channel, - (await config.fetchRecent(this.clickhouse, config.limit)).filter((item) => - isWithinLiveFeedLookback(channel, item) - ), + await config.fetchRecent(this.clickhouse, config.limit), config ); this.stats.genericHydrateFromClickHouse += 1; @@ -467,8 +459,7 @@ export class LiveStateManager { optionTypes: subscription.filters?.optionTypes, minNotional: subscription.filters?.minNotional, underlyingIds: subscription.underlying_ids, - optionContractId: subscription.option_contract_id, - sinceTs: Date.now() - LIVE_FEED_LOOKBACK_MS + optionContractId: subscription.option_contract_id }; const items = await fetchRecentOptionPrints( this.clickhouse, @@ -487,7 +478,6 @@ export class LiveStateManager { const config = this.generic.options; const limit = snapshotLimitFor(subscription, config.limit); const items = (this.genericItems.get("options") ?? []).filter((item) => - isWithinLiveFeedLookback("options", item) && matchesOptionPrintFilters(item, subscription.filters) ).slice(0, limit); return { @@ -501,7 +491,6 @@ export class LiveStateManager { const config = this.generic.flow; const limit = snapshotLimitFor(subscription, config.limit); const items = (this.genericItems.get("flow") ?? []).filter((item) => - isWithinLiveFeedLookback("flow", item) && matchesFlowPacketFilters(item, subscription.filters) ).slice(0, limit); return { @@ -516,8 +505,7 @@ export class LiveStateManager { const limit = snapshotLimitFor(subscription, config.limit); if (subscription.underlying_ids?.length) { const filters: EquityPrintQueryFilters = { - underlyingIds: subscription.underlying_ids, - sinceTs: Date.now() - LIVE_FEED_LOOKBACK_MS + underlyingIds: subscription.underlying_ids }; const items = await fetchRecentEquityPrints(this.clickhouse, limit, filters); return { @@ -527,9 +515,7 @@ export class LiveStateManager { next_before: nextBeforeForItems(items, config.cursor) }; } - const items = (this.genericItems.get("equities") ?? []).filter((item) => - isWithinLiveFeedLookback("equities", item) - ).slice(0, limit); + const items = (this.genericItems.get("equities") ?? []).slice(0, limit); return { subscription, items, @@ -568,9 +554,7 @@ export class LiveStateManager { default: { const config = this.generic[subscription.channel]; const limit = snapshotLimitFor(subscription, config.limit); - const items = (this.genericItems.get(subscription.channel) ?? []).filter((item) => - isWithinLiveFeedLookback(subscription.channel, item) - ).slice(0, limit); + const items = (this.genericItems.get(subscription.channel) ?? []).slice(0, limit); return { subscription, items, diff --git a/services/api/tests/live.test.ts b/services/api/tests/live.test.ts index 3cb789e..898d2fa 100644 --- a/services/api/tests/live.test.ts +++ b/services/api/tests/live.test.ts @@ -7,15 +7,17 @@ import { shouldFanoutLiveEvent } from "../src/live"; -const makeClickHouse = (): ClickHouseClient => +const makeClickHouse = ( + queryResolver?: (query: string) => unknown[] +): ClickHouseClient => ({ exec: async () => {}, insert: async () => {}, ping: async () => ({ success: true }), close: async () => {}, - query: async () => ({ + query: async ({ query }: { query: string }) => ({ async json() { - return [] as T; + return (queryResolver?.(query) ?? []) as T; } }) }) as ClickHouseClient; @@ -408,6 +410,160 @@ describe("LiveStateManager", () => { ]); }); + it("seeds scoped option snapshots from clickhouse rows older than 24h", async () => { + const now = Date.now(); + const staleTs = now - 25 * 60 * 60 * 1000; + const manager = new LiveStateManager( + makeClickHouse((query) => + query.includes("FROM option_prints") + ? [ + { + source_ts: staleTs, + ingest_ts: staleTs + 1, + seq: 1, + trace_id: "opt-ancient", + ts: staleTs, + option_contract_id: "AAPL-2025-01-17-200-C", + underlying_id: "AAPL", + price: 1, + size: 10, + exchange: "X", + signal_pass: true + } + ] + : [] + ), + null + ); + + const snapshot = await manager.getSnapshot({ + channel: "options", + underlying_ids: ["AAPL"], + option_contract_id: "AAPL-2025-01-17-200-C" + }); + + expect((snapshot.items as Array<{ trace_id: string }>).map((item) => item.trace_id)).toEqual([ + "opt-ancient" + ]); + expect(snapshot.next_before).toEqual({ ts: staleTs, seq: 1 }); + expect(isLiveItemFresh("options", snapshot.items[0], now)).toBe(false); + }); + + it("seeds scoped equity snapshots from clickhouse rows older than 24h", async () => { + const now = Date.now(); + const staleTs = now - 25 * 60 * 60 * 1000; + const manager = new LiveStateManager( + makeClickHouse((query) => + query.includes("FROM equity_prints") + ? [ + { + source_ts: staleTs, + ingest_ts: staleTs + 1, + seq: 1, + trace_id: "eq-ancient", + ts: staleTs, + underlying_id: "AAPL", + price: 100, + size: 10, + exchange: "X", + offExchangeFlag: false + } + ] + : [] + ), + null + ); + + const snapshot = await manager.getSnapshot({ + channel: "equities", + underlying_ids: ["AAPL"] + }); + + expect((snapshot.items as Array<{ trace_id: string }>).map((item) => item.trace_id)).toEqual([ + "eq-ancient" + ]); + expect(snapshot.next_before).toEqual({ ts: staleTs, seq: 1 }); + expect(isLiveItemFresh("equities", snapshot.items[0], now)).toBe(false); + }); + + it("hydrates retained rows older than 24h into generic live snapshots and keeps them stale", async () => { + const redis = makeRedis(); + const now = Date.now(); + const staleTs = now - 25 * 60 * 60 * 1000; + + await redis.lPush( + "live:options", + JSON.stringify({ + source_ts: staleTs, + ingest_ts: staleTs + 1, + seq: 1, + trace_id: "opt-retained", + ts: staleTs, + option_contract_id: "AAPL-2025-01-17-200-C", + underlying_id: "AAPL", + price: 1, + size: 10, + exchange: "X", + signal_pass: true + }) + ); + await redis.hSet("live:cursors", "options", JSON.stringify({ ts: staleTs, seq: 1 })); + + await redis.lPush( + "live:equities", + JSON.stringify({ + source_ts: staleTs, + ingest_ts: staleTs + 1, + seq: 2, + trace_id: "eq-retained", + ts: staleTs, + underlying_id: "AAPL", + price: 100, + size: 10, + exchange: "X", + offExchangeFlag: false + }) + ); + await redis.hSet("live:cursors", "equities", JSON.stringify({ ts: staleTs, seq: 2 })); + + await redis.lPush( + "live:flow", + JSON.stringify({ + source_ts: staleTs, + ingest_ts: staleTs + 1, + seq: 3, + trace_id: "flow-retained", + id: "flow-retained", + members: ["opt-retained"], + features: {}, + join_quality: {} + }) + ); + await redis.hSet("live:cursors", "flow", JSON.stringify({ ts: staleTs, seq: 3 })); + + const manager = new LiveStateManager(makeClickHouse(), redis as never); + await manager.hydrate(); + + const [optionsSnapshot, equitiesSnapshot, flowSnapshot] = await Promise.all([ + manager.getSnapshot({ channel: "options" }), + manager.getSnapshot({ channel: "equities" }), + manager.getSnapshot({ channel: "flow" }) + ]); + + expect((optionsSnapshot.items as Array<{ trace_id: string }>).map((item) => item.trace_id)).toEqual([ + "opt-retained" + ]); + expect((equitiesSnapshot.items as Array<{ trace_id: string }>).map((item) => item.trace_id)).toEqual([ + "eq-retained" + ]); + expect((flowSnapshot.items as Array<{ id: string }>).map((item) => item.id)).toEqual([ + "flow-retained" + ]); + expect(isLiveItemFresh("options", optionsSnapshot.items[0], now)).toBe(false); + expect(isLiveItemFresh("equities", equitiesSnapshot.items[0], now)).toBe(false); + expect(isLiveItemFresh("flow", flowSnapshot.items[0], now)).toBe(false); + }); + it("keeps only the newest NBBO quote per contract across hydrate and ingest", async () => { const redis = makeRedis(); const now = Date.now();