Add replay source filter for option replay

This commit is contained in:
dirtydishes 2026-01-11 16:44:34 -05:00
parent 04188851b3
commit debbc1046b
3 changed files with 69 additions and 13 deletions

View file

@ -743,6 +743,9 @@ const useTape = <T extends SortableItem & { seq: number }>(
try { try {
const url = new URL(buildApiUrl(latestPath)); const url = new URL(buildApiUrl(latestPath));
url.searchParams.set("limit", "1"); url.searchParams.set("limit", "1");
if (replaySourceKey) {
url.searchParams.set("source", replaySourceKey);
}
const response = await fetch(url.toString()); const response = await fetch(url.toString());
if (!response.ok) { if (!response.ok) {
throw new Error(`Replay baseline failed with ${response.status}`); throw new Error(`Replay baseline failed with ${response.status}`);
@ -763,7 +766,7 @@ const useTape = <T extends SortableItem & { seq: number }>(
return () => { return () => {
active = false; active = false;
}; };
}, [mode, latestPath, getItemTs]); }, [mode, latestPath, getItemTs, replaySourceKey]);
useEffect(() => { useEffect(() => {
if (mode !== "live") { if (mode !== "live") {
@ -883,6 +886,10 @@ const useTape = <T extends SortableItem & { seq: number }>(
url.searchParams.set("after_ts", cursor.ts.toString()); url.searchParams.set("after_ts", cursor.ts.toString());
url.searchParams.set("after_seq", cursor.seq.toString()); url.searchParams.set("after_seq", cursor.seq.toString());
url.searchParams.set("limit", batchSize.toString()); url.searchParams.set("limit", batchSize.toString());
const desiredSource = replaySourceKey ?? replaySourceRef.current;
if (desiredSource) {
url.searchParams.set("source", desiredSource);
}
const response = await fetch(url.toString()); const response = await fetch(url.toString());
if (!response.ok) { if (!response.ok) {

View file

@ -331,6 +331,17 @@ const quoteString = (value: string): string => {
return `'${escaped}'`; return `'${escaped}'`;
}; };
const buildTracePrefixCondition = (tracePrefix: string | undefined): string | null => {
if (!tracePrefix) {
return null;
}
const normalized = tracePrefix.trim();
if (!normalized) {
return null;
}
return `startsWith(trace_id, ${quoteString(normalized)})`;
};
const normalizeNumericFields = ( const normalizeNumericFields = (
row: Record<string, unknown>, row: Record<string, unknown>,
fields: string[] fields: string[]
@ -531,11 +542,14 @@ const normalizeAlertRow = (row: unknown): AlertRecord | null => {
export const fetchRecentOptionPrints = async ( export const fetchRecentOptionPrints = async (
client: ClickHouseClient, client: ClickHouseClient,
limit: number limit: number,
tracePrefix?: string
): Promise<OptionPrint[]> => { ): Promise<OptionPrint[]> => {
const safeLimit = clampLimit(limit); const safeLimit = clampLimit(limit);
const condition = buildTracePrefixCondition(tracePrefix);
const whereClause = condition ? ` WHERE ${condition}` : "";
const result = await client.query({ const result = await client.query({
query: `SELECT * FROM ${OPTION_PRINTS_TABLE} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, query: `SELECT * FROM ${OPTION_PRINTS_TABLE}${whereClause} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`,
format: "JSONEachRow" format: "JSONEachRow"
}); });
@ -545,11 +559,14 @@ export const fetchRecentOptionPrints = async (
export const fetchRecentOptionNBBO = async ( export const fetchRecentOptionNBBO = async (
client: ClickHouseClient, client: ClickHouseClient,
limit: number limit: number,
tracePrefix?: string
): Promise<OptionNBBO[]> => { ): Promise<OptionNBBO[]> => {
const safeLimit = clampLimit(limit); const safeLimit = clampLimit(limit);
const condition = buildTracePrefixCondition(tracePrefix);
const whereClause = condition ? ` WHERE ${condition}` : "";
const result = await client.query({ const result = await client.query({
query: `SELECT * FROM ${OPTION_NBBO_TABLE} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, query: `SELECT * FROM ${OPTION_NBBO_TABLE}${whereClause} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`,
format: "JSONEachRow" format: "JSONEachRow"
}); });
@ -697,14 +714,17 @@ export const fetchOptionPrintsAfter = async (
client: ClickHouseClient, client: ClickHouseClient,
afterTs: number, afterTs: number,
afterSeq: number, afterSeq: number,
limit: number limit: number,
tracePrefix?: string
): Promise<OptionPrint[]> => { ): Promise<OptionPrint[]> => {
const safeLimit = clampLimit(limit); const safeLimit = clampLimit(limit);
const safeAfterTs = clampCursor(afterTs); const safeAfterTs = clampCursor(afterTs);
const safeAfterSeq = clampCursor(afterSeq); const safeAfterSeq = clampCursor(afterSeq);
const traceCondition = buildTracePrefixCondition(tracePrefix);
const traceClause = traceCondition ? ` AND ${traceCondition}` : "";
const result = await client.query({ const result = await client.query({
query: `SELECT * FROM ${OPTION_PRINTS_TABLE} WHERE (ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`, query: `SELECT * FROM ${OPTION_PRINTS_TABLE} WHERE (ts, seq) > (${safeAfterTs}, ${safeAfterSeq})${traceClause} ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`,
format: "JSONEachRow" format: "JSONEachRow"
}); });
@ -716,14 +736,17 @@ export const fetchOptionNBBOAfter = async (
client: ClickHouseClient, client: ClickHouseClient,
afterTs: number, afterTs: number,
afterSeq: number, afterSeq: number,
limit: number limit: number,
tracePrefix?: string
): Promise<OptionNBBO[]> => { ): Promise<OptionNBBO[]> => {
const safeLimit = clampLimit(limit); const safeLimit = clampLimit(limit);
const safeAfterTs = clampCursor(afterTs); const safeAfterTs = clampCursor(afterTs);
const safeAfterSeq = clampCursor(afterSeq); const safeAfterSeq = clampCursor(afterSeq);
const traceCondition = buildTracePrefixCondition(tracePrefix);
const traceClause = traceCondition ? ` AND ${traceCondition}` : "";
const result = await client.query({ const result = await client.query({
query: `SELECT * FROM ${OPTION_NBBO_TABLE} WHERE (ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`, query: `SELECT * FROM ${OPTION_NBBO_TABLE} WHERE (ts, seq) > (${safeAfterTs}, ${safeAfterSeq})${traceClause} ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`,
format: "JSONEachRow" format: "JSONEachRow"
}); });

View file

@ -122,6 +122,14 @@ const replayParamsSchema = z.object({
after_seq: z.coerce.number().int().nonnegative().default(0), after_seq: z.coerce.number().int().nonnegative().default(0),
limit: z.coerce.number().int().positive().max(1000).default(200) limit: z.coerce.number().int().positive().max(1000).default(200)
}); });
const replaySourceSchema = z
.string()
.trim()
.min(1)
.max(64)
.regex(/^[A-Za-z0-9][A-Za-z0-9_-]*$/)
.transform((value) => value.toLowerCase());
const candleQuerySchema = z.object({ const candleQuerySchema = z.object({
underlying_id: z.string().min(1), underlying_id: z.string().min(1),
interval_ms: z.coerce.number().int().positive(), interval_ms: z.coerce.number().int().positive(),
@ -193,6 +201,20 @@ const parseReplayParams = (url: URL): { afterTs: number; afterSeq: number; limit
}; };
}; };
const parseReplaySource = (url: URL): string | null => {
const raw = url.searchParams.get("source");
if (!raw) {
return null;
}
const trimmed = raw.trim();
if (!trimmed) {
return null;
}
return replaySourceSchema.parse(trimmed);
};
const parseBooleanParam = (value: string | null | undefined): boolean => { const parseBooleanParam = (value: string | null | undefined): boolean => {
if (!value) { if (!value) {
return false; return false;
@ -756,13 +778,15 @@ const run = async () => {
if (req.method === "GET" && url.pathname === "/prints/options") { if (req.method === "GET" && url.pathname === "/prints/options") {
const limit = parseLimit(url.searchParams.get("limit")); const limit = parseLimit(url.searchParams.get("limit"));
const data = await fetchRecentOptionPrints(clickhouse, limit); const source = parseReplaySource(url) ?? undefined;
const data = await fetchRecentOptionPrints(clickhouse, limit, source);
return jsonResponse({ data }); return jsonResponse({ data });
} }
if (req.method === "GET" && url.pathname === "/nbbo/options") { if (req.method === "GET" && url.pathname === "/nbbo/options") {
const limit = parseLimit(url.searchParams.get("limit")); const limit = parseLimit(url.searchParams.get("limit"));
const data = await fetchRecentOptionNBBO(clickhouse, limit); const source = parseReplaySource(url) ?? undefined;
const data = await fetchRecentOptionNBBO(clickhouse, limit, source);
return jsonResponse({ data }); return jsonResponse({ data });
} }
@ -847,7 +871,8 @@ const run = async () => {
if (req.method === "GET" && url.pathname === "/replay/options") { if (req.method === "GET" && url.pathname === "/replay/options") {
const { afterTs, afterSeq, limit } = parseReplayParams(url); const { afterTs, afterSeq, limit } = parseReplayParams(url);
const data = await fetchOptionPrintsAfter(clickhouse, afterTs, afterSeq, limit); const source = parseReplaySource(url) ?? undefined;
const data = await fetchOptionPrintsAfter(clickhouse, afterTs, afterSeq, limit, source);
const last = data.at(-1); const last = data.at(-1);
const next = last ? { ts: last.ts, seq: last.seq } : null; const next = last ? { ts: last.ts, seq: last.seq } : null;
return jsonResponse({ data, next }); return jsonResponse({ data, next });
@ -855,7 +880,8 @@ const run = async () => {
if (req.method === "GET" && url.pathname === "/replay/nbbo") { if (req.method === "GET" && url.pathname === "/replay/nbbo") {
const { afterTs, afterSeq, limit } = parseReplayParams(url); const { afterTs, afterSeq, limit } = parseReplayParams(url);
const data = await fetchOptionNBBOAfter(clickhouse, afterTs, afterSeq, limit); const source = parseReplaySource(url) ?? undefined;
const data = await fetchOptionNBBOAfter(clickhouse, afterTs, afterSeq, limit, source);
const last = data.at(-1); const last = data.at(-1);
const next = last ? { ts: last.ts, seq: last.seq } : null; const next = last ? { ts: last.ts, seq: last.seq } : null;
return jsonResponse({ data, next }); return jsonResponse({ data, next });