From 6ba3c5343bf10f6ff9a6a120bbb5b61dc2d52548 Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Wed, 6 May 2026 22:28:20 -0400 Subject: [PATCH] Implement live tape scroll-gated history --- apps/web/app/terminal.tsx | 395 ++++++++++++++++++++++++++--- packages/storage/src/clickhouse.ts | 98 +++++++ packages/types/src/live.ts | 15 +- services/api/src/index.ts | 53 ++++ services/api/src/live.ts | 25 +- 5 files changed, 533 insertions(+), 53 deletions(-) diff --git a/apps/web/app/terminal.tsx b/apps/web/app/terminal.tsx index 0a4bb56..e4c67a6 100644 --- a/apps/web/app/terminal.tsx +++ b/apps/web/app/terminal.tsx @@ -62,13 +62,25 @@ const parseBoundedInt = ( return Math.max(min, Math.min(max, Math.floor(parsed))); }; -const LIVE_HOT_WINDOW = parseBoundedInt(process.env.NEXT_PUBLIC_LIVE_HOT_WINDOW, 2000, 100, 100000); +const LIVE_HOT_WINDOW = parseBoundedInt(process.env.NEXT_PUBLIC_LIVE_HOT_WINDOW, 100, 1, 100000); const LIVE_HOT_WINDOW_OPTIONS = parseBoundedInt( process.env.NEXT_PUBLIC_LIVE_HOT_WINDOW_OPTIONS, - 25000, 100, + 1, 100000 ); +const LIVE_HISTORY_SOFT_CAP = parseBoundedInt( + process.env.NEXT_PUBLIC_LIVE_HISTORY_SOFT_CAP, + 5000, + 100, + 50000 +); +const LIVE_HISTORY_BATCH = parseBoundedInt( + process.env.NEXT_PUBLIC_LIVE_HISTORY_BATCH, + 500, + 1, + 1000 +); const LIVE_OPTIONS_STALE_MS = 15_000; const LIVE_NBBO_STALE_MS = 15_000; const LIVE_EQUITIES_STALE_MS = 15_000; @@ -409,6 +421,16 @@ type PausableTapeData = { dropped: number; }; +type LiveHistoryBuffer = { + liveHead: T[]; + queuedLive: T[]; + historyTail: T[]; + nextBefore: Cursor | null; + historyLoading: boolean; + historyExhausted: boolean; + autoHydrating: boolean; +}; + export const reducePausableTapeData = ( current: PausableTapeData, incoming: T[], @@ -488,6 +510,37 @@ const EMPTY_PAUSABLE_TAPE = { dropped: 0 }; +const appendHistoryTail = ( + current: T[], + incoming: T[], + liveHead: T[], + cap = LIVE_HISTORY_SOFT_CAP +): T[] => { + if (incoming.length === 0) { + return current; + } + + const seen = new Set(); + for (const item of liveHead) { + seen.add(getTapeItemKey(item)); + } + for (const item of current) { + seen.add(getTapeItemKey(item)); + } + + const appended = [...current]; + for (const item of incoming) { + const key = getTapeItemKey(item); + if (seen.has(key)) { + continue; + } + seen.add(key); + appended.push(item); + } + + return cap > 0 ? appended.slice(0, cap) : appended; +}; + export const getLiveFeedStatus = ( sourceStatus: WsStatus, freshestTs: number | null, @@ -1315,6 +1368,40 @@ const useScrollAnchor = ( return { capture, apply }; }; +const useBottomHistoryGate = ( + listRef: React.RefObject, + enabled: boolean, + onLoadOlder: () => void +): void => { + const loadRef = useRef(onLoadOlder); + useEffect(() => { + loadRef.current = onLoadOlder; + }, [onLoadOlder]); + + useEffect(() => { + if (!enabled) { + return; + } + const element = listRef.current; + if (!element) { + return; + } + + const maybeLoad = () => { + const threshold = Math.max(240, element.clientHeight * 0.5); + if (element.scrollTop + element.clientHeight >= element.scrollHeight - threshold) { + loadRef.current(); + } + }; + + maybeLoad(); + element.addEventListener("scroll", maybeLoad); + return () => { + element.removeEventListener("scroll", maybeLoad); + }; + }, [enabled, listRef]); +}; + type VirtualListResult = { visibleItems: T[]; topSpacerHeight: number; @@ -1886,6 +1973,7 @@ type PausableTapeViewConfig = { enabled: boolean; sourceStatus: WsStatus; sourceItems: T[]; + historyTail?: T[]; lastUpdate: number | null; freshnessMs: number; onNewItems?: (count: number) => void; @@ -2011,10 +2099,14 @@ const usePausableTapeView = ( ) : "disconnected"; const projected = projectPausableTapeState(data.visible, status, config.lastUpdate); + const items = useMemo( + () => [...projected.items, ...(config.historyTail ?? [])], + [projected.items, config.historyTail] + ); return { status, - items: projected.items, + items, lastUpdate: projected.lastUpdate, replayTime: null, replayComplete: false, @@ -2269,6 +2361,15 @@ type LiveSessionState = { historyLoading: Partial>; historyErrors: Partial>; loadOlder: (channel: LiveSubscription["channel"]) => Promise; + optionsHistory: OptionPrint[]; + nbboHistory: OptionNBBO[]; + equitiesHistory: EquityPrint[]; + equityJoinsHistory: EquityPrintJoin[]; + flowHistory: FlowPacket[]; + smartMoneyHistory: SmartMoneyEvent[]; + classifierHitsHistory: ClassifierHitEvent[]; + alertsHistory: AlertEvent[]; + inferredDarkHistory: InferredDarkEvent[]; options: OptionPrint[]; nbbo: OptionNBBO[]; equities: EquityPrint[]; @@ -2360,14 +2461,23 @@ export const getLiveManifest = ( ]; if (pathname === "/tape") { - return dedupeLiveSubscriptions([ - ...baselineSubs, - { channel: "nbbo" }, - { channel: "equities", ...equityScope }, - { channel: "flow", filters: flowFilters }, - { channel: "smart-money" }, - { channel: "classifier-hits" } - ]); + const optionsSub: Extract = { + channel: "options", + filters: flowFilters, + ...optionScope, + snapshot_limit: LIVE_HOT_WINDOW_OPTIONS + }; + const tapeSubs: LiveSubscription[] = [ + optionsSub, + { channel: "nbbo", snapshot_limit: LIVE_HOT_WINDOW }, + { channel: "equities", ...equityScope, snapshot_limit: LIVE_HOT_WINDOW }, + { channel: "flow", filters: flowFilters, snapshot_limit: LIVE_HOT_WINDOW }, + { channel: "smart-money", snapshot_limit: LIVE_HOT_WINDOW }, + { channel: "classifier-hits", snapshot_limit: LIVE_HOT_WINDOW }, + { channel: "alerts", snapshot_limit: LIVE_HOT_WINDOW }, + { channel: "inferred-dark", snapshot_limit: LIVE_HOT_WINDOW } + ]; + return dedupeLiveSubscriptions(tapeSubs); } return dedupeLiveSubscriptions([ @@ -2410,6 +2520,15 @@ const useLiveSession = ( const [classifierHits, setClassifierHits] = useState([]); const [alerts, setAlerts] = useState([]); const [inferredDark, setInferredDark] = useState([]); + const [optionsHistory, setOptionsHistory] = useState([]); + const [nbboHistory, setNbboHistory] = useState([]); + const [equitiesHistory, setEquitiesHistory] = useState([]); + const [equityJoinsHistory, setEquityJoinsHistory] = useState([]); + const [flowHistory, setFlowHistory] = useState([]); + const [smartMoneyHistory, setSmartMoneyHistory] = useState([]); + const [classifierHitsHistory, setClassifierHitsHistory] = useState([]); + const [alertsHistory, setAlertsHistory] = useState([]); + const [inferredDarkHistory, setInferredDarkHistory] = useState([]); const [chartCandles, setChartCandles] = useState([]); const [chartOverlay, setChartOverlay] = useState([]); const socketRef = useRef(null); @@ -2443,6 +2562,15 @@ const useLiveSession = ( setClassifierHits([]); setAlerts([]); setInferredDark([]); + setOptionsHistory([]); + setNbboHistory([]); + setEquitiesHistory([]); + setEquityJoinsHistory([]); + setFlowHistory([]); + setSmartMoneyHistory([]); + setClassifierHitsHistory([]); + setAlertsHistory([]); + setInferredDarkHistory([]); setChartCandles([]); setChartOverlay([]); subscribedKeysRef.current = new Set(); @@ -2699,9 +2827,11 @@ const useLiveSession = ( ); if (resetScopedChannels.has("options")) { setOptions([]); + setOptionsHistory([]); } if (resetScopedChannels.has("equities")) { setEquities([]); + setEquitiesHistory([]); } if (resetScopedChannels.size > 0) { setHistoryCursors((current) => { @@ -2765,7 +2895,7 @@ const useLiveSession = ( const params = new URLSearchParams({ before_ts: String(cursor.ts), before_seq: String(cursor.seq), - limit: String(subscription.channel === "options" ? 500 : 200) + limit: String(subscription.channel === "options" ? LIVE_HISTORY_BATCH : 200) }); if (subscription.channel === "options" || subscription.channel === "flow") { appendOptionFlowFilters(params, subscription.filters); @@ -2783,45 +2913,49 @@ const useLiveSession = ( const mergeOlder = ( setter: Dispatch>, - limit: number + liveHead: T[], + cap = LIVE_HISTORY_SOFT_CAP ) => { - setter((prev) => - mergeNewest(older as T[], prev, limit, (evicted) => - incrementRetentionMetric("hotWindowEvictions", evicted) - ) - ); + setter((prev) => appendHistoryTail(prev, older as T[], liveHead, cap)); }; switch (subscription.channel) { case "options": - mergeOlder(setOptions, LIVE_HOT_WINDOW_OPTIONS); + mergeOlder( + setOptionsHistory, + options, + subscription.underlying_ids?.length || subscription.option_contract_id ? 0 : LIVE_HISTORY_SOFT_CAP + ); break; case "nbbo": - mergeOlder(setNbbo, LIVE_HOT_WINDOW); + mergeOlder(setNbboHistory, nbbo); break; case "equities": - mergeOlder(setEquities, LIVE_HOT_WINDOW); + mergeOlder( + setEquitiesHistory, + equities, + subscription.underlying_ids?.length ? 0 : LIVE_HISTORY_SOFT_CAP + ); break; case "equity-quotes": - mergeOlder(setEquityQuotes, LIVE_HOT_WINDOW); break; case "equity-joins": - mergeOlder(setEquityJoins, LIVE_HOT_WINDOW); + mergeOlder(setEquityJoinsHistory, equityJoins); break; case "flow": - mergeOlder(setFlow, LIVE_HOT_WINDOW); + mergeOlder(setFlowHistory, flow); break; case "smart-money": - mergeOlder(setSmartMoney, LIVE_HOT_WINDOW); + mergeOlder(setSmartMoneyHistory, smartMoney); break; case "classifier-hits": - mergeOlder(setClassifierHits, LIVE_HOT_WINDOW); + mergeOlder(setClassifierHitsHistory, classifierHits); break; case "alerts": - mergeOlder(setAlerts, LIVE_HOT_WINDOW); + mergeOlder(setAlertsHistory, alerts); break; case "inferred-dark": - mergeOlder(setInferredDark, LIVE_HOT_WINDOW); + mergeOlder(setInferredDarkHistory, inferredDark); break; } @@ -2839,9 +2973,44 @@ const useLiveSession = ( setHistoryLoading((current) => ({ ...current, [key]: false })); } }, - [enabled, manifest, historyCursors, historyLoading] + [ + enabled, + manifest, + historyCursors, + historyLoading, + options, + nbbo, + equities, + equityJoins, + flow, + smartMoney, + classifierHits, + alerts, + inferredDark + ] ); + useEffect(() => { + if (!enabled || pathname !== "/tape") { + return; + } + const scoped = manifest.filter( + (subscription) => + (subscription.channel === "options" && + (subscription.underlying_ids?.length || subscription.option_contract_id)) || + (subscription.channel === "equities" && subscription.underlying_ids?.length) + ); + if (scoped.length === 0) { + return; + } + for (const subscription of scoped) { + const key = getLiveSubscriptionKey(subscription); + if (historyCursors[key] && !historyLoading[key]) { + void loadOlder(subscription.channel); + } + } + }, [enabled, pathname, manifest, historyCursors, historyLoading, loadOlder]); + return { status, connectedAt, @@ -2852,6 +3021,15 @@ const useLiveSession = ( historyLoading, historyErrors, loadOlder, + optionsHistory, + nbboHistory, + equitiesHistory, + equityJoinsHistory, + flowHistory, + smartMoneyHistory, + classifierHitsHistory, + alertsHistory, + inferredDarkHistory, options, nbbo, equities, @@ -4435,6 +4613,7 @@ const useTerminalState = () => { enabled: mode === "live", sourceStatus: liveSession.status, sourceItems: liveSession.options, + historyTail: liveSession.optionsHistory, lastUpdate: liveSession.lastUpdate, freshnessMs: LIVE_OPTIONS_STALE_MS, retentionLimit: LIVE_HOT_WINDOW_OPTIONS, @@ -4447,6 +4626,7 @@ const useTerminalState = () => { enabled: mode === "live", sourceStatus: liveSession.status, sourceItems: liveSession.equities, + historyTail: liveSession.equitiesHistory, lastUpdate: liveSession.lastUpdate, freshnessMs: LIVE_EQUITIES_STALE_MS, captureScroll: equitiesAnchor.capture, @@ -4458,6 +4638,7 @@ const useTerminalState = () => { enabled: mode === "live", sourceStatus: liveSession.status, sourceItems: liveSession.flow, + historyTail: liveSession.flowHistory, lastUpdate: liveSession.lastUpdate, freshnessMs: LIVE_FLOW_STALE_MS, captureScroll: flowAnchor.capture, @@ -4469,26 +4650,26 @@ const useTerminalState = () => { const optionsFeed = mode === "live" ? liveOptions : options; const nbboFeed = - mode === "live" ? toStaticTapeState(liveSession.status, liveSession.nbbo, liveSession.lastUpdate) : nbbo; + mode === "live" ? toStaticTapeState(liveSession.status, [...liveSession.nbbo, ...liveSession.nbboHistory], liveSession.lastUpdate) : nbbo; const equitiesFeed = mode === "live" ? liveEquities : equities; const equityJoinsFeed = mode === "live" - ? toStaticTapeState(liveSession.status, liveSession.equityJoins, liveSession.lastUpdate) + ? toStaticTapeState(liveSession.status, [...liveSession.equityJoins, ...liveSession.equityJoinsHistory], liveSession.lastUpdate) : equityJoins; const flowFeed = mode === "live" ? liveFlow : flow; const alertsFeed = - mode === "live" ? toStaticTapeState(liveSession.status, liveSession.alerts, liveSession.lastUpdate) : alerts; + mode === "live" ? toStaticTapeState(liveSession.status, [...liveSession.alerts, ...liveSession.alertsHistory], liveSession.lastUpdate) : alerts; const classifierHitsFeed = mode === "live" - ? toStaticTapeState(liveSession.status, liveSession.classifierHits, liveSession.lastUpdate) + ? toStaticTapeState(liveSession.status, [...liveSession.classifierHits, ...liveSession.classifierHitsHistory], liveSession.lastUpdate) : classifierHits; const smartMoneyFeed = mode === "live" - ? toStaticTapeState(liveSession.status, liveSession.smartMoney, liveSession.lastUpdate) + ? toStaticTapeState(liveSession.status, [...liveSession.smartMoney, ...liveSession.smartMoneyHistory], liveSession.lastUpdate) : smartMoney; const inferredDarkFeed = mode === "live" - ? toStaticTapeState(liveSession.status, liveSession.inferredDark, liveSession.lastUpdate) + ? toStaticTapeState(liveSession.status, [...liveSession.inferredDark, ...liveSession.inferredDarkHistory], liveSession.lastUpdate) : inferredDark; useLayoutEffect(() => { @@ -4575,6 +4756,11 @@ const useTerminalState = () => { const [pinnedEquityJoinMap, setPinnedEquityJoinMap] = useState< Map> >(() => new Map()); + const [optionSupportSmartMoney, setOptionSupportSmartMoney] = useState([]); + const [optionSupportClassifierHits, setOptionSupportClassifierHits] = useState([]); + const [historicalNbboByTraceId, setHistoricalNbboByTraceId] = useState>( + () => new Map() + ); const resolvedOptionPrintMap = useMemo(() => { const merged = new Map(); @@ -4809,7 +4995,7 @@ const useTerminalState = () => { const classifierHitsByPacketId = useMemo(() => { const map = new Map(); - for (const hit of classifierHitsFeed.items) { + for (const hit of [...classifierHitsFeed.items, ...optionSupportClassifierHits]) { const packetId = extractPacketIdFromClassifierHitTrace(hit.trace_id); if (!packetId) { continue; @@ -4817,11 +5003,11 @@ const useTerminalState = () => { map.set(packetId, [...(map.get(packetId) ?? []), hit]); } return map; - }, [classifierHitsFeed.items, extractPacketIdFromClassifierHitTrace]); + }, [classifierHitsFeed.items, optionSupportClassifierHits, extractPacketIdFromClassifierHitTrace]); const smartMoneyByPacketId = useMemo(() => { const map = new Map(); - for (const event of smartMoneyFeed.items) { + for (const event of [...smartMoneyFeed.items, ...optionSupportSmartMoney]) { for (const packetId of event.packet_ids) { const existing = map.get(packetId); if (!existing || event.source_ts > existing.source_ts || event.seq > existing.seq) { @@ -4830,17 +5016,17 @@ const useTerminalState = () => { } } return map; - }, [smartMoneyFeed.items]); + }, [smartMoneyFeed.items, optionSupportSmartMoney]); const packetIdByOptionTraceId = useMemo(() => { const map = new Map(); - for (const packet of flowFeed.items) { + for (const packet of resolvedFlowPacketMap.values()) { for (const member of packet.members) { map.set(member, packet.id); } } return map; - }, [flowFeed.items]); + }, [resolvedFlowPacketMap]); const classifierDecorByOptionTraceId = useMemo(() => { const map = new Map(); @@ -4858,6 +5044,111 @@ const useTerminalState = () => { return map; }, [classifierHitsByPacketId, packetIdByOptionTraceId, smartMoneyByPacketId]); + useEffect(() => { + if (mode !== "live" || optionsFeed.items.length === 0) { + return; + } + + const traceIds: string[] = []; + const nbboContext: Array<{ trace_id: string; option_contract_id: string; ts: number }> = []; + for (const print of optionsFeed.items.slice(0, 1000)) { + if (!print.trace_id || classifierDecorByOptionTraceId.has(print.trace_id)) { + continue; + } + if (!packetIdByOptionTraceId.has(print.trace_id)) { + traceIds.push(print.trace_id); + } + const missingPreservedNbbo = + typeof print.execution_nbbo_side !== "string" && + typeof print.nbbo_side !== "string" && + !historicalNbboByTraceId.has(print.trace_id); + if (missingPreservedNbbo) { + nbboContext.push({ + trace_id: print.trace_id, + option_contract_id: print.option_contract_id, + ts: print.ts + }); + } + if (traceIds.length >= 250 && nbboContext.length >= 250) { + break; + } + } + + const uniqueTraceIds = Array.from(new Set(traceIds)).slice(0, 250); + const uniqueNbboContext = Array.from( + new Map(nbboContext.map((item) => [item.trace_id, item])).values() + ).slice(0, 250); + if (uniqueTraceIds.length === 0 && uniqueNbboContext.length === 0) { + return; + } + + let cancelled = false; + void fetch(buildApiUrl("/lookup/options-support"), { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + trace_ids: uniqueTraceIds, + nbbo_context: uniqueNbboContext + }) + }) + .then(async (response) => { + if (!response.ok) { + throw new Error(await readErrorDetail(response)); + } + return response.json() as Promise<{ + packets?: FlowPacket[]; + smart_money?: SmartMoneyEvent[]; + classifier_hits?: ClassifierHitEvent[]; + nbbo_by_trace_id?: Record; + }>; + }) + .then((payload) => { + if (cancelled) { + return; + } + const now = Date.now(); + const packetMap = new Map(); + for (const packet of payload.packets ?? []) { + packetMap.set(packet.id, packet); + } + if (packetMap.size > 0) { + setPinnedFlowPacketMap((prev) => upsertPinnedEntries(prev, packetMap, now)); + } + if (payload.smart_money?.length) { + setOptionSupportSmartMoney((prev) => + mergeNewest(payload.smart_money ?? [], prev, PINNED_EVIDENCE_MAX_ITEMS) + ); + } + if (payload.classifier_hits?.length) { + setOptionSupportClassifierHits((prev) => + mergeNewest(payload.classifier_hits ?? [], prev, PINNED_EVIDENCE_MAX_ITEMS) + ); + } + if (payload.nbbo_by_trace_id) { + setHistoricalNbboByTraceId((prev) => { + const next = new Map(prev); + for (const [traceId, quote] of Object.entries(payload.nbbo_by_trace_id ?? {})) { + next.set(traceId, quote); + } + return next; + }); + } + }) + .catch((error) => { + console.warn("Failed to hydrate option row support", error); + }); + + return () => { + cancelled = true; + }; + }, [ + mode, + optionsFeed.items, + classifierDecorByOptionTraceId, + packetIdByOptionTraceId, + historicalNbboByTraceId + ]); + const selectedClassifierPacketId = useMemo(() => { if (!selectedClassifierHit) { return null; @@ -5456,6 +5747,7 @@ const useTerminalState = () => { tickerSet, chartTicker, nbboMap, + historicalNbboByTraceId, optionPrintMap: resolvedOptionPrintMap, equityPrintMap, equityJoinMap: resolvedEquityJoinMap, @@ -5808,6 +6100,9 @@ const OptionsPane = ({ limit }: OptionsPaneProps) => { const state = useTerminal(); const items = limit ? state.filteredOptions.slice(0, limit) : state.filteredOptions; const virtual = useVirtualList(items, state.optionsScroll.listRef, !limit, 36); + useBottomHistoryGate(state.optionsScroll.listRef, state.mode === "live" && !limit, () => + void state.liveSession.loadOlder("options") + ); return ( { const contractId = normalizeContractId(print.option_contract_id); const parsed = parseOptionContractId(contractId); const contractDisplay = formatOptionContractLabel(contractId); - const quote = state.nbboMap.get(contractId); + const quote = state.historicalNbboByTraceId.get(print.trace_id) ?? state.nbboMap.get(contractId); const hasPreservedNbbo = typeof print.execution_nbbo_side === "string"; const nbboSide = print.execution_nbbo_side ?? @@ -5982,6 +6277,9 @@ const EquitiesPane = ({ limit }: EquitiesPaneProps) => { const state = useTerminal(); const items = limit ? state.filteredEquities.slice(0, limit) : state.filteredEquities; const virtual = useVirtualList(items, state.equitiesScroll.listRef, !limit, 36); + useBottomHistoryGate(state.equitiesScroll.listRef, state.mode === "live" && !limit, () => + void state.liveSession.loadOlder("equities") + ); return ( { const state = useTerminal(); const items = limit ? state.filteredFlow.slice(0, limit) : state.filteredFlow; const virtual = useVirtualList(items, state.flowScroll.listRef, !limit, 44); + useBottomHistoryGate(state.flowScroll.listRef, state.mode === "live" && !limit, () => + void state.liveSession.loadOlder("flow") + ); return ( const state = useTerminal(); const items = limit ? state.filteredAlerts.slice(0, limit) : state.filteredAlerts; const virtual = useVirtualList(items, state.alertsScroll.listRef, !limit, 46); + useBottomHistoryGate(state.alertsScroll.listRef, state.mode === "live" && !limit, () => + void state.liveSession.loadOlder("alerts") + ); return ( { const state = useTerminal(); + useBottomHistoryGate(state.classifierScroll.listRef, state.mode === "live" && !limit, () => { + void state.liveSession.loadOlder("smart-money"); + void state.liveSession.loadOlder("classifier-hits"); + }); const smartMoneyItems = limit ? state.filteredSmartMoneyEvents.slice(0, limit) : state.filteredSmartMoneyEvents; const legacyItems = smartMoneyItems.length === 0 @@ -6438,6 +6746,9 @@ const DarkPane = ({ limit, className }: DarkPaneProps) => { const state = useTerminal(); const items = limit ? state.filteredInferredDark.slice(0, limit) : state.filteredInferredDark; const virtual = useVirtualList(items, state.darkScroll.listRef, !limit, 44); + useBottomHistoryGate(state.darkScroll.listRef, state.mode === "live" && !limit, () => + void state.liveSession.loadOlder("inferred-dark") + ); return ( => { + const ids = Array.from(new Set(traceIds.map((id) => id.trim()).filter(Boolean))); + if (ids.length === 0) { + return []; + } + + const memberPredicates = ids.map((id) => `has(members, ${quoteString(id)})`); + const result = await client.query({ + query: `SELECT * FROM ${FLOW_PACKETS_TABLE} WHERE ${memberPredicates.join(" OR ")} ORDER BY source_ts DESC, seq DESC LIMIT ${clampLookupLimit(ids.length * 4)}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeFlowPacketRow) + .filter((record): record is FlowPacketRecord => record !== null); + return FlowPacketSchema.array().parse(records.map(fromFlowPacketRecord)); +}; + +export const fetchSmartMoneyEventsByPacketIds = async ( + client: ClickHouseClient, + packetIds: string[] +): Promise => { + const ids = Array.from(new Set(packetIds.map((id) => id.trim()).filter(Boolean))); + if (ids.length === 0) { + return []; + } + + const packetPredicates = ids.map((id) => `has(packet_ids, ${quoteString(id)})`); + const result = await client.query({ + query: `SELECT * FROM ${SMART_MONEY_EVENTS_TABLE} WHERE ${packetPredicates.join(" OR ")} ORDER BY source_ts DESC, seq DESC LIMIT ${clampLookupLimit(ids.length * 4)}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeSmartMoneyEventRow) + .filter((record): record is SmartMoneyEventRecord => record !== null); + return SmartMoneyEventSchema.array().parse(records.map(fromSmartMoneyEventRecord)); +}; + +export const fetchClassifierHitsByPacketIds = async ( + client: ClickHouseClient, + packetIds: string[] +): Promise => { + const ids = Array.from(new Set(packetIds.map((id) => id.trim()).filter(Boolean))); + if (ids.length === 0) { + return []; + } + + const tracePredicates = ids.map((id) => `position(trace_id, ${quoteString(id)}) > 0`); + const result = await client.query({ + query: `SELECT * FROM ${CLASSIFIER_HITS_TABLE} WHERE ${tracePredicates.join(" OR ")} ORDER BY source_ts DESC, seq DESC LIMIT ${clampLookupLimit(ids.length * 4)}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeClassifierHitRow) + .filter((record): record is ClassifierHitRecord => record !== null); + return ClassifierHitEventSchema.array().parse(records.map(fromClassifierHitRecord)); +}; + +export const fetchNearestOptionNBBOForPrints = async ( + client: ClickHouseClient, + inputs: Array<{ trace_id: string; option_contract_id: string; ts: number }> +): Promise> => { + const normalized = inputs + .map((item) => ({ + trace_id: item.trace_id.trim(), + option_contract_id: item.option_contract_id.trim(), + ts: clampCursor(item.ts) + })) + .filter((item) => item.trace_id && item.option_contract_id); + if (normalized.length === 0) { + return {}; + } + + const byTraceId: Record = Object.fromEntries( + normalized.map((item) => [item.trace_id, null]) + ); + await Promise.all( + normalized.map(async (item) => { + const result = await client.query({ + query: `SELECT * FROM ${OPTION_NBBO_TABLE} WHERE option_contract_id = ${quoteString(item.option_contract_id)} AND ts <= ${item.ts} ORDER BY ts DESC, seq DESC LIMIT 1`, + format: "JSONEachRow" + }); + const rows = await result.json(); + const quote = OptionNBBOSchema.array().parse(rows.map(normalizeOptionNbboRow))[0] ?? null; + byTraceId[item.trace_id] = quote; + }) + ); + return byTraceId; +}; + export const fetchOptionPrintsByTraceIds = async ( client: ClickHouseClient, traceIds: string[] diff --git a/packages/types/src/live.ts b/packages/types/src/live.ts index 37fe7c8..01fe4af 100644 --- a/packages/types/src/live.ts +++ b/packages/types/src/live.ts @@ -60,21 +60,26 @@ export const LiveSubscriptionSchema = z.discriminatedUnion("channel", [ channel: z.literal("options"), filters: OptionFlowFiltersSchema.optional(), underlying_ids: z.array(z.string().min(1)).optional(), - option_contract_id: z.string().min(1).optional() + option_contract_id: z.string().min(1).optional(), + snapshot_limit: z.number().int().positive().optional() }), z.object({ channel: z.literal("flow"), - filters: OptionFlowFiltersSchema.optional() + filters: OptionFlowFiltersSchema.optional(), + snapshot_limit: z.number().int().positive().optional() }), z.object({ - channel: z.literal("smart-money") + channel: z.literal("smart-money"), + snapshot_limit: z.number().int().positive().optional() }), z.object({ - channel: z.enum(["nbbo", "equity-quotes", "equity-joins", "classifier-hits", "alerts", "inferred-dark"]) + channel: z.enum(["nbbo", "equity-quotes", "equity-joins", "classifier-hits", "alerts", "inferred-dark"]), + snapshot_limit: z.number().int().positive().optional() }), z.object({ channel: z.literal("equities"), - underlying_ids: z.array(z.string().min(1)).optional() + underlying_ids: z.array(z.string().min(1)).optional(), + snapshot_limit: z.number().int().positive().optional() }), z.object({ channel: z.literal("equity-candles"), diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 031da57..c450ea7 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -49,6 +49,7 @@ import { fetchSmartMoneyEventsBefore, fetchFlowPacketsAfter, fetchFlowPacketById, + fetchFlowPacketsByMemberTraceIds, fetchFlowPacketsBefore, fetchRecentAlerts, fetchRecentClassifierHits, @@ -76,6 +77,9 @@ import { fetchOptionPrintsBefore, fetchOptionPrintsAfter, fetchOptionPrintsByTraceIds, + fetchNearestOptionNBBOForPrints, + fetchSmartMoneyEventsByPacketIds, + fetchClassifierHitsByPacketIds, fetchRecentOptionPrints } from "@islandflow/storage"; import type { EquityPrintQueryFilters, OptionPrintQueryFilters } from "@islandflow/storage"; @@ -303,6 +307,28 @@ const jsonResponse = (body: unknown, status = 200): Response => { }); }; +const readJsonBody = async (req: Request): Promise => { + const text = await req.text(); + if (!text.trim()) { + return {}; + } + return JSON.parse(text); +}; + +const optionsSupportLookupSchema = z.object({ + trace_ids: z.array(z.string().min(1)).default([]), + nbbo_context: z + .array( + z.object({ + trace_id: z.string().min(1), + option_contract_id: z.string().min(1), + ts: z.number().int().nonnegative() + }) + ) + .optional() + .default([]) +}); + const parseLimit = (value: string | null): number => { if (value === null) { return env.REST_DEFAULT_LIMIT; @@ -1608,6 +1634,33 @@ const run = async () => { return jsonResponse({ data }); } + if (req.method === "POST" && url.pathname === "/lookup/options-support") { + try { + const body = optionsSupportLookupSchema.parse(await readJsonBody(req)); + const packets = await fetchFlowPacketsByMemberTraceIds(clickhouse, body.trace_ids); + const packetIds = packets.map((packet) => packet.id); + const [smartMoney, classifierHits, nbboByTraceId] = await Promise.all([ + fetchSmartMoneyEventsByPacketIds(clickhouse, packetIds), + fetchClassifierHitsByPacketIds(clickhouse, packetIds), + fetchNearestOptionNBBOForPrints(clickhouse, body.nbbo_context) + ]); + return jsonResponse({ + packets, + smart_money: smartMoney, + classifier_hits: classifierHits, + nbbo_by_trace_id: nbboByTraceId + }); + } catch (error) { + return jsonResponse( + { + error: "invalid options support lookup", + detail: error instanceof Error ? error.message : String(error) + }, + 400 + ); + } + } + if (req.method === "GET" && url.pathname === "/equity-joins/by-id") { const ids = url.searchParams.getAll("id"); const data = await fetchEquityPrintJoinsByIds(clickhouse, ids); diff --git a/services/api/src/live.ts b/services/api/src/live.ts index 74276ec..aa4281c 100644 --- a/services/api/src/live.ts +++ b/services/api/src/live.ts @@ -327,6 +327,14 @@ const nextBeforeForItems = (items: T[], cursorOf: (item: T) => Cursor): Curso return last ? cursorOf(last) : null; }; +const snapshotLimitFor = (subscription: LiveSubscription, configuredLimit: number): number => { + const requested = "snapshot_limit" in subscription ? subscription.snapshot_limit : undefined; + if (!requested) { + return configuredLimit; + } + return Math.max(1, Math.min(configuredLimit, Math.floor(requested))); +}; + const candleRedisKey = (underlyingId: string, intervalMs: number): string => `live:equity-candles:${underlyingId}:${intervalMs}`; @@ -448,6 +456,7 @@ export class LiveStateManager { const scoped = Boolean(subscription.underlying_ids?.length) || Boolean(subscription.option_contract_id); if (subscription.filters?.view === "raw" || scoped) { + const limit = snapshotLimitFor(subscription, this.generic.options.limit); const storageFilters: OptionPrintQueryFilters = { view: subscription.filters?.view ?? "signal", security: @@ -463,7 +472,7 @@ export class LiveStateManager { }; const items = await fetchRecentOptionPrints( this.clickhouse, - this.generic.options.limit, + limit, undefined, storageFilters ); @@ -476,10 +485,11 @@ export class LiveStateManager { } const config = this.generic.options; + const limit = snapshotLimitFor(subscription, config.limit); const items = (this.genericItems.get("options") ?? []).filter((item) => isWithinLiveFeedLookback("options", item) && matchesOptionPrintFilters(item, subscription.filters) - ); + ).slice(0, limit); return { subscription, items, @@ -489,10 +499,11 @@ export class LiveStateManager { } case "flow": { const config = this.generic.flow; + const limit = snapshotLimitFor(subscription, config.limit); const items = (this.genericItems.get("flow") ?? []).filter((item) => isWithinLiveFeedLookback("flow", item) && matchesFlowPacketFilters(item, subscription.filters) - ); + ).slice(0, limit); return { subscription, items, @@ -502,12 +513,13 @@ export class LiveStateManager { } case "equities": { const config = this.generic.equities; + const limit = snapshotLimitFor(subscription, config.limit); if (subscription.underlying_ids?.length) { const filters: EquityPrintQueryFilters = { underlyingIds: subscription.underlying_ids, sinceTs: Date.now() - LIVE_FEED_LOOKBACK_MS }; - const items = await fetchRecentEquityPrints(this.clickhouse, config.limit, filters); + const items = await fetchRecentEquityPrints(this.clickhouse, limit, filters); return { subscription, items, @@ -517,7 +529,7 @@ export class LiveStateManager { } const items = (this.genericItems.get("equities") ?? []).filter((item) => isWithinLiveFeedLookback("equities", item) - ); + ).slice(0, limit); return { subscription, items, @@ -555,9 +567,10 @@ export class LiveStateManager { } default: { const config = this.generic[subscription.channel]; + const limit = snapshotLimitFor(subscription, config.limit); const items = (this.genericItems.get(subscription.channel) ?? []).filter((item) => isWithinLiveFeedLookback(subscription.channel, item) - ); + ).slice(0, limit); return { subscription, items,