From debbc1046ba4e5d67686d537741eea2ba4033f7f Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Sun, 11 Jan 2026 16:44:34 -0500 Subject: [PATCH] Add replay source filter for option replay --- apps/web/app/page.tsx | 9 ++++++- packages/storage/src/clickhouse.ts | 39 ++++++++++++++++++++++++------ services/api/src/index.ts | 34 +++++++++++++++++++++++--- 3 files changed, 69 insertions(+), 13 deletions(-) diff --git a/apps/web/app/page.tsx b/apps/web/app/page.tsx index d496531..b6beebc 100644 --- a/apps/web/app/page.tsx +++ b/apps/web/app/page.tsx @@ -743,6 +743,9 @@ const useTape = ( try { const url = new URL(buildApiUrl(latestPath)); url.searchParams.set("limit", "1"); + if (replaySourceKey) { + url.searchParams.set("source", replaySourceKey); + } const response = await fetch(url.toString()); if (!response.ok) { throw new Error(`Replay baseline failed with ${response.status}`); @@ -763,7 +766,7 @@ const useTape = ( return () => { active = false; }; - }, [mode, latestPath, getItemTs]); + }, [mode, latestPath, getItemTs, replaySourceKey]); useEffect(() => { if (mode !== "live") { @@ -883,6 +886,10 @@ const useTape = ( url.searchParams.set("after_ts", cursor.ts.toString()); url.searchParams.set("after_seq", cursor.seq.toString()); url.searchParams.set("limit", batchSize.toString()); + const desiredSource = replaySourceKey ?? replaySourceRef.current; + if (desiredSource) { + url.searchParams.set("source", desiredSource); + } const response = await fetch(url.toString()); if (!response.ok) { diff --git a/packages/storage/src/clickhouse.ts b/packages/storage/src/clickhouse.ts index 8ed0aff..9a42b9f 100644 --- a/packages/storage/src/clickhouse.ts +++ b/packages/storage/src/clickhouse.ts @@ -331,6 +331,17 @@ const quoteString = (value: string): string => { return `'${escaped}'`; }; +const buildTracePrefixCondition = (tracePrefix: string | undefined): string | null => { + if (!tracePrefix) { + return null; + } + const normalized = tracePrefix.trim(); + if (!normalized) { + return null; + } + return `startsWith(trace_id, ${quoteString(normalized)})`; +}; + const normalizeNumericFields = ( row: Record, fields: string[] @@ -531,11 +542,14 @@ const normalizeAlertRow = (row: unknown): AlertRecord | null => { export const fetchRecentOptionPrints = async ( client: ClickHouseClient, - limit: number + limit: number, + tracePrefix?: string ): Promise => { const safeLimit = clampLimit(limit); + const condition = buildTracePrefixCondition(tracePrefix); + const whereClause = condition ? ` WHERE ${condition}` : ""; const result = await client.query({ - query: `SELECT * FROM ${OPTION_PRINTS_TABLE} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, + query: `SELECT * FROM ${OPTION_PRINTS_TABLE}${whereClause} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); @@ -545,11 +559,14 @@ export const fetchRecentOptionPrints = async ( export const fetchRecentOptionNBBO = async ( client: ClickHouseClient, - limit: number + limit: number, + tracePrefix?: string ): Promise => { const safeLimit = clampLimit(limit); + const condition = buildTracePrefixCondition(tracePrefix); + const whereClause = condition ? ` WHERE ${condition}` : ""; const result = await client.query({ - query: `SELECT * FROM ${OPTION_NBBO_TABLE} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, + query: `SELECT * FROM ${OPTION_NBBO_TABLE}${whereClause} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); @@ -697,14 +714,17 @@ export const fetchOptionPrintsAfter = async ( client: ClickHouseClient, afterTs: number, afterSeq: number, - limit: number + limit: number, + tracePrefix?: string ): Promise => { const safeLimit = clampLimit(limit); const safeAfterTs = clampCursor(afterTs); const safeAfterSeq = clampCursor(afterSeq); + const traceCondition = buildTracePrefixCondition(tracePrefix); + const traceClause = traceCondition ? ` AND ${traceCondition}` : ""; const result = await client.query({ - query: `SELECT * FROM ${OPTION_PRINTS_TABLE} WHERE (ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`, + query: `SELECT * FROM ${OPTION_PRINTS_TABLE} WHERE (ts, seq) > (${safeAfterTs}, ${safeAfterSeq})${traceClause} ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`, format: "JSONEachRow" }); @@ -716,14 +736,17 @@ export const fetchOptionNBBOAfter = async ( client: ClickHouseClient, afterTs: number, afterSeq: number, - limit: number + limit: number, + tracePrefix?: string ): Promise => { const safeLimit = clampLimit(limit); const safeAfterTs = clampCursor(afterTs); const safeAfterSeq = clampCursor(afterSeq); + const traceCondition = buildTracePrefixCondition(tracePrefix); + const traceClause = traceCondition ? ` AND ${traceCondition}` : ""; const result = await client.query({ - query: `SELECT * FROM ${OPTION_NBBO_TABLE} WHERE (ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`, + query: `SELECT * FROM ${OPTION_NBBO_TABLE} WHERE (ts, seq) > (${safeAfterTs}, ${safeAfterSeq})${traceClause} ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`, format: "JSONEachRow" }); diff --git a/services/api/src/index.ts b/services/api/src/index.ts index a345aba..d282b86 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -122,6 +122,14 @@ const replayParamsSchema = z.object({ after_seq: z.coerce.number().int().nonnegative().default(0), limit: z.coerce.number().int().positive().max(1000).default(200) }); + +const replaySourceSchema = z + .string() + .trim() + .min(1) + .max(64) + .regex(/^[A-Za-z0-9][A-Za-z0-9_-]*$/) + .transform((value) => value.toLowerCase()); const candleQuerySchema = z.object({ underlying_id: z.string().min(1), interval_ms: z.coerce.number().int().positive(), @@ -193,6 +201,20 @@ const parseReplayParams = (url: URL): { afterTs: number; afterSeq: number; limit }; }; +const parseReplaySource = (url: URL): string | null => { + const raw = url.searchParams.get("source"); + if (!raw) { + return null; + } + + const trimmed = raw.trim(); + if (!trimmed) { + return null; + } + + return replaySourceSchema.parse(trimmed); +}; + const parseBooleanParam = (value: string | null | undefined): boolean => { if (!value) { return false; @@ -756,13 +778,15 @@ const run = async () => { if (req.method === "GET" && url.pathname === "/prints/options") { const limit = parseLimit(url.searchParams.get("limit")); - const data = await fetchRecentOptionPrints(clickhouse, limit); + const source = parseReplaySource(url) ?? undefined; + const data = await fetchRecentOptionPrints(clickhouse, limit, source); return jsonResponse({ data }); } if (req.method === "GET" && url.pathname === "/nbbo/options") { const limit = parseLimit(url.searchParams.get("limit")); - const data = await fetchRecentOptionNBBO(clickhouse, limit); + const source = parseReplaySource(url) ?? undefined; + const data = await fetchRecentOptionNBBO(clickhouse, limit, source); return jsonResponse({ data }); } @@ -847,7 +871,8 @@ const run = async () => { if (req.method === "GET" && url.pathname === "/replay/options") { const { afterTs, afterSeq, limit } = parseReplayParams(url); - const data = await fetchOptionPrintsAfter(clickhouse, afterTs, afterSeq, limit); + const source = parseReplaySource(url) ?? undefined; + const data = await fetchOptionPrintsAfter(clickhouse, afterTs, afterSeq, limit, source); const last = data.at(-1); const next = last ? { ts: last.ts, seq: last.seq } : null; return jsonResponse({ data, next }); @@ -855,7 +880,8 @@ const run = async () => { if (req.method === "GET" && url.pathname === "/replay/nbbo") { const { afterTs, afterSeq, limit } = parseReplayParams(url); - const data = await fetchOptionNBBOAfter(clickhouse, afterTs, afterSeq, limit); + const source = parseReplaySource(url) ?? undefined; + const data = await fetchOptionNBBOAfter(clickhouse, afterTs, afterSeq, limit, source); const last = data.at(-1); const next = last ? { ts: last.ts, seq: last.seq } : null; return jsonResponse({ data, next });