hydrate alert evidence from clickhouse

This commit is contained in:
dirtydishes 2026-05-17 11:02:30 -04:00
parent cd0a1dd9e5
commit c0b5b6dbeb
10 changed files with 701 additions and 62 deletions

View file

@ -746,6 +746,13 @@ export type EquityPrintQueryFilters = {
sinceTs?: number;
};
export type AlertContextBundle = {
alert: AlertEvent | null;
flow_packets: FlowPacket[];
option_prints: OptionPrint[];
missing_refs: string[];
};
const buildOptionPrintFilterConditions = (
filters: OptionPrintQueryFilters | undefined,
tracePrefix: string | undefined
@ -1200,6 +1207,101 @@ export const fetchRecentAlerts = async (
return AlertEventSchema.array().parse(alerts);
};
const normalizeAlertEvidenceRefs = (refs: string[]): string[] => {
return Array.from(new Set(refs.map((ref) => ref.trim()).filter(Boolean)));
};
const flowPacketCandidatesFromRef = (ref: string): string[] => {
if (!ref) {
return [];
}
if (ref.startsWith("flowpacket:")) {
const raw = ref.slice("flowpacket:".length);
return raw ? [ref, raw] : [ref];
}
return [ref, `flowpacket:${ref}`];
};
const optionPrintCandidatesFromRef = (ref: string): string[] => {
if (!ref || ref.startsWith("flowpacket:")) {
return [];
}
return [ref];
};
export const fetchAlertContextByTraceId = async (
client: ClickHouseClient,
traceId: string
): Promise<AlertContextBundle> => {
const normalizedTraceId = traceId.trim();
if (!normalizedTraceId) {
return {
alert: null,
flow_packets: [],
option_prints: [],
missing_refs: []
};
}
const alertResult = await client.query({
query: `SELECT * FROM ${ALERTS_TABLE} WHERE trace_id = ${quoteString(normalizedTraceId)} ORDER BY source_ts DESC, seq DESC LIMIT 1`,
format: "JSONEachRow"
});
const alertRows = await alertResult.json<unknown[]>();
const alertRecord = alertRows
.map(normalizeAlertRow)
.find((record): record is AlertRecord => record !== null);
const alert = alertRecord ? AlertEventSchema.parse(fromAlertRecord(alertRecord)) : null;
if (!alert) {
return {
alert: null,
flow_packets: [],
option_prints: [],
missing_refs: []
};
}
const refs = normalizeAlertEvidenceRefs(alert.evidence_refs);
const packetLookupIds = Array.from(new Set(refs.flatMap(flowPacketCandidatesFromRef)));
const printLookupIds = Array.from(new Set(refs.flatMap(optionPrintCandidatesFromRef)));
const [flowPackets, optionPrints] = await Promise.all([
packetLookupIds.length > 0
? client
.query({
query: `SELECT * FROM ${FLOW_PACKETS_TABLE} WHERE id IN (${buildStringList(packetLookupIds)}) ORDER BY source_ts DESC, seq DESC LIMIT ${clampLookupLimit(packetLookupIds.length)}`,
format: "JSONEachRow"
})
.then(async (result) => {
const rows = await result.json<unknown[]>();
const records = rows
.map(normalizeFlowPacketRow)
.filter((record): record is FlowPacketRecord => record !== null);
return FlowPacketSchema.array().parse(records.map(fromFlowPacketRecord));
})
: Promise.resolve([]),
printLookupIds.length > 0
? fetchOptionPrintsByTraceIds(client, printLookupIds)
: Promise.resolve([])
]);
const packetIds = new Set(flowPackets.flatMap((packet) => [packet.id, packet.trace_id]));
const printIds = new Set(optionPrints.map((print) => print.trace_id));
const missingRefs = refs.filter((ref) => {
const packetResolved = flowPacketCandidatesFromRef(ref).some((candidate) => packetIds.has(candidate));
const printResolved = optionPrintCandidatesFromRef(ref).some((candidate) => printIds.has(candidate));
return !packetResolved && !printResolved;
});
return {
alert,
flow_packets: flowPackets,
option_prints: optionPrints,
missing_refs: missingRefs
};
};
export const fetchOptionPrintsAfter = async (
client: ClickHouseClient,
afterTs: number,