resolve main merge conflicts for alert context and beads state
This commit is contained in:
commit
dc932cf18e
11 changed files with 843 additions and 153 deletions
|
|
@ -746,6 +746,13 @@ export type EquityPrintQueryFilters = {
|
|||
sinceTs?: number;
|
||||
};
|
||||
|
||||
export type AlertContextBundle = {
|
||||
alert: AlertEvent | null;
|
||||
flow_packets: FlowPacket[];
|
||||
option_prints: OptionPrint[];
|
||||
missing_refs: string[];
|
||||
};
|
||||
|
||||
const buildOptionPrintFilterConditions = (
|
||||
filters: OptionPrintQueryFilters | undefined,
|
||||
tracePrefix: string | undefined
|
||||
|
|
@ -1200,6 +1207,101 @@ export const fetchRecentAlerts = async (
|
|||
return AlertEventSchema.array().parse(alerts);
|
||||
};
|
||||
|
||||
const normalizeAlertEvidenceRefs = (refs: string[]): string[] => {
|
||||
return Array.from(new Set(refs.map((ref) => ref.trim()).filter(Boolean)));
|
||||
};
|
||||
|
||||
const flowPacketCandidatesFromRef = (ref: string): string[] => {
|
||||
if (!ref) {
|
||||
return [];
|
||||
}
|
||||
if (ref.startsWith("flowpacket:")) {
|
||||
const raw = ref.slice("flowpacket:".length);
|
||||
return raw ? [ref, raw] : [ref];
|
||||
}
|
||||
return [ref, `flowpacket:${ref}`];
|
||||
};
|
||||
|
||||
const optionPrintCandidatesFromRef = (ref: string): string[] => {
|
||||
if (!ref || ref.startsWith("flowpacket:")) {
|
||||
return [];
|
||||
}
|
||||
return [ref];
|
||||
};
|
||||
|
||||
export const fetchAlertContextByTraceId = async (
|
||||
client: ClickHouseClient,
|
||||
traceId: string
|
||||
): Promise<AlertContextBundle> => {
|
||||
const normalizedTraceId = traceId.trim();
|
||||
if (!normalizedTraceId) {
|
||||
return {
|
||||
alert: null,
|
||||
flow_packets: [],
|
||||
option_prints: [],
|
||||
missing_refs: []
|
||||
};
|
||||
}
|
||||
|
||||
const alertResult = await client.query({
|
||||
query: `SELECT * FROM ${ALERTS_TABLE} WHERE trace_id = ${quoteString(normalizedTraceId)} ORDER BY source_ts DESC, seq DESC LIMIT 1`,
|
||||
format: "JSONEachRow"
|
||||
});
|
||||
const alertRows = await alertResult.json<unknown[]>();
|
||||
const alertRecord = alertRows
|
||||
.map(normalizeAlertRow)
|
||||
.find((record): record is AlertRecord => record !== null);
|
||||
const alert = alertRecord ? AlertEventSchema.parse(fromAlertRecord(alertRecord)) : null;
|
||||
|
||||
if (!alert) {
|
||||
return {
|
||||
alert: null,
|
||||
flow_packets: [],
|
||||
option_prints: [],
|
||||
missing_refs: []
|
||||
};
|
||||
}
|
||||
|
||||
const refs = normalizeAlertEvidenceRefs(alert.evidence_refs);
|
||||
const packetLookupIds = Array.from(new Set(refs.flatMap(flowPacketCandidatesFromRef)));
|
||||
const printLookupIds = Array.from(new Set(refs.flatMap(optionPrintCandidatesFromRef)));
|
||||
|
||||
const [flowPackets, optionPrints] = await Promise.all([
|
||||
packetLookupIds.length > 0
|
||||
? client
|
||||
.query({
|
||||
query: `SELECT * FROM ${FLOW_PACKETS_TABLE} WHERE id IN (${buildStringList(packetLookupIds)}) ORDER BY source_ts DESC, seq DESC LIMIT ${clampLookupLimit(packetLookupIds.length)}`,
|
||||
format: "JSONEachRow"
|
||||
})
|
||||
.then(async (result) => {
|
||||
const rows = await result.json<unknown[]>();
|
||||
const records = rows
|
||||
.map(normalizeFlowPacketRow)
|
||||
.filter((record): record is FlowPacketRecord => record !== null);
|
||||
return FlowPacketSchema.array().parse(records.map(fromFlowPacketRecord));
|
||||
})
|
||||
: Promise.resolve([]),
|
||||
printLookupIds.length > 0
|
||||
? fetchOptionPrintsByTraceIds(client, printLookupIds)
|
||||
: Promise.resolve([])
|
||||
]);
|
||||
|
||||
const packetIds = new Set(flowPackets.flatMap((packet) => [packet.id, packet.trace_id]));
|
||||
const printIds = new Set(optionPrints.map((print) => print.trace_id));
|
||||
const missingRefs = refs.filter((ref) => {
|
||||
const packetResolved = flowPacketCandidatesFromRef(ref).some((candidate) => packetIds.has(candidate));
|
||||
const printResolved = optionPrintCandidatesFromRef(ref).some((candidate) => printIds.has(candidate));
|
||||
return !packetResolved && !printResolved;
|
||||
});
|
||||
|
||||
return {
|
||||
alert,
|
||||
flow_packets: flowPackets,
|
||||
option_prints: optionPrints,
|
||||
missing_refs: missingRefs
|
||||
};
|
||||
};
|
||||
|
||||
export const fetchOptionPrintsAfter = async (
|
||||
client: ClickHouseClient,
|
||||
afterTs: number,
|
||||
|
|
@ -1846,55 +1948,6 @@ export const fetchOptionPrintsByTraceIds = async (
|
|||
return OptionPrintSchema.array().parse(rows.map(normalizeOptionRow));
|
||||
};
|
||||
|
||||
export type AlertContextBundle = {
|
||||
alert: AlertEvent | null;
|
||||
flow_packets: FlowPacket[];
|
||||
option_prints: OptionPrint[];
|
||||
missing_refs: string[];
|
||||
};
|
||||
|
||||
export const fetchAlertContextByTraceId = async (
|
||||
client: ClickHouseClient,
|
||||
traceId: string
|
||||
): Promise<AlertContextBundle> => {
|
||||
const normalizedTraceId = traceId.trim();
|
||||
if (!normalizedTraceId) {
|
||||
return { alert: null, flow_packets: [], option_prints: [], missing_refs: [] };
|
||||
}
|
||||
|
||||
const alertResult = await client.query({
|
||||
query: `SELECT * FROM ${ALERTS_TABLE} WHERE trace_id = ${quoteString(normalizedTraceId)} ORDER BY source_ts DESC, seq DESC LIMIT 1`,
|
||||
format: "JSONEachRow"
|
||||
});
|
||||
const alertRows = await alertResult.json<unknown[]>();
|
||||
const alertRecord = alertRows
|
||||
.map(normalizeAlertRow)
|
||||
.find((row): row is AlertRecord => row !== null);
|
||||
const alert = alertRecord ? AlertEventSchema.parse(fromAlertRecord(alertRecord)) : null;
|
||||
if (!alert) {
|
||||
return { alert: null, flow_packets: [], option_prints: [], missing_refs: [] };
|
||||
}
|
||||
|
||||
const refs = Array.from(new Set(alert.evidence_refs.map((id) => id.trim()).filter(Boolean)));
|
||||
const packetIds = refs.filter((id) => id.startsWith("flowpacket:"));
|
||||
const printIds = refs.filter((id) => !id.startsWith("flowpacket:"));
|
||||
const [flow_packets, option_prints] = await Promise.all([
|
||||
packetIds.length > 0
|
||||
? fetchFlowPacketsByIds(client, packetIds)
|
||||
: Promise.resolve([] as FlowPacket[]),
|
||||
printIds.length > 0
|
||||
? fetchOptionPrintsByTraceIds(client, printIds)
|
||||
: Promise.resolve([] as OptionPrint[])
|
||||
]);
|
||||
|
||||
const resolvedRefs = new Set<string>([
|
||||
...flow_packets.map((packet) => packet.id),
|
||||
...option_prints.map((print) => print.trace_id)
|
||||
]);
|
||||
const missing_refs = refs.filter((id) => !resolvedRefs.has(id));
|
||||
return { alert, flow_packets, option_prints, missing_refs };
|
||||
};
|
||||
|
||||
export const fetchEquityPrintJoinsByIds = async (
|
||||
client: ClickHouseClient,
|
||||
ids: string[]
|
||||
|
|
|
|||
|
|
@ -1,5 +1,8 @@
|
|||
import { describe, expect, it } from "bun:test";
|
||||
import type { ClickHouseClient } from "../src/clickhouse";
|
||||
import { alertsTableDDL, ALERTS_TABLE, fromAlertRecord, toAlertRecord } from "../src/alerts";
|
||||
import { fetchAlertContextByTraceId } from "../src/clickhouse";
|
||||
import { toFlowPacketRecord } from "../src/flow-packets";
|
||||
|
||||
const alert = {
|
||||
source_ts: 10,
|
||||
|
|
@ -19,6 +22,62 @@ const alert = {
|
|||
evidence_refs: ["flowpacket:1", "print:1"]
|
||||
};
|
||||
|
||||
const packet = {
|
||||
source_ts: 11,
|
||||
ingest_ts: 21,
|
||||
seq: 2,
|
||||
trace_id: "flowpacket:1",
|
||||
id: "flowpacket:1",
|
||||
members: ["print:1"],
|
||||
features: {
|
||||
option_contract_id: "SPY-2026-06-19-500-C",
|
||||
count: 1,
|
||||
total_size: 50
|
||||
},
|
||||
join_quality: {}
|
||||
};
|
||||
|
||||
const print = {
|
||||
source_ts: 12,
|
||||
ingest_ts: 22,
|
||||
seq: 3,
|
||||
trace_id: "print:1",
|
||||
ts: 12,
|
||||
option_contract_id: "SPY-2026-06-19-500-C",
|
||||
price: 1.45,
|
||||
size: 50,
|
||||
exchange: "XTEST",
|
||||
conditions: [],
|
||||
nbbo_side: "A",
|
||||
execution_nbbo_bid: 1.4,
|
||||
execution_nbbo_ask: 1.5,
|
||||
execution_nbbo_mid: 1.45,
|
||||
execution_nbbo_spread: 0.1,
|
||||
execution_nbbo_age_ms: 14,
|
||||
execution_nbbo_side: "A",
|
||||
execution_underlying_spot: 500.25,
|
||||
execution_underlying_bid: 500.2,
|
||||
execution_underlying_ask: 500.3,
|
||||
execution_underlying_mid: 500.25,
|
||||
execution_underlying_age_ms: 9,
|
||||
execution_iv: 0.31,
|
||||
signal_reasons: [],
|
||||
signal_pass: true
|
||||
};
|
||||
|
||||
const makeClient = (resolver: (query: string) => unknown[]): ClickHouseClient =>
|
||||
({
|
||||
exec: async () => {},
|
||||
insert: async () => {},
|
||||
ping: async () => ({ success: true }),
|
||||
close: async () => {},
|
||||
query: async ({ query }: { query: string }) => ({
|
||||
async json<T>() {
|
||||
return resolver(query) as T;
|
||||
}
|
||||
})
|
||||
}) as ClickHouseClient;
|
||||
|
||||
describe("alerts storage helpers", () => {
|
||||
it("includes the correct table name in the DDL", () => {
|
||||
const ddl = alertsTableDDL();
|
||||
|
|
@ -33,4 +92,51 @@ describe("alerts storage helpers", () => {
|
|||
expect(restored.evidence_refs).toEqual(alert.evidence_refs);
|
||||
expect(restored.severity).toBe(alert.severity);
|
||||
});
|
||||
|
||||
it("fetches persisted alert context and reports unresolved refs", async () => {
|
||||
const contextAlert = {
|
||||
...alert,
|
||||
trace_id: "alert:ctx",
|
||||
evidence_refs: ["flowpacket:1", "print:1", "print:missing"]
|
||||
};
|
||||
const queries: string[] = [];
|
||||
const client = makeClient((query) => {
|
||||
queries.push(query);
|
||||
if (query.includes(ALERTS_TABLE)) {
|
||||
return [toAlertRecord(contextAlert)];
|
||||
}
|
||||
if (query.includes("flow_packets")) {
|
||||
return [toFlowPacketRecord(packet)];
|
||||
}
|
||||
if (query.includes("option_prints")) {
|
||||
return [print];
|
||||
}
|
||||
return [];
|
||||
});
|
||||
|
||||
const bundle = await fetchAlertContextByTraceId(client, "alert:ctx");
|
||||
|
||||
expect(bundle.alert?.trace_id).toBe("alert:ctx");
|
||||
expect(bundle.flow_packets.map((item) => item.id)).toEqual(["flowpacket:1"]);
|
||||
expect(bundle.option_prints.map((item) => item.trace_id)).toEqual(["print:1"]);
|
||||
expect(bundle.option_prints[0]?.execution_nbbo_side).toBe("A");
|
||||
expect(bundle.option_prints[0]?.execution_nbbo_bid).toBe(1.4);
|
||||
expect(bundle.option_prints[0]?.execution_underlying_spot).toBe(500.25);
|
||||
expect(bundle.option_prints[0]?.execution_iv).toBe(0.31);
|
||||
expect(bundle.missing_refs).toEqual(["print:missing"]);
|
||||
expect(queries[0]).toContain("trace_id = 'alert:ctx'");
|
||||
expect(queries[1]).toContain("id IN");
|
||||
expect(queries[2]).toContain("trace_id IN ('print:1', 'print:missing')");
|
||||
});
|
||||
|
||||
it("returns an empty context when the alert is missing", async () => {
|
||||
const bundle = await fetchAlertContextByTraceId(makeClient(() => []), "alert:missing");
|
||||
|
||||
expect(bundle).toEqual({
|
||||
alert: null,
|
||||
flow_packets: [],
|
||||
option_prints: [],
|
||||
missing_refs: []
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue