From 04188851b33689b8beb00b0390c412e05fe9f4aa Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Sun, 11 Jan 2026 14:26:47 -0500 Subject: [PATCH] Sync replay source for options and NBBO --- apps/web/app/page.tsx | 75 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 69 insertions(+), 6 deletions(-) diff --git a/apps/web/app/page.tsx b/apps/web/app/page.tsx index ce8fe40..d496531 100644 --- a/apps/web/app/page.tsx +++ b/apps/web/app/page.tsx @@ -122,6 +122,29 @@ const extractTracePrefix = (item: T): string | null => { return inferTracePrefix(traceId); }; +const extractReplaySource = (item: T): string | null => { + const prefix = extractTracePrefix(item); + if (!prefix) { + return null; + } + + const normalized = prefix.toLowerCase(); + if (normalized.startsWith("synthetic")) { + return "synthetic"; + } + if (normalized.startsWith("databento")) { + return "databento"; + } + if (normalized.startsWith("alpaca")) { + return "alpaca"; + } + if (normalized.startsWith("ibkr")) { + return "ibkr"; + } + + return prefix; +}; + type SortableItem = { ts?: number; source_ts?: number; @@ -604,6 +627,8 @@ type TapeConfig = { onNewItems?: (count: number) => void; getItemTs?: (item: T) => number; getReplayKey?: (item: T) => string | null; + replaySourceKey?: string | null; + onReplaySourceKey?: (key: string | null) => void; }; const useTape = ( @@ -614,6 +639,8 @@ const useTape = ( const pollMs = config.pollMs ?? 1000; const getItemTs = config.getItemTs ?? extractSortTs; const getReplayKey = config.getReplayKey ?? extractTracePrefix; + const replaySourceKey = config.replaySourceKey ?? null; + const onReplaySourceKey = config.onReplaySourceKey; const [status, setStatus] = useState("connecting"); const [items, setItems] = useState([]); const [lastUpdate, setLastUpdate] = useState(null); @@ -627,6 +654,7 @@ const useTape = ( const replayEndRef = useRef(null); const replayCompleteRef = useRef(false); const replaySourceRef = useRef(null); + const replaySourceNotifiedRef = useRef(null); const emptyPollsRef = useRef(0); const pausedRef = useRef(paused); const pendingRef = useRef([]); @@ -690,6 +718,7 @@ const useTape = ( setReplayComplete(false); replayCompleteRef.current = false; replaySourceRef.current = null; + replaySourceNotifiedRef.current = null; emptyPollsRef.current = 0; setDropped(0); setStatus("connecting"); @@ -697,7 +726,7 @@ const useTape = ( pendingRef.current = []; pendingCountRef.current = 0; cancelFlush(); - }, [mode, cancelFlush]); + }, [mode, replaySourceKey, cancelFlush]); useEffect(() => { if (mode !== "replay" || !latestPath) { @@ -863,7 +892,12 @@ const useTape = ( const payload = (await response.json()) as ReplayResponse; let sourcePrefix = replaySourceRef.current; - if (!sourcePrefix) { + if (replaySourceKey) { + if (sourcePrefix !== replaySourceKey) { + sourcePrefix = replaySourceKey; + replaySourceRef.current = replaySourceKey; + } + } else if (!sourcePrefix) { const firstWithTrace = payload.data.find((item) => getReplayKey(item)); if (firstWithTrace) { sourcePrefix = getReplayKey(firstWithTrace); @@ -871,6 +905,11 @@ const useTape = ( } } + if (onReplaySourceKey && sourcePrefix && replaySourceNotifiedRef.current !== sourcePrefix) { + replaySourceNotifiedRef.current = sourcePrefix; + onReplaySourceKey(sourcePrefix); + } + const filtered = sourcePrefix ? payload.data.filter((item) => getReplayKey(item) === sourcePrefix) : payload.data; @@ -915,7 +954,7 @@ const useTape = ( await new Promise((resolve) => setTimeout(resolve, 0)); } - if (hasForeign) { + if (!replaySourceKey && hasForeign) { replayCompleteRef.current = true; setReplayComplete(true); setStatus("disconnected"); @@ -943,7 +982,18 @@ const useTape = ( window.clearInterval(interval); cancelFlush(); }; - }, [mode, replayPath, batchSize, pollMs, scheduleFlush, cancelFlush, getItemTs, getReplayKey]); + }, [ + mode, + replayPath, + batchSize, + pollMs, + scheduleFlush, + cancelFlush, + getItemTs, + getReplayKey, + replaySourceKey, + onReplaySourceKey + ]); return { status, @@ -1896,10 +1946,19 @@ const formatFlowMetric = (value: number, suffix?: string): string => { export default function HomePage() { const [mode, setMode] = useState("live"); + const [replaySource, setReplaySource] = useState(null); const [selectedAlert, setSelectedAlert] = useState(null); const [selectedDarkEvent, setSelectedDarkEvent] = useState(null); const [filterInput, setFilterInput] = useState(""); const [chartIntervalMs, setChartIntervalMs] = useState(CANDLE_INTERVALS[0].ms); + + const handleReplaySource = useCallback((value: string | null) => { + setReplaySource(value); + }, []); + + useEffect(() => { + setReplaySource(null); + }, [mode]); const optionsScroll = useListScroll(); const equitiesScroll = useListScroll(); const flowScroll = useListScroll(); @@ -1927,7 +1986,9 @@ export default function HomePage() { batchSize: mode === "replay" ? 120 : undefined, pollMs: mode === "replay" ? 200 : undefined, captureScroll: optionsAnchor.capture, - onNewItems: optionsScroll.onNewItems + onNewItems: optionsScroll.onNewItems, + getReplayKey: extractReplaySource, + onReplaySourceKey: handleReplaySource }); const equities = useTape({ @@ -1960,7 +2021,9 @@ export default function HomePage() { latestPath: "/nbbo/options", expectedType: "option-nbbo", batchSize: mode === "replay" ? 120 : undefined, - pollMs: mode === "replay" ? 200 : undefined + pollMs: mode === "replay" ? 200 : undefined, + getReplayKey: extractReplaySource, + replaySourceKey: replaySource }); const inferredDark = useTape({