Add replay for flow alerts and hits

This commit is contained in:
dirtydishes 2026-01-09 17:09:05 -05:00
parent 2752025fbc
commit 6951dddfdf
3 changed files with 142 additions and 32 deletions

View file

@ -1957,27 +1957,41 @@ export default function HomePage() {
getReplayKey: disableReplayGrouping getReplayKey: disableReplayGrouping
}); });
const flowHold = useCallback(() => !flowScroll.isAtTopRef.current, [flowScroll.isAtTopRef]); const flow = useTape<FlowPacket>({
const flow = useFlowStream( mode,
mode === "live", wsPath: "/ws/flow",
flowScroll.onNewItems, replayPath: "/replay/flow",
flowAnchor.capture, latestPath: "/flow/packets",
flowHold, expectedType: "flow-packet",
flowScroll.resumeTick batchSize: mode === "replay" ? 120 : undefined,
); pollMs: mode === "replay" ? 200 : undefined,
const alerts = useLiveStream<AlertEvent>({ captureScroll: flowAnchor.capture,
enabled: mode === "live", onNewItems: flowScroll.onNewItems,
wsPath: "/ws/alerts", getReplayKey: disableReplayGrouping
expectedType: "alert",
onNewItems: alertsScroll.onNewItems,
captureScroll: alertsAnchor.capture
}); });
const classifierHits = useLiveStream<ClassifierHitEvent>({ const alerts = useTape<AlertEvent>({
enabled: mode === "live", mode,
wsPath: "/ws/alerts",
replayPath: "/replay/alerts",
latestPath: "/flow/alerts",
expectedType: "alert",
batchSize: mode === "replay" ? 120 : undefined,
pollMs: mode === "replay" ? 200 : undefined,
captureScroll: alertsAnchor.capture,
onNewItems: alertsScroll.onNewItems,
getReplayKey: disableReplayGrouping
});
const classifierHits = useTape<ClassifierHitEvent>({
mode,
wsPath: "/ws/classifier-hits", wsPath: "/ws/classifier-hits",
replayPath: "/replay/classifier-hits",
latestPath: "/flow/classifier-hits",
expectedType: "classifier-hit", expectedType: "classifier-hit",
batchSize: mode === "replay" ? 120 : undefined,
pollMs: mode === "replay" ? 200 : undefined,
captureScroll: classifierAnchor.capture,
onNewItems: classifierScroll.onNewItems, onNewItems: classifierScroll.onNewItems,
captureScroll: classifierAnchor.capture getReplayKey: disableReplayGrouping
}); });
useLayoutEffect(() => { useLayoutEffect(() => {
@ -2498,7 +2512,7 @@ export default function HomePage() {
<div className="card-header"> <div className="card-header">
<div> <div>
<h2>Flow Packets</h2> <h2>Flow Packets</h2>
<p className="card-subtitle">Deterministic clusters (live only).</p> <p className="card-subtitle">Deterministic clusters.</p>
</div> </div>
</div> </div>
<div className="card-controls"> <div className="card-controls">
@ -2521,13 +2535,13 @@ export default function HomePage() {
<div className="card-body"> <div className="card-body">
<div className="list" ref={flowScroll.listRef}> <div className="list" ref={flowScroll.listRef}>
{mode !== "live" ? ( {filteredFlow.length === 0 ? (
<div className="empty">Flow packets are live-only in this build.</div>
) : filteredFlow.length === 0 ? (
<div className="empty"> <div className="empty">
{tickerSet.size > 0 {tickerSet.size > 0
? "No flow packets match the current filter." ? "No flow packets match the current filter."
: "No flow packets yet. Start compute."} : mode === "live"
? "No flow packets yet. Start compute."
: "Replay queue empty. Ensure ClickHouse has data."}
</div> </div>
) : ( ) : (
filteredFlow.map((packet) => { filteredFlow.map((packet) => {
@ -2640,7 +2654,7 @@ export default function HomePage() {
replayComplete={alerts.replayComplete} replayComplete={alerts.replayComplete}
paused={alerts.paused} paused={alerts.paused}
dropped={alerts.dropped} dropped={alerts.dropped}
mode="live" mode={mode}
onTogglePause={alerts.togglePause} onTogglePause={alerts.togglePause}
/> />
<TapeControls <TapeControls
@ -2653,13 +2667,13 @@ export default function HomePage() {
<div className="card-body"> <div className="card-body">
<AlertSeverityStrip alerts={filteredAlerts} /> <AlertSeverityStrip alerts={filteredAlerts} />
<div className="list" ref={alertsScroll.listRef}> <div className="list" ref={alertsScroll.listRef}>
{mode !== "live" ? ( {filteredAlerts.length === 0 ? (
<div className="empty">Alerts are live-only in this build.</div>
) : filteredAlerts.length === 0 ? (
<div className="empty"> <div className="empty">
{tickerSet.size > 0 {tickerSet.size > 0
? "No alerts match the current filter." ? "No alerts match the current filter."
: "No alerts yet. Start compute."} : mode === "live"
? "No alerts yet. Start compute."
: "Replay queue empty. Ensure ClickHouse has data."}
</div> </div>
) : ( ) : (
filteredAlerts.map((alert) => { filteredAlerts.map((alert) => {
@ -2716,7 +2730,7 @@ export default function HomePage() {
replayComplete={classifierHits.replayComplete} replayComplete={classifierHits.replayComplete}
paused={classifierHits.paused} paused={classifierHits.paused}
dropped={classifierHits.dropped} dropped={classifierHits.dropped}
mode="live" mode={mode}
onTogglePause={classifierHits.togglePause} onTogglePause={classifierHits.togglePause}
/> />
<TapeControls <TapeControls
@ -2728,13 +2742,13 @@ export default function HomePage() {
<div className="card-body"> <div className="card-body">
<div className="list" ref={classifierScroll.listRef}> <div className="list" ref={classifierScroll.listRef}>
{mode !== "live" ? ( {filteredClassifierHits.length === 0 ? (
<div className="empty">Classifier hits are live-only in this build.</div>
) : filteredClassifierHits.length === 0 ? (
<div className="empty"> <div className="empty">
{tickerSet.size > 0 {tickerSet.size > 0
? "No classifier hits match the current filter." ? "No classifier hits match the current filter."
: "No classifier hits yet. Start compute."} : mode === "live"
? "No classifier hits yet. Start compute."
: "Replay queue empty. Ensure ClickHouse has data."}
</div> </div>
) : ( ) : (
filteredClassifierHits.map((hit) => { filteredClassifierHits.map((hit) => {

View file

@ -860,3 +860,72 @@ export const fetchInferredDarkAfter = async (
const events = records.map(fromInferredDarkRecord); const events = records.map(fromInferredDarkRecord);
return InferredDarkEventSchema.array().parse(events); return InferredDarkEventSchema.array().parse(events);
}; };
export const fetchFlowPacketsAfter = async (
client: ClickHouseClient,
afterTs: number,
afterSeq: number,
limit: number
): Promise<FlowPacket[]> => {
const safeLimit = clampLimit(limit);
const safeAfterTs = clampCursor(afterTs);
const safeAfterSeq = clampCursor(afterSeq);
const result = await client.query({
query: `SELECT * FROM ${FLOW_PACKETS_TABLE} WHERE (source_ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY source_ts ASC, seq ASC LIMIT ${safeLimit}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const records = rows
.map(normalizeFlowPacketRow)
.filter((record): record is FlowPacketRecord => record !== null);
const packets = records.map(fromFlowPacketRecord);
return FlowPacketSchema.array().parse(packets);
};
export const fetchClassifierHitsAfter = async (
client: ClickHouseClient,
afterTs: number,
afterSeq: number,
limit: number
): Promise<ClassifierHitEvent[]> => {
const safeLimit = clampLimit(limit);
const safeAfterTs = clampCursor(afterTs);
const safeAfterSeq = clampCursor(afterSeq);
const result = await client.query({
query: `SELECT * FROM ${CLASSIFIER_HITS_TABLE} WHERE (source_ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY source_ts ASC, seq ASC LIMIT ${safeLimit}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const records = rows
.map(normalizeClassifierHitRow)
.filter((record): record is ClassifierHitRecord => record !== null);
const hits = records.map(fromClassifierHitRecord);
return ClassifierHitEventSchema.array().parse(hits);
};
export const fetchAlertsAfter = async (
client: ClickHouseClient,
afterTs: number,
afterSeq: number,
limit: number
): Promise<AlertEvent[]> => {
const safeLimit = clampLimit(limit);
const safeAfterTs = clampCursor(afterTs);
const safeAfterSeq = clampCursor(afterSeq);
const result = await client.query({
query: `SELECT * FROM ${ALERTS_TABLE} WHERE (source_ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY source_ts ASC, seq ASC LIMIT ${safeLimit}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const records = rows
.map(normalizeAlertRow)
.filter((record): record is AlertRecord => record !== null);
const alerts = records.map(fromAlertRecord);
return AlertEventSchema.array().parse(alerts);
};

View file

@ -38,6 +38,9 @@ import {
ensureFlowPacketsTable, ensureFlowPacketsTable,
ensureOptionNBBOTable, ensureOptionNBBOTable,
ensureOptionPrintsTable, ensureOptionPrintsTable,
fetchAlertsAfter,
fetchClassifierHitsAfter,
fetchFlowPacketsAfter,
fetchRecentAlerts, fetchRecentAlerts,
fetchRecentClassifierHits, fetchRecentClassifierHits,
fetchRecentEquityPrintJoins, fetchRecentEquityPrintJoins,
@ -916,6 +919,30 @@ const run = async () => {
return jsonResponse({ data, next }); return jsonResponse({ data, next });
} }
if (req.method === "GET" && url.pathname === "/replay/flow") {
const { afterTs, afterSeq, limit } = parseReplayParams(url);
const data = await fetchFlowPacketsAfter(clickhouse, afterTs, afterSeq, limit);
const last = data.at(-1);
const next = last ? { ts: last.source_ts, seq: last.seq } : null;
return jsonResponse({ data, next });
}
if (req.method === "GET" && url.pathname === "/replay/classifier-hits") {
const { afterTs, afterSeq, limit } = parseReplayParams(url);
const data = await fetchClassifierHitsAfter(clickhouse, afterTs, afterSeq, limit);
const last = data.at(-1);
const next = last ? { ts: last.source_ts, seq: last.seq } : null;
return jsonResponse({ data, next });
}
if (req.method === "GET" && url.pathname === "/replay/alerts") {
const { afterTs, afterSeq, limit } = parseReplayParams(url);
const data = await fetchAlertsAfter(clickhouse, afterTs, afterSeq, limit);
const last = data.at(-1);
const next = last ? { ts: last.source_ts, seq: last.seq } : null;
return jsonResponse({ data, next });
}
if (req.method === "GET" && url.pathname === "/ws/options") { if (req.method === "GET" && url.pathname === "/ws/options") {
if (serverRef.upgrade(req, { data: { channel: "options" } })) { if (serverRef.upgrade(req, { data: { channel: "options" } })) {
return new Response(null, { status: 101 }); return new Response(null, { status: 101 });