diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index 741bac4..b7f0a79 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -1,3 +1,4 @@ +{"_type":"issue","id":"islandflow-2ij","title":"Harden tape virtualization, scoped focus, and live feed health","description":"Implement the coordinated tape stability plan across web and API.\n\nScope:\n- replace fixed-height tape virtualization with measured virtualization and virtual-end history loading\n- replace scrollHeight anchoring with key-based anchor restore\n- compose canonical tape lists across seed/live/history sources\n- preserve clicked contract/ticker context during scoped focus transitions\n- separate backend hot-channel health from scoped quiet empty states\n- shrink browser hot windows and modestly reduce server cache limits\n- add regression tests and development instrumentation\n\nAcceptance:\n- no giant blank spacer gaps during tape scrolling\n- scroll remains stable while live data and history mutate the list\n- clicked deep-history option/equity rows remain visible immediately after focus\n- narrow scopes do not surface Feed behind unless backend channel health is stale\n","status":"closed","priority":1,"issue_type":"feature","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-07T05:35:18Z","created_by":"dirtydishes","updated_at":"2026-05-07T05:52:14Z","started_at":"2026-05-07T05:35:21Z","closed_at":"2026-05-07T05:52:14Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-uj7","title":"Fix home to tape navigation","description":"Home rail Tape navigation was not reliably switching to the tape route. Use browser-native top-level navigation for Home/Tape rail links so /tape remains reachable even if client router handling stalls.","status":"closed","priority":1,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-07T03:18:14Z","created_by":"dirtydishes","updated_at":"2026-05-07T03:18:21Z","started_at":"2026-05-07T03:18:20Z","closed_at":"2026-05-07T03:18:21Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-84s","title":"Implement seamless /tape live-to-history scroll gate","description":"Implement seamless live-to-ClickHouse scroll-gated history for /tape panes, including split live/history buffers in the web client, snapshot_limit support on live subscriptions, a bundled options support lookup endpoint, ClickHouse helpers for parity hydration, and test coverage for live head retention, background history loading, scoped options deep-hydration, and historical options decor restoration.\n","status":"in_progress","priority":1,"issue_type":"feature","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-07T02:10:43Z","created_by":"dirtydishes","updated_at":"2026-05-07T02:10:47Z","started_at":"2026-05-07T02:10:47Z","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-sh1","title":"Fix live websocket stale lag and reconnect loop","description":"Investigate and fix API live consumer lag causing stale timestamps, feed-behind status, and reconnect loops. Optimize live cache persistence path, add lag telemetry/alerts, and validate in runtime.","status":"closed","priority":1,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-04T17:04:34Z","created_by":"dirtydishes","updated_at":"2026-05-04T17:09:44Z","started_at":"2026-05-04T17:04:38Z","closed_at":"2026-05-04T17:09:44Z","close_reason":"Completed: optimized live cache persistence path, added lag telemetry, deployed api via docker compose on di, verified ws freshness and low hotFeedLagMs","dependency_count":0,"dependent_count":0,"comment_count":0} diff --git a/.env.example b/.env.example index 50f9c5a..4d5ac1b 100644 --- a/.env.example +++ b/.env.example @@ -58,8 +58,8 @@ API_DELIVER_POLICY=new API_CONSUMER_RESET=false NBBO_MAX_AGE_MS=1000 NEXT_PUBLIC_NBBO_MAX_AGE_MS=1000 -NEXT_PUBLIC_LIVE_HOT_WINDOW=2000 -NEXT_PUBLIC_LIVE_HOT_WINDOW_OPTIONS=25000 +NEXT_PUBLIC_LIVE_HOT_WINDOW=600 +NEXT_PUBLIC_LIVE_HOT_WINDOW_OPTIONS=1200 NEXT_PUBLIC_PINNED_EVIDENCE_TTL_MS=1200000 NEXT_PUBLIC_PINNED_EVIDENCE_MAX_ITEMS=4000 ROLLING_WINDOW_SIZE=50 @@ -100,12 +100,12 @@ REPLAY_BATCH_SIZE=200 REPLAY_LOG_EVERY=1000 # API live retention (generic channels) -LIVE_LIMIT_OPTIONS=10000 +LIVE_LIMIT_OPTIONS=2000 LIVE_LIMIT_NBBO=10000 -LIVE_LIMIT_EQUITIES=10000 +LIVE_LIMIT_EQUITIES=2000 LIVE_LIMIT_EQUITY_QUOTES=10000 LIVE_LIMIT_EQUITY_JOINS=10000 -LIVE_LIMIT_FLOW=10000 +LIVE_LIMIT_FLOW=2000 LIVE_LIMIT_CLASSIFIER_HITS=10000 LIVE_LIMIT_ALERTS=10000 LIVE_LIMIT_INFERRED_DARK=10000 diff --git a/apps/web/app/globals.css b/apps/web/app/globals.css index 5af91c1..ab3f6ed 100644 --- a/apps/web/app/globals.css +++ b/apps/web/app/globals.css @@ -967,6 +967,11 @@ h3 { min-width: 980px; } +.data-table-body { + position: relative; + min-width: 100%; +} + .data-table-options { min-width: 1280px; } @@ -1024,10 +1029,16 @@ h3 { text-align: left; } -.data-table-row:nth-child(even) { +.data-table-row.is-even { background: rgba(255, 255, 255, 0.022); } +.data-table-virtual-row { + position: absolute; + left: 0; + width: 100%; +} + .data-table-row:hover, .data-table-row:focus-visible { outline: none; diff --git a/apps/web/app/terminal.test.ts b/apps/web/app/terminal.test.ts index 78c7c70..16ce0ad 100644 --- a/apps/web/app/terminal.test.ts +++ b/apps/web/app/terminal.test.ts @@ -5,12 +5,15 @@ import { appendHistoryTail, buildDefaultFlowFilters, classifierToneForFamily, + composeTapeItems, deriveAlertDirection, countActiveFlowFilterGroups, + findAnchorRestoreIndex, formatCompactUsd, formatOptionContractLabel, flushPausableTapeData, getAlertWindowAnchorTs, + getHotChannelFeedStatus, getScopedLiveAutoHydrationChannels, getLiveHistoryRetentionCap, getOptionTableSnapshot, @@ -246,6 +249,37 @@ describe("live tape pausable helpers", () => { }); describe("live tape history helpers", () => { + it("composes tape items across seed, live, and history without seam duplicates", () => { + const seed = [makeItem("seed", 1, 100), makeItem("dup", 2, 200)]; + const live = [makeItem("live", 5, 500), makeItem("dup", 2, 200)]; + const history = [makeItem("old", 0, 50), makeItem("mid", 3, 300)]; + + expect(composeTapeItems(seed, live, history).map((item) => item.trace_id)).toEqual([ + "live", + "mid", + "dup", + "seed", + "old" + ]); + }); + + it("keeps a clicked seed row visible before scoped live and history arrive", () => { + const clicked = makeItem("clicked", 3, 300); + + expect(composeTapeItems([clicked], [], []).map((item) => item.trace_id)).toEqual(["clicked"]); + }); + + it("drops focus seed duplicates once equivalent live or history rows arrive", () => { + const clicked = makeItem("clicked", 3, 300); + const live = [makeItem("new", 4, 400)]; + const history = [makeItem("clicked", 3, 300)]; + + expect(composeTapeItems([clicked], live, history).map((item) => item.trace_id)).toEqual([ + "new", + "clicked" + ]); + }); + it("promotes hot-window overflow into the history tail", () => { const currentHot = [makeItem("hot-3", 3, 300), makeItem("hot-2", 2, 200), makeItem("hot-1", 1, 100)]; const incoming = [makeItem("hot-4", 4, 400)]; @@ -362,6 +396,21 @@ describe("live tape history helpers", () => { }, {}) ).toEqual(["options"]); }); + + it("restores the same anchor key after live insertions at the top", () => { + const nextKeys = ["new-1", "new-2", "anchor", "after-1", "after-2"]; + expect(findAnchorRestoreIndex(nextKeys, "anchor", ["anchor", "after-1", "after-2"])).toBe(2); + }); + + it("falls forward to the nearest surviving key when the anchor is evicted", () => { + const nextKeys = ["new-1", "after-1", "after-2"]; + expect(findAnchorRestoreIndex(nextKeys, "anchor", ["anchor", "after-1", "after-2"])).toBe(1); + }); + + it("keeps the same anchor when history is appended at the bottom", () => { + const nextKeys = ["anchor", "after-1", "after-2", "older-1", "older-2"]; + expect(findAnchorRestoreIndex(nextKeys, "anchor", ["anchor", "after-1", "after-2"])).toBe(0); + }); }); describe("options display formatters", () => { @@ -533,4 +582,13 @@ describe("signals helpers", () => { expect(statusLabel("connected", false, "live")).toBe("Connected"); expect(statusLabel("stale", false, "live")).toBe("Feed behind"); }); + + it("treats healthy scoped channels as connected even when no matching rows are visible", () => { + expect(getHotChannelFeedStatus("connected", { healthy: true })).toBe("connected"); + }); + + it("surfaces feed behind only when the backend channel health is stale", () => { + expect(getHotChannelFeedStatus("connected", { healthy: false })).toBe("stale"); + expect(getHotChannelFeedStatus("disconnected", { healthy: true })).toBe("disconnected"); + }); }); diff --git a/apps/web/app/terminal.tsx b/apps/web/app/terminal.tsx index 72edbd5..2718ed7 100644 --- a/apps/web/app/terminal.tsx +++ b/apps/web/app/terminal.tsx @@ -17,6 +17,7 @@ import { type ReactNode, type SetStateAction } from "react"; +import { useVirtualizer, type Virtualizer } from "@tanstack/react-virtual"; import type { AlertEvent, ClassifierHitEvent, @@ -28,6 +29,7 @@ import type { FlowPacket, InferredDarkEvent, LiveServerMessage, + LiveHotChannelHealthMap, LiveSubscription, OptionFlowFilters, OptionNbboSide, @@ -62,10 +64,10 @@ const parseBoundedInt = ( return Math.max(min, Math.min(max, Math.floor(parsed))); }; -const LIVE_HOT_WINDOW = parseBoundedInt(process.env.NEXT_PUBLIC_LIVE_HOT_WINDOW, 100, 1, 100000); +const LIVE_HOT_WINDOW = parseBoundedInt(process.env.NEXT_PUBLIC_LIVE_HOT_WINDOW, 600, 1, 100000); const LIVE_HOT_WINDOW_OPTIONS = parseBoundedInt( process.env.NEXT_PUBLIC_LIVE_HOT_WINDOW_OPTIONS, - 100, + 1200, 1, 100000 ); @@ -145,6 +147,11 @@ type SelectedInstrument = | { kind: "equity"; underlyingId: string } | { kind: "option-contract"; contractId: string; underlyingId: string }; +type TapeFocusSeed = { + scopeKey: string; + items: T[]; +}; + const formatIntervalLabel = (intervalMs: number): string => { const match = CANDLE_INTERVALS.find((interval) => interval.ms === intervalMs); if (match) { @@ -343,6 +350,42 @@ const frontendRetentionMetrics: Record = { pinnedStoreSize: 0 }; +const DEV_TAPE_DEBUG = process.env.NODE_ENV !== "production"; + +type TapeDebugMetricKey = + | "anchorRestoreCount" + | "anchorRestoreFallbackCount" + | "virtualRowMeasurementCount" + | "focusSeedRowCount" + | "scopedQuietTransitions"; + +const frontendTapeDebugMetrics: Record = { + anchorRestoreCount: 0, + anchorRestoreFallbackCount: 0, + virtualRowMeasurementCount: 0, + focusSeedRowCount: 0, + scopedQuietTransitions: 0 +}; + +const bumpTapeDebugMetric = (key: TapeDebugMetricKey, count = 1): void => { + frontendTapeDebugMetrics[key] += count; + if (DEV_TAPE_DEBUG && typeof window !== "undefined") { + (window as typeof window & { __IF_TAPE_DEBUG__?: Record }).__IF_TAPE_DEBUG__ = + frontendTapeDebugMetrics; + } +}; + +const logTapeDebug = (message: string, payload?: Record): void => { + if (!DEV_TAPE_DEBUG) { + return; + } + if (payload) { + console.debug(`[tape] ${message}`, payload); + return; + } + console.debug(`[tape] ${message}`); +}; + const incrementRetentionMetric = (key: RetentionMetricKey, count = 1): void => { frontendRetentionMetrics[key] += count; }; @@ -426,6 +469,24 @@ const getTapeItemKey = (item: SortableItem): string => { return buildItemKey(item) ?? `${extractSortTs(item)}:${extractSortSeq(item)}`; }; +export const composeTapeItems = ( + seedItems: T[], + liveItems: T[], + historyItems: T[] +): T[] => { + const deduped = new Map(); + for (const item of [...seedItems, ...liveItems, ...historyItems]) { + deduped.set(getTapeItemKey(item), item); + } + return Array.from(deduped.values()).sort((a, b) => { + const delta = extractSortTs(b) - extractSortTs(a); + if (delta !== 0) { + return delta; + } + return extractSortSeq(b) - extractSortSeq(a); + }); +}; + type PausableTapeData = { visible: T[]; queued: T[]; @@ -618,9 +679,45 @@ export const getLiveFeedStatus = ( return behindMs > behindDelayMs ? "stale" : "connected"; }; +export const getHotChannelFeedStatus = ( + sourceStatus: WsStatus, + health: { healthy: boolean } | null | undefined +): WsStatus => { + if (sourceStatus !== "connected") { + return sourceStatus; + } + if (!health) { + return "connected"; + } + return health.healthy ? "connected" : "stale"; +}; + +export const findAnchorRestoreIndex = ( + keys: string[], + anchorKey: string, + fallbackKeys: string[] +): number => { + const directIndex = keys.indexOf(anchorKey); + if (directIndex >= 0) { + return directIndex; + } + + const indexByKey = new Map(keys.map((key, index) => [key, index])); + for (const key of fallbackKeys) { + const index = indexByKey.get(key); + if (typeof index === "number") { + return index; + } + } + + return -1; +}; + type TapeState = { status: WsStatus; items: T[]; + liveItems?: T[]; + historyItems?: T[]; lastUpdate: number | null; replayTime: number | null; replayComplete: boolean; @@ -1380,7 +1477,26 @@ const useScrollAnchor = ( listRef: React.RefObject, isAtTopRef: React.MutableRefObject ) => { - const pendingRef = useRef<{ height: number } | null>(null); + const pendingRef = useRef<{ + key: string; + offset: number; + fallbackKeys: string[]; + } | null>(null); + + const readRenderedRows = useCallback((element: HTMLDivElement) => { + return Array.from(element.querySelectorAll("[data-tape-key][data-row-start][data-row-size]")) + .map((node) => { + const key = node.dataset.tapeKey; + const start = Number(node.dataset.rowStart); + const size = Number(node.dataset.rowSize); + if (!key || !Number.isFinite(start) || !Number.isFinite(size)) { + return null; + } + return { key, start, size }; + }) + .filter((row): row is { key: string; start: number; size: number } => row !== null) + .sort((a, b) => a.start - b.start); + }, []); const capture = useCallback(() => { if (isAtTopRef.current) { @@ -1393,10 +1509,27 @@ const useScrollAnchor = ( return; } + const rows = readRenderedRows(el); + if (rows.length === 0) { + pendingRef.current = null; + return; + } + + const scrollTop = el.scrollTop; + const anchorIndex = rows.findIndex((row) => row.start + row.size > scrollTop); + const resolvedIndex = anchorIndex >= 0 ? anchorIndex : 0; + const anchorRow = rows[resolvedIndex]; + if (!anchorRow) { + pendingRef.current = null; + return; + } + pendingRef.current = { - height: el.scrollHeight + key: anchorRow.key, + offset: Math.max(0, scrollTop - anchorRow.start), + fallbackKeys: rows.slice(resolvedIndex).map((row) => row.key) }; - }, [isAtTopRef, listRef]); + }, [isAtTopRef, listRef, readRenderedRows]); const apply = useCallback(() => { const pending = pendingRef.current; @@ -1414,20 +1547,41 @@ const useScrollAnchor = ( return; } - const delta = el.scrollHeight - pending.height; - if (delta !== 0) { - el.scrollTop = Math.max(0, el.scrollTop + delta); + const rows = readRenderedRows(el); + if (rows.length === 0) { + return; + } + + const keys = rows.map((row) => row.key); + const restoreIndex = findAnchorRestoreIndex(keys, pending.key, pending.fallbackKeys); + if (restoreIndex < 0) { + return; + } + + const row = rows[restoreIndex]; + if (!row) { + return; + } + + el.scrollTop = Math.max(0, row.start + pending.offset); + bumpTapeDebugMetric("anchorRestoreCount", 1); + if (row.key !== pending.key) { + bumpTapeDebugMetric("anchorRestoreFallbackCount", 1); + logTapeDebug("anchor restore fallback", { + requested_key: pending.key, + restored_key: row.key + }); } pendingRef.current = null; - }, [isAtTopRef, listRef]); + }, [isAtTopRef, listRef, readRenderedRows]); return { capture, apply }; }; -const useBottomHistoryGate = ( - listRef: React.RefObject, - listNode: HTMLDivElement | null, +const useVirtualHistoryGate = ( enabled: boolean, + itemCount: number, + lastVirtualIndex: number, onLoadOlder: () => void ): void => { const loadRef = useRef(onLoadOlder); @@ -1436,107 +1590,97 @@ const useBottomHistoryGate = ( }, [onLoadOlder]); useEffect(() => { - if (!enabled) { + if (!enabled || itemCount === 0) { return; } - const element = listNode ?? listRef.current; - if (!element) { + if (lastVirtualIndex < itemCount - 1) { return; } - - const maybeLoad = () => { - const threshold = Math.max(240, element.clientHeight * 0.5); - if (element.scrollTop + element.clientHeight >= element.scrollHeight - threshold) { - loadRef.current(); - } - }; - - maybeLoad(); - element.addEventListener("scroll", maybeLoad); - return () => { - element.removeEventListener("scroll", maybeLoad); - }; - }, [enabled, listNode, listRef]); + loadRef.current(); + }, [enabled, itemCount, lastVirtualIndex]); }; -type VirtualListResult = { - visibleItems: T[]; - topSpacerHeight: number; - bottomSpacerHeight: number; +type MeasuredVirtualListResult = { + totalSize: number; + virtualItems: MeasuredVirtualRow[]; + measureElement: (node: HTMLElement | null) => void; + virtualizer: Virtualizer; }; -const useVirtualList = ( +type MeasuredVirtualRow = { + item: T; + key: string; + index: number; + start: number; + size: number; + end: number; +}; + +const useMeasuredVirtualList = ( items: T[], listRef: React.RefObject, - enabled: boolean, - rowHeight: number, - overscan = 8 -): VirtualListResult => { - const [range, setRange] = useState<{ start: number; end: number }>({ - start: 0, - end: items.length + estimateSize: number, + overscan: number, + debugLabel: string +): MeasuredVirtualListResult => { + const virtualizer = useVirtualizer({ + count: items.length, + getScrollElement: () => listRef.current, + estimateSize: () => estimateSize, + overscan, + getItemKey: (index) => getTapeItemKey(items[index] as SortableItem), + measureElement: (node) => { + bumpTapeDebugMetric("virtualRowMeasurementCount", 1); + return node.getBoundingClientRect().height; + } }); - const recompute = useCallback(() => { - if (!enabled) { - setRange({ start: 0, end: items.length }); - return; + const virtualItems: MeasuredVirtualRow[] = virtualizer.getVirtualItems().map((virtualItem) => { + const item = items[virtualItem.index] as T | undefined; + if (!item) { + return null; } - - const element = listRef.current; - if (!element) { - setRange({ start: 0, end: Math.min(items.length, 80) }); - return; - } - - const viewportHeight = Math.max(rowHeight, element.clientHeight); - const visibleCount = Math.ceil(viewportHeight / rowHeight); - const start = Math.max(0, Math.floor(element.scrollTop / rowHeight) - overscan); - const end = Math.min(items.length, start + visibleCount + overscan * 2); - setRange({ start, end }); - }, [enabled, items.length, listRef, overscan, rowHeight]); - - useEffect(() => { - recompute(); - }, [items.length, recompute]); - - useEffect(() => { - if (!enabled) { - return; - } - - const element = listRef.current; - if (!element) { - return; - } - - const onScroll = () => recompute(); - const onResize = () => recompute(); - - element.addEventListener("scroll", onScroll); - window.addEventListener("resize", onResize); - - return () => { - element.removeEventListener("scroll", onScroll); - window.removeEventListener("resize", onResize); - }; - }, [enabled, listRef, recompute]); - - if (!enabled) { return { - visibleItems: items, - topSpacerHeight: 0, - bottomSpacerHeight: 0 + item, + key: getTapeItemKey(item), + index: virtualItem.index, + start: virtualItem.start, + size: virtualItem.size, + end: virtualItem.end }; - } + }).filter((virtualItem): virtualItem is MeasuredVirtualRow => virtualItem !== null); - const start = Math.min(range.start, items.length); - const end = Math.min(Math.max(range.end, start), items.length); + useEffect(() => { + if (!DEV_TAPE_DEBUG || items.length === 0) { + return; + } + const element = listRef.current; + if (!element) { + return; + } + const first = virtualItems[0]; + const last = virtualItems.at(-1); + if (!first || !last) { + return; + } + const visibleTopGap = Math.max(0, first.start - element.scrollTop); + const visibleBottomGap = Math.max(0, element.scrollTop + element.clientHeight - last.end); + if (visibleTopGap > element.clientHeight || visibleBottomGap > element.clientHeight) { + console.warn("[tape] false-gap watchdog", { + pane: debugLabel, + item_count: items.length, + visible_top_gap: visibleTopGap, + visible_bottom_gap: visibleBottomGap, + viewport_height: element.clientHeight + }); + } + }, [debugLabel, items.length, listRef, virtualItems]); return { - visibleItems: items.slice(start, end), - topSpacerHeight: start * rowHeight, - bottomSpacerHeight: Math.max(0, (items.length - end) * rowHeight) + totalSize: virtualizer.getTotalSize(), + virtualItems, + measureElement: virtualizer.measureElement, + virtualizer }; }; @@ -2018,6 +2162,8 @@ const toStaticTapeState = ( ): TapeState => ({ status, items, + liveItems: items, + historyItems: [], lastUpdate, replayTime: null, replayComplete: false, @@ -2032,10 +2178,8 @@ type PausableTapeViewConfig = { sourceItems: T[]; historyTail?: T[]; lastUpdate: number | null; - freshnessMs: number; onNewItems?: (count: number) => void; captureScroll?: () => void; - getItemTs?: (item: T) => number; retentionLimit?: number; shouldHold?: () => boolean; resumeSignal?: number; @@ -2046,17 +2190,6 @@ const usePausableTapeView = ( ): TapeState => { const [paused, setPaused] = useState(false); const [data, setData] = useState>(EMPTY_PAUSABLE_TAPE); - const [clock, setClock] = useState(() => Date.now()); - - useEffect(() => { - const handle = window.setInterval(() => { - setClock(Date.now()); - }, 1000); - - return () => { - window.clearInterval(handle); - }; - }, []); useEffect(() => { if (!config.enabled) { @@ -2132,38 +2265,16 @@ const usePausableTapeView = ( setPaused((current) => !current); }, []); - const getItemTs = config.getItemTs ?? extractSortTs; - const freshestTs = useMemo(() => { - if (config.sourceItems.length === 0) { - return null; - } - - let newest = Number.NEGATIVE_INFINITY; - for (const item of config.sourceItems) { - newest = Math.max(newest, getItemTs(item)); - } - - return Number.isFinite(newest) ? newest : null; - }, [config.sourceItems, getItemTs]); - - const status = config.enabled - ? getLiveFeedStatus( - config.sourceStatus, - freshestTs, - config.freshnessMs, - clock, - LIVE_FEED_BEHIND_DELAY_MS - ) - : "disconnected"; + const status = config.enabled ? config.sourceStatus : "disconnected"; const projected = projectPausableTapeState(data.visible, status, config.lastUpdate); - const items = useMemo( - () => [...projected.items, ...(config.historyTail ?? [])], - [projected.items, config.historyTail] - ); + const historyItems = config.historyTail ?? []; + const items = useMemo(() => composeTapeItems([], projected.items, historyItems), [projected.items, historyItems]); return { status, items, + liveItems: projected.items, + historyItems, lastUpdate: projected.lastUpdate, replayTime: null, replayComplete: false, @@ -2412,6 +2523,7 @@ type LiveSessionState = { status: WsStatus; connectedAt: number | null; lastUpdate: number | null; + channelHealth: LiveHotChannelHealthMap; lastEventByChannel: Partial>; manifest: LiveSubscription[]; historyCursors: Partial>; @@ -2561,6 +2673,12 @@ const useLiveSession = ( const [status, setStatus] = useState(enabled ? "connecting" : "disconnected"); const [connectedAt, setConnectedAt] = useState(null); const [lastUpdate, setLastUpdate] = useState(null); + const [channelHealth, setChannelHealth] = useState({ + options: { freshness_age_ms: null, healthy: false }, + nbbo: { freshness_age_ms: null, healthy: false }, + equities: { freshness_age_ms: null, healthy: false }, + flow: { freshness_age_ms: null, healthy: false } + }); const [lastEventByChannel, setLastEventByChannel] = useState< Partial> >({}); @@ -2647,6 +2765,12 @@ const useLiveSession = ( setStatus("disconnected"); setConnectedAt(null); setLastUpdate(null); + setChannelHealth({ + options: { freshness_age_ms: null, healthy: false }, + nbbo: { freshness_age_ms: null, healthy: false }, + equities: { freshness_age_ms: null, healthy: false }, + flow: { freshness_age_ms: null, healthy: false } + }); setLastEventByChannel({}); setHistoryCursors({}); setHistoryLoading({}); @@ -2736,6 +2860,7 @@ const useLiveSession = ( const handleMessage = (message: LiveServerMessage) => { if (message.op === "ready" || message.op === "heartbeat") { + setChannelHealth(message.channel_health); return; } if (message.op === "error") { @@ -3173,6 +3298,7 @@ const useLiveSession = ( status, connectedAt, lastUpdate, + channelHealth, lastEventByChannel, manifest, historyCursors, @@ -4512,6 +4638,8 @@ const useTerminalState = () => { const [selectedClassifierHit, setSelectedClassifierHit] = useState(null); const [selectedSmartMoneyEvent, setSelectedSmartMoneyEvent] = useState(null); const [selectedInstrument, setSelectedInstrument] = useState(null); + const [optionFocusSeed, setOptionFocusSeed] = useState | null>(null); + const [equityFocusSeed, setEquityFocusSeed] = useState | null>(null); const [filterInput, setFilterInput] = useState(""); const [flowFilters, setFlowFilters] = useState(() => buildDefaultFlowFilters()); const [chartIntervalMs, setChartIntervalMs] = useState(CANDLE_INTERVALS[0].ms); @@ -4524,6 +4652,14 @@ const useTerminalState = () => { }, [filterInput]); const tickerSet = useMemo(() => new Set(activeTickers), [activeTickers]); const instrumentUnderlying = selectedInstrument?.underlyingId.toUpperCase() ?? null; + const optionFocusScopeKey = + selectedInstrument?.kind === "option-contract" + ? `option-contract:${selectedInstrument.contractId}` + : null; + const equityFocusScopeKey = + selectedInstrument?.kind === "equity" + ? `equity:${selectedInstrument.underlyingId.toUpperCase()}` + : null; const optionScope = useMemo( () => ({ underlying_ids: activeTickers.length > 0 ? activeTickers : instrumentUnderlying ? [instrumentUnderlying] : undefined, @@ -4767,13 +4903,19 @@ const useTerminalState = () => { getReplayKey: disableReplayGrouping }); + const optionsChannelStatus = getHotChannelFeedStatus(liveSession.status, liveSession.channelHealth.options); + const equitiesChannelStatus = getHotChannelFeedStatus( + liveSession.status, + liveSession.channelHealth.equities + ); + const flowChannelStatus = getHotChannelFeedStatus(liveSession.status, liveSession.channelHealth.flow); + const liveOptions = usePausableTapeView({ enabled: mode === "live", - sourceStatus: liveSession.status, + sourceStatus: optionsChannelStatus, sourceItems: liveSession.options, historyTail: liveSession.optionsHistory, lastUpdate: liveSession.lastUpdate, - freshnessMs: LIVE_OPTIONS_STALE_MS, retentionLimit: LIVE_HOT_WINDOW_OPTIONS, captureScroll: optionsAnchor.capture, onNewItems: optionsScroll.onNewItems, @@ -4782,11 +4924,10 @@ const useTerminalState = () => { }); const liveEquities = usePausableTapeView({ enabled: mode === "live", - sourceStatus: liveSession.status, + sourceStatus: equitiesChannelStatus, sourceItems: liveSession.equities, historyTail: liveSession.equitiesHistory, lastUpdate: liveSession.lastUpdate, - freshnessMs: LIVE_EQUITIES_STALE_MS, captureScroll: equitiesAnchor.capture, onNewItems: equitiesScroll.onNewItems, shouldHold: () => !equitiesScroll.isAtTopRef.current, @@ -4794,40 +4935,87 @@ const useTerminalState = () => { }); const liveFlow = usePausableTapeView({ enabled: mode === "live", - sourceStatus: liveSession.status, + sourceStatus: flowChannelStatus, sourceItems: liveSession.flow, historyTail: liveSession.flowHistory, lastUpdate: liveSession.lastUpdate, - freshnessMs: LIVE_FLOW_STALE_MS, captureScroll: flowAnchor.capture, onNewItems: flowScroll.onNewItems, shouldHold: () => !flowScroll.isAtTopRef.current, - resumeSignal: flowScroll.resumeTick, - getItemTs: (item) => item.source_ts + resumeSignal: flowScroll.resumeTick }); - const optionsFeed = mode === "live" ? liveOptions : options; + const seededLiveOptionsItems = useMemo( + () => + composeTapeItems( + optionFocusSeed?.scopeKey === optionFocusScopeKey ? optionFocusSeed.items : [], + liveOptions.liveItems ?? [], + liveOptions.historyItems ?? [] + ), + [liveOptions.historyItems, liveOptions.liveItems, optionFocusScopeKey, optionFocusSeed] + ); + const seededLiveEquitiesItems = useMemo( + () => + composeTapeItems( + equityFocusSeed?.scopeKey === equityFocusScopeKey ? equityFocusSeed.items : [], + liveEquities.liveItems ?? [], + liveEquities.historyItems ?? [] + ), + [equityFocusScopeKey, equityFocusSeed, liveEquities.historyItems, liveEquities.liveItems] + ); + + const optionsFeed = + mode === "live" ? { ...liveOptions, items: seededLiveOptionsItems } : options; const nbboFeed = - mode === "live" ? toStaticTapeState(liveSession.status, [...liveSession.nbbo, ...liveSession.nbboHistory], liveSession.lastUpdate) : nbbo; - const equitiesFeed = mode === "live" ? liveEquities : equities; + mode === "live" + ? toStaticTapeState( + getHotChannelFeedStatus(liveSession.status, liveSession.channelHealth.nbbo), + composeTapeItems([], liveSession.nbbo, liveSession.nbboHistory), + liveSession.lastUpdate + ) + : nbbo; + const equitiesFeed = + mode === "live" ? { ...liveEquities, items: seededLiveEquitiesItems } : equities; const equityJoinsFeed = mode === "live" - ? toStaticTapeState(liveSession.status, [...liveSession.equityJoins, ...liveSession.equityJoinsHistory], liveSession.lastUpdate) + ? toStaticTapeState( + liveSession.status, + composeTapeItems([], liveSession.equityJoins, liveSession.equityJoinsHistory), + liveSession.lastUpdate + ) : equityJoins; const flowFeed = mode === "live" ? liveFlow : flow; const alertsFeed = - mode === "live" ? toStaticTapeState(liveSession.status, [...liveSession.alerts, ...liveSession.alertsHistory], liveSession.lastUpdate) : alerts; + mode === "live" + ? toStaticTapeState( + liveSession.status, + composeTapeItems([], liveSession.alerts, liveSession.alertsHistory), + liveSession.lastUpdate + ) + : alerts; const classifierHitsFeed = mode === "live" - ? toStaticTapeState(liveSession.status, [...liveSession.classifierHits, ...liveSession.classifierHitsHistory], liveSession.lastUpdate) + ? toStaticTapeState( + liveSession.status, + composeTapeItems([], liveSession.classifierHits, liveSession.classifierHitsHistory), + liveSession.lastUpdate + ) : classifierHits; const smartMoneyFeed = mode === "live" - ? toStaticTapeState(liveSession.status, [...liveSession.smartMoney, ...liveSession.smartMoneyHistory], liveSession.lastUpdate) + ? toStaticTapeState( + liveSession.status, + composeTapeItems([], liveSession.smartMoney, liveSession.smartMoneyHistory), + liveSession.lastUpdate + ) : smartMoney; const inferredDarkFeed = mode === "live" - ? toStaticTapeState(liveSession.status, [...liveSession.inferredDark, ...liveSession.inferredDarkHistory], liveSession.lastUpdate) + ? toStaticTapeState( + liveSession.status, + composeTapeItems([], liveSession.inferredDark, liveSession.inferredDarkHistory), + liveSession.lastUpdate + ) : inferredDark; useLayoutEffect(() => { @@ -5528,12 +5716,126 @@ const useTerminalState = () => { return equitiesFeed.items.filter((print) => matchesTicker(print.underlying_id)); }, [equitiesFeed.items, matchesTicker, tickerSet, instrumentUnderlying]); + useEffect(() => { + if (!optionFocusSeed) { + return; + } + if (optionFocusSeed.scopeKey !== optionFocusScopeKey) { + setOptionFocusSeed(null); + return; + } + const composedBaseItems = composeTapeItems([], liveOptions.liveItems ?? [], liveOptions.historyItems ?? []); + const liveKeys = new Set(composedBaseItems.map((item) => getTapeItemKey(item))); + if (optionFocusSeed.items.every((item) => liveKeys.has(getTapeItemKey(item)))) { + setOptionFocusSeed(null); + } + }, [liveOptions.historyItems, liveOptions.liveItems, optionFocusScopeKey, optionFocusSeed]); + + useEffect(() => { + if (!equityFocusSeed) { + return; + } + if (equityFocusSeed.scopeKey !== equityFocusScopeKey) { + setEquityFocusSeed(null); + return; + } + const composedBaseItems = composeTapeItems([], liveEquities.liveItems ?? [], liveEquities.historyItems ?? []); + const liveKeys = new Set(composedBaseItems.map((item) => getTapeItemKey(item))); + if (equityFocusSeed.items.every((item) => liveKeys.has(getTapeItemKey(item)))) { + setEquityFocusSeed(null); + } + }, [equityFocusScopeKey, equityFocusSeed, liveEquities.historyItems, liveEquities.liveItems]); + + const focusOptionContract = useCallback( + (print: OptionPrint) => { + const contractId = normalizeContractId(print.option_contract_id); + const parsed = parseOptionContractId(contractId); + const underlyingId = (print.underlying_id ?? parsed?.root ?? extractUnderlying(contractId)).toUpperCase(); + const scopeKey = `option-contract:${contractId}`; + const seedItems = composeTapeItems( + [print], + filteredOptions.filter((candidate) => normalizeContractId(candidate.option_contract_id) === contractId), + [] + ); + setOptionFocusSeed({ scopeKey, items: seedItems }); + bumpTapeDebugMetric("focusSeedRowCount", seedItems.length); + logTapeDebug("option focus seed captured", { + contract_id: contractId, + row_count: seedItems.length + }); + setSelectedInstrument({ + kind: "option-contract", + contractId, + underlyingId + }); + }, + [filteredOptions] + ); + + const focusEquityTicker = useCallback( + (print: EquityPrint) => { + const underlyingId = print.underlying_id.toUpperCase(); + const scopeKey = `equity:${underlyingId}`; + const seedItems = composeTapeItems( + [print], + filteredEquities.filter((candidate) => candidate.underlying_id.toUpperCase() === underlyingId), + [] + ); + setEquityFocusSeed({ scopeKey, items: seedItems }); + bumpTapeDebugMetric("focusSeedRowCount", seedItems.length); + logTapeDebug("equity focus seed captured", { + underlying_id: underlyingId, + row_count: seedItems.length + }); + setSelectedInstrument({ + kind: "equity", + underlyingId + }); + }, + [filteredEquities] + ); + const equitiesSilentWarning = shouldShowEquitiesSilentFeedWarning({ wsStatus: liveSession.status, equitiesSubscribed: mode === "live" && equitiesLiveSubscriptionActive, connectedAt: liveSession.connectedAt, lastEquitiesEventAt: liveSession.lastEventByChannel.equities ?? null }); + const optionsScopeActive = Boolean( + optionScope.option_contract_id || optionScope.underlying_ids?.length + ); + const equitiesScopeActive = Boolean(equityScope.underlying_ids?.length); + const optionsScopedQuiet = + mode === "live" && + optionsScopeActive && + optionsChannelStatus === "connected" && + filteredOptions.length === 0; + const equitiesScopedQuiet = + mode === "live" && + equitiesScopeActive && + equitiesChannelStatus === "connected" && + filteredEquities.length === 0; + + const previousScopedQuietRef = useRef({ + options: optionsScopedQuiet, + equities: equitiesScopedQuiet + }); + + useEffect(() => { + const previous = previousScopedQuietRef.current; + if (previous.options !== optionsScopedQuiet) { + bumpTapeDebugMetric("scopedQuietTransitions", 1); + logTapeDebug("options scoped quiet transition", { active: optionsScopedQuiet }); + } + if (previous.equities !== equitiesScopedQuiet) { + bumpTapeDebugMetric("scopedQuietTransitions", 1); + logTapeDebug("equities scoped quiet transition", { active: equitiesScopedQuiet }); + } + previousScopedQuietRef.current = { + options: optionsScopedQuiet, + equities: equitiesScopedQuiet + }; + }, [equitiesScopedQuiet, optionsScopedQuiet]); const filteredInferredDark = useMemo(() => { if (tickerSet.size === 0) { @@ -5924,6 +6226,8 @@ const useTerminalState = () => { selectedSmartMoneyEvidence, filteredOptions, filteredEquities, + optionsScopedQuiet, + equitiesScopedQuiet, equitiesSilentWarning, filteredInferredDark, filteredFlow, @@ -5932,6 +6236,8 @@ const useTerminalState = () => { filteredClassifierHits, chartSmartMoneyEvents, chartInferredDark, + focusOptionContract, + focusEquityTicker, openFromSmartMoneyEvent, openFromClassifierHit, handleSmartMoneyMarkerClick, @@ -6257,8 +6563,8 @@ type OptionsPaneProps = { const OptionsPane = ({ limit }: OptionsPaneProps) => { const state = useTerminal(); const items = limit ? state.filteredOptions.slice(0, limit) : state.filteredOptions; - const virtual = useVirtualList(items, state.optionsScroll.listRef, !limit, 36); - useBottomHistoryGate(state.optionsScroll.listRef, state.optionsScroll.listNode, state.mode === "live" && !limit, () => + const virtual = useMeasuredVirtualList(items, state.optionsScroll.listRef, 36, 12, "options"); + useVirtualHistoryGate(state.mode === "live" && !limit, items.length, virtual.virtualItems.at(-1)?.index ?? -1, () => void state.liveSession.loadOlder("options") ); @@ -6289,12 +6595,16 @@ const OptionsPane = ({ limit }: OptionsPaneProps) => {
{items.length === 0 ? (
- {state.tickerSet.size > 0 - ? "No option prints match the current filter." - : state.mode === "live" - ? state.options.status === "stale" - ? "Feed behind. Waiting for fresh option prints." - : "No option prints yet. Start ingest-options." + {state.mode === "live" + ? state.options.status === "stale" + ? "Feed behind. Waiting for fresh option prints." + : state.optionsScopedQuiet + ? "No recent option prints for this scope yet." + : state.tickerSet.size > 0 + ? "No option prints match the current filter." + : "No option prints yet. Start ingest-options." + : state.tickerSet.size > 0 + ? "No option prints match the current filter." : "Replay queue empty. Ensure ClickHouse has data."}
) : ( @@ -6314,10 +6624,12 @@ const OptionsPane = ({ limit }: OptionsPaneProps) => { IV CLASSIFIER
- {virtual.topSpacerHeight > 0 ? ( -
- ) : null} - {virtual.visibleItems.map((print) => { +
+ {virtual.virtualItems.map(({ item: print, key, index, start, size }) => { const contractId = normalizeContractId(print.option_contract_id); const parsed = parseOptionContractId(contractId); const contractDisplay = formatOptionContractLabel(contractId); @@ -6334,15 +6646,21 @@ const OptionsPane = ({ limit }: OptionsPaneProps) => { const underlyingId = (print.underlying_id ?? parsed?.root ?? extractUnderlying(contractId)).toUpperCase(); const focusContract = (event: ReactMouseEvent) => { event.stopPropagation(); - state.setSelectedInstrument({ - kind: "option-contract", - contractId, - underlyingId - }); + state.focusOptionContract(print); }; + const rowStyle = { + ...(decor + ? ({ "--classifier-intensity": decor.intensity } as CSSProperties) + : undefined), + top: `${start}px` + } as CSSProperties; const commonProps = { - className: `data-table-row data-table-row-button data-table-row-classified data-table-row-options${decor ? ` is-classified classifier-${decor.tone}` : ""}`, - style: decor ? ({ "--classifier-intensity": decor.intensity } as CSSProperties) : undefined + className: `data-table-row data-table-row-button data-table-row-classified data-table-row-options data-table-virtual-row${index % 2 === 1 ? " is-even" : ""}${decor ? ` is-classified classifier-${decor.tone}` : ""}`, + style: rowStyle, + "data-row-start": String(start), + "data-row-size": String(size), + "data-tape-key": key, + ref: virtual.measureElement }; const cells = ( <> @@ -6389,7 +6707,7 @@ const OptionsPane = ({ limit }: OptionsPaneProps) => { ) : ( -
+
{cells}
); })} - {virtual.bottomSpacerHeight > 0 ? ( -
- ) : null} +
)} @@ -6434,8 +6750,8 @@ type EquitiesPaneProps = { const EquitiesPane = ({ limit }: EquitiesPaneProps) => { const state = useTerminal(); const items = limit ? state.filteredEquities.slice(0, limit) : state.filteredEquities; - const virtual = useVirtualList(items, state.equitiesScroll.listRef, !limit, 36); - useBottomHistoryGate(state.equitiesScroll.listRef, state.equitiesScroll.listNode, state.mode === "live" && !limit, () => + const virtual = useMeasuredVirtualList(items, state.equitiesScroll.listRef, 36, 10, "equities"); + useVirtualHistoryGate(state.mode === "live" && !limit, items.length, virtual.virtualItems.at(-1)?.index ?? -1, () => void state.liveSession.loadOlder("equities") ); @@ -6466,14 +6782,18 @@ const EquitiesPane = ({ limit }: EquitiesPaneProps) => {
{items.length === 0 ? (
- {state.tickerSet.size > 0 - ? "No equity prints match the current filter." - : state.mode === "live" - ? state.equitiesSilentWarning - ? "Connected but no equity prints received. Check ingest-equities." - : state.equities.status === "stale" - ? "Feed behind. Waiting for fresh equity prints." - : "No equity prints yet. Start ingest-equities." + {state.mode === "live" + ? state.equities.status === "stale" + ? "Feed behind. Waiting for fresh equity prints." + : state.equitiesScopedQuiet + ? "No recent equity prints for this scope yet." + : state.tickerSet.size > 0 + ? "No equity prints match the current filter." + : state.equitiesSilentWarning + ? "Connected but no equity prints received. Check ingest-equities." + : "No equity prints yet. Start ingest-equities." + : state.tickerSet.size > 0 + ? "No equity prints match the current filter." : "Replay queue empty. Ensure ClickHouse has data."}
) : ( @@ -6487,22 +6807,23 @@ const EquitiesPane = ({ limit }: EquitiesPaneProps) => { VENUE TAPE
- {virtual.topSpacerHeight > 0 ? ( -
- ) : null} - {virtual.visibleItems.map((print) => ( -
+
+ {virtual.virtualItems.map(({ item: print, key, index, start, size }) => ( +
{formatTime(print.ts)} @@ -6513,9 +6834,7 @@ const EquitiesPane = ({ limit }: EquitiesPaneProps) => { {print.offExchangeFlag ? "Off-Ex" : "Lit"}
))} - {virtual.bottomSpacerHeight > 0 ? ( -
- ) : null} +
)} @@ -6532,8 +6851,8 @@ type FlowPaneProps = { const FlowPane = ({ limit, title = "Flow" }: FlowPaneProps) => { const state = useTerminal(); const items = limit ? state.filteredFlow.slice(0, limit) : state.filteredFlow; - const virtual = useVirtualList(items, state.flowScroll.listRef, !limit, 44); - useBottomHistoryGate(state.flowScroll.listRef, state.flowScroll.listNode, state.mode === "live" && !limit, () => + const virtual = useMeasuredVirtualList(items, state.flowScroll.listRef, 44, 8, "flow"); + useVirtualHistoryGate(state.mode === "live" && !limit, items.length, virtual.virtualItems.at(-1)?.index ?? -1, () => void state.liveSession.loadOlder("flow") ); @@ -6586,10 +6905,8 @@ const FlowPane = ({ limit, title = "Flow" }: FlowPaneProps) => { NBBO QUALITY
- {virtual.topSpacerHeight > 0 ? ( -
- ) : null} - {virtual.visibleItems.map((packet) => { +
+ {virtual.virtualItems.map(({ item: packet, key, index, start, size }) => { const features = packet.features ?? {}; const contract = String(features.option_contract_id ?? packet.id ?? "unknown"); const count = parseNumber(features.count, packet.members.length); @@ -6641,7 +6958,15 @@ const FlowPane = ({ limit, title = "Flow" }: FlowPaneProps) => { ].filter(Boolean).join(" | "); return ( -
+
{formatTime(startTs)} → {formatTime(endTs)} {contract} {formatFlowMetric(count)} @@ -6654,9 +6979,7 @@ const FlowPane = ({ limit, title = "Flow" }: FlowPaneProps) => {
); })} - {virtual.bottomSpacerHeight > 0 ? ( -
- ) : null} +
)} @@ -6674,8 +6997,8 @@ type AlertsPaneProps = { const AlertsPane = ({ limit, withStrip = false, className }: AlertsPaneProps) => { const state = useTerminal(); const items = limit ? state.filteredAlerts.slice(0, limit) : state.filteredAlerts; - const virtual = useVirtualList(items, state.alertsScroll.listRef, !limit, 46); - useBottomHistoryGate(state.alertsScroll.listRef, state.alertsScroll.listNode, state.mode === "live" && !limit, () => + const virtual = useMeasuredVirtualList(items, state.alertsScroll.listRef, 46, 8, "alerts"); + useVirtualHistoryGate(state.mode === "live" && !limit, items.length, virtual.virtualItems.at(-1)?.index ?? -1, () => void state.liveSession.loadOlder("alerts") ); @@ -6726,19 +7049,22 @@ const AlertsPane = ({ limit, withStrip = false, className }: AlertsPaneProps) => DIR NOTE
- {virtual.topSpacerHeight > 0 ? ( -
- ) : null} - {virtual.visibleItems.map((alert) => { +
+ {virtual.virtualItems.map(({ item: alert, key, index, start, size }) => { const primary = alert.hits[0]; const direction = deriveAlertDirection(alert); const severity = normalizeAlertSeverity(alert); return ( ); })} - {virtual.bottomSpacerHeight > 0 ? ( -
- ) : null} +
)} @@ -6774,10 +7098,6 @@ type ClassifierPaneProps = { const ClassifierPane = ({ limit, className }: ClassifierPaneProps) => { const state = useTerminal(); - useBottomHistoryGate(state.classifierScroll.listRef, state.classifierScroll.listNode, state.mode === "live" && !limit, () => { - void state.liveSession.loadOlder("smart-money"); - void state.liveSession.loadOlder("classifier-hits"); - }); const smartMoneyItems = limit ? state.filteredSmartMoneyEvents.slice(0, limit) : state.filteredSmartMoneyEvents; const legacyItems = smartMoneyItems.length === 0 @@ -6787,12 +7107,11 @@ const ClassifierPane = ({ limit, className }: ClassifierPaneProps) => { : []; const items: Array = smartMoneyItems.length > 0 ? smartMoneyItems : legacyItems; - const virtual = useVirtualList( - items, - state.classifierScroll.listRef, - !limit, - 44 - ); + const virtual = useMeasuredVirtualList(items, state.classifierScroll.listRef, 44, 8, "classifier"); + useVirtualHistoryGate(state.mode === "live" && !limit, items.length, virtual.virtualItems.at(-1)?.index ?? -1, () => { + void state.liveSession.loadOlder("smart-money"); + void state.liveSession.loadOlder("classifier-hits"); + }); const showingSmartMoney = smartMoneyItems.length > 0; return ( @@ -6839,19 +7158,23 @@ const ClassifierPane = ({ limit, className }: ClassifierPaneProps) => { PROB NOTE
- {virtual.topSpacerHeight > 0 ? ( -
- ) : null} - {showingSmartMoney ? (virtual.visibleItems as SmartMoneyEvent[]).map((event) => { +
+ {showingSmartMoney ? virtual.virtualItems.map(({ item, key, index, start, size }) => { + const event = item as SmartMoneyEvent; const primaryScore = event.profile_scores.find((score) => score.profile_id === event.primary_profile_id) ?? event.profile_scores[0]; const direction = normalizeDirection(event.primary_direction); return ( ); - }) : (virtual.visibleItems as ClassifierHitEvent[]).map((hit) => { + }) : virtual.virtualItems.map(({ item, key, index, start, size }) => { + const hit = item as ClassifierHitEvent; const direction = normalizeDirection(hit.direction); return ( ); })} - {virtual.bottomSpacerHeight > 0 ? ( -
- ) : null} +
)} @@ -6903,8 +7230,8 @@ type DarkPaneProps = { const DarkPane = ({ limit, className }: DarkPaneProps) => { const state = useTerminal(); const items = limit ? state.filteredInferredDark.slice(0, limit) : state.filteredInferredDark; - const virtual = useVirtualList(items, state.darkScroll.listRef, !limit, 44); - useBottomHistoryGate(state.darkScroll.listRef, state.darkScroll.listNode, state.mode === "live" && !limit, () => + const virtual = useMeasuredVirtualList(items, state.darkScroll.listRef, 44, 8, "dark"); + useVirtualHistoryGate(state.mode === "live" && !limit, items.length, virtual.virtualItems.at(-1)?.index ?? -1, () => void state.liveSession.loadOlder("inferred-dark") ); @@ -6953,18 +7280,21 @@ const DarkPane = ({ limit, className }: DarkPaneProps) => { EVIDENCE NOTE - {virtual.topSpacerHeight > 0 ? ( -
- ) : null} - {virtual.visibleItems.map((event) => { +
+ {virtual.virtualItems.map(({ item: event, key, index, start, size }) => { const underlying = inferDarkUnderlying(event, state.equityPrintMap, state.equityJoinMap); const evidenceCount = event.evidence_refs.length; return ( ); })} - {virtual.bottomSpacerHeight > 0 ? ( -
- ) : null} +
)} diff --git a/apps/web/package.json b/apps/web/package.json index b61eb2e..8ab6906 100644 --- a/apps/web/package.json +++ b/apps/web/package.json @@ -9,6 +9,7 @@ }, "dependencies": { "@islandflow/types": "workspace:*", + "@tanstack/react-virtual": "^3.13.24", "lightweight-charts": "^4.2.0", "next": "^14.2.4", "react": "^18.3.1", diff --git a/bun.lock b/bun.lock index de67cb2..47fc572 100644 --- a/bun.lock +++ b/bun.lock @@ -12,6 +12,7 @@ "name": "@islandflow/web", "dependencies": { "@islandflow/types": "workspace:*", + "@tanstack/react-virtual": "^3.13.24", "lightweight-charts": "^4.2.0", "next": "^14.2.4", "react": "^18.3.1", @@ -208,6 +209,10 @@ "@swc/helpers": ["@swc/helpers@0.5.5", "", { "dependencies": { "@swc/counter": "^0.1.3", "tslib": "^2.4.0" } }, "sha512-KGYxvIOXcceOAbEk4bi/dVLEK9z8sZ0uBB3Il5b1rhfClSpcX0yfRO0KmTkqR2cnQDymwLB+25ZyMzICg/cm/A=="], + "@tanstack/react-virtual": ["@tanstack/react-virtual@3.13.24", "", { "dependencies": { "@tanstack/virtual-core": "3.14.0" }, "peerDependencies": { "react": "^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0", "react-dom": "^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0" } }, "sha512-aIJvz5OSkhNIhZIpYivrxrPTKYsjW9Uzy+sP/mx0S3sev2HyvPb7xmjbYvokzEpfgYHy/HjzJ2zFAETuUfgCpg=="], + + "@tanstack/virtual-core": ["@tanstack/virtual-core@3.14.0", "", {}, "sha512-JLANqGy/D6k4Ujmh8Tr25lGimuOXNiaVyXaCAZS0W+1390sADdGnyUdSWNIfd49gebtIxGMij4IktRVzrdr12Q=="], + "@types/node": ["@types/node@20.19.27", "", { "dependencies": { "undici-types": "~6.21.0" } }, "sha512-N2clP5pJhB2YnZJ3PIHFk5RkygRX5WO/5f0WC08tp0wd+sv0rsJk3MqWn3CbNmT2J505a5336jaQj4ph1AdMug=="], "@types/prop-types": ["@types/prop-types@15.7.15", "", {}, "sha512-F6bEyamV9jKGAFBEmlQnesRPGOQqS2+Uwi0Em15xenOxHaf2hv6L8YCVn3rPdPJOiJfPiCnLIRyvwVaqMY3MIw=="], diff --git a/packages/types/src/live.ts b/packages/types/src/live.ts index 01fe4af..0787c84 100644 --- a/packages/types/src/live.ts +++ b/packages/types/src/live.ts @@ -54,6 +54,24 @@ export const LiveChannelSchema = z.enum([ export type LiveChannel = z.infer; export type LiveGenericChannel = z.infer; +export const LiveHotChannelSchema = z.enum(["options", "nbbo", "equities", "flow"]); +export type LiveHotChannel = z.infer; + +export const LiveChannelHealthSchema = z.object({ + freshness_age_ms: z.number().int().nonnegative().nullable(), + healthy: z.boolean() +}); + +export type LiveChannelHealth = z.infer; + +export const LiveHotChannelHealthSchema = z.object({ + options: LiveChannelHealthSchema, + nbbo: LiveChannelHealthSchema, + equities: LiveChannelHealthSchema, + flow: LiveChannelHealthSchema +}); + +export type LiveHotChannelHealthMap = z.infer; export const LiveSubscriptionSchema = z.discriminatedUnion("channel", [ z.object({ @@ -152,7 +170,8 @@ export const LiveClientMessageSchema = z.discriminatedUnion("op", [ export type LiveClientMessage = z.infer; export const LiveReadyMessageSchema = z.object({ - op: z.literal("ready") + op: z.literal("ready"), + channel_health: LiveHotChannelHealthSchema }); export type LiveReadyMessage = z.infer; @@ -175,7 +194,8 @@ export type LiveEventMessage = z.infer; export const LiveHeartbeatMessageSchema = z.object({ op: z.literal("heartbeat"), - ts: z.number().int().nonnegative() + ts: z.number().int().nonnegative(), + channel_health: LiveHotChannelHealthSchema }); export type LiveHeartbeatMessage = z.infer; diff --git a/services/api/src/index.ts b/services/api/src/index.ts index ff72307..3035897 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -112,7 +112,7 @@ import { } from "@islandflow/types"; import { createClient } from "redis"; import { z } from "zod"; -import { LiveStateManager, shouldFanoutLiveEvent } from "./live"; +import { HOT_LIVE_REDIS_KEYS, LiveStateManager, shouldFanoutLiveEvent } from "./live"; const service = "api"; const logger = createLogger({ service }); @@ -138,13 +138,6 @@ const state = { shutdownPromise: null as Promise | null }; -const HOT_LIVE_REDIS_KEYS = { - options: "live:options", - equities: "live:equities", - flow: "live:flow", - nbbo: "live:nbbo" -} as const; - const getErrorMessage = (error: unknown): string => { return error instanceof Error ? error.message : String(error); }; @@ -908,6 +901,7 @@ const run = async () => { }; const liveStateMetricsTimer = setInterval(() => { const snapshot = liveState.getStatsSnapshot(); + const hotFeedHealth = liveState.getHotChannelHealth(); const hotFeedLagMs = { options: snapshot.freshnessAgeMsByKey[HOT_LIVE_REDIS_KEYS.options] ?? null, equities: snapshot.freshnessAgeMsByKey[HOT_LIVE_REDIS_KEYS.equities] ?? null, @@ -916,7 +910,12 @@ const run = async () => { }; logger.info("live cache metrics", { ...snapshot, - hotFeedLagMs + hotFeedLagMs, + hotFeedHealth, + snapshotSourceCounts: { + generic_cache_snapshot: snapshot.genericCacheSnapshots, + scoped_clickhouse_snapshot: snapshot.scopedClickHouseSnapshots + } }); warnLiveLag("options", hotFeedLagMs.options); warnLiveLag("equities", hotFeedLagMs.equities); @@ -1892,9 +1891,13 @@ const run = async () => { websocket: { open: (socket: any) => { if (socket.data.channel === "live") { - sendLiveMessage(socket, { op: "ready" }); + sendLiveMessage(socket, { op: "ready", channel_health: liveState.getHotChannelHealth() }); const heartbeat = setInterval(() => { - sendLiveMessage(socket, { op: "heartbeat", ts: Date.now() }); + sendLiveMessage(socket, { + op: "heartbeat", + ts: Date.now(), + channel_health: liveState.getHotChannelHealth() + }); }, 15000); liveHeartbeats.set(socket, heartbeat); } else if (socket.data.channel === "options") { @@ -1935,7 +1938,11 @@ const run = async () => { : new TextDecoder().decode(message instanceof Uint8Array ? message : new Uint8Array(message)); const parsed = LiveClientMessageSchema.parse(JSON.parse(payload)); if (parsed.op === "ping") { - sendLiveMessage(socket, { op: "heartbeat", ts: Date.now() }); + sendLiveMessage(socket, { + op: "heartbeat", + ts: Date.now(), + channel_health: liveState.getHotChannelHealth() + }); return; } diff --git a/services/api/src/live.ts b/services/api/src/live.ts index 2907214..bd579da 100644 --- a/services/api/src/live.ts +++ b/services/api/src/live.ts @@ -25,7 +25,10 @@ import { FeedSnapshot, FlowPacketSchema, InferredDarkEventSchema, + LiveChannelHealth, LiveGenericChannel, + LiveHotChannel, + LiveHotChannelHealthMap, LiveSubscription, matchesFlowPacketFilters, matchesOptionPrintFilters, @@ -81,6 +84,13 @@ export const LIVE_FRESHNESS_THRESHOLDS: Partial; + export type GenericLiveLimits = Record; const parseGenericLimit = ( @@ -357,6 +367,8 @@ export class LiveStateManager { private readonly stats = { genericHydrateFromRedis: 0, genericHydrateFromClickHouse: 0, + genericCacheSnapshots: 0, + scopedClickHouseSnapshots: 0, trimOperations: 0, cacheDepthByKey: new Map(), freshnessAgeMsByKey: new Map() @@ -373,6 +385,8 @@ export class LiveStateManager { getStatsSnapshot(): { genericHydrateFromRedis: number; genericHydrateFromClickHouse: number; + genericCacheSnapshots: number; + scopedClickHouseSnapshots: number; trimOperations: number; cacheDepthByKey: Record; freshnessAgeMsByKey: Record; @@ -380,12 +394,37 @@ export class LiveStateManager { return { genericHydrateFromRedis: this.stats.genericHydrateFromRedis, genericHydrateFromClickHouse: this.stats.genericHydrateFromClickHouse, + genericCacheSnapshots: this.stats.genericCacheSnapshots, + scopedClickHouseSnapshots: this.stats.scopedClickHouseSnapshots, trimOperations: this.stats.trimOperations, cacheDepthByKey: Object.fromEntries(this.stats.cacheDepthByKey), freshnessAgeMsByKey: Object.fromEntries(this.stats.freshnessAgeMsByKey) }; } + getHotChannelHealth(): LiveHotChannelHealthMap { + return { + options: this.getChannelHealth("options"), + nbbo: this.getChannelHealth("nbbo"), + equities: this.getChannelHealth("equities"), + flow: this.getChannelHealth("flow") + }; + } + + private getChannelHealth(channel: LiveHotChannel): LiveChannelHealth { + const listKey = HOT_LIVE_REDIS_KEYS[channel]; + const thresholdMs = LIVE_FRESHNESS_THRESHOLDS[channel]; + const freshnessAgeMs = this.stats.freshnessAgeMsByKey.get(listKey) ?? null; + return { + freshness_age_ms: freshnessAgeMs, + healthy: + freshnessAgeMs !== null && + typeof thresholdMs === "number" && + Number.isFinite(freshnessAgeMs) && + freshnessAgeMs <= thresholdMs + }; + } + private updateFreshnessMetric(listKey: string, channel: LiveChannel, item: unknown, now = Date.now()): void { const ts = channel === "equity-candles" || channel === "equity-overlay" @@ -448,6 +487,7 @@ export class LiveStateManager { const scoped = Boolean(subscription.underlying_ids?.length) || Boolean(subscription.option_contract_id); if (subscription.filters?.view === "raw" || scoped) { + this.stats.scopedClickHouseSnapshots += 1; const limit = snapshotLimitFor(subscription, this.generic.options.limit); const storageFilters: OptionPrintQueryFilters = { view: subscription.filters?.view ?? "signal", @@ -476,6 +516,7 @@ export class LiveStateManager { } const config = this.generic.options; + this.stats.genericCacheSnapshots += 1; const limit = snapshotLimitFor(subscription, config.limit); const items = (this.genericItems.get("options") ?? []).filter((item) => matchesOptionPrintFilters(item, subscription.filters) @@ -489,6 +530,7 @@ export class LiveStateManager { } case "flow": { const config = this.generic.flow; + this.stats.genericCacheSnapshots += 1; const limit = snapshotLimitFor(subscription, config.limit); const items = (this.genericItems.get("flow") ?? []).filter((item) => matchesFlowPacketFilters(item, subscription.filters) @@ -504,6 +546,7 @@ export class LiveStateManager { const config = this.generic.equities; const limit = snapshotLimitFor(subscription, config.limit); if (subscription.underlying_ids?.length) { + this.stats.scopedClickHouseSnapshots += 1; const filters: EquityPrintQueryFilters = { underlyingIds: subscription.underlying_ids }; @@ -515,6 +558,7 @@ export class LiveStateManager { next_before: nextBeforeForItems(items, config.cursor) }; } + this.stats.genericCacheSnapshots += 1; const items = (this.genericItems.get("equities") ?? []).slice(0, limit); return { subscription, @@ -553,6 +597,7 @@ export class LiveStateManager { } default: { const config = this.generic[subscription.channel]; + this.stats.genericCacheSnapshots += 1; const limit = snapshotLimitFor(subscription, config.limit); const items = (this.genericItems.get(subscription.channel) ?? []).slice(0, limit); return { diff --git a/services/api/tests/live.test.ts b/services/api/tests/live.test.ts index 898d2fa..55232cc 100644 --- a/services/api/tests/live.test.ts +++ b/services/api/tests/live.test.ts @@ -1,6 +1,7 @@ import { describe, expect, it } from "bun:test"; import type { ClickHouseClient } from "@islandflow/storage"; import { + HOT_LIVE_REDIS_KEYS, LiveStateManager, isLiveItemFresh, resolveGenericLiveLimits, @@ -729,6 +730,122 @@ describe("LiveStateManager", () => { expect(persisted).toHaveLength(1); }); + it("includes hot-channel health for options, nbbo, equities, and flow", async () => { + const manager = new LiveStateManager(makeClickHouse(), null); + const now = Date.now(); + + await manager.ingest("options", { + source_ts: now, + ingest_ts: now + 1, + seq: 1, + trace_id: "opt-health", + ts: now, + option_contract_id: "AAPL-2025-01-17-200-C", + price: 1, + size: 10, + exchange: "X" + }); + await manager.ingest("nbbo", { + source_ts: now, + ingest_ts: now + 1, + seq: 1, + trace_id: "nbbo-health", + ts: now, + option_contract_id: "AAPL-2025-01-17-200-C", + bid: 1, + ask: 1.1, + bidSize: 10, + askSize: 10 + }); + await manager.ingest("equities", { + source_ts: now, + ingest_ts: now + 1, + seq: 1, + trace_id: "eq-health", + ts: now, + underlying_id: "AAPL", + price: 100, + size: 10, + exchange: "X", + offExchangeFlag: false + }); + await manager.ingest("flow", { + source_ts: now, + ingest_ts: now + 1, + seq: 1, + trace_id: "flow-health", + id: "flow-health", + members: [], + features: {}, + join_quality: {} + }); + + const health = manager.getHotChannelHealth(); + expect(health.options.healthy).toBe(true); + expect(health.nbbo.healthy).toBe(true); + expect(health.equities.healthy).toBe(true); + expect(health.flow.healthy).toBe(true); + expect(health.options.freshness_age_ms).not.toBeNull(); + expect(health.nbbo.freshness_age_ms).not.toBeNull(); + expect(health.equities.freshness_age_ms).not.toBeNull(); + expect(health.flow.freshness_age_ms).not.toBeNull(); + }); + + it("tracks generic cache and scoped clickhouse snapshot sources separately", async () => { + const manager = new LiveStateManager(makeClickHouse(() => []), null); + const now = Date.now(); + + await manager.ingest("options", { + source_ts: now, + ingest_ts: now + 1, + seq: 1, + trace_id: "opt-snapshot", + ts: now, + option_contract_id: "SPY-2025-01-17-500-C", + price: 1, + size: 10, + exchange: "X" + }); + + await manager.getSnapshot({ channel: "options" }); + await manager.getSnapshot({ + channel: "options", + underlying_ids: ["QQQ"], + option_contract_id: "QQQ-2025-01-17-400-C" + }); + + const stats = manager.getStatsSnapshot(); + expect(stats.genericCacheSnapshots).toBe(1); + expect(stats.scopedClickHouseSnapshots).toBe(1); + }); + + it("keeps backend channel health healthy when a scoped query is quiet", async () => { + const manager = new LiveStateManager(makeClickHouse(() => []), null); + const now = Date.now(); + + await manager.ingest("options", { + source_ts: now, + ingest_ts: now + 1, + seq: 1, + trace_id: "opt-global", + ts: now, + option_contract_id: "SPY-2025-01-17-500-C", + price: 1, + size: 10, + exchange: "X" + }); + + const quietSnapshot = await manager.getSnapshot({ + channel: "options", + underlying_ids: ["QQQ"], + option_contract_id: "QQQ-2025-01-17-400-C" + }); + + expect(quietSnapshot.items).toEqual([]); + expect(manager.getHotChannelHealth().options.healthy).toBe(true); + expect(manager.getStatsSnapshot().freshnessAgeMsByKey[HOT_LIVE_REDIS_KEYS.options]).toBeLessThanOrEqual(50); + }); + it("exposes freshness helper for feed status", () => { expect(isLiveItemFresh("options", { ts: 1000 }, 1010)).toBe(true); expect(isLiveItemFresh("options", { ts: 1000 }, 20_001)).toBe(false);