From 6951dddfdff3c73d68d9c3a07208d626e2663648 Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Fri, 9 Jan 2026 17:09:05 -0500 Subject: [PATCH] Add replay for flow alerts and hits --- apps/web/app/page.tsx | 78 ++++++++++++++++++------------ packages/storage/src/clickhouse.ts | 69 ++++++++++++++++++++++++++ services/api/src/index.ts | 27 +++++++++++ 3 files changed, 142 insertions(+), 32 deletions(-) diff --git a/apps/web/app/page.tsx b/apps/web/app/page.tsx index 01dd36c..8e182bb 100644 --- a/apps/web/app/page.tsx +++ b/apps/web/app/page.tsx @@ -1957,27 +1957,41 @@ export default function HomePage() { getReplayKey: disableReplayGrouping }); - const flowHold = useCallback(() => !flowScroll.isAtTopRef.current, [flowScroll.isAtTopRef]); - const flow = useFlowStream( - mode === "live", - flowScroll.onNewItems, - flowAnchor.capture, - flowHold, - flowScroll.resumeTick - ); - const alerts = useLiveStream({ - enabled: mode === "live", - wsPath: "/ws/alerts", - expectedType: "alert", - onNewItems: alertsScroll.onNewItems, - captureScroll: alertsAnchor.capture + const flow = useTape({ + mode, + wsPath: "/ws/flow", + replayPath: "/replay/flow", + latestPath: "/flow/packets", + expectedType: "flow-packet", + batchSize: mode === "replay" ? 120 : undefined, + pollMs: mode === "replay" ? 200 : undefined, + captureScroll: flowAnchor.capture, + onNewItems: flowScroll.onNewItems, + getReplayKey: disableReplayGrouping }); - const classifierHits = useLiveStream({ - enabled: mode === "live", + const alerts = useTape({ + mode, + wsPath: "/ws/alerts", + replayPath: "/replay/alerts", + latestPath: "/flow/alerts", + expectedType: "alert", + batchSize: mode === "replay" ? 120 : undefined, + pollMs: mode === "replay" ? 200 : undefined, + captureScroll: alertsAnchor.capture, + onNewItems: alertsScroll.onNewItems, + getReplayKey: disableReplayGrouping + }); + const classifierHits = useTape({ + mode, wsPath: "/ws/classifier-hits", + replayPath: "/replay/classifier-hits", + latestPath: "/flow/classifier-hits", expectedType: "classifier-hit", + batchSize: mode === "replay" ? 120 : undefined, + pollMs: mode === "replay" ? 200 : undefined, + captureScroll: classifierAnchor.capture, onNewItems: classifierScroll.onNewItems, - captureScroll: classifierAnchor.capture + getReplayKey: disableReplayGrouping }); useLayoutEffect(() => { @@ -2498,7 +2512,7 @@ export default function HomePage() {

Flow Packets

-

Deterministic clusters (live only).

+

Deterministic clusters.

@@ -2521,13 +2535,13 @@ export default function HomePage() {
- {mode !== "live" ? ( -
Flow packets are live-only in this build.
- ) : filteredFlow.length === 0 ? ( + {filteredFlow.length === 0 ? (
{tickerSet.size > 0 ? "No flow packets match the current filter." - : "No flow packets yet. Start compute."} + : mode === "live" + ? "No flow packets yet. Start compute." + : "Replay queue empty. Ensure ClickHouse has data."}
) : ( filteredFlow.map((packet) => { @@ -2640,7 +2654,7 @@ export default function HomePage() { replayComplete={alerts.replayComplete} paused={alerts.paused} dropped={alerts.dropped} - mode="live" + mode={mode} onTogglePause={alerts.togglePause} />
- {mode !== "live" ? ( -
Alerts are live-only in this build.
- ) : filteredAlerts.length === 0 ? ( + {filteredAlerts.length === 0 ? (
{tickerSet.size > 0 ? "No alerts match the current filter." - : "No alerts yet. Start compute."} + : mode === "live" + ? "No alerts yet. Start compute." + : "Replay queue empty. Ensure ClickHouse has data."}
) : ( filteredAlerts.map((alert) => { @@ -2716,7 +2730,7 @@ export default function HomePage() { replayComplete={classifierHits.replayComplete} paused={classifierHits.paused} dropped={classifierHits.dropped} - mode="live" + mode={mode} onTogglePause={classifierHits.togglePause} />
- {mode !== "live" ? ( -
Classifier hits are live-only in this build.
- ) : filteredClassifierHits.length === 0 ? ( + {filteredClassifierHits.length === 0 ? (
{tickerSet.size > 0 ? "No classifier hits match the current filter." - : "No classifier hits yet. Start compute."} + : mode === "live" + ? "No classifier hits yet. Start compute." + : "Replay queue empty. Ensure ClickHouse has data."}
) : ( filteredClassifierHits.map((hit) => { diff --git a/packages/storage/src/clickhouse.ts b/packages/storage/src/clickhouse.ts index 594dec9..8ed0aff 100644 --- a/packages/storage/src/clickhouse.ts +++ b/packages/storage/src/clickhouse.ts @@ -860,3 +860,72 @@ export const fetchInferredDarkAfter = async ( const events = records.map(fromInferredDarkRecord); return InferredDarkEventSchema.array().parse(events); }; + +export const fetchFlowPacketsAfter = async ( + client: ClickHouseClient, + afterTs: number, + afterSeq: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const safeAfterTs = clampCursor(afterTs); + const safeAfterSeq = clampCursor(afterSeq); + + const result = await client.query({ + query: `SELECT * FROM ${FLOW_PACKETS_TABLE} WHERE (source_ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY source_ts ASC, seq ASC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeFlowPacketRow) + .filter((record): record is FlowPacketRecord => record !== null); + const packets = records.map(fromFlowPacketRecord); + return FlowPacketSchema.array().parse(packets); +}; + +export const fetchClassifierHitsAfter = async ( + client: ClickHouseClient, + afterTs: number, + afterSeq: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const safeAfterTs = clampCursor(afterTs); + const safeAfterSeq = clampCursor(afterSeq); + + const result = await client.query({ + query: `SELECT * FROM ${CLASSIFIER_HITS_TABLE} WHERE (source_ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY source_ts ASC, seq ASC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeClassifierHitRow) + .filter((record): record is ClassifierHitRecord => record !== null); + const hits = records.map(fromClassifierHitRecord); + return ClassifierHitEventSchema.array().parse(hits); +}; + +export const fetchAlertsAfter = async ( + client: ClickHouseClient, + afterTs: number, + afterSeq: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const safeAfterTs = clampCursor(afterTs); + const safeAfterSeq = clampCursor(afterSeq); + + const result = await client.query({ + query: `SELECT * FROM ${ALERTS_TABLE} WHERE (source_ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY source_ts ASC, seq ASC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeAlertRow) + .filter((record): record is AlertRecord => record !== null); + const alerts = records.map(fromAlertRecord); + return AlertEventSchema.array().parse(alerts); +}; diff --git a/services/api/src/index.ts b/services/api/src/index.ts index df57d59..a345aba 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -38,6 +38,9 @@ import { ensureFlowPacketsTable, ensureOptionNBBOTable, ensureOptionPrintsTable, + fetchAlertsAfter, + fetchClassifierHitsAfter, + fetchFlowPacketsAfter, fetchRecentAlerts, fetchRecentClassifierHits, fetchRecentEquityPrintJoins, @@ -916,6 +919,30 @@ const run = async () => { return jsonResponse({ data, next }); } + if (req.method === "GET" && url.pathname === "/replay/flow") { + const { afterTs, afterSeq, limit } = parseReplayParams(url); + const data = await fetchFlowPacketsAfter(clickhouse, afterTs, afterSeq, limit); + const last = data.at(-1); + const next = last ? { ts: last.source_ts, seq: last.seq } : null; + return jsonResponse({ data, next }); + } + + if (req.method === "GET" && url.pathname === "/replay/classifier-hits") { + const { afterTs, afterSeq, limit } = parseReplayParams(url); + const data = await fetchClassifierHitsAfter(clickhouse, afterTs, afterSeq, limit); + const last = data.at(-1); + const next = last ? { ts: last.source_ts, seq: last.seq } : null; + return jsonResponse({ data, next }); + } + + if (req.method === "GET" && url.pathname === "/replay/alerts") { + const { afterTs, afterSeq, limit } = parseReplayParams(url); + const data = await fetchAlertsAfter(clickhouse, afterTs, afterSeq, limit); + const last = data.at(-1); + const next = last ? { ts: last.source_ts, seq: last.seq } : null; + return jsonResponse({ data, next }); + } + if (req.method === "GET" && url.pathname === "/ws/options") { if (serverRef.upgrade(req, { data: { channel: "options" } })) { return new Response(null, { status: 101 });