diff --git a/apps/web/app/terminal.tsx b/apps/web/app/terminal.tsx index 1b19394..7f3b242 100644 --- a/apps/web/app/terminal.tsx +++ b/apps/web/app/terminal.tsx @@ -16,11 +16,14 @@ import { import type { AlertEvent, ClassifierHitEvent, + Cursor, EquityCandle, EquityPrint, EquityPrintJoin, FlowPacket, InferredDarkEvent, + LiveServerMessage, + LiveSubscription, OptionNBBO, OptionPrint } from "@islandflow/types"; @@ -692,6 +695,7 @@ type TapeConfig = { wsPath: string; replayPath: string; latestPath?: string; + liveEnabled?: boolean; expectedType: MessageType; batchSize?: number; pollMs?: number; @@ -841,7 +845,7 @@ const useTape = ( }, [mode, latestPath, getItemTs, replaySourceKey]); useEffect(() => { - if (mode !== "live") { + if (mode !== "live" || config.liveEnabled === false) { return; } @@ -1086,6 +1090,21 @@ const useTape = ( }; }; +const toStaticTapeState = ( + status: WsStatus, + items: T[], + lastUpdate: number | null +): TapeState => ({ + status, + items, + lastUpdate, + replayTime: null, + replayComplete: false, + paused: false, + dropped: 0, + togglePause: () => {} +}); + const useLiveStream = ( config: { enabled: boolean; @@ -1311,6 +1330,310 @@ const useFlowStream = ( }); }; +type LiveSessionState = { + status: WsStatus; + lastUpdate: number | null; + options: OptionPrint[]; + nbbo: OptionNBBO[]; + equities: EquityPrint[]; + equityJoins: EquityPrintJoin[]; + flow: FlowPacket[]; + classifierHits: ClassifierHitEvent[]; + alerts: AlertEvent[]; + inferredDark: InferredDarkEvent[]; + chartCandles: EquityCandle[]; + chartOverlay: EquityPrint[]; +}; + +const getLiveSubscriptionKey = (subscription: LiveSubscription): string => { + switch (subscription.channel) { + case "equity-candles": + return `${subscription.channel}|${subscription.underlying_id}|${subscription.interval_ms}`; + case "equity-overlay": + return `${subscription.channel}|${subscription.underlying_id}`; + default: + return subscription.channel; + } +}; + +const getLiveManifest = ( + pathname: string, + chartTicker: string, + chartIntervalMs: number +): LiveSubscription[] => { + const chartSubs: LiveSubscription[] = [ + { channel: "equity-candles", underlying_id: chartTicker, interval_ms: chartIntervalMs }, + { channel: "equity-overlay", underlying_id: chartTicker } + ]; + + if (pathname === "/tape") { + return [ + { channel: "options" }, + { channel: "nbbo" }, + { channel: "equities" }, + { channel: "flow" } + ]; + } + + if (pathname === "/signals") { + return [{ channel: "alerts" }, { channel: "classifier-hits" }, { channel: "inferred-dark" }]; + } + + if (pathname === "/charts") { + return [...chartSubs, { channel: "classifier-hits" }, { channel: "inferred-dark" }]; + } + + if (pathname === "/replay") { + return []; + } + + return [ + { channel: "equities" }, + { channel: "flow" }, + { channel: "alerts" }, + { channel: "classifier-hits" }, + { channel: "inferred-dark" }, + ...chartSubs + ]; +}; + +const useLiveSession = ( + enabled: boolean, + pathname: string, + chartTicker: string, + chartIntervalMs: number +): LiveSessionState => { + const [status, setStatus] = useState(enabled ? "connecting" : "disconnected"); + const [lastUpdate, setLastUpdate] = useState(null); + const [options, setOptions] = useState([]); + const [nbbo, setNbbo] = useState([]); + const [equities, setEquities] = useState([]); + const [equityJoins, setEquityJoins] = useState([]); + const [flow, setFlow] = useState([]); + const [classifierHits, setClassifierHits] = useState([]); + const [alerts, setAlerts] = useState([]); + const [inferredDark, setInferredDark] = useState([]); + const [chartCandles, setChartCandles] = useState([]); + const [chartOverlay, setChartOverlay] = useState([]); + const socketRef = useRef(null); + const reconnectRef = useRef(null); + const subscribedKeysRef = useRef>(new Set()); + const subscribedMapRef = useRef>(new Map()); + const manifest = useMemo( + () => getLiveManifest(pathname, chartTicker.toUpperCase(), chartIntervalMs), + [pathname, chartTicker, chartIntervalMs] + ); + + useEffect(() => { + if (!enabled) { + setStatus("disconnected"); + setLastUpdate(null); + setOptions([]); + setNbbo([]); + setEquities([]); + setEquityJoins([]); + setFlow([]); + setClassifierHits([]); + setAlerts([]); + setInferredDark([]); + setChartCandles([]); + setChartOverlay([]); + subscribedKeysRef.current = new Set(); + subscribedMapRef.current = new Map(); + if (socketRef.current) { + socketRef.current.close(); + socketRef.current = null; + } + if (reconnectRef.current !== null) { + window.clearTimeout(reconnectRef.current); + reconnectRef.current = null; + } + return; + } + + let active = true; + + const syncSubscriptions = (socket: WebSocket) => { + const nextKeys = new Set(manifest.map(getLiveSubscriptionKey)); + const nextMap = new Map(manifest.map((sub) => [getLiveSubscriptionKey(sub), sub])); + const currentKeys = subscribedKeysRef.current; + const toSubscribe = manifest.filter((sub) => !currentKeys.has(getLiveSubscriptionKey(sub))); + const toUnsubscribe = Array.from(currentKeys) + .filter((key) => !nextKeys.has(key)) + .map((key) => subscribedMapRef.current.get(key) ?? null) + .filter((sub): sub is LiveSubscription => sub !== null); + + if (toUnsubscribe.length > 0) { + socket.send(JSON.stringify({ op: "unsubscribe", subscriptions: toUnsubscribe })); + } + if (toSubscribe.length > 0) { + socket.send(JSON.stringify({ op: "subscribe", subscriptions: toSubscribe })); + } + subscribedKeysRef.current = nextKeys; + subscribedMapRef.current = nextMap; + }; + + const handleMessage = (message: LiveServerMessage) => { + if (message.op === "ready" || message.op === "heartbeat") { + return; + } + if (message.op === "error") { + console.warn("Live socket error", message.message); + return; + } + + const subscription = message.op === "snapshot" ? message.snapshot.subscription : message.subscription; + const items = message.op === "snapshot" ? message.snapshot.items : [message.item]; + const updateAt = Date.now(); + + const mergeItems = ( + setter: React.Dispatch>, + nextItems: T[] + ) => { + setter((prev) => + message.op === "snapshot" ? (nextItems as T[]) : mergeNewest(nextItems as T[], prev) + ); + }; + + switch (subscription.channel) { + case "options": + mergeItems(setOptions, items as OptionPrint[]); + break; + case "nbbo": + mergeItems(setNbbo, items as OptionNBBO[]); + break; + case "equities": + mergeItems(setEquities, items as EquityPrint[]); + break; + case "equity-joins": + mergeItems(setEquityJoins, items as EquityPrintJoin[]); + break; + case "flow": + mergeItems(setFlow, items as FlowPacket[]); + break; + case "classifier-hits": + mergeItems(setClassifierHits, items as ClassifierHitEvent[]); + break; + case "alerts": + mergeItems(setAlerts, items as AlertEvent[]); + break; + case "inferred-dark": + mergeItems(setInferredDark, items as InferredDarkEvent[]); + break; + case "equity-candles": + mergeItems(setChartCandles, items as EquityCandle[]); + break; + case "equity-overlay": + mergeItems(setChartOverlay, items as EquityPrint[]); + break; + } + + setLastUpdate(updateAt); + }; + + const connect = () => { + if (!active) { + return; + } + setStatus("connecting"); + const socket = new WebSocket(buildWsUrl("/ws/live")); + socketRef.current = socket; + + socket.onopen = () => { + if (!active) { + return; + } + setStatus("connected"); + syncSubscriptions(socket); + }; + + socket.onmessage = (event) => { + if (!active) { + return; + } + try { + const parsed = JSON.parse(event.data) as LiveServerMessage; + handleMessage(parsed); + } catch (error) { + console.warn("Failed to parse live session payload", error); + } + }; + + socket.onclose = () => { + if (!active) { + return; + } + setStatus("disconnected"); + subscribedKeysRef.current = new Set(); + subscribedMapRef.current = new Map(); + reconnectRef.current = window.setTimeout(connect, 1000); + }; + + socket.onerror = () => { + if (!active) { + return; + } + setStatus("disconnected"); + socket.close(); + }; + }; + + connect(); + + return () => { + active = false; + if (reconnectRef.current !== null) { + window.clearTimeout(reconnectRef.current); + } + if (socketRef.current) { + socketRef.current.close(); + } + }; + }, [enabled]); + + useEffect(() => { + const socket = socketRef.current; + if (!enabled || !socket || socket.readyState !== WebSocket.OPEN) { + return; + } + + const nextKeys = new Set(manifest.map(getLiveSubscriptionKey)); + const nextMap = new Map(manifest.map((sub) => [getLiveSubscriptionKey(sub), sub])); + const currentKeys = subscribedKeysRef.current; + const toSubscribe = manifest.filter((sub) => !currentKeys.has(getLiveSubscriptionKey(sub))); + const removedKeys = Array.from(currentKeys).filter((key) => !nextKeys.has(key)); + + if (removedKeys.length > 0) { + const removedSubs = removedKeys + .map((key) => subscribedMapRef.current.get(key) ?? null) + .filter((sub): sub is LiveSubscription => sub !== null); + if (removedSubs.length > 0) { + socket.send(JSON.stringify({ op: "unsubscribe", subscriptions: removedSubs })); + } + } + if (toSubscribe.length > 0) { + socket.send(JSON.stringify({ op: "subscribe", subscriptions: toSubscribe })); + } + subscribedKeysRef.current = nextKeys; + subscribedMapRef.current = nextMap; + }, [enabled, manifest]); + + return { + status, + lastUpdate, + options, + nbbo, + equities, + equityJoins, + flow, + classifierHits, + alerts, + inferredDark, + chartCandles, + chartOverlay + }; +}; + type TapeStatusProps = { status: WsStatus; lastUpdate: number | null; @@ -1377,6 +1700,8 @@ type CandleChartProps = { intervalMs: number; mode: TapeMode; replayTime?: number | null; + liveCandles?: EquityCandle[]; + liveOverlayPrints?: EquityPrint[]; classifierHits: ClassifierHitEvent[]; inferredDark: InferredDarkEvent[]; onClassifierHitClick: (hit: ClassifierHitEvent) => void; @@ -1392,6 +1717,8 @@ const CandleChart = ({ intervalMs, mode, replayTime = null, + liveCandles = [], + liveOverlayPrints = [], classifierHits, inferredDark, onClassifierHitClick, @@ -1985,156 +2312,30 @@ const CandleChart = ({ return; } - let active = true; + if (mode !== "live" || !seriesRef.current) { + return; + } - const connect = () => { - if (!active) { - return; - } - - setStatus("connecting"); - const socket = new WebSocket(buildWsUrl("/ws/equity-candles")); - socketRef.current = socket; - - socket.onopen = () => { - if (!active) { - return; - } + const sortedCandles = [...liveCandles].sort((a, b) => (a.ts - b.ts) || (a.seq - b.seq)); + if (sortedCandles.length > 0) { + seriesRef.current.setData(sortedCandles.map(toChartCandle)); + const last = sortedCandles.at(-1); + if (last) { + lastCandleRef.current = { time: toChartTime(last.ts), seq: last.seq }; + setHasData(true); + setLastUpdate(last.ingest_ts ?? last.ts); setStatus("connected"); - }; - - socket.onmessage = (event) => { - if (!active || !seriesRef.current) { - return; - } - - try { - const message = JSON.parse(event.data) as StreamMessage; - if (!message || message.type !== "equity-candle") { - return; - } - - const candle = message.payload; - if (candle.underlying_id !== ticker || candle.interval_ms !== intervalMs) { - return; - } - - const chartCandle = toChartCandle(candle); - const last = lastCandleRef.current; - if (last) { - if (chartCandle.time < last.time) { - return; - } - if (chartCandle.time === last.time && candle.seq <= last.seq) { - return; - } - } - - seriesRef.current.update(chartCandle); - lastCandleRef.current = { time: chartCandle.time, seq: candle.seq }; - setHasData(true); - setLastUpdate(candle.ingest_ts ?? candle.ts); - drawOverlay([...overlayDataRef.current, ...overlayLiveRef.current]); - } catch (error) { - console.warn("Failed to parse candle payload", error); - } - }; - - socket.onclose = () => { - if (!active) { - return; - } - setStatus("disconnected"); - reconnectRef.current = window.setTimeout(connect, 1000); - }; - - socket.onerror = () => { - if (!active) { - return; - } - setStatus("disconnected"); - socket.close(); - }; - }; - - const connectOverlay = () => { - if (!active) { - return; } + } - const socket = new WebSocket(buildWsUrl("/ws/equities")); - overlaySocketRef.current = socket; - - socket.onmessage = (event) => { - if (!active) { - return; - } - - try { - const message = JSON.parse(event.data) as StreamMessage; - if (!message || message.type !== "equity-print") { - return; - } - - const print = message.payload; - if (print.underlying_id !== ticker) { - return; - } - - overlayLiveRef.current.push({ - ts: print.ts, - price: print.price, - size: print.size, - offExchangeFlag: print.offExchangeFlag - }); - - if (overlayLiveRef.current.length > 1500) { - overlayLiveRef.current = overlayLiveRef.current.slice(-1500); - } - - drawOverlay([...overlayDataRef.current, ...overlayLiveRef.current]); - } catch (error) { - console.warn("Failed to parse equity print payload", error); - } - }; - - socket.onclose = () => { - if (!active) { - return; - } - overlayReconnectRef.current = window.setTimeout(connectOverlay, 1500); - }; - - socket.onerror = () => { - if (!active) { - return; - } - socket.close(); - }; - }; - - connect(); - connectOverlay(); - - return () => { - active = false; - if (reconnectRef.current !== null) { - window.clearTimeout(reconnectRef.current); - reconnectRef.current = null; - } - if (socketRef.current) { - socketRef.current.close(); - } - - if (overlayReconnectRef.current !== null) { - window.clearTimeout(overlayReconnectRef.current); - overlayReconnectRef.current = null; - } - if (overlaySocketRef.current) { - overlaySocketRef.current.close(); - } - }; - }, [ready, mode, ticker, intervalMs, drawOverlay]); + overlayLiveRef.current = liveOverlayPrints.map((print) => ({ + ts: print.ts, + price: print.price, + size: print.size, + offExchangeFlag: print.offExchangeFlag + })); + drawOverlay([...overlayDataRef.current, ...overlayLiveRef.current]); + }, [ready, mode, liveCandles, liveOverlayPrints, drawOverlay]); useEffect(() => { if (!chartRef.current) { @@ -2623,6 +2824,7 @@ const formatFlowMetric = (value: number, suffix?: string): string => { }; const useTerminalState = () => { + const pathname = usePathname(); const [mode, setMode] = useState("live"); const [replaySource, setReplaySource] = useState(null); const [selectedAlert, setSelectedAlert] = useState(null); @@ -2630,6 +2832,16 @@ const useTerminalState = () => { const [selectedClassifierHit, setSelectedClassifierHit] = useState(null); const [filterInput, setFilterInput] = useState(""); const [chartIntervalMs, setChartIntervalMs] = useState(CANDLE_INTERVALS[0].ms); + const activeTickers = useMemo(() => { + const parts = filterInput + .split(/[,\s]+/) + .map((value) => value.trim().toUpperCase()) + .filter(Boolean); + return Array.from(new Set(parts)); + }, [filterInput]); + const tickerSet = useMemo(() => new Set(activeTickers), [activeTickers]); + const chartTicker = useMemo(() => activeTickers[0] ?? "SPY", [activeTickers]); + const liveSession = useLiveSession(mode === "live", pathname, chartTicker, chartIntervalMs); const handleReplaySource = useCallback((value: string | null) => { setReplaySource(value); @@ -2658,6 +2870,7 @@ const useTerminalState = () => { const options = useTape({ mode, + liveEnabled: false, wsPath: "/ws/options", replayPath: "/replay/options", latestPath: "/prints/options", @@ -2672,6 +2885,7 @@ const useTerminalState = () => { const equities = useTape({ mode, + liveEnabled: false, wsPath: "/ws/equities", replayPath: "/replay/equities", latestPath: "/prints/equities", @@ -2684,6 +2898,7 @@ const useTerminalState = () => { const equityJoins = useTape({ mode, + liveEnabled: false, wsPath: "/ws/equity-joins", replayPath: "/replay/equity-joins", latestPath: "/joins/equities", @@ -2695,6 +2910,7 @@ const useTerminalState = () => { const nbbo = useTape({ mode, + liveEnabled: false, wsPath: "/ws/options-nbbo", replayPath: "/replay/nbbo", latestPath: "/nbbo/options", @@ -2707,6 +2923,7 @@ const useTerminalState = () => { const inferredDark = useTape({ mode, + liveEnabled: false, wsPath: "/ws/inferred-dark", replayPath: "/replay/inferred-dark", latestPath: "/dark/inferred", @@ -2720,6 +2937,7 @@ const useTerminalState = () => { const flow = useTape({ mode, + liveEnabled: false, wsPath: "/ws/flow", replayPath: "/replay/flow", latestPath: "/flow/packets", @@ -2732,6 +2950,7 @@ const useTerminalState = () => { }); const alerts = useTape({ mode, + liveEnabled: false, wsPath: "/ws/alerts", replayPath: "/replay/alerts", latestPath: "/flow/alerts", @@ -2744,6 +2963,7 @@ const useTerminalState = () => { }); const classifierHits = useTape({ mode, + liveEnabled: false, wsPath: "/ws/classifier-hits", replayPath: "/replay/classifier-hits", latestPath: "/flow/classifier-hits", @@ -2755,44 +2975,60 @@ const useTerminalState = () => { getReplayKey: disableReplayGrouping }); + const optionsFeed = + mode === "live" + ? toStaticTapeState(liveSession.status, liveSession.options, liveSession.lastUpdate) + : options; + const nbboFeed = + mode === "live" ? toStaticTapeState(liveSession.status, liveSession.nbbo, liveSession.lastUpdate) : nbbo; + const equitiesFeed = + mode === "live" + ? toStaticTapeState(liveSession.status, liveSession.equities, liveSession.lastUpdate) + : equities; + const equityJoinsFeed = + mode === "live" + ? toStaticTapeState(liveSession.status, liveSession.equityJoins, liveSession.lastUpdate) + : equityJoins; + const flowFeed = + mode === "live" ? toStaticTapeState(liveSession.status, liveSession.flow, liveSession.lastUpdate) : flow; + const alertsFeed = + mode === "live" ? toStaticTapeState(liveSession.status, liveSession.alerts, liveSession.lastUpdate) : alerts; + const classifierHitsFeed = + mode === "live" + ? toStaticTapeState(liveSession.status, liveSession.classifierHits, liveSession.lastUpdate) + : classifierHits; + const inferredDarkFeed = + mode === "live" + ? toStaticTapeState(liveSession.status, liveSession.inferredDark, liveSession.lastUpdate) + : inferredDark; + useLayoutEffect(() => { optionsAnchor.apply(); - }, [options.items, optionsAnchor.apply]); + }, [optionsFeed.items, optionsAnchor.apply]); useLayoutEffect(() => { equitiesAnchor.apply(); - }, [equities.items, equitiesAnchor.apply]); + }, [equitiesFeed.items, equitiesAnchor.apply]); useLayoutEffect(() => { flowAnchor.apply(); - }, [flow.items, flowAnchor.apply]); + }, [flowFeed.items, flowAnchor.apply]); useLayoutEffect(() => { darkAnchor.apply(); - }, [inferredDark.items, darkAnchor.apply]); + }, [inferredDarkFeed.items, darkAnchor.apply]); useLayoutEffect(() => { alertsAnchor.apply(); - }, [alerts.items, alertsAnchor.apply]); + }, [alertsFeed.items, alertsAnchor.apply]); useLayoutEffect(() => { classifierAnchor.apply(); - }, [classifierHits.items, classifierAnchor.apply]); - - const activeTickers = useMemo(() => { - const parts = filterInput - .split(/[,\s]+/) - .map((value) => value.trim().toUpperCase()) - .filter(Boolean); - return Array.from(new Set(parts)); - }, [filterInput]); - - const tickerSet = useMemo(() => new Set(activeTickers), [activeTickers]); - const chartTicker = useMemo(() => activeTickers[0] ?? "SPY", [activeTickers]); + }, [classifierHitsFeed.items, classifierAnchor.apply]); const nbboMap = useMemo(() => { const map = new Map(); - for (const quote of nbbo.items) { + for (const quote of nbboFeed.items) { const contractId = normalizeContractId(quote.option_contract_id); const existing = map.get(contractId); if ( @@ -2804,43 +3040,142 @@ const useTerminalState = () => { } } return map; - }, [nbbo.items]); + }, [nbboFeed.items]); const optionPrintMap = useMemo(() => { const map = new Map(); - for (const print of options.items) { + for (const print of optionsFeed.items) { if (print.trace_id) { map.set(print.trace_id, print); } } return map; - }, [options.items]); + }, [optionsFeed.items]); const equityPrintMap = useMemo(() => { const map = new Map(); - for (const print of equities.items) { + for (const print of equitiesFeed.items) { if (print.trace_id) { map.set(print.trace_id, print); } } return map; - }, [equities.items]); + }, [equitiesFeed.items]); const equityJoinMap = useMemo(() => { const map = new Map(); - for (const join of equityJoins.items) { + for (const join of equityJoinsFeed.items) { map.set(join.id, join); } return map; - }, [equityJoins.items]); + }, [equityJoinsFeed.items]); const flowPacketMap = useMemo(() => { const map = new Map(); - for (const packet of flow.items) { + for (const packet of flowFeed.items) { map.set(packet.id, packet); } return map; - }, [flow.items]); + }, [flowFeed.items]); + const [fetchedOptionPrintMap, setFetchedOptionPrintMap] = useState>( + () => new Map() + ); + const [fetchedFlowPacketMap, setFetchedFlowPacketMap] = useState>( + () => new Map() + ); + const [fetchedEquityJoinMap, setFetchedEquityJoinMap] = useState>( + () => new Map() + ); + const mergedOptionPrintMap = useMemo(() => { + const merged = new Map(optionPrintMap); + for (const [key, value] of fetchedOptionPrintMap) { + merged.set(key, value); + } + return merged; + }, [optionPrintMap, fetchedOptionPrintMap]); + const mergedFlowPacketMap = useMemo(() => { + const merged = new Map(flowPacketMap); + for (const [key, value] of fetchedFlowPacketMap) { + merged.set(key, value); + } + return merged; + }, [flowPacketMap, fetchedFlowPacketMap]); + const mergedEquityJoinMap = useMemo(() => { + const merged = new Map(equityJoinMap); + for (const [key, value] of fetchedEquityJoinMap) { + merged.set(key, value); + } + return merged; + }, [equityJoinMap, fetchedEquityJoinMap]); + + useEffect(() => { + if (!selectedAlert || mode !== "live") { + return; + } + + const packetId = selectedAlert.evidence_refs[0]; + if (packetId && !mergedFlowPacketMap.has(packetId)) { + void fetch(buildApiUrl(`/flow/packets/${encodeURIComponent(packetId)}`)) + .then((response) => response.json()) + .then((payload: { data?: FlowPacket | null }) => { + if (!payload.data) { + return; + } + setFetchedFlowPacketMap((prev) => new Map(prev).set(payload.data!.id, payload.data!)); + }) + .catch((error) => console.warn("Failed to fetch flow packet evidence", error)); + } + + const missingPrintIds = selectedAlert.evidence_refs.filter( + (id) => !mergedFlowPacketMap.has(id) && !mergedOptionPrintMap.has(id) + ); + if (missingPrintIds.length > 0) { + const url = new URL(buildApiUrl("/option-prints/by-trace")); + for (const traceId of missingPrintIds) { + url.searchParams.append("trace_id", traceId); + } + void fetch(url.toString()) + .then((response) => response.json()) + .then((payload: { data?: OptionPrint[] }) => { + const next = new Map(); + for (const item of payload.data ?? []) { + next.set(item.trace_id, item); + } + if (next.size > 0) { + setFetchedOptionPrintMap((prev) => new Map([...prev, ...next])); + } + }) + .catch((error) => console.warn("Failed to fetch option print evidence", error)); + } + }, [selectedAlert, mode, mergedFlowPacketMap, mergedOptionPrintMap]); + + useEffect(() => { + if (!selectedDarkEvent || mode !== "live") { + return; + } + + const missingIds = selectedDarkEvent.evidence_refs.filter((id) => !mergedEquityJoinMap.has(id)); + if (missingIds.length === 0) { + return; + } + + const url = new URL(buildApiUrl("/equity-joins/by-id")); + for (const id of missingIds) { + url.searchParams.append("id", id); + } + void fetch(url.toString()) + .then((response) => response.json()) + .then((payload: { data?: EquityPrintJoin[] }) => { + const next = new Map(); + for (const item of payload.data ?? []) { + next.set(item.id, item); + } + if (next.size > 0) { + setFetchedEquityJoinMap((prev) => new Map([...prev, ...next])); + } + }) + .catch((error) => console.warn("Failed to fetch dark evidence joins", error)); + }, [selectedDarkEvent, mode, mergedEquityJoinMap]); const selectedEvidence = useMemo((): EvidenceItem[] => { if (!selectedAlert) { @@ -2848,25 +3183,25 @@ const useTerminalState = () => { } return selectedAlert.evidence_refs.map((id) => { - const packet = flowPacketMap.get(id); + const packet = mergedFlowPacketMap.get(id); if (packet) { return { kind: "flow", id, packet }; } - const print = optionPrintMap.get(id); + const print = mergedOptionPrintMap.get(id); if (print) { return { kind: "print", id, print }; } return { kind: "unknown", id }; }); - }, [selectedAlert, flowPacketMap, optionPrintMap]); + }, [selectedAlert, mergedFlowPacketMap, mergedOptionPrintMap]); const selectedFlowPacket = useMemo(() => { if (!selectedAlert) { return null; } const packetId = selectedAlert.evidence_refs[0]; - return packetId ? flowPacketMap.get(packetId) ?? null : null; - }, [selectedAlert, flowPacketMap]); + return packetId ? mergedFlowPacketMap.get(packetId) ?? null : null; + }, [selectedAlert, mergedFlowPacketMap]); const selectedDarkEvidence = useMemo((): DarkEvidenceItem[] => { if (!selectedDarkEvent) { @@ -2874,20 +3209,20 @@ const useTerminalState = () => { } return selectedDarkEvent.evidence_refs.map((id) => { - const join = equityJoinMap.get(id); + const join = mergedEquityJoinMap.get(id); if (join) { return { kind: "join", id, join }; } return { kind: "unknown", id }; }); - }, [selectedDarkEvent, equityJoinMap]); + }, [selectedDarkEvent, mergedEquityJoinMap]); const selectedDarkUnderlying = useMemo(() => { if (!selectedDarkEvent) { return null; } - return inferDarkUnderlying(selectedDarkEvent, equityPrintMap, equityJoinMap); - }, [selectedDarkEvent, equityJoinMap, equityPrintMap]); + return inferDarkUnderlying(selectedDarkEvent, equityPrintMap, mergedEquityJoinMap); + }, [selectedDarkEvent, mergedEquityJoinMap, equityPrintMap]); useEffect(() => { if (mode !== "live") { @@ -2929,12 +3264,30 @@ const useTerminalState = () => { return extractPacketIdFromClassifierHitTrace(selectedClassifierHit.trace_id); }, [extractPacketIdFromClassifierHitTrace, selectedClassifierHit]); + useEffect(() => { + if (!selectedClassifierPacketId || mode !== "live") { + return; + } + + if (!mergedFlowPacketMap.has(selectedClassifierPacketId)) { + void fetch(buildApiUrl(`/flow/packets/${encodeURIComponent(selectedClassifierPacketId)}`)) + .then((response) => response.json()) + .then((payload: { data?: FlowPacket | null }) => { + if (!payload.data) { + return; + } + setFetchedFlowPacketMap((prev) => new Map(prev).set(payload.data!.id, payload.data!)); + }) + .catch((error) => console.warn("Failed to fetch classifier flow packet", error)); + } + }, [selectedClassifierPacketId, mode, mergedFlowPacketMap]); + const selectedClassifierFlowPacket = useMemo(() => { if (!selectedClassifierPacketId) { return null; } - return flowPacketMap.get(selectedClassifierPacketId) ?? null; - }, [flowPacketMap, selectedClassifierPacketId]); + return mergedFlowPacketMap.get(selectedClassifierPacketId) ?? null; + }, [mergedFlowPacketMap, selectedClassifierPacketId]); const selectedClassifierEvidence = useMemo((): EvidenceItem[] => { if (!selectedClassifierHit) { @@ -2945,19 +3298,19 @@ const useTerminalState = () => { return []; } - const packet = flowPacketMap.get(selectedClassifierPacketId); + const packet = mergedFlowPacketMap.get(selectedClassifierPacketId); if (!packet) { return []; } return packet.members.map((id) => { - const print = optionPrintMap.get(id); + const print = mergedOptionPrintMap.get(id); if (print) { return { kind: "print", id, print }; } return { kind: "unknown", id }; }); - }, [flowPacketMap, optionPrintMap, selectedClassifierHit, selectedClassifierPacketId]); + }, [mergedFlowPacketMap, mergedOptionPrintMap, selectedClassifierHit, selectedClassifierPacketId]); const inferAlertUnderlying = useCallback( (alert: AlertEvent): string | null => { @@ -2968,14 +3321,14 @@ const useTerminalState = () => { const packetId = alert.evidence_refs[0]; if (packetId) { - const packet = flowPacketMap.get(packetId); + const packet = mergedFlowPacketMap.get(packetId); if (packet) { return extractUnderlying(extractPacketContract(packet)); } } for (const ref of alert.evidence_refs) { - const print = optionPrintMap.get(ref); + const print = mergedOptionPrintMap.get(ref); if (print) { return extractUnderlying(print.option_contract_id); } @@ -2983,7 +3336,7 @@ const useTerminalState = () => { return null; }, - [extractPacketContract, extractUnderlyingFromTrace, flowPacketMap, optionPrintMap] + [extractPacketContract, extractUnderlyingFromTrace, mergedFlowPacketMap, mergedOptionPrintMap] ); const matchesTicker = useCallback( @@ -3001,59 +3354,59 @@ const useTerminalState = () => { const filteredOptions = useMemo(() => { if (tickerSet.size === 0) { - return options.items; + return optionsFeed.items; } - return options.items.filter((print) => + return optionsFeed.items.filter((print) => matchesTicker(extractUnderlying(normalizeContractId(print.option_contract_id))) ); - }, [options.items, matchesTicker, tickerSet]); + }, [optionsFeed.items, matchesTicker, tickerSet]); const filteredEquities = useMemo(() => { if (tickerSet.size === 0) { - return equities.items; + return equitiesFeed.items; } - return equities.items.filter((print) => matchesTicker(print.underlying_id)); - }, [equities.items, matchesTicker, tickerSet]); + return equitiesFeed.items.filter((print) => matchesTicker(print.underlying_id)); + }, [equitiesFeed.items, matchesTicker, tickerSet]); const filteredInferredDark = useMemo(() => { if (tickerSet.size === 0) { - return inferredDark.items; + return inferredDarkFeed.items; } - return inferredDark.items.filter((event) => { - const underlying = inferDarkUnderlying(event, equityPrintMap, equityJoinMap); + return inferredDarkFeed.items.filter((event) => { + const underlying = inferDarkUnderlying(event, equityPrintMap, mergedEquityJoinMap); return matchesTicker(underlying); }); - }, [equityJoinMap, equityPrintMap, inferredDark.items, matchesTicker, tickerSet]); + }, [mergedEquityJoinMap, equityPrintMap, inferredDarkFeed.items, matchesTicker, tickerSet]); const filteredFlow = useMemo(() => { if (tickerSet.size === 0) { - return flow.items; + return flowFeed.items; } - return flow.items.filter((packet) => + return flowFeed.items.filter((packet) => matchesTicker(extractUnderlying(extractPacketContract(packet))) ); - }, [flow.items, extractPacketContract, matchesTicker, tickerSet]); + }, [flowFeed.items, extractPacketContract, matchesTicker, tickerSet]); const filteredAlerts = useMemo(() => { if (tickerSet.size === 0) { - return alerts.items; + return alertsFeed.items; } - return alerts.items.filter((alert) => matchesTicker(inferAlertUnderlying(alert))); - }, [alerts.items, inferAlertUnderlying, matchesTicker, tickerSet]); + return alertsFeed.items.filter((alert) => matchesTicker(inferAlertUnderlying(alert))); + }, [alertsFeed.items, inferAlertUnderlying, matchesTicker, tickerSet]); const filteredClassifierHits = useMemo(() => { if (tickerSet.size === 0) { - return classifierHits.items; + return classifierHitsFeed.items; } - return classifierHits.items.filter((hit) => { + return classifierHitsFeed.items.filter((hit) => { const underlying = extractUnderlyingFromTrace(hit.trace_id); return matchesTicker(underlying); }); - }, [classifierHits.items, extractUnderlyingFromTrace, matchesTicker, tickerSet]); + }, [classifierHitsFeed.items, extractUnderlyingFromTrace, matchesTicker, tickerSet]); const chartClassifierHits = useMemo(() => { const desired = chartTicker.toUpperCase(); - return classifierHits.items + return classifierHitsFeed.items .filter((hit) => extractUnderlyingFromTrace(hit.trace_id) === desired) .sort((a, b) => { const delta = a.source_ts - b.source_ts; @@ -3062,12 +3415,12 @@ const useTerminalState = () => { } return a.seq - b.seq; }); - }, [chartTicker, classifierHits.items, extractUnderlyingFromTrace]); + }, [chartTicker, classifierHitsFeed.items, extractUnderlyingFromTrace]); const chartInferredDark = useMemo(() => { const desired = chartTicker.toUpperCase(); - return inferredDark.items - .filter((event) => inferDarkUnderlying(event, equityPrintMap, equityJoinMap) === desired) + return inferredDarkFeed.items + .filter((event) => inferDarkUnderlying(event, equityPrintMap, mergedEquityJoinMap) === desired) .sort((a, b) => { const delta = a.source_ts - b.source_ts; if (delta !== 0) { @@ -3075,7 +3428,7 @@ const useTerminalState = () => { } return a.seq - b.seq; }); - }, [chartTicker, inferredDark.items, equityJoinMap, equityPrintMap]); + }, [chartTicker, inferredDarkFeed.items, mergedEquityJoinMap, equityPrintMap]); const findAlertForClassifierHit = useCallback( (hit: ClassifierHitEvent): AlertEvent | null => { @@ -3086,12 +3439,12 @@ const useTerminalState = () => { const desiredTrace = `alert:${packetId}`; return ( - alerts.items.find( + alertsFeed.items.find( (item) => item.trace_id === desiredTrace || item.evidence_refs[0] === packetId ) ?? null ); }, - [alerts.items, extractPacketIdFromClassifierHitTrace] + [alertsFeed.items, extractPacketIdFromClassifierHitTrace] ); const openFromClassifierHit = useCallback( @@ -3126,22 +3479,22 @@ const useTerminalState = () => { const lastSeen = useMemo(() => { return [ - options.lastUpdate, - equities.lastUpdate, - inferredDark.lastUpdate, - flow.lastUpdate, - alerts.lastUpdate, - classifierHits.lastUpdate + optionsFeed.lastUpdate, + equitiesFeed.lastUpdate, + inferredDarkFeed.lastUpdate, + flowFeed.lastUpdate, + alertsFeed.lastUpdate, + classifierHitsFeed.lastUpdate ] .filter((value): value is number => value !== null) .sort((a, b) => b - a)[0] ?? null; }, [ - options.lastUpdate, - equities.lastUpdate, - inferredDark.lastUpdate, - flow.lastUpdate, - alerts.lastUpdate, - classifierHits.lastUpdate + optionsFeed.lastUpdate, + equitiesFeed.lastUpdate, + inferredDarkFeed.lastUpdate, + flowFeed.lastUpdate, + alertsFeed.lastUpdate, + classifierHitsFeed.lastUpdate ]); return { @@ -3165,22 +3518,23 @@ const useTerminalState = () => { darkScroll, alertsScroll, classifierScroll, - options, - equities, - equityJoins, - nbbo, - inferredDark, - flow, - alerts, - classifierHits, + options: optionsFeed, + equities: equitiesFeed, + equityJoins: equityJoinsFeed, + nbbo: nbboFeed, + inferredDark: inferredDarkFeed, + flow: flowFeed, + alerts: alertsFeed, + classifierHits: classifierHitsFeed, + liveSession, activeTickers, tickerSet, chartTicker, nbboMap, - optionPrintMap, + optionPrintMap: mergedOptionPrintMap, equityPrintMap, - equityJoinMap, - flowPacketMap, + equityJoinMap: mergedEquityJoinMap, + flowPacketMap: mergedFlowPacketMap, selectedEvidence, selectedFlowPacket, selectedDarkEvidence, @@ -3921,6 +4275,8 @@ const ChartPane = ({ title = "Chart" }: ChartPaneProps) => { intervalMs={state.chartIntervalMs} mode={state.mode} replayTime={state.equities.replayTime} + liveCandles={state.liveSession.chartCandles} + liveOverlayPrints={state.liveSession.chartOverlay} classifierHits={state.chartClassifierHits} inferredDark={state.chartInferredDark} onClassifierHitClick={state.handleClassifierMarkerClick} diff --git a/packages/storage/src/clickhouse.ts b/packages/storage/src/clickhouse.ts index 1f72299..6c08623 100644 --- a/packages/storage/src/clickhouse.ts +++ b/packages/storage/src/clickhouse.ts @@ -418,6 +418,14 @@ const clampLimit = (limit: number): number => { return Math.max(1, Math.min(1000, Math.floor(limit))); }; +const clampLookupLimit = (limit: number): number => { + if (!Number.isFinite(limit)) { + return 100; + } + + return Math.max(1, Math.min(5000, Math.floor(limit))); +}; + const clampPositiveInt = (value: number, fallback = 1): number => { if (!Number.isFinite(value)) { return fallback; @@ -450,6 +458,10 @@ const quoteString = (value: string): string => { return `'${escaped}'`; }; +const buildStringList = (values: string[]): string => { + return values.map((value) => quoteString(value)).join(", "); +}; + const buildTracePrefixCondition = (tracePrefix: string | undefined): string | null => { if (!tracePrefix) { return null; @@ -461,6 +473,15 @@ const buildTracePrefixCondition = (tracePrefix: string | undefined): string | nu return `startsWith(trace_id, ${quoteString(normalized)})`; }; +const buildBeforeTupleCondition = ( + tsColumn: string, + seqColumn: string, + beforeTs: number, + beforeSeq: number +): string => { + return `(${tsColumn}, ${seqColumn}) < (${clampCursor(beforeTs)}, ${clampCursor(beforeSeq)})`; +}; + const normalizeNumericFields = ( row: Record, fields: string[] @@ -1095,3 +1116,215 @@ export const fetchAlertsAfter = async ( const alerts = records.map(fromAlertRecord); return AlertEventSchema.array().parse(alerts); }; + +export const fetchOptionPrintsBefore = async ( + client: ClickHouseClient, + beforeTs: number, + beforeSeq: number, + limit: number, + tracePrefix?: string +): Promise => { + const safeLimit = clampLimit(limit); + const conditions = [buildBeforeTupleCondition("ts", "seq", beforeTs, beforeSeq)]; + const traceCondition = buildTracePrefixCondition(tracePrefix); + if (traceCondition) { + conditions.push(traceCondition); + } + + const result = await client.query({ + query: `SELECT * FROM ${OPTION_PRINTS_TABLE} WHERE ${conditions.join(" AND ")} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + return OptionPrintSchema.array().parse(rows.map(normalizeOptionRow)); +}; + +export const fetchOptionNBBOBefore = async ( + client: ClickHouseClient, + beforeTs: number, + beforeSeq: number, + limit: number, + tracePrefix?: string +): Promise => { + const safeLimit = clampLimit(limit); + const conditions = [buildBeforeTupleCondition("ts", "seq", beforeTs, beforeSeq)]; + const traceCondition = buildTracePrefixCondition(tracePrefix); + if (traceCondition) { + conditions.push(traceCondition); + } + + const result = await client.query({ + query: `SELECT * FROM ${OPTION_NBBO_TABLE} WHERE ${conditions.join(" AND ")} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + return OptionNBBOSchema.array().parse(rows.map(normalizeOptionNbboRow)); +}; + +export const fetchEquityPrintsBefore = async ( + client: ClickHouseClient, + beforeTs: number, + beforeSeq: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const result = await client.query({ + query: `SELECT * FROM ${EQUITY_PRINTS_TABLE} WHERE ${buildBeforeTupleCondition("ts", "seq", beforeTs, beforeSeq)} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + return EquityPrintSchema.array().parse(rows.map(normalizeEquityRow)); +}; + +export const fetchEquityPrintJoinsBefore = async ( + client: ClickHouseClient, + beforeTs: number, + beforeSeq: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const result = await client.query({ + query: `SELECT * FROM ${EQUITY_PRINT_JOINS_TABLE} WHERE ${buildBeforeTupleCondition("source_ts", "seq", beforeTs, beforeSeq)} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeEquityPrintJoinRow) + .filter((record): record is EquityPrintJoinRecord => record !== null); + return EquityPrintJoinSchema.array().parse(records.map(fromEquityPrintJoinRecord)); +}; + +export const fetchFlowPacketsBefore = async ( + client: ClickHouseClient, + beforeTs: number, + beforeSeq: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const result = await client.query({ + query: `SELECT * FROM ${FLOW_PACKETS_TABLE} WHERE ${buildBeforeTupleCondition("source_ts", "seq", beforeTs, beforeSeq)} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeFlowPacketRow) + .filter((record): record is FlowPacketRecord => record !== null); + return FlowPacketSchema.array().parse(records.map(fromFlowPacketRecord)); +}; + +export const fetchClassifierHitsBefore = async ( + client: ClickHouseClient, + beforeTs: number, + beforeSeq: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const result = await client.query({ + query: `SELECT * FROM ${CLASSIFIER_HITS_TABLE} WHERE ${buildBeforeTupleCondition("source_ts", "seq", beforeTs, beforeSeq)} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeClassifierHitRow) + .filter((record): record is ClassifierHitRecord => record !== null); + return ClassifierHitEventSchema.array().parse(records.map(fromClassifierHitRecord)); +}; + +export const fetchAlertsBefore = async ( + client: ClickHouseClient, + beforeTs: number, + beforeSeq: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const result = await client.query({ + query: `SELECT * FROM ${ALERTS_TABLE} WHERE ${buildBeforeTupleCondition("source_ts", "seq", beforeTs, beforeSeq)} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeAlertRow) + .filter((record): record is AlertRecord => record !== null); + return AlertEventSchema.array().parse(records.map(fromAlertRecord)); +}; + +export const fetchInferredDarkBefore = async ( + client: ClickHouseClient, + beforeTs: number, + beforeSeq: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const result = await client.query({ + query: `SELECT * FROM ${INFERRED_DARK_TABLE} WHERE ${buildBeforeTupleCondition("source_ts", "seq", beforeTs, beforeSeq)} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeInferredDarkRow) + .filter((record): record is InferredDarkRecord => record !== null); + return InferredDarkEventSchema.array().parse(records.map(fromInferredDarkRecord)); +}; + +export const fetchFlowPacketById = async ( + client: ClickHouseClient, + id: string +): Promise => { + const result = await client.query({ + query: `SELECT * FROM ${FLOW_PACKETS_TABLE} WHERE id = ${quoteString(id)} ORDER BY source_ts DESC, seq DESC LIMIT 1`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const record = rows + .map(normalizeFlowPacketRow) + .find((row): row is FlowPacketRecord => row !== null); + return record ? FlowPacketSchema.parse(fromFlowPacketRecord(record)) : null; +}; + +export const fetchOptionPrintsByTraceIds = async ( + client: ClickHouseClient, + traceIds: string[] +): Promise => { + const ids = Array.from(new Set(traceIds.map((id) => id.trim()).filter(Boolean))); + if (ids.length === 0) { + return []; + } + + const result = await client.query({ + query: `SELECT * FROM ${OPTION_PRINTS_TABLE} WHERE trace_id IN (${buildStringList(ids)}) ORDER BY ts DESC, seq DESC LIMIT ${clampLookupLimit(ids.length)}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + return OptionPrintSchema.array().parse(rows.map(normalizeOptionRow)); +}; + +export const fetchEquityPrintJoinsByIds = async ( + client: ClickHouseClient, + ids: string[] +): Promise => { + const uniqueIds = Array.from(new Set(ids.map((id) => id.trim()).filter(Boolean))); + if (uniqueIds.length === 0) { + return []; + } + + const result = await client.query({ + query: `SELECT * FROM ${EQUITY_PRINT_JOINS_TABLE} WHERE id IN (${buildStringList(uniqueIds)}) ORDER BY source_ts DESC, seq DESC LIMIT ${clampLookupLimit(uniqueIds.length)}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeEquityPrintJoinRow) + .filter((record): record is EquityPrintJoinRecord => record !== null); + return EquityPrintJoinSchema.array().parse(records.map(fromEquityPrintJoinRecord)); +}; diff --git a/packages/storage/tests/flow-packets.test.ts b/packages/storage/tests/flow-packets.test.ts index 8660625..f31928b 100644 --- a/packages/storage/tests/flow-packets.test.ts +++ b/packages/storage/tests/flow-packets.test.ts @@ -1,4 +1,5 @@ import { describe, expect, it } from "bun:test"; +import { createClickHouseClient, fetchFlowPacketById, fetchFlowPacketsBefore } from "../src/clickhouse"; import { flowPacketsTableDDL, FLOW_PACKETS_TABLE, @@ -36,4 +37,24 @@ describe("flow-packets storage helpers", () => { expect(restored.features).toEqual(packet.features); expect(restored.join_quality).toEqual(packet.join_quality); }); + + it("builds before-history and id lookup queries", async () => { + const queries: string[] = []; + const client = createClickHouseClient({ url: "http://127.0.0.1:8123" }); + client.query = async ({ query }) => { + queries.push(query); + return { + async json() { + return [] as T; + } + }; + }; + + await fetchFlowPacketsBefore(client, 200, 3, 15); + await fetchFlowPacketById(client, "fp-1"); + + expect(queries[0]).toContain("(source_ts, seq) < (200, 3)"); + expect(queries[0]).toContain("ORDER BY source_ts DESC, seq DESC LIMIT 15"); + expect(queries[1]).toContain("WHERE id = 'fp-1'"); + }); }); diff --git a/packages/storage/tests/option-prints.test.ts b/packages/storage/tests/option-prints.test.ts index debbf30..81c50c2 100644 --- a/packages/storage/tests/option-prints.test.ts +++ b/packages/storage/tests/option-prints.test.ts @@ -1,4 +1,5 @@ import { describe, expect, it } from "bun:test"; +import { createClickHouseClient, fetchOptionPrintsBefore, fetchOptionPrintsByTraceIds } from "../src/clickhouse"; import { normalizeOptionPrint, optionPrintsTableDDL, OPTION_PRINTS_TABLE } from "../src/option-prints"; const basePrint = { @@ -24,4 +25,25 @@ describe("option-prints storage helpers", () => { expect(ddl).toContain(OPTION_PRINTS_TABLE); expect(ddl).toContain("CREATE TABLE IF NOT EXISTS"); }); + + it("builds before/history and trace lookup queries", async () => { + const queries: string[] = []; + const client = createClickHouseClient({ url: "http://127.0.0.1:8123" }); + client.query = async ({ query }) => { + queries.push(query); + return { + async json() { + return [] as T; + } + }; + }; + + await fetchOptionPrintsBefore(client, 100, 5, 20, "alpaca"); + await fetchOptionPrintsByTraceIds(client, ["trace-1", "trace-2"]); + + expect(queries[0]).toContain("(ts, seq) < (100, 5)"); + expect(queries[0]).toContain("startsWith(trace_id, 'alpaca')"); + expect(queries[0]).toContain("ORDER BY ts DESC, seq DESC LIMIT 20"); + expect(queries[1]).toContain("trace_id IN ('trace-1', 'trace-2')"); + }); }); diff --git a/packages/types/src/index.ts b/packages/types/src/index.ts index 08ba2d2..44f18f5 100644 --- a/packages/types/src/index.ts +++ b/packages/types/src/index.ts @@ -1,2 +1,3 @@ export * from "./events"; +export * from "./live"; export * from "./sp500"; diff --git a/packages/types/src/live.ts b/packages/types/src/live.ts new file mode 100644 index 0000000..c5fc399 --- /dev/null +++ b/packages/types/src/live.ts @@ -0,0 +1,182 @@ +import { z } from "zod"; +import { + AlertEventSchema, + ClassifierHitEventSchema, + EquityCandleSchema, + EquityPrintJoinSchema, + EquityPrintSchema, + FlowPacketSchema, + InferredDarkEventSchema, + OptionNBBOSchema, + OptionPrintSchema +} from "./events"; + +export const CursorSchema = z.object({ + ts: z.number().int().nonnegative(), + seq: z.number().int().nonnegative() +}); + +export type Cursor = z.infer; + +export const LiveGenericChannelSchema = z.enum([ + "options", + "nbbo", + "equities", + "equity-joins", + "flow", + "classifier-hits", + "alerts", + "inferred-dark" +]); + +export const LiveChannelSchema = z.enum([ + "options", + "nbbo", + "equities", + "equity-joins", + "flow", + "classifier-hits", + "alerts", + "inferred-dark", + "equity-candles", + "equity-overlay" +]); + +export type LiveChannel = z.infer; +export type LiveGenericChannel = z.infer; + +export const LiveSubscriptionSchema = z.discriminatedUnion("channel", [ + z.object({ + channel: LiveGenericChannelSchema + }), + z.object({ + channel: z.literal("equity-candles"), + underlying_id: z.string().min(1), + interval_ms: z.number().int().positive() + }), + z.object({ + channel: z.literal("equity-overlay"), + underlying_id: z.string().min(1) + }) +]); + +export type LiveSubscription = z.infer; + +const livePayloadSchemas = { + options: OptionPrintSchema, + nbbo: OptionNBBOSchema, + equities: EquityPrintSchema, + "equity-joins": EquityPrintJoinSchema, + flow: FlowPacketSchema, + "classifier-hits": ClassifierHitEventSchema, + alerts: AlertEventSchema, + "inferred-dark": InferredDarkEventSchema, + "equity-candles": EquityCandleSchema, + "equity-overlay": EquityPrintSchema +} as const; + +export const FeedSnapshotSchema = z.object({ + subscription: LiveSubscriptionSchema, + items: z.array(z.unknown()), + watermark: CursorSchema.nullable(), + next_before: CursorSchema.nullable() +}); + +export type FeedSnapshot = { + subscription: LiveSubscription; + items: T[]; + watermark: Cursor | null; + next_before: Cursor | null; +}; + +export const LiveSubscribeMessageSchema = z.object({ + op: z.literal("subscribe"), + subscriptions: z.array(LiveSubscriptionSchema).min(1) +}); + +export type LiveSubscribeMessage = z.infer; + +export const LiveUnsubscribeMessageSchema = z.object({ + op: z.literal("unsubscribe"), + subscriptions: z.array(LiveSubscriptionSchema).min(1) +}); + +export type LiveUnsubscribeMessage = z.infer; + +export const LivePingMessageSchema = z.object({ + op: z.literal("ping") +}); + +export type LivePingMessage = z.infer; + +export const LiveClientMessageSchema = z.discriminatedUnion("op", [ + LiveSubscribeMessageSchema, + LiveUnsubscribeMessageSchema, + LivePingMessageSchema +]); + +export type LiveClientMessage = z.infer; + +export const LiveReadyMessageSchema = z.object({ + op: z.literal("ready") +}); + +export type LiveReadyMessage = z.infer; + +export const LiveSnapshotMessageSchema = z.object({ + op: z.literal("snapshot"), + snapshot: FeedSnapshotSchema +}); + +export type LiveSnapshotMessage = z.infer; + +export const LiveEventMessageSchema = z.object({ + op: z.literal("event"), + subscription: LiveSubscriptionSchema, + item: z.unknown(), + watermark: CursorSchema.nullable() +}); + +export type LiveEventMessage = z.infer; + +export const LiveHeartbeatMessageSchema = z.object({ + op: z.literal("heartbeat"), + ts: z.number().int().nonnegative() +}); + +export type LiveHeartbeatMessage = z.infer; + +export const LiveErrorMessageSchema = z.object({ + op: z.literal("error"), + message: z.string().min(1) +}); + +export type LiveErrorMessage = z.infer; + +export const LiveServerMessageSchema = z.discriminatedUnion("op", [ + LiveReadyMessageSchema, + LiveSnapshotMessageSchema, + LiveEventMessageSchema, + LiveHeartbeatMessageSchema, + LiveErrorMessageSchema +]); + +export type LiveServerMessage = z.infer; + +export const getSubscriptionKey = (subscription: LiveSubscription): string => { + switch (subscription.channel) { + case "equity-candles": + return `${subscription.channel}|${subscription.underlying_id}|${subscription.interval_ms}`; + case "equity-overlay": + return `${subscription.channel}|${subscription.underlying_id}`; + default: + return subscription.channel; + } +}; + +export const parseLivePayload = ( + channel: LiveChannel, + item: unknown +): z.infer<(typeof livePayloadSchemas)[typeof channel]> => { + return livePayloadSchemas[channel].parse(item); +}; diff --git a/packages/types/tests/live.test.ts b/packages/types/tests/live.test.ts new file mode 100644 index 0000000..e53929b --- /dev/null +++ b/packages/types/tests/live.test.ts @@ -0,0 +1,69 @@ +import { describe, expect, it } from "bun:test"; +import { + CursorSchema, + LiveClientMessageSchema, + LiveServerMessageSchema, + getSubscriptionKey +} from "../src/live"; + +describe("live protocol types", () => { + it("builds stable keys for generic and parameterized subscriptions", () => { + expect(getSubscriptionKey({ channel: "flow" })).toBe("flow"); + expect( + getSubscriptionKey({ + channel: "equity-candles", + underlying_id: "SPY", + interval_ms: 60000 + }) + ).toBe("equity-candles|SPY|60000"); + expect(getSubscriptionKey({ channel: "equity-overlay", underlying_id: "SPY" })).toBe( + "equity-overlay|SPY" + ); + }); + + it("validates subscribe messages", () => { + const parsed = LiveClientMessageSchema.parse({ + op: "subscribe", + subscriptions: [ + { channel: "flow" }, + { channel: "equity-candles", underlying_id: "SPY", interval_ms: 60000 } + ] + }); + + expect(parsed.op).toBe("subscribe"); + expect(parsed.subscriptions).toHaveLength(2); + }); + + it("validates snapshot and event server messages", () => { + const cursor = CursorSchema.parse({ ts: 100, seq: 2 }); + const snapshot = LiveServerMessageSchema.parse({ + op: "snapshot", + snapshot: { + subscription: { channel: "alerts" }, + items: [], + watermark: cursor, + next_before: null + } + }); + const event = LiveServerMessageSchema.parse({ + op: "event", + subscription: { channel: "equity-overlay", underlying_id: "SPY" }, + item: { + source_ts: 100, + ingest_ts: 101, + seq: 1, + trace_id: "eq-1", + ts: 100, + underlying_id: "SPY", + price: 500, + size: 10, + exchange: "X", + offExchangeFlag: true + }, + watermark: cursor + }); + + expect(snapshot.op).toBe("snapshot"); + expect(event.op).toBe("event"); + }); +}); diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 02951ff..3d10874 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -39,8 +39,12 @@ import { ensureOptionNBBOTable, ensureOptionPrintsTable, fetchAlertsAfter, + fetchAlertsBefore, fetchClassifierHitsAfter, + fetchClassifierHitsBefore, fetchFlowPacketsAfter, + fetchFlowPacketById, + fetchFlowPacketsBefore, fetchRecentAlerts, fetchRecentClassifierHits, fetchRecentEquityPrintJoins, @@ -49,31 +53,46 @@ import { fetchRecentEquityQuotes, fetchEquityCandlesAfter, fetchEquityCandlesRange, + fetchEquityPrintJoinsByIds, + fetchEquityPrintJoinsBefore, fetchRecentOptionNBBO, fetchEquityPrintsAfter, + fetchEquityPrintsBefore, fetchEquityPrintsRange, fetchEquityPrintJoinsAfter, fetchEquityQuotesAfter, + fetchInferredDarkBefore, fetchInferredDarkAfter, fetchRecentEquityPrints, + fetchOptionNBBOBefore, fetchOptionNBBOAfter, + fetchOptionPrintsBefore, fetchOptionPrintsAfter, + fetchOptionPrintsByTraceIds, fetchRecentOptionPrints } from "@islandflow/storage"; import { AlertEventSchema, ClassifierHitEventSchema, + Cursor, EquityCandleSchema, EquityPrintSchema, EquityPrintJoinSchema, EquityQuoteSchema, + FeedSnapshot, InferredDarkEventSchema, + LiveClientMessageSchema, + LiveServerMessage, + LiveSubscription, + LiveSubscriptionSchema, FlowPacketSchema, OptionNBBOSchema, - OptionPrintSchema + OptionPrintSchema, + getSubscriptionKey } from "@islandflow/types"; import { createClient } from "redis"; import { z } from "zod"; +import { LiveStateManager } from "./live"; const service = "api"; const logger = createLogger({ service }); @@ -148,6 +167,11 @@ const replayParamsSchema = z.object({ after_seq: z.coerce.number().int().nonnegative().default(0), limit: z.coerce.number().int().positive().max(1000).default(200) }); +const beforeParamsSchema = z.object({ + before_ts: z.coerce.number().int().nonnegative(), + before_seq: z.coerce.number().int().nonnegative(), + limit: z.coerce.number().int().positive().max(1000).default(200) +}); const replaySourceSchema = z .string() @@ -192,16 +216,26 @@ type WsData = { channel: Channel; }; -const optionSockets = new Set>(); -const optionNbboSockets = new Set>(); -const equitySockets = new Set>(); -const equityCandleSockets = new Set>(); -const equityQuoteSockets = new Set>(); -const equityJoinSockets = new Set>(); -const inferredDarkSockets = new Set>(); -const flowSockets = new Set>(); -const classifierHitSockets = new Set>(); -const alertSockets = new Set>(); +type LiveWsData = { + channel: "live"; +}; + +type LegacySocket = any; +type LiveSocket = any; + +const optionSockets = new Set(); +const optionNbboSockets = new Set(); +const equitySockets = new Set(); +const equityCandleSockets = new Set(); +const equityQuoteSockets = new Set(); +const equityJoinSockets = new Set(); +const inferredDarkSockets = new Set(); +const flowSockets = new Set(); +const classifierHitSockets = new Set(); +const alertSockets = new Set(); +const liveSocketSubscriptions = new Map>(); +const subscriptionSockets = new Map>(); +const liveHeartbeats = new Map>(); const jsonResponse = (body: unknown, status = 200): Response => { return new Response(JSON.stringify(body), { @@ -234,6 +268,20 @@ const parseReplayParams = (url: URL): { afterTs: number; afterSeq: number; limit }; }; +const parseBeforeParams = (url: URL): { beforeTs: number; beforeSeq: number; limit: number } => { + const params = beforeParamsSchema.parse({ + before_ts: url.searchParams.get("before_ts") ?? undefined, + before_seq: url.searchParams.get("before_seq") ?? undefined, + limit: url.searchParams.get("limit") ?? undefined + }); + + return { + beforeTs: params.before_ts, + beforeSeq: params.before_seq, + limit: params.limit + }; +}; + const parseReplaySource = (url: URL): string | null => { const raw = url.searchParams.get("source"); if (!raw) { @@ -330,7 +378,7 @@ const parseCandleReplayParams = ( }; }; -const broadcast = (sockets: Set>, payload: unknown): void => { +const broadcast = (sockets: Set, payload: unknown): void => { const message = JSON.stringify(payload); for (const socket of sockets) { @@ -345,6 +393,71 @@ const broadcast = (sockets: Set>, payload: unknown): void => { } }; +const sendLiveMessage = (socket: LiveSocket, payload: LiveServerMessage): void => { + try { + socket.send(JSON.stringify(payload)); + } catch (error) { + logger.warn("failed to send live websocket message", { + error: error instanceof Error ? error.message : String(error) + }); + } +}; + +const subscribeSocket = (socket: LiveSocket, subscription: LiveSubscription): void => { + const key = getSubscriptionKey(subscription); + const keys = liveSocketSubscriptions.get(socket) ?? new Set(); + keys.add(key); + liveSocketSubscriptions.set(socket, keys); + + const sockets = subscriptionSockets.get(key) ?? new Set(); + sockets.add(socket); + subscriptionSockets.set(key, sockets); +}; + +const unsubscribeSocket = (socket: LiveSocket, subscription: LiveSubscription): void => { + const key = getSubscriptionKey(subscription); + liveSocketSubscriptions.get(socket)?.delete(key); + + const sockets = subscriptionSockets.get(key); + if (!sockets) { + return; + } + sockets.delete(socket); + if (sockets.size === 0) { + subscriptionSockets.delete(key); + } +}; + +const cleanupLiveSocket = (socket: LiveSocket): void => { + const keys = liveSocketSubscriptions.get(socket); + if (keys) { + for (const key of keys) { + const sockets = subscriptionSockets.get(key); + sockets?.delete(socket); + if (sockets && sockets.size === 0) { + subscriptionSockets.delete(key); + } + } + } + liveSocketSubscriptions.delete(socket); + const heartbeat = liveHeartbeats.get(socket); + if (heartbeat) { + clearInterval(heartbeat); + liveHeartbeats.delete(socket); + } +}; + +const buildHistoryResponse = ( + items: T[], + cursorOf: (item: T) => Cursor +): { data: T[]; next_before: Cursor | null } => { + const last = items.at(-1); + return { + data: items, + next_before: last ? cursorOf(last) : null + }; +}; + const buildCandleCacheKey = (underlyingId: string, intervalMs: number): string => { return `candles:equity:${intervalMs}:${underlyingId}`; }; @@ -563,6 +676,9 @@ const run = async () => { redis = null; } + const liveState = new LiveStateManager(clickhouse, redis); + await liveState.hydrate(); + const subscribeWithReset = async ( subject: string, stream: string, @@ -661,11 +777,34 @@ const run = async () => { "api-alerts" ); + const fanoutLive = async ( + subscription: LiveSubscription, + item: unknown, + ingestChannel: "options" | "nbbo" | "equities" | "equity-candles" | "equity-overlay" | "equity-joins" | "flow" | "classifier-hits" | "alerts" | "inferred-dark" + ) => { + const key = getSubscriptionKey(subscription); + const sockets = subscriptionSockets.get(key); + const watermark = await liveState.ingest(ingestChannel, item); + if (!sockets || sockets.size === 0) { + return; + } + + for (const socket of sockets) { + sendLiveMessage(socket, { + op: "event", + subscription, + item, + watermark + }); + } + }; + const pumpOptions = async () => { for await (const msg of optionSubscription.messages) { try { const payload = OptionPrintSchema.parse(optionSubscription.decode(msg)); broadcast(optionSockets, { type: "option-print", payload }); + await fanoutLive({ channel: "options" }, payload, "options"); msg.ack(); } catch (error) { logger.error("failed to process option print", { @@ -681,6 +820,7 @@ const run = async () => { try { const payload = OptionNBBOSchema.parse(optionNbboSubscription.decode(msg)); broadcast(optionNbboSockets, { type: "option-nbbo", payload }); + await fanoutLive({ channel: "nbbo" }, payload, "nbbo"); msg.ack(); } catch (error) { logger.error("failed to process option nbbo", { @@ -696,6 +836,12 @@ const run = async () => { try { const payload = EquityPrintSchema.parse(equitySubscription.decode(msg)); broadcast(equitySockets, { type: "equity-print", payload }); + await fanoutLive({ channel: "equities" }, payload, "equities"); + await fanoutLive( + { channel: "equity-overlay", underlying_id: payload.underlying_id }, + payload, + "equity-overlay" + ); msg.ack(); } catch (error) { logger.error("failed to process equity print", { @@ -726,6 +872,15 @@ const run = async () => { try { const payload = EquityCandleSchema.parse(equityCandleSubscription.decode(msg)); broadcast(equityCandleSockets, { type: "equity-candle", payload }); + await fanoutLive( + { + channel: "equity-candles", + underlying_id: payload.underlying_id, + interval_ms: payload.interval_ms + }, + payload, + "equity-candles" + ); msg.ack(); } catch (error) { logger.error("failed to process equity candle", { @@ -741,6 +896,7 @@ const run = async () => { try { const payload = EquityPrintJoinSchema.parse(equityJoinSubscription.decode(msg)); broadcast(equityJoinSockets, { type: "equity-join", payload }); + await fanoutLive({ channel: "equity-joins" }, payload, "equity-joins"); msg.ack(); } catch (error) { logger.error("failed to process equity join", { @@ -756,6 +912,7 @@ const run = async () => { try { const payload = InferredDarkEventSchema.parse(inferredDarkSubscription.decode(msg)); broadcast(inferredDarkSockets, { type: "inferred-dark", payload }); + await fanoutLive({ channel: "inferred-dark" }, payload, "inferred-dark"); msg.ack(); } catch (error) { logger.error("failed to process inferred dark event", { @@ -771,6 +928,7 @@ const run = async () => { try { const payload = FlowPacketSchema.parse(flowSubscription.decode(msg)); broadcast(flowSockets, { type: "flow-packet", payload }); + await fanoutLive({ channel: "flow" }, payload, "flow"); msg.ack(); } catch (error) { logger.error("failed to process flow packet", { @@ -786,6 +944,7 @@ const run = async () => { try { const payload = ClassifierHitEventSchema.parse(classifierHitSubscription.decode(msg)); broadcast(classifierHitSockets, { type: "classifier-hit", payload }); + await fanoutLive({ channel: "classifier-hits" }, payload, "classifier-hits"); msg.ack(); } catch (error) { logger.error("failed to process classifier hit", { @@ -801,6 +960,7 @@ const run = async () => { try { const payload = AlertEventSchema.parse(alertSubscription.decode(msg)); broadcast(alertSockets, { type: "alert", payload }); + await fanoutLive({ channel: "alerts" }, payload, "alerts"); msg.ack(); } catch (error) { logger.error("failed to process alert", { @@ -822,9 +982,9 @@ const run = async () => { void pumpClassifierHits(); void pumpAlerts(); - const server = Bun.serve({ + const server = Bun.serve({ port: env.API_PORT, - fetch: async (req, serverRef) => { + fetch: async (req: Request, serverRef: any) => { const url = new URL(req.url); if (req.method === "GET" && url.pathname === "/health") { @@ -940,6 +1100,84 @@ const run = async () => { return jsonResponse({ data }); } + if (req.method === "GET" && url.pathname === "/history/options") { + const { beforeTs, beforeSeq, limit } = parseBeforeParams(url); + const source = parseReplaySource(url) ?? undefined; + const data = await fetchOptionPrintsBefore(clickhouse, beforeTs, beforeSeq, limit, source); + return jsonResponse(buildHistoryResponse(data, (item) => ({ ts: item.ts, seq: item.seq }))); + } + + if (req.method === "GET" && url.pathname === "/history/nbbo") { + const { beforeTs, beforeSeq, limit } = parseBeforeParams(url); + const source = parseReplaySource(url) ?? undefined; + const data = await fetchOptionNBBOBefore(clickhouse, beforeTs, beforeSeq, limit, source); + return jsonResponse(buildHistoryResponse(data, (item) => ({ ts: item.ts, seq: item.seq }))); + } + + if (req.method === "GET" && url.pathname === "/history/equities") { + const { beforeTs, beforeSeq, limit } = parseBeforeParams(url); + const data = await fetchEquityPrintsBefore(clickhouse, beforeTs, beforeSeq, limit); + return jsonResponse(buildHistoryResponse(data, (item) => ({ ts: item.ts, seq: item.seq }))); + } + + if (req.method === "GET" && url.pathname === "/history/equity-joins") { + const { beforeTs, beforeSeq, limit } = parseBeforeParams(url); + const data = await fetchEquityPrintJoinsBefore(clickhouse, beforeTs, beforeSeq, limit); + return jsonResponse( + buildHistoryResponse(data, (item) => ({ ts: item.source_ts, seq: item.seq })) + ); + } + + if (req.method === "GET" && url.pathname === "/history/flow") { + const { beforeTs, beforeSeq, limit } = parseBeforeParams(url); + const data = await fetchFlowPacketsBefore(clickhouse, beforeTs, beforeSeq, limit); + return jsonResponse( + buildHistoryResponse(data, (item) => ({ ts: item.source_ts, seq: item.seq })) + ); + } + + if (req.method === "GET" && url.pathname === "/history/classifier-hits") { + const { beforeTs, beforeSeq, limit } = parseBeforeParams(url); + const data = await fetchClassifierHitsBefore(clickhouse, beforeTs, beforeSeq, limit); + return jsonResponse( + buildHistoryResponse(data, (item) => ({ ts: item.source_ts, seq: item.seq })) + ); + } + + if (req.method === "GET" && url.pathname === "/history/alerts") { + const { beforeTs, beforeSeq, limit } = parseBeforeParams(url); + const data = await fetchAlertsBefore(clickhouse, beforeTs, beforeSeq, limit); + return jsonResponse( + buildHistoryResponse(data, (item) => ({ ts: item.source_ts, seq: item.seq })) + ); + } + + if (req.method === "GET" && url.pathname === "/history/inferred-dark") { + const { beforeTs, beforeSeq, limit } = parseBeforeParams(url); + const data = await fetchInferredDarkBefore(clickhouse, beforeTs, beforeSeq, limit); + return jsonResponse( + buildHistoryResponse(data, (item) => ({ ts: item.source_ts, seq: item.seq })) + ); + } + + if (req.method === "GET" && /^\/flow\/packets\/[^/]+$/.test(url.pathname)) { + const id = decodeURIComponent(url.pathname.slice("/flow/packets/".length)); + const data = await fetchFlowPacketById(clickhouse, id); + return jsonResponse({ data }); + } + + if (req.method === "GET" && url.pathname === "/option-prints/by-trace") { + const traceIds = url.searchParams.getAll("trace_id"); + const data = await fetchOptionPrintsByTraceIds(clickhouse, traceIds); + return jsonResponse({ data }); + } + + if (req.method === "GET" && url.pathname === "/equity-joins/by-id") { + const ids = url.searchParams.getAll("id"); + const data = await fetchEquityPrintJoinsByIds(clickhouse, ids); + return jsonResponse({ data }); + } + if (req.method === "GET" && url.pathname === "/replay/options") { const { afterTs, afterSeq, limit } = parseReplayParams(url); const source = parseReplaySource(url) ?? undefined; @@ -1120,11 +1358,25 @@ const run = async () => { return jsonResponse({ error: "websocket upgrade failed" }, 400); } + if (req.method === "GET" && url.pathname === "/ws/live") { + if (serverRef.upgrade(req, { data: { channel: "live" } })) { + return new Response(null, { status: 101 }); + } + + return jsonResponse({ error: "websocket upgrade failed" }, 400); + } + return jsonResponse({ error: "not found" }, 404); }, websocket: { - open: (socket) => { - if (socket.data.channel === "options") { + open: (socket: any) => { + if (socket.data.channel === "live") { + sendLiveMessage(socket, { op: "ready" }); + const heartbeat = setInterval(() => { + sendLiveMessage(socket, { op: "heartbeat", ts: Date.now() }); + }, 15000); + liveHeartbeats.set(socket, heartbeat); + } else if (socket.data.channel === "options") { optionSockets.add(socket); } else if (socket.data.channel === "options-nbbo") { optionNbboSockets.add(socket); @@ -1148,8 +1400,44 @@ const run = async () => { logger.info("websocket connected", { channel: socket.data.channel }); }, - close: (socket) => { - if (socket.data.channel === "options") { + message: async (socket: any, message: string | ArrayBuffer | Uint8Array) => { + if (socket.data.channel !== "live") { + return; + } + + try { + const payload = + typeof message === "string" + ? message + : new TextDecoder().decode(message instanceof Uint8Array ? message : new Uint8Array(message)); + const parsed = LiveClientMessageSchema.parse(JSON.parse(payload)); + if (parsed.op === "ping") { + sendLiveMessage(socket, { op: "heartbeat", ts: Date.now() }); + return; + } + + for (const subscription of parsed.subscriptions) { + LiveSubscriptionSchema.parse(subscription); + if (parsed.op === "unsubscribe") { + unsubscribeSocket(socket, subscription); + continue; + } + + subscribeSocket(socket, subscription); + const snapshot = await liveState.getSnapshot(subscription); + sendLiveMessage(socket, { op: "snapshot", snapshot }); + } + } catch (error) { + sendLiveMessage(socket, { + op: "error", + message: error instanceof Error ? error.message : String(error) + }); + } + }, + close: (socket: any) => { + if (socket.data.channel === "live") { + cleanupLiveSocket(socket); + } else if (socket.data.channel === "options") { optionSockets.delete(socket); } else if (socket.data.channel === "options-nbbo") { optionNbboSockets.delete(socket); diff --git a/services/api/src/live.ts b/services/api/src/live.ts new file mode 100644 index 0000000..7aeebb0 --- /dev/null +++ b/services/api/src/live.ts @@ -0,0 +1,370 @@ +import { + fetchRecentAlerts, + fetchRecentClassifierHits, + fetchRecentEquityCandles, + fetchRecentEquityPrintJoins, + fetchRecentEquityPrints, + fetchRecentFlowPackets, + fetchRecentInferredDark, + fetchRecentOptionNBBO, + fetchRecentOptionPrints, + type ClickHouseClient +} from "@islandflow/storage"; +import { + AlertEventSchema, + ClassifierHitEventSchema, + CursorSchema, + EquityCandleSchema, + EquityPrintJoinSchema, + EquityPrintSchema, + FeedSnapshot, + FlowPacketSchema, + InferredDarkEventSchema, + LiveGenericChannel, + LiveSubscription, + OptionNBBOSchema, + OptionPrintSchema, + type Cursor, + type EquityCandle, + type EquityPrint, + type LiveChannel +} from "@islandflow/types"; +import type { RedisClientType } from "redis"; + +const CURSOR_HASH_KEY = "live:cursors"; + +const GENERIC_LIMITS = { + options: 500, + nbbo: 500, + equities: 500, + "equity-joins": 500, + flow: 500, + "classifier-hits": 500, + alerts: 500, + "inferred-dark": 500 +} as const; + +const CHART_LIMITS = { + candles: 500, + overlay: 1500 +} as const; + +type GenericFeedConfig = { + redisKey: string; + cursorField: string; + limit: number; + parse: (value: unknown) => any; + cursor: (item: any) => Cursor; + fetchRecent: (clickhouse: ClickHouseClient, limit: number) => Promise; +}; + +type RedisLike = Pick< + RedisClientType, + "isOpen" | "lRange" | "lPush" | "lTrim" | "hGet" | "hSet" +>; + +const parseCursor = (value: string | null): Cursor | null => { + if (!value) { + return null; + } + + try { + return CursorSchema.parse(JSON.parse(value)); + } catch { + return null; + } +}; + +const getGenericConfig = (): { + [K in LiveGenericChannel]: GenericFeedConfig; +} => ({ + options: { + redisKey: "live:options", + cursorField: "options", + limit: GENERIC_LIMITS.options, + parse: (value) => OptionPrintSchema.parse(value), + cursor: (item) => ({ ts: item.ts, seq: item.seq }), + fetchRecent: fetchRecentOptionPrints + }, + nbbo: { + redisKey: "live:nbbo", + cursorField: "nbbo", + limit: GENERIC_LIMITS.nbbo, + parse: (value) => OptionNBBOSchema.parse(value), + cursor: (item) => ({ ts: item.ts, seq: item.seq }), + fetchRecent: fetchRecentOptionNBBO + }, + equities: { + redisKey: "live:equities", + cursorField: "equities", + limit: GENERIC_LIMITS.equities, + parse: (value) => EquityPrintSchema.parse(value), + cursor: (item) => ({ ts: item.ts, seq: item.seq }), + fetchRecent: fetchRecentEquityPrints + }, + "equity-joins": { + redisKey: "live:equity-joins", + cursorField: "equity-joins", + limit: GENERIC_LIMITS["equity-joins"], + parse: (value) => EquityPrintJoinSchema.parse(value), + cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), + fetchRecent: fetchRecentEquityPrintJoins + }, + flow: { + redisKey: "live:flow", + cursorField: "flow", + limit: GENERIC_LIMITS.flow, + parse: (value) => FlowPacketSchema.parse(value), + cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), + fetchRecent: fetchRecentFlowPackets + }, + "classifier-hits": { + redisKey: "live:classifier-hits", + cursorField: "classifier-hits", + limit: GENERIC_LIMITS["classifier-hits"], + parse: (value) => ClassifierHitEventSchema.parse(value), + cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), + fetchRecent: fetchRecentClassifierHits + }, + alerts: { + redisKey: "live:alerts", + cursorField: "alerts", + limit: GENERIC_LIMITS.alerts, + parse: (value) => AlertEventSchema.parse(value), + cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), + fetchRecent: fetchRecentAlerts + }, + "inferred-dark": { + redisKey: "live:inferred-dark", + cursorField: "inferred-dark", + limit: GENERIC_LIMITS["inferred-dark"], + parse: (value) => InferredDarkEventSchema.parse(value), + cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), + fetchRecent: fetchRecentInferredDark + } +}); + +const parseJsonList = (payloads: string[], parse: (value: unknown) => T): T[] => { + const items: T[] = []; + for (const payload of payloads) { + try { + items.push(parse(JSON.parse(payload))); + } catch { + // ignore bad cache entries + } + } + return items; +}; + +const nextBeforeForItems = (items: T[], cursorOf: (item: T) => Cursor): Cursor | null => { + const last = items.at(-1); + return last ? cursorOf(last) : null; +}; + +const candleRedisKey = (underlyingId: string, intervalMs: number): string => + `live:equity-candles:${underlyingId}:${intervalMs}`; + +const candleCursorField = (underlyingId: string, intervalMs: number): string => + `equity-candles:${underlyingId}:${intervalMs}`; + +const overlayRedisKey = (underlyingId: string): string => `live:equity-overlay:${underlyingId}`; +const overlayCursorField = (underlyingId: string): string => `equities:${underlyingId}`; + +export class LiveStateManager { + private readonly generic = getGenericConfig(); + private readonly genericItems = new Map(); + private readonly genericCursors = new Map(); + private readonly candleItems = new Map(); + private readonly candleCursors = new Map(); + private readonly overlayItems = new Map(); + private readonly overlayCursors = new Map(); + + constructor( + private readonly clickhouse: ClickHouseClient, + private readonly redis: RedisLike | null + ) {} + + async hydrate(): Promise { + const channels = Object.keys(this.generic) as LiveGenericChannel[]; + await Promise.all(channels.map((channel) => this.hydrateGeneric(channel))); + } + + private async hydrateGeneric(channel: LiveGenericChannel): Promise { + const config = this.generic[channel]; + if (this.redis?.isOpen) { + const payloads = await this.redis.lRange(config.redisKey, 0, config.limit - 1); + const cached = parseJsonList(payloads, config.parse); + if (cached.length > 0) { + this.genericItems.set(channel, cached); + this.genericCursors.set(config.cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, config.cursorField))); + return; + } + } + + const fresh = await config.fetchRecent(this.clickhouse, config.limit); + this.genericItems.set(channel, fresh); + const watermark = fresh[0] ? config.cursor(fresh[0]) : null; + this.genericCursors.set(config.cursorField, watermark); + await this.persistList(config.redisKey, config.cursorField, fresh, config.limit, watermark); + } + + async getSnapshot(subscription: LiveSubscription): Promise> { + switch (subscription.channel) { + case "equity-candles": { + const key = candleRedisKey(subscription.underlying_id, subscription.interval_ms); + const cursorField = candleCursorField(subscription.underlying_id, subscription.interval_ms); + if (!this.candleItems.has(key)) { + await this.hydrateCandles(subscription.underlying_id, subscription.interval_ms); + } + const items = this.candleItems.get(key) ?? []; + return { + subscription, + items, + watermark: this.candleCursors.get(cursorField) ?? null, + next_before: nextBeforeForItems(items, (item) => ({ ts: item.ts, seq: item.seq })) + }; + } + case "equity-overlay": { + const key = overlayRedisKey(subscription.underlying_id); + const cursorField = overlayCursorField(subscription.underlying_id); + if (!this.overlayItems.has(key)) { + await this.hydrateOverlay(subscription.underlying_id); + } + const items = this.overlayItems.get(key) ?? []; + return { + subscription, + items, + watermark: this.overlayCursors.get(cursorField) ?? null, + next_before: nextBeforeForItems(items, (item) => ({ ts: item.ts, seq: item.seq })) + }; + } + default: { + const config = this.generic[subscription.channel]; + const items = this.genericItems.get(subscription.channel) ?? []; + return { + subscription, + items, + watermark: this.genericCursors.get(config.cursorField) ?? null, + next_before: nextBeforeForItems(items, config.cursor) + }; + } + } + } + + async ingest(channel: LiveChannel, item: unknown): Promise { + switch (channel) { + case "equity-candles": { + const candle = EquityCandleSchema.parse(item); + const key = candleRedisKey(candle.underlying_id, candle.interval_ms); + const cursorField = candleCursorField(candle.underlying_id, candle.interval_ms); + const items = this.candleItems.get(key) ?? []; + const next = [candle, ...items] + .sort((a, b) => (b.ts - a.ts) || (b.seq - a.seq)) + .slice(0, CHART_LIMITS.candles); + this.candleItems.set(key, next); + const cursor = { ts: candle.ts, seq: candle.seq }; + this.candleCursors.set(cursorField, cursor); + await this.persistList(key, cursorField, next, CHART_LIMITS.candles, cursor); + return cursor; + } + case "equity-overlay": { + const print = EquityPrintSchema.parse(item); + const key = overlayRedisKey(print.underlying_id); + const cursorField = overlayCursorField(print.underlying_id); + const items = this.overlayItems.get(key) ?? []; + const next = [print, ...items] + .sort((a, b) => (b.ts - a.ts) || (b.seq - a.seq)) + .slice(0, CHART_LIMITS.overlay); + this.overlayItems.set(key, next); + const cursor = { ts: print.ts, seq: print.seq }; + this.overlayCursors.set(cursorField, cursor); + await this.persistList(key, cursorField, next, CHART_LIMITS.overlay, cursor); + return cursor; + } + default: { + const config = this.generic[channel]; + const parsed = config.parse(item); + const items = this.genericItems.get(channel) ?? []; + const next = [parsed, ...items] + .sort((a, b) => { + const aCursor = config.cursor(a); + const bCursor = config.cursor(b); + return (bCursor.ts - aCursor.ts) || (bCursor.seq - aCursor.seq); + }) + .slice(0, config.limit); + this.genericItems.set(channel, next); + const cursor = config.cursor(parsed); + this.genericCursors.set(config.cursorField, cursor); + await this.persistList(config.redisKey, config.cursorField, next, config.limit, cursor); + return cursor; + } + } + } + + private async hydrateCandles(underlyingId: string, intervalMs: number): Promise { + const key = candleRedisKey(underlyingId, intervalMs); + const cursorField = candleCursorField(underlyingId, intervalMs); + if (this.redis?.isOpen) { + const payloads = await this.redis.lRange(key, 0, CHART_LIMITS.candles - 1); + const cached = parseJsonList(payloads, (value) => EquityCandleSchema.parse(value)); + if (cached.length > 0) { + this.candleItems.set(key, cached); + this.candleCursors.set(cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, cursorField))); + return; + } + } + + const fresh = await fetchRecentEquityCandles(this.clickhouse, underlyingId, intervalMs, CHART_LIMITS.candles); + this.candleItems.set(key, fresh); + const watermark = fresh[0] ? { ts: fresh[0].ts, seq: fresh[0].seq } : null; + this.candleCursors.set(cursorField, watermark); + await this.persistList(key, cursorField, fresh, CHART_LIMITS.candles, watermark); + } + + private async hydrateOverlay(underlyingId: string): Promise { + const key = overlayRedisKey(underlyingId); + const cursorField = overlayCursorField(underlyingId); + if (this.redis?.isOpen) { + const payloads = await this.redis.lRange(key, 0, CHART_LIMITS.overlay - 1); + const cached = parseJsonList(payloads, (value) => EquityPrintSchema.parse(value)); + if (cached.length > 0) { + this.overlayItems.set(key, cached); + this.overlayCursors.set(cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, cursorField))); + return; + } + } + + const fresh = (await fetchRecentEquityPrints(this.clickhouse, CHART_LIMITS.overlay)).filter( + (item) => item.underlying_id === underlyingId + ); + this.overlayItems.set(key, fresh); + const watermark = fresh[0] ? { ts: fresh[0].ts, seq: fresh[0].seq } : null; + this.overlayCursors.set(cursorField, watermark); + await this.persistList(key, cursorField, fresh, CHART_LIMITS.overlay, watermark); + } + + private async persistList( + listKey: string, + cursorField: string, + items: T[], + limit: number, + cursor: Cursor | null + ): Promise { + if (!this.redis?.isOpen) { + return; + } + + const payloads = items.map((item) => JSON.stringify(item)); + await this.redis.lTrim(listKey, 1, 0); + if (payloads.length > 0) { + for (let idx = payloads.length - 1; idx >= 0; idx -= 1) { + const payload = payloads[idx]; + if (payload) { + await this.redis.lPush(listKey, payload); + } + } + await this.redis.lTrim(listKey, 0, limit - 1); + } + await this.redis.hSet(CURSOR_HASH_KEY, cursorField, JSON.stringify(cursor)); + } +} diff --git a/services/api/tests/live.test.ts b/services/api/tests/live.test.ts new file mode 100644 index 0000000..bfda54d --- /dev/null +++ b/services/api/tests/live.test.ts @@ -0,0 +1,123 @@ +import { describe, expect, it } from "bun:test"; +import type { ClickHouseClient } from "@islandflow/storage"; +import { LiveStateManager } from "../src/live"; + +const makeClickHouse = (): ClickHouseClient => + ({ + exec: async () => {}, + insert: async () => {}, + ping: async () => ({ success: true }), + close: async () => {}, + query: async () => ({ + async json() { + return [] as T; + } + }) + }) as ClickHouseClient; + +const makeRedis = () => { + const lists = new Map(); + const hashes = new Map>(); + + return { + isOpen: true, + async lRange(key: string, start: number, stop: number) { + return (lists.get(key) ?? []).slice(start, stop + 1); + }, + async lPush(key: string, value: string) { + const next = lists.get(key) ?? []; + next.unshift(value); + lists.set(key, next); + return next.length; + }, + async lTrim(key: string, start: number, stop: number) { + const next = lists.get(key) ?? []; + lists.set(key, start > stop ? [] : next.slice(start, stop + 1)); + return "OK"; + }, + async hGet(key: string, field: string) { + return hashes.get(key)?.get(field) ?? null; + }, + async hSet(key: string, field: string, value: string) { + const hash = hashes.get(key) ?? new Map(); + hash.set(field, value); + hashes.set(key, hash); + return 1; + } + }; +}; + +describe("LiveStateManager", () => { + it("hydrates snapshots from redis generic windows", async () => { + const redis = makeRedis(); + await redis.lPush( + "live:flow", + JSON.stringify({ + source_ts: 100, + ingest_ts: 101, + seq: 1, + trace_id: "flow-1", + id: "flow-1", + members: ["a"], + features: {}, + join_quality: {} + }) + ); + await redis.hSet("live:cursors", "flow", JSON.stringify({ ts: 100, seq: 1 })); + + const manager = new LiveStateManager(makeClickHouse(), redis as never); + await manager.hydrate(); + const snapshot = await manager.getSnapshot({ channel: "flow" }); + + expect(snapshot.items).toHaveLength(1); + expect(snapshot.watermark).toEqual({ ts: 100, seq: 1 }); + expect(snapshot.next_before).toEqual({ ts: 100, seq: 1 }); + }); + + it("persists parameterized candle and overlay caches on ingest", async () => { + const redis = makeRedis(); + const manager = new LiveStateManager(makeClickHouse(), redis as never); + await manager.ingest("equity-candles", { + source_ts: 100, + ingest_ts: 101, + seq: 1, + trace_id: "candle:SPY:60000:100", + ts: 100, + interval_ms: 60000, + underlying_id: "SPY", + open: 1, + high: 2, + low: 1, + close: 2, + volume: 10, + trade_count: 1 + }); + await manager.ingest("equity-overlay", { + source_ts: 110, + ingest_ts: 111, + seq: 2, + trace_id: "eq-1", + ts: 110, + underlying_id: "SPY", + price: 10, + size: 5, + exchange: "X", + offExchangeFlag: true + }); + + const candleSnapshot = await manager.getSnapshot({ + channel: "equity-candles", + underlying_id: "SPY", + interval_ms: 60000 + }); + const overlaySnapshot = await manager.getSnapshot({ + channel: "equity-overlay", + underlying_id: "SPY" + }); + + expect(candleSnapshot.items).toHaveLength(1); + expect(overlaySnapshot.items).toHaveLength(1); + expect(candleSnapshot.watermark).toEqual({ ts: 100, seq: 1 }); + expect(overlaySnapshot.watermark).toEqual({ ts: 110, seq: 2 }); + }); +});