diff --git a/apps/web/app/terminal.test.ts b/apps/web/app/terminal.test.ts index 1b353b2..8d78abd 100644 --- a/apps/web/app/terminal.test.ts +++ b/apps/web/app/terminal.test.ts @@ -5,7 +5,10 @@ import { flushPausableTapeData, getLiveFeedStatus, nextFlowFilterPopoverState, + projectPausableTapeState, reducePausableTapeData, + shouldRetainLiveSnapshotHistory, + shouldShowEquitiesSilentFeedWarning, toggleFilterValue } from "./terminal"; @@ -53,6 +56,55 @@ describe("live tape pausable helpers", () => { expect(getLiveFeedStatus("connected", 1000, 500, 1601)).toBe("stale"); expect(getLiveFeedStatus("disconnected", 1000, 500, 1601)).toBe("disconnected"); }); + + it("keeps visible history even when live status is stale", () => { + const projected = projectPausableTapeState([makeItem("stale", 7, 1000)], "stale", 2000); + expect(projected.items.map((item) => item.trace_id)).toEqual(["stale"]); + expect(projected.lastUpdate).toBeNull(); + }); + + it("flags connected equities feeds that stay silent past threshold", () => { + expect( + shouldShowEquitiesSilentFeedWarning({ + wsStatus: "connected", + equitiesSubscribed: true, + connectedAt: 1_000, + lastEquitiesEventAt: null, + now: 20_000, + thresholdMs: 25_000 + }) + ).toBe(false); + + expect( + shouldShowEquitiesSilentFeedWarning({ + wsStatus: "connected", + equitiesSubscribed: true, + connectedAt: 1_000, + lastEquitiesEventAt: null, + now: 27_000, + thresholdMs: 25_000 + }) + ).toBe(true); + + expect( + shouldShowEquitiesSilentFeedWarning({ + wsStatus: "connected", + equitiesSubscribed: true, + connectedAt: 1_000, + lastEquitiesEventAt: 20_000, + now: 40_000, + thresholdMs: 25_000 + }) + ).toBe(false); + }); + + it("retains live history when freshness-gated snapshots are empty", () => { + expect(shouldRetainLiveSnapshotHistory("options", true, 0, 3)).toBe(true); + expect(shouldRetainLiveSnapshotHistory("equities", true, 0, 2)).toBe(true); + expect(shouldRetainLiveSnapshotHistory("alerts", true, 0, 3)).toBe(false); + expect(shouldRetainLiveSnapshotHistory("options", true, 1, 3)).toBe(false); + expect(shouldRetainLiveSnapshotHistory("options", false, 0, 3)).toBe(false); + }); }); describe("flow filter popup helpers", () => { diff --git a/apps/web/app/terminal.tsx b/apps/web/app/terminal.tsx index c39d418..15bdbd8 100644 --- a/apps/web/app/terminal.tsx +++ b/apps/web/app/terminal.tsx @@ -60,6 +60,12 @@ const LIVE_HOT_WINDOW = parseBoundedInt(process.env.NEXT_PUBLIC_LIVE_HOT_WINDOW, const LIVE_OPTIONS_STALE_MS = 15_000; const LIVE_NBBO_STALE_MS = 15_000; const LIVE_EQUITIES_STALE_MS = 15_000; +const LIVE_EQUITIES_SILENT_WARNING_MS = parseBoundedInt( + process.env.NEXT_PUBLIC_LIVE_EQUITIES_SILENT_WARNING_MS, + 25_000, + 5_000, + 5 * 60 * 1000 +); const LIVE_FLOW_STALE_MS = 30_000; const PINNED_EVIDENCE_TTL_MS = parseBoundedInt( process.env.NEXT_PUBLIC_PINNED_EVIDENCE_TTL_MS, @@ -775,13 +781,6 @@ export const countActiveFlowFilterGroups = (filters: OptionFlowFilters): number const isFreshLiveItem = (ts: number, thresholdMs: number, now = Date.now()): boolean => now - ts <= thresholdMs; -const filterFreshLiveItems = ( - items: T[], - thresholdMs: number, - getItemTs: (item: T) => number = extractSortTs, - now = Date.now() -): T[] => items.filter((item) => isFreshLiveItem(getItemTs(item), thresholdMs, now)); - export const toggleFilterValue = ( values: T[] | undefined, value: T, @@ -803,6 +802,60 @@ export const nextFlowFilterPopoverState = ( return action === "toggle" ? !current : false; }; +export const projectPausableTapeState = ( + visible: T[], + status: WsStatus, + lastUpdate: number | null +): { items: T[]; lastUpdate: number | null } => ({ + items: visible, + lastUpdate: status === "stale" ? null : lastUpdate +}); + +type EquitiesSilentFeedWarningInput = { + wsStatus: WsStatus; + equitiesSubscribed: boolean; + connectedAt: number | null; + lastEquitiesEventAt: number | null; + now?: number; + thresholdMs?: number; +}; + +export const shouldShowEquitiesSilentFeedWarning = ({ + wsStatus, + equitiesSubscribed, + connectedAt, + lastEquitiesEventAt, + now = Date.now(), + thresholdMs = LIVE_EQUITIES_SILENT_WARNING_MS +}: EquitiesSilentFeedWarningInput): boolean => { + if (wsStatus !== "connected" || !equitiesSubscribed) { + return false; + } + const baselineTs = lastEquitiesEventAt ?? connectedAt; + if (baselineTs === null) { + return false; + } + return now - baselineTs >= thresholdMs; +}; + +const LIVE_SNAPSHOT_HISTORY_CHANNELS = new Set([ + "options", + "nbbo", + "equities", + "flow" +]); + +export const shouldRetainLiveSnapshotHistory = ( + channel: LiveSubscription["channel"], + isSnapshot: boolean, + snapshotItemCount: number, + currentItemCount: number +): boolean => + isSnapshot && + snapshotItemCount === 0 && + currentItemCount > 0 && + LIVE_SNAPSHOT_HISTORY_CHANNELS.has(channel); + const classifyNbboSide = (price: number, quote: OptionNBBO | null | undefined): NbboSide | null => { if (!quote || !Number.isFinite(price)) { return null; @@ -1635,15 +1688,12 @@ const usePausableTapeView = ( const status = config.enabled ? getLiveFeedStatus(config.sourceStatus, freshestTs, config.freshnessMs, clock) : "disconnected"; - const items = - status === "stale" - ? [] - : filterFreshLiveItems(data.visible, config.freshnessMs, getItemTs, clock); + const projected = projectPausableTapeState(data.visible, status, config.lastUpdate); return { status, - items, - lastUpdate: status === "stale" ? null : config.lastUpdate, + items: projected.items, + lastUpdate: projected.lastUpdate, replayTime: null, replayComplete: false, paused, @@ -1889,7 +1939,9 @@ const useFlowStream = ( type LiveSessionState = { status: WsStatus; + connectedAt: number | null; lastUpdate: number | null; + lastEventByChannel: Partial>; options: OptionPrint[]; nbbo: OptionNBBO[]; equities: EquityPrint[]; @@ -1952,7 +2004,11 @@ const useLiveSession = ( flowFilters: OptionFlowFilters ): LiveSessionState => { const [status, setStatus] = useState(enabled ? "connecting" : "disconnected"); + const [connectedAt, setConnectedAt] = useState(null); const [lastUpdate, setLastUpdate] = useState(null); + const [lastEventByChannel, setLastEventByChannel] = useState< + Partial> + >({}); const [options, setOptions] = useState([]); const [nbbo, setNbbo] = useState([]); const [equities, setEquities] = useState([]); @@ -1975,7 +2031,9 @@ const useLiveSession = ( useEffect(() => { if (!enabled) { setStatus("disconnected"); + setConnectedAt(null); setLastUpdate(null); + setLastEventByChannel({}); setOptions([]); setNbbo([]); setEquities([]); @@ -2040,7 +2098,14 @@ const useLiveSession = ( ) => { setter((prev) => message.op === "snapshot" - ? (nextItems as T[]) + ? shouldRetainLiveSnapshotHistory( + subscription.channel, + true, + nextItems.length, + prev.length + ) + ? prev + : (nextItems as T[]) : mergeNewest(nextItems as T[], prev, LIVE_HOT_WINDOW, (evicted) => incrementRetentionMetric("hotWindowEvictions", evicted) ) @@ -2080,6 +2145,13 @@ const useLiveSession = ( break; } + if (items.length > 0) { + setLastEventByChannel((current) => ({ + ...current, + [subscription.channel]: updateAt + })); + } + setLastUpdate(updateAt); }; @@ -2096,6 +2168,7 @@ const useLiveSession = ( return; } setStatus("connected"); + setConnectedAt(Date.now()); syncSubscriptions(socket); }; @@ -2116,6 +2189,7 @@ const useLiveSession = ( return; } setStatus("disconnected"); + setConnectedAt(null); subscribedKeysRef.current = new Set(); subscribedMapRef.current = new Map(); reconnectRef.current = window.setTimeout(connect, 1000); @@ -2126,6 +2200,7 @@ const useLiveSession = ( return; } setStatus("disconnected"); + setConnectedAt(null); socket.close(); }; }; @@ -2172,7 +2247,9 @@ const useLiveSession = ( return { status, + connectedAt, lastUpdate, + lastEventByChannel, options, nbbo, equities, @@ -3401,6 +3478,13 @@ const useTerminalState = () => { chartIntervalMs, flowFilters ); + const equitiesLiveSubscriptionActive = useMemo( + () => + getLiveManifest(pathname, chartTicker.toUpperCase(), chartIntervalMs, flowFilters).some( + (sub) => sub.channel === "equities" + ), + [pathname, chartTicker, chartIntervalMs, flowFilters] + ); const handleReplaySource = useCallback((value: string | null) => { setReplaySource(value); @@ -4038,6 +4122,13 @@ const useTerminalState = () => { return equitiesFeed.items.filter((print) => matchesTicker(print.underlying_id)); }, [equitiesFeed.items, matchesTicker, tickerSet]); + const equitiesSilentWarning = shouldShowEquitiesSilentFeedWarning({ + wsStatus: liveSession.status, + equitiesSubscribed: mode === "live" && equitiesLiveSubscriptionActive, + connectedAt: liveSession.connectedAt, + lastEquitiesEventAt: liveSession.lastEventByChannel.equities ?? null + }); + const filteredInferredDark = useMemo(() => { if (tickerSet.size === 0) { return inferredDarkFeed.items; @@ -4390,6 +4481,7 @@ const useTerminalState = () => { selectedClassifierEvidence, filteredOptions, filteredEquities, + equitiesSilentWarning, filteredInferredDark, filteredFlow, filteredAlerts, @@ -4906,7 +4998,9 @@ const EquitiesPane = ({ limit }: EquitiesPaneProps) => { {state.tickerSet.size > 0 ? "No equity prints match the current filter." : state.mode === "live" - ? state.equities.status === "stale" + ? state.equitiesSilentWarning + ? "Connected but no equity prints received. Check ingest-equities." + : state.equities.status === "stale" ? "Live feed behind. Waiting for fresh equity prints." : "No equity prints yet. Start ingest-equities." : "Replay queue empty. Ensure ClickHouse has data."}