import { AlertEventSchema, ClassifierHitEventSchema, EquityCandleSchema, EquityPrintSchema, EquityQuoteSchema, EquityPrintJoinSchema, InferredDarkEventSchema, FlowPacketSchema, NewsStorySchema, OptionNBBOSchema, OptionPrintSchema, SmartMoneyEventSchema } from "@islandflow/types"; import type { AlertEvent, ClassifierHitEvent, EquityCandle, EquityPrint, EquityQuote, EquityPrintJoin, InferredDarkEvent, FlowPacket, NewsStory, SmartMoneyEvent, OptionNBBO, OptionPrint, OptionFlowFilters, OptionFlowView } from "@islandflow/types"; import { normalizeOptionPrint, optionPrintsTableDDL, optionPrintsTableMigrations, OPTION_PRINTS_TABLE } from "./option-prints"; import { normalizeOptionNBBO, optionNBBOTableDDL, OPTION_NBBO_TABLE } from "./option-nbbo"; import { equityPrintsTableDDL, EQUITY_PRINTS_TABLE, normalizeEquityPrint } from "./equity-prints"; import { equityQuotesTableDDL, EQUITY_QUOTES_TABLE, normalizeEquityQuote } from "./equity-quotes"; import { equityCandlesTableDDL, EQUITY_CANDLES_TABLE, normalizeEquityCandle } from "./equity-candles"; import { equityPrintJoinsTableDDL, EQUITY_PRINT_JOINS_TABLE, fromEquityPrintJoinRecord, toEquityPrintJoinRecord, type EquityPrintJoinRecord } from "./equity-print-joins"; import { inferredDarkTableDDL, INFERRED_DARK_TABLE, fromInferredDarkRecord, toInferredDarkRecord, type InferredDarkRecord } from "./inferred-dark"; import { FLOW_PACKETS_TABLE, flowPacketsTableDDL, fromFlowPacketRecord, toFlowPacketRecord, type FlowPacketRecord } from "./flow-packets"; import { CLASSIFIER_HITS_TABLE, classifierHitsTableDDL, fromClassifierHitRecord, toClassifierHitRecord, type ClassifierHitRecord } from "./classifier-hits"; import { ALERTS_TABLE, alertsTableMigrations, alertsTableDDL, fromAlertRecord, toAlertRecord, type AlertRecord } from "./alerts"; import { SMART_MONEY_EVENTS_TABLE, smartMoneyEventsTableDDL, fromSmartMoneyEventRecord, toSmartMoneyEventRecord, type SmartMoneyEventRecord } from "./smart-money-events"; import { NEWS_TABLE, newsTableDDL, fromNewsRecord, toNewsRecord, type NewsRecord } from "./news"; export type ClickHouseOptions = { url: string; database?: string; username?: string; password?: string; }; type ClickHouseQueryFormat = "JSONEachRow"; type ClickHouseQueryResult = { json(): Promise; }; export type ClickHouseClient = { exec(params: { query: string }): Promise; insert(params: { table: string; values: unknown[]; format: ClickHouseQueryFormat }): Promise; query(params: { query: string; format: ClickHouseQueryFormat }): Promise; ping(): Promise<{ success: boolean; error?: Error }>; close(): Promise; }; const buildBaseUrl = (options: ClickHouseOptions): URL => { const url = new URL(options.url); if (options.database) { url.searchParams.set("database", options.database); } return url; }; const buildHeaders = (options: ClickHouseOptions, hasBody: boolean): Headers => { const headers = new Headers(); if (hasBody) { headers.set("content-type", "text/plain; charset=utf-8"); } if (options.username || options.password) { const auth = Buffer.from(`${options.username ?? "default"}:${options.password ?? ""}`).toString("base64"); headers.set("authorization", `Basic ${auth}`); } return headers; }; const executeClickHouse = async ( options: ClickHouseOptions, query: string, body?: string ): Promise => { const url = buildBaseUrl(options); url.searchParams.set("query", query); const response = await fetch(url, { method: "POST", headers: buildHeaders(options, body !== undefined), body }); if (!response.ok) { const message = (await response.text()).trim() || `${response.status} ${response.statusText}`; throw new Error(message); } return response; }; const parseJsonEachRow = (text: string): T => { const trimmed = text.trim(); if (!trimmed) { return [] as T; } const rows = trimmed .split("\n") .filter((line) => line.trim().length > 0) .map((line) => JSON.parse(line)); return rows as T; }; export const createClickHouseClient = (options: ClickHouseOptions): ClickHouseClient => { return { async exec({ query }) { await executeClickHouse(options, query); }, async insert({ table, values, format }) { const rows = values.map((value) => JSON.stringify(value)).join("\n"); const body = rows.length > 0 ? `${rows}\n` : ""; await executeClickHouse(options, `INSERT INTO ${table} FORMAT ${format}`, body); }, async query({ query, format }) { const response = await executeClickHouse(options, `${query} FORMAT ${format}`); return { async json() { const text = await response.text(); return parseJsonEachRow(text); } }; }, async ping() { try { const url = buildBaseUrl(options); url.pathname = "/ping"; const response = await fetch(url, { method: "GET", headers: buildHeaders(options, false) }); if (!response.ok) { const message = (await response.text()).trim() || `${response.status} ${response.statusText}`; return { success: false, error: new Error(message) }; } return { success: true }; } catch (error) { if (error instanceof Error) { return { success: false, error }; } throw error; } }, async close() { return; } }; }; export const ensureOptionPrintsTable = async ( client: ClickHouseClient ): Promise => { await client.exec({ query: optionPrintsTableDDL() }); for (const query of optionPrintsTableMigrations()) { await client.exec({ query }); } }; export const ensureOptionNBBOTable = async ( client: ClickHouseClient ): Promise => { await client.exec({ query: optionNBBOTableDDL() }); }; export const ensureEquityPrintsTable = async ( client: ClickHouseClient ): Promise => { await client.exec({ query: equityPrintsTableDDL() }); }; export const ensureEquityQuotesTable = async ( client: ClickHouseClient ): Promise => { await client.exec({ query: equityQuotesTableDDL() }); }; export const ensureEquityCandlesTable = async ( client: ClickHouseClient ): Promise => { await client.exec({ query: equityCandlesTableDDL() }); }; export const ensureEquityPrintJoinsTable = async ( client: ClickHouseClient ): Promise => { await client.exec({ query: equityPrintJoinsTableDDL() }); }; export const ensureInferredDarkTable = async ( client: ClickHouseClient ): Promise => { await client.exec({ query: inferredDarkTableDDL() }); }; export const ensureFlowPacketsTable = async ( client: ClickHouseClient ): Promise => { await client.exec({ query: flowPacketsTableDDL() }); }; export const ensureSmartMoneyEventsTable = async ( client: ClickHouseClient ): Promise => { await client.exec({ query: smartMoneyEventsTableDDL() }); }; export const ensureClassifierHitsTable = async ( client: ClickHouseClient ): Promise => { await client.exec({ query: classifierHitsTableDDL() }); }; export const ensureAlertsTable = async (client: ClickHouseClient): Promise => { await client.exec({ query: alertsTableDDL() }); for (const query of alertsTableMigrations()) { await client.exec({ query }); } }; export const ensureNewsTable = async (client: ClickHouseClient): Promise => { await client.exec({ query: newsTableDDL() }); }; export const insertOptionPrint = async ( client: ClickHouseClient, print: OptionPrint ): Promise => { const record = normalizeOptionPrint(print); await client.insert({ table: OPTION_PRINTS_TABLE, values: [record], format: "JSONEachRow" }); }; export const insertOptionNBBO = async ( client: ClickHouseClient, nbbo: OptionNBBO ): Promise => { const record = normalizeOptionNBBO(nbbo); await client.insert({ table: OPTION_NBBO_TABLE, values: [record], format: "JSONEachRow" }); }; export const insertEquityPrint = async ( client: ClickHouseClient, print: EquityPrint ): Promise => { const record = normalizeEquityPrint(print); await client.insert({ table: EQUITY_PRINTS_TABLE, values: [record], format: "JSONEachRow" }); }; export const insertEquityQuote = async ( client: ClickHouseClient, quote: EquityQuote ): Promise => { const record = normalizeEquityQuote(quote); await client.insert({ table: EQUITY_QUOTES_TABLE, values: [record], format: "JSONEachRow" }); }; export const insertEquityCandle = async ( client: ClickHouseClient, candle: EquityCandle ): Promise => { const record = normalizeEquityCandle(candle); await client.insert({ table: EQUITY_CANDLES_TABLE, values: [record], format: "JSONEachRow" }); }; export const insertEquityPrintJoin = async ( client: ClickHouseClient, join: EquityPrintJoin ): Promise => { const record = toEquityPrintJoinRecord(join); await client.insert({ table: EQUITY_PRINT_JOINS_TABLE, values: [record], format: "JSONEachRow" }); }; export const insertInferredDark = async ( client: ClickHouseClient, event: InferredDarkEvent ): Promise => { const record = toInferredDarkRecord(event); await client.insert({ table: INFERRED_DARK_TABLE, values: [record], format: "JSONEachRow" }); }; export const insertFlowPacket = async ( client: ClickHouseClient, packet: FlowPacket ): Promise => { const record = toFlowPacketRecord(packet); await client.insert({ table: FLOW_PACKETS_TABLE, values: [record], format: "JSONEachRow" }); }; export const insertSmartMoneyEvent = async ( client: ClickHouseClient, event: SmartMoneyEvent ): Promise => { const record = toSmartMoneyEventRecord(event); await client.insert({ table: SMART_MONEY_EVENTS_TABLE, values: [record], format: "JSONEachRow" }); }; export const insertClassifierHit = async ( client: ClickHouseClient, hit: ClassifierHitEvent ): Promise => { const record = toClassifierHitRecord(hit); await client.insert({ table: CLASSIFIER_HITS_TABLE, values: [record], format: "JSONEachRow" }); }; export const insertAlert = async (client: ClickHouseClient, alert: AlertEvent): Promise => { const record = toAlertRecord(alert); await client.insert({ table: ALERTS_TABLE, values: [record], format: "JSONEachRow" }); }; export const insertNewsStory = async (client: ClickHouseClient, story: NewsStory): Promise => { const record = toNewsRecord(story); await client.insert({ table: NEWS_TABLE, values: [record], format: "JSONEachRow" }); }; export type ClickHouseBatchWriterOptions = { flushIntervalMs?: number; maxRows?: number; onError?: (table: string, error: unknown, rowCount: number) => void; }; type BatchState = { rows: unknown[]; timer: ReturnType | null; flushing: Promise | null; }; const createBatchState = (): BatchState => ({ rows: [], timer: null, flushing: null }); export class ClickHouseBatchWriter { private readonly flushIntervalMs: number; private readonly maxRows: number; private readonly states = new Map(); constructor( private readonly client: ClickHouseClient, options: ClickHouseBatchWriterOptions = {} ) { this.flushIntervalMs = Math.max(1, Math.floor(options.flushIntervalMs ?? 100)); this.maxRows = Math.max(1, Math.floor(options.maxRows ?? 250)); this.onError = options.onError; } private readonly onError?: (table: string, error: unknown, rowCount: number) => void; enqueue(table: string, row: unknown): void { const state = this.states.get(table) ?? createBatchState(); if (!this.states.has(table)) { this.states.set(table, state); } state.rows.push(row); if (state.rows.length >= this.maxRows) { void this.flush(table); return; } if (!state.timer) { state.timer = setTimeout(() => { state.timer = null; void this.flush(table); }, this.flushIntervalMs); } } async flush(table: string): Promise { const state = this.states.get(table); if (!state) { return; } if (state.flushing) { await state.flushing; return; } if (state.timer) { clearTimeout(state.timer); state.timer = null; } if (state.rows.length === 0) { return; } const rows = state.rows.splice(0, state.rows.length); state.flushing = this.client .insert({ table, values: rows, format: "JSONEachRow" }) .catch((error) => { this.onError?.(table, error, rows.length); }) .finally(() => { state.flushing = null; }); await state.flushing; } async flushAll(): Promise { for (const table of this.states.keys()) { await this.flush(table); } } async close(): Promise { for (const state of this.states.values()) { if (state.timer) { clearTimeout(state.timer); state.timer = null; } } await this.flushAll(); } } export const enqueueEquityPrintJoinInsert = ( writer: ClickHouseBatchWriter, join: EquityPrintJoin ): void => { writer.enqueue(EQUITY_PRINT_JOINS_TABLE, toEquityPrintJoinRecord(join)); }; export const enqueueInferredDarkInsert = ( writer: ClickHouseBatchWriter, event: InferredDarkEvent ): void => { writer.enqueue(INFERRED_DARK_TABLE, toInferredDarkRecord(event)); }; export const enqueueFlowPacketInsert = ( writer: ClickHouseBatchWriter, packet: FlowPacket ): void => { writer.enqueue(FLOW_PACKETS_TABLE, toFlowPacketRecord(packet)); }; export const enqueueSmartMoneyEventInsert = ( writer: ClickHouseBatchWriter, event: SmartMoneyEvent ): void => { writer.enqueue(SMART_MONEY_EVENTS_TABLE, toSmartMoneyEventRecord(event)); }; export const enqueueClassifierHitInsert = ( writer: ClickHouseBatchWriter, hit: ClassifierHitEvent ): void => { writer.enqueue(CLASSIFIER_HITS_TABLE, toClassifierHitRecord(hit)); }; export const enqueueAlertInsert = ( writer: ClickHouseBatchWriter, alert: AlertEvent ): void => { writer.enqueue(ALERTS_TABLE, toAlertRecord(alert)); }; export const enqueueNewsStoryInsert = ( writer: ClickHouseBatchWriter, story: NewsStory ): void => { writer.enqueue(NEWS_TABLE, toNewsRecord(story)); }; const clampLimit = (limit: number): number => { if (!Number.isFinite(limit)) { return 100; } return Math.max(1, Math.min(1000, Math.floor(limit))); }; const clampLookupLimit = (limit: number): number => { if (!Number.isFinite(limit)) { return 100; } return Math.max(1, Math.min(5000, Math.floor(limit))); }; const clampPositiveInt = (value: number, fallback = 1): number => { if (!Number.isFinite(value)) { return fallback; } return Math.max(1, Math.floor(value)); }; const clampCursor = (value: number): number => { if (!Number.isFinite(value)) { return 0; } return Math.max(0, Math.floor(value)); }; const coerceNumber = (value: unknown): unknown => { if (typeof value === "string") { const parsed = Number(value); if (Number.isFinite(parsed)) { return parsed; } } return value; }; const quoteString = (value: string): string => { const escaped = value.replace(/'/g, "''"); return `'${escaped}'`; }; const buildStringList = (values: string[]): string => { return values.map((value) => quoteString(value)).join(", "); }; const buildTracePrefixCondition = (tracePrefix: string | undefined): string | null => { if (!tracePrefix) { return null; } const normalized = tracePrefix.trim(); if (!normalized) { return null; } return `startsWith(trace_id, ${quoteString(normalized)})`; }; const buildBeforeTupleCondition = ( tsColumn: string, seqColumn: string, beforeTs: number, beforeSeq: number ): string => { return `(${tsColumn}, ${seqColumn}) < (${clampCursor(beforeTs)}, ${clampCursor(beforeSeq)})`; }; const normalizeNumericFields = ( row: Record, fields: string[] ): Record => { const record: Record = { ...row }; for (const field of fields) { if (field in record) { record[field] = coerceNumber(record[field]); } } return record; }; const normalizeOptionRow = (row: unknown): unknown => { if (row && typeof row === "object") { const record = normalizeNumericFields(row as Record, [ "source_ts", "ingest_ts", "seq", "ts", "price", "size", "notional", "execution_nbbo_bid", "execution_nbbo_ask", "execution_nbbo_mid", "execution_nbbo_spread", "execution_nbbo_bid_size", "execution_nbbo_ask_size", "execution_nbbo_ts", "execution_nbbo_age_ms", "execution_underlying_spot", "execution_underlying_bid", "execution_underlying_ask", "execution_underlying_mid", "execution_underlying_spread", "execution_underlying_ts", "execution_underlying_age_ms", "execution_iv" ]); if ("is_etf" in record) { record.is_etf = Boolean(record.is_etf); } if ("signal_pass" in record) { record.signal_pass = Boolean(record.signal_pass); } if (record.signal_reasons == null) { record.signal_reasons = []; } return record; } return row; }; export type OptionPrintQueryFilters = { view?: OptionFlowView; minNotional?: number; security?: "stock" | "etf" | "all"; optionTypes?: string[]; nbboSides?: string[]; underlyingIds?: string[]; optionContractId?: string; sinceTs?: number; }; export type EquityPrintQueryFilters = { underlyingIds?: string[]; sinceTs?: number; }; export type AlertContextBundle = { alert: AlertEvent | null; flow_packets: FlowPacket[]; option_prints: OptionPrint[]; missing_refs: string[]; }; const buildOptionPrintFilterConditions = ( filters: OptionPrintQueryFilters | undefined, tracePrefix: string | undefined ): string[] => { const conditions: string[] = []; const traceCondition = buildTracePrefixCondition(tracePrefix); if (traceCondition) { conditions.push(traceCondition); } if (!filters) { return conditions; } if ((filters.view ?? "signal") === "signal") { conditions.push("signal_pass = 1"); } if (typeof filters.minNotional === "number" && Number.isFinite(filters.minNotional)) { conditions.push(`notional >= ${filters.minNotional}`); } if (filters.security === "stock") { conditions.push("(is_etf = 0 OR is_etf IS NULL)"); } else if (filters.security === "etf") { conditions.push("is_etf = 1"); } if (filters.optionTypes && filters.optionTypes.length > 0) { conditions.push(`option_type IN (${buildStringList(filters.optionTypes)})`); } if (filters.nbboSides && filters.nbboSides.length > 0) { conditions.push(`nbbo_side IN (${buildStringList(filters.nbboSides)})`); } if (filters.underlyingIds && filters.underlyingIds.length > 0) { conditions.push(`underlying_id IN (${buildStringList(filters.underlyingIds)})`); } if (filters.optionContractId) { conditions.push(`option_contract_id = ${quoteString(filters.optionContractId)}`); } if (typeof filters.sinceTs === "number" && Number.isFinite(filters.sinceTs)) { conditions.push(`ts >= ${clampCursor(filters.sinceTs)}`); } return conditions; }; const buildEquityPrintFilterConditions = (filters?: EquityPrintQueryFilters): string[] => { const conditions: string[] = []; if (!filters) { return conditions; } if (filters.underlyingIds && filters.underlyingIds.length > 0) { conditions.push(`underlying_id IN (${buildStringList(filters.underlyingIds)})`); } if (typeof filters.sinceTs === "number" && Number.isFinite(filters.sinceTs)) { conditions.push(`ts >= ${clampCursor(filters.sinceTs)}`); } return conditions; }; const normalizeOptionNbboRow = (row: unknown): unknown => { if (row && typeof row === "object") { return normalizeNumericFields(row as Record, [ "source_ts", "ingest_ts", "seq", "ts", "bid", "ask", "bidSize", "askSize" ]); } return row; }; const normalizeEquityQuoteRow = (row: unknown): unknown => { if (row && typeof row === "object") { return normalizeNumericFields(row as Record, [ "source_ts", "ingest_ts", "seq", "ts", "bid", "ask" ]); } return row; }; const normalizeEquityCandleRow = (row: unknown): unknown => { if (row && typeof row === "object") { return normalizeNumericFields(row as Record, [ "source_ts", "ingest_ts", "seq", "ts", "interval_ms", "open", "high", "low", "close", "volume", "trade_count" ]); } return row; }; const normalizeEquityRow = (row: unknown): unknown => { if (row && typeof row === "object") { const record = normalizeNumericFields(row as Record, [ "source_ts", "ingest_ts", "seq", "ts", "price", "size" ]); if ("offExchangeFlag" in record) { return { ...record, offExchangeFlag: Boolean(record.offExchangeFlag) }; } return record; } return row; }; const normalizeEquityPrintJoinRow = (row: unknown): EquityPrintJoinRecord | null => { if (!row || typeof row !== "object") { return null; } const record = row as Record; return { source_ts: coerceNumber(record.source_ts) as number, ingest_ts: coerceNumber(record.ingest_ts) as number, seq: coerceNumber(record.seq) as number, trace_id: String(record.trace_id ?? ""), id: String(record.id ?? ""), print_trace_id: String(record.print_trace_id ?? ""), quote_trace_id: String(record.quote_trace_id ?? ""), features_json: String(record.features_json ?? "{}"), join_quality_json: String(record.join_quality_json ?? "{}") }; }; const normalizeInferredDarkRow = (row: unknown): InferredDarkRecord | null => { if (!row || typeof row !== "object") { return null; } const record = row as Record; return { source_ts: coerceNumber(record.source_ts) as number, ingest_ts: coerceNumber(record.ingest_ts) as number, seq: coerceNumber(record.seq) as number, trace_id: String(record.trace_id ?? ""), type: String(record.type ?? ""), confidence: Number(coerceNumber(record.confidence) ?? 0), evidence_refs_json: String(record.evidence_refs_json ?? "[]") }; }; const normalizeFlowPacketRow = (row: unknown): FlowPacketRecord | null => { if (!row || typeof row !== "object") { return null; } const record = row as Record; return { source_ts: coerceNumber(record.source_ts) as number, ingest_ts: coerceNumber(record.ingest_ts) as number, seq: coerceNumber(record.seq) as number, trace_id: String(record.trace_id ?? ""), id: String(record.id ?? ""), members: Array.isArray(record.members) ? record.members.map((value) => String(value)) : [], features_json: String(record.features_json ?? "{}"), join_quality_json: String(record.join_quality_json ?? "{}") }; }; const normalizeClassifierHitRow = (row: unknown): ClassifierHitRecord | null => { if (!row || typeof row !== "object") { return null; } const record = row as Record; return { source_ts: coerceNumber(record.source_ts) as number, ingest_ts: coerceNumber(record.ingest_ts) as number, seq: coerceNumber(record.seq) as number, trace_id: String(record.trace_id ?? ""), classifier_id: String(record.classifier_id ?? ""), confidence: Number(coerceNumber(record.confidence) ?? 0), direction: String(record.direction ?? ""), explanations_json: String(record.explanations_json ?? "[]") }; }; const normalizeSmartMoneyEventRow = (row: unknown): SmartMoneyEventRecord | null => { if (!row || typeof row !== "object") { return null; } const record = row as Record; return { source_ts: coerceNumber(record.source_ts) as number, ingest_ts: coerceNumber(record.ingest_ts) as number, seq: coerceNumber(record.seq) as number, trace_id: String(record.trace_id ?? ""), event_id: String(record.event_id ?? ""), packet_ids: Array.isArray(record.packet_ids) ? record.packet_ids.map((value) => String(value)) : [], member_print_ids: Array.isArray(record.member_print_ids) ? record.member_print_ids.map((value) => String(value)) : [], underlying_id: String(record.underlying_id ?? ""), event_kind: String(record.event_kind ?? ""), event_window_ms: coerceNumber(record.event_window_ms) as number, features_json: String(record.features_json ?? "{}"), profile_scores_json: String(record.profile_scores_json ?? "[]"), primary_profile_id: String(record.primary_profile_id ?? ""), primary_direction: String(record.primary_direction ?? "unknown"), abstained: Boolean(record.abstained), suppressed_reasons_json: String(record.suppressed_reasons_json ?? "[]") }; }; const normalizeAlertRow = (row: unknown): AlertRecord | null => { if (!row || typeof row !== "object") { return null; } const record = row as Record; return { source_ts: coerceNumber(record.source_ts) as number, ingest_ts: coerceNumber(record.ingest_ts) as number, seq: coerceNumber(record.seq) as number, trace_id: String(record.trace_id ?? ""), score: Number(coerceNumber(record.score) ?? 0), severity: String(record.severity ?? ""), hits_json: String(record.hits_json ?? "[]"), evidence_refs_json: String(record.evidence_refs_json ?? "[]"), primary_profile_id: String(record.primary_profile_id ?? ""), profile_scores_json: String(record.profile_scores_json ?? "[]") }; }; const normalizeNewsRow = (row: unknown): NewsRecord | null => { if (!row || typeof row !== "object") { return null; } const record = row as Record; return { source_ts: coerceNumber(record.source_ts) as number, ingest_ts: coerceNumber(record.ingest_ts) as number, seq: coerceNumber(record.seq) as number, trace_id: String(record.trace_id ?? ""), story_id: coerceNumber(record.story_id) as number, provider: String(record.provider ?? ""), source: String(record.source ?? ""), headline: String(record.headline ?? ""), summary: String(record.summary ?? ""), content_html: String(record.content_html ?? ""), url: String(record.url ?? ""), published_ts: coerceNumber(record.published_ts) as number, updated_ts: coerceNumber(record.updated_ts) as number, provider_symbols_json: String(record.provider_symbols_json ?? "[]"), resolved_symbols_json: String(record.resolved_symbols_json ?? "[]"), symbol_resolution: String(record.symbol_resolution ?? "none") as NewsRecord["symbol_resolution"] }; }; export const fetchRecentOptionPrints = async ( client: ClickHouseClient, limit: number, tracePrefix?: string, filters?: OptionPrintQueryFilters ): Promise => { const safeLimit = clampLimit(limit); const conditions = buildOptionPrintFilterConditions(filters, tracePrefix); const whereClause = conditions.length > 0 ? ` WHERE ${conditions.join(" AND ")}` : ""; const result = await client.query({ query: `SELECT * FROM ${OPTION_PRINTS_TABLE}${whereClause} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); return OptionPrintSchema.array().parse(rows.map(normalizeOptionRow)); }; export const fetchRecentOptionNBBO = async ( client: ClickHouseClient, limit: number, tracePrefix?: string ): Promise => { const safeLimit = clampLimit(limit); const condition = buildTracePrefixCondition(tracePrefix); const whereClause = condition ? ` WHERE ${condition}` : ""; const result = await client.query({ query: `SELECT * FROM ${OPTION_NBBO_TABLE}${whereClause} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); return OptionNBBOSchema.array().parse(rows.map(normalizeOptionNbboRow)); }; export const fetchRecentEquityPrints = async ( client: ClickHouseClient, limit: number, filters?: EquityPrintQueryFilters ): Promise => { const safeLimit = clampLimit(limit); const conditions = buildEquityPrintFilterConditions(filters); const whereClause = conditions.length > 0 ? ` WHERE ${conditions.join(" AND ")}` : ""; const result = await client.query({ query: `SELECT * FROM ${EQUITY_PRINTS_TABLE}${whereClause} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); return EquityPrintSchema.array().parse(rows.map(normalizeEquityRow)); }; export const fetchRecentEquityQuotes = async ( client: ClickHouseClient, limit: number ): Promise => { const safeLimit = clampLimit(limit); const result = await client.query({ query: `SELECT * FROM ${EQUITY_QUOTES_TABLE} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); return EquityQuoteSchema.array().parse(rows.map(normalizeEquityQuoteRow)); }; export const fetchRecentEquityCandles = async ( client: ClickHouseClient, underlyingId: string, intervalMs: number, limit: number ): Promise => { const safeLimit = clampLimit(limit); const safeInterval = clampPositiveInt(intervalMs, 1000); const safeUnderlying = quoteString(underlyingId); const result = await client.query({ query: `SELECT * FROM ${EQUITY_CANDLES_TABLE} WHERE underlying_id = ${safeUnderlying} AND interval_ms = ${safeInterval} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); return EquityCandleSchema.array().parse(rows.map(normalizeEquityCandleRow)); }; export const fetchRecentEquityPrintJoins = async ( client: ClickHouseClient, limit: number ): Promise => { const safeLimit = clampLimit(limit); const result = await client.query({ query: `SELECT * FROM ${EQUITY_PRINT_JOINS_TABLE} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeEquityPrintJoinRow) .filter((record): record is EquityPrintJoinRecord => record !== null); const joins = records.map(fromEquityPrintJoinRecord); return EquityPrintJoinSchema.array().parse(joins); }; export const fetchRecentInferredDark = async ( client: ClickHouseClient, limit: number ): Promise => { const safeLimit = clampLimit(limit); const result = await client.query({ query: `SELECT * FROM ${INFERRED_DARK_TABLE} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeInferredDarkRow) .filter((record): record is InferredDarkRecord => record !== null); const events = records.map(fromInferredDarkRecord); return InferredDarkEventSchema.array().parse(events); }; export const fetchRecentFlowPackets = async ( client: ClickHouseClient, limit: number ): Promise => { const safeLimit = clampLimit(limit); const result = await client.query({ query: `SELECT * FROM ${FLOW_PACKETS_TABLE} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeFlowPacketRow) .filter((record): record is FlowPacketRecord => record !== null); const packets = records.map(fromFlowPacketRecord); return FlowPacketSchema.array().parse(packets); }; export const fetchRecentClassifierHits = async ( client: ClickHouseClient, limit: number ): Promise => { const safeLimit = clampLimit(limit); const result = await client.query({ query: `SELECT * FROM ${CLASSIFIER_HITS_TABLE} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeClassifierHitRow) .filter((record): record is ClassifierHitRecord => record !== null); const hits = records.map(fromClassifierHitRecord); return ClassifierHitEventSchema.array().parse(hits); }; export const fetchRecentSmartMoneyEvents = async ( client: ClickHouseClient, limit: number ): Promise => { const safeLimit = clampLimit(limit); const result = await client.query({ query: `SELECT * FROM ${SMART_MONEY_EVENTS_TABLE} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeSmartMoneyEventRow) .filter((record): record is SmartMoneyEventRecord => record !== null); return SmartMoneyEventSchema.array().parse(records.map(fromSmartMoneyEventRecord)); }; export const fetchRecentAlerts = async ( client: ClickHouseClient, limit: number ): Promise => { const safeLimit = clampLimit(limit); const result = await client.query({ query: `SELECT * FROM ${ALERTS_TABLE} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeAlertRow) .filter((record): record is AlertRecord => record !== null); const alerts = records.map(fromAlertRecord); return AlertEventSchema.array().parse(alerts); }; const latestNewsSelect = ` SELECT source_ts, ingest_ts, seq, trace_id, story_id, provider, source, headline, summary, content_html, url, published_ts, updated_ts, provider_symbols_json, resolved_symbols_json, symbol_resolution FROM ( SELECT *, row_number() OVER (PARTITION BY provider, story_id ORDER BY updated_ts DESC, ingest_ts DESC, seq DESC) AS revision_rank FROM ${NEWS_TABLE} ) WHERE revision_rank = 1 `; export const fetchRecentNews = async ( client: ClickHouseClient, limit: number ): Promise => { const safeLimit = clampLimit(limit); const result = await client.query({ query: `${latestNewsSelect} ORDER BY published_ts DESC, story_id DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeNewsRow) .filter((record): record is NewsRecord => record !== null); return NewsStorySchema.array().parse(records.map(fromNewsRecord)); }; const normalizeAlertEvidenceRefs = (refs: string[]): string[] => { return Array.from(new Set(refs.map((ref) => ref.trim()).filter(Boolean))); }; const flowPacketCandidatesFromRef = (ref: string): string[] => { if (!ref) { return []; } if (ref.startsWith("flowpacket:")) { const raw = ref.slice("flowpacket:".length); return raw ? [ref, raw] : [ref]; } return [ref, `flowpacket:${ref}`]; }; const optionPrintCandidatesFromRef = (ref: string): string[] => { if (!ref || ref.startsWith("flowpacket:")) { return []; } return [ref]; }; export const fetchAlertContextByTraceId = async ( client: ClickHouseClient, traceId: string ): Promise => { const normalizedTraceId = traceId.trim(); if (!normalizedTraceId) { return { alert: null, flow_packets: [], option_prints: [], missing_refs: [] }; } const alertResult = await client.query({ query: `SELECT * FROM ${ALERTS_TABLE} WHERE trace_id = ${quoteString(normalizedTraceId)} ORDER BY source_ts DESC, seq DESC LIMIT 1`, format: "JSONEachRow" }); const alertRows = await alertResult.json(); const alertRecord = alertRows .map(normalizeAlertRow) .find((record): record is AlertRecord => record !== null); const alert = alertRecord ? AlertEventSchema.parse(fromAlertRecord(alertRecord)) : null; if (!alert) { return { alert: null, flow_packets: [], option_prints: [], missing_refs: [] }; } const refs = normalizeAlertEvidenceRefs(alert.evidence_refs); const packetLookupIds = Array.from(new Set(refs.flatMap(flowPacketCandidatesFromRef))); const printLookupIds = Array.from(new Set(refs.flatMap(optionPrintCandidatesFromRef))); const [flowPackets, optionPrints] = await Promise.all([ packetLookupIds.length > 0 ? client .query({ query: `SELECT * FROM ${FLOW_PACKETS_TABLE} WHERE id IN (${buildStringList(packetLookupIds)}) ORDER BY source_ts DESC, seq DESC LIMIT ${clampLookupLimit(packetLookupIds.length)}`, format: "JSONEachRow" }) .then(async (result) => { const rows = await result.json(); const records = rows .map(normalizeFlowPacketRow) .filter((record): record is FlowPacketRecord => record !== null); return FlowPacketSchema.array().parse(records.map(fromFlowPacketRecord)); }) : Promise.resolve([]), printLookupIds.length > 0 ? fetchOptionPrintsByTraceIds(client, printLookupIds) : Promise.resolve([]) ]); const packetIds = new Set(flowPackets.flatMap((packet) => [packet.id, packet.trace_id])); const printIds = new Set(optionPrints.map((print) => print.trace_id)); const missingRefs = refs.filter((ref) => { const packetResolved = flowPacketCandidatesFromRef(ref).some((candidate) => packetIds.has(candidate)); const printResolved = optionPrintCandidatesFromRef(ref).some((candidate) => printIds.has(candidate)); return !packetResolved && !printResolved; }); return { alert, flow_packets: flowPackets, option_prints: optionPrints, missing_refs: missingRefs }; }; export const fetchOptionPrintsAfter = async ( client: ClickHouseClient, afterTs: number, afterSeq: number, limit: number, tracePrefix?: string, filters?: OptionPrintQueryFilters ): Promise => { const safeLimit = clampLimit(limit); const safeAfterTs = clampCursor(afterTs); const safeAfterSeq = clampCursor(afterSeq); const conditions = [ `((ts, seq) > (${safeAfterTs}, ${safeAfterSeq}))`, ...buildOptionPrintFilterConditions(filters, tracePrefix) ]; const result = await client.query({ query: `SELECT * FROM ${OPTION_PRINTS_TABLE} WHERE ${conditions.join(" AND ")} ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); return OptionPrintSchema.array().parse(rows.map(normalizeOptionRow)); }; export const fetchOptionNBBOAfter = async ( client: ClickHouseClient, afterTs: number, afterSeq: number, limit: number, tracePrefix?: string ): Promise => { const safeLimit = clampLimit(limit); const safeAfterTs = clampCursor(afterTs); const safeAfterSeq = clampCursor(afterSeq); const traceCondition = buildTracePrefixCondition(tracePrefix); const traceClause = traceCondition ? ` AND ${traceCondition}` : ""; const result = await client.query({ query: `SELECT * FROM ${OPTION_NBBO_TABLE} WHERE (ts, seq) > (${safeAfterTs}, ${safeAfterSeq})${traceClause} ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); return OptionNBBOSchema.array().parse(rows.map(normalizeOptionNbboRow)); }; export const fetchEquityPrintsAfter = async ( client: ClickHouseClient, afterTs: number, afterSeq: number, limit: number, filters?: EquityPrintQueryFilters ): Promise => { const safeLimit = clampLimit(limit); const safeAfterTs = clampCursor(afterTs); const safeAfterSeq = clampCursor(afterSeq); const conditions = [ `((ts, seq) > (${safeAfterTs}, ${safeAfterSeq}))`, ...buildEquityPrintFilterConditions(filters) ]; const result = await client.query({ query: `SELECT * FROM ${EQUITY_PRINTS_TABLE} WHERE ${conditions.join(" AND ")} ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); return EquityPrintSchema.array().parse(rows.map(normalizeEquityRow)); }; export const fetchEquityPrintsRange = async ( client: ClickHouseClient, underlyingId: string, startTs: number, endTs: number, limit: number ): Promise => { const safeLimit = clampLimit(limit); const safeStart = clampCursor(startTs); const safeEnd = clampCursor(endTs); const rangeStart = Math.min(safeStart, safeEnd); const rangeEnd = Math.max(safeStart, safeEnd); const safeUnderlying = quoteString(underlyingId); const result = await client.query({ query: `SELECT * FROM ${EQUITY_PRINTS_TABLE} WHERE underlying_id = ${safeUnderlying} AND ts >= ${rangeStart} AND ts <= ${rangeEnd} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const parsed = EquityPrintSchema.array().parse(rows.map(normalizeEquityRow)); return parsed.reverse(); }; export const fetchEquityQuotesAfter = async ( client: ClickHouseClient, afterTs: number, afterSeq: number, limit: number ): Promise => { const safeLimit = clampLimit(limit); const safeAfterTs = clampCursor(afterTs); const safeAfterSeq = clampCursor(afterSeq); const result = await client.query({ query: `SELECT * FROM ${EQUITY_QUOTES_TABLE} WHERE (ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); return EquityQuoteSchema.array().parse(rows.map(normalizeEquityQuoteRow)); }; export const fetchEquityCandlesAfter = async ( client: ClickHouseClient, underlyingId: string, intervalMs: number, afterTs: number, afterSeq: number, limit: number ): Promise => { const safeLimit = clampLimit(limit); const safeAfterTs = clampCursor(afterTs); const safeAfterSeq = clampCursor(afterSeq); const safeInterval = clampPositiveInt(intervalMs, 1000); const safeUnderlying = quoteString(underlyingId); const result = await client.query({ query: `SELECT * FROM ${EQUITY_CANDLES_TABLE} WHERE underlying_id = ${safeUnderlying} AND interval_ms = ${safeInterval} AND (ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); return EquityCandleSchema.array().parse(rows.map(normalizeEquityCandleRow)); }; export const fetchEquityCandlesRange = async ( client: ClickHouseClient, underlyingId: string, intervalMs: number, startTs: number, endTs: number, limit: number ): Promise => { const safeLimit = clampLimit(limit); const safeStart = clampCursor(startTs); const safeEnd = clampCursor(endTs); const safeInterval = clampPositiveInt(intervalMs, 1000); const safeUnderlying = quoteString(underlyingId); const result = await client.query({ query: `SELECT * FROM ${EQUITY_CANDLES_TABLE} WHERE underlying_id = ${safeUnderlying} AND interval_ms = ${safeInterval} AND ts >= ${safeStart} AND ts <= ${safeEnd} ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); return EquityCandleSchema.array().parse(rows.map(normalizeEquityCandleRow)); }; export const fetchEquityPrintJoinsAfter = async ( client: ClickHouseClient, afterTs: number, afterSeq: number, limit: number ): Promise => { const safeLimit = clampLimit(limit); const safeAfterTs = clampCursor(afterTs); const safeAfterSeq = clampCursor(afterSeq); const result = await client.query({ query: `SELECT * FROM ${EQUITY_PRINT_JOINS_TABLE} WHERE (source_ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY source_ts ASC, seq ASC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeEquityPrintJoinRow) .filter((record): record is EquityPrintJoinRecord => record !== null); const joins = records.map(fromEquityPrintJoinRecord); return EquityPrintJoinSchema.array().parse(joins); }; export const fetchInferredDarkAfter = async ( client: ClickHouseClient, afterTs: number, afterSeq: number, limit: number ): Promise => { const safeLimit = clampLimit(limit); const safeAfterTs = clampCursor(afterTs); const safeAfterSeq = clampCursor(afterSeq); const result = await client.query({ query: `SELECT * FROM ${INFERRED_DARK_TABLE} WHERE (source_ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY source_ts ASC, seq ASC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeInferredDarkRow) .filter((record): record is InferredDarkRecord => record !== null); const events = records.map(fromInferredDarkRecord); return InferredDarkEventSchema.array().parse(events); }; export const fetchFlowPacketsAfter = async ( client: ClickHouseClient, afterTs: number, afterSeq: number, limit: number ): Promise => { const safeLimit = clampLimit(limit); const safeAfterTs = clampCursor(afterTs); const safeAfterSeq = clampCursor(afterSeq); const result = await client.query({ query: `SELECT * FROM ${FLOW_PACKETS_TABLE} WHERE (source_ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY source_ts ASC, seq ASC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeFlowPacketRow) .filter((record): record is FlowPacketRecord => record !== null); const packets = records.map(fromFlowPacketRecord); return FlowPacketSchema.array().parse(packets); }; export const fetchClassifierHitsAfter = async ( client: ClickHouseClient, afterTs: number, afterSeq: number, limit: number ): Promise => { const safeLimit = clampLimit(limit); const safeAfterTs = clampCursor(afterTs); const safeAfterSeq = clampCursor(afterSeq); const result = await client.query({ query: `SELECT * FROM ${CLASSIFIER_HITS_TABLE} WHERE (source_ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY source_ts ASC, seq ASC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeClassifierHitRow) .filter((record): record is ClassifierHitRecord => record !== null); const hits = records.map(fromClassifierHitRecord); return ClassifierHitEventSchema.array().parse(hits); }; export const fetchSmartMoneyEventsAfter = async ( client: ClickHouseClient, afterTs: number, afterSeq: number, limit: number ): Promise => { const safeLimit = clampLimit(limit); const safeAfterTs = clampCursor(afterTs); const safeAfterSeq = clampCursor(afterSeq); const result = await client.query({ query: `SELECT * FROM ${SMART_MONEY_EVENTS_TABLE} WHERE (source_ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY source_ts ASC, seq ASC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeSmartMoneyEventRow) .filter((record): record is SmartMoneyEventRecord => record !== null); return SmartMoneyEventSchema.array().parse(records.map(fromSmartMoneyEventRecord)); }; export const fetchAlertsAfter = async ( client: ClickHouseClient, afterTs: number, afterSeq: number, limit: number ): Promise => { const safeLimit = clampLimit(limit); const safeAfterTs = clampCursor(afterTs); const safeAfterSeq = clampCursor(afterSeq); const result = await client.query({ query: `SELECT * FROM ${ALERTS_TABLE} WHERE (source_ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY source_ts ASC, seq ASC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeAlertRow) .filter((record): record is AlertRecord => record !== null); const alerts = records.map(fromAlertRecord); return AlertEventSchema.array().parse(alerts); }; export const fetchNewsAfter = async ( client: ClickHouseClient, afterTs: number, afterSeq: number, limit: number ): Promise => { const safeLimit = clampLimit(limit); const safeAfterTs = clampCursor(afterTs); const safeAfterSeq = clampCursor(afterSeq); const result = await client.query({ query: `${latestNewsSelect} AND (published_ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY published_ts ASC, seq ASC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeNewsRow) .filter((record): record is NewsRecord => record !== null); return NewsStorySchema.array().parse(records.map(fromNewsRecord)); }; export const fetchOptionPrintsBefore = async ( client: ClickHouseClient, beforeTs: number, beforeSeq: number, limit: number, tracePrefix?: string, filters?: OptionPrintQueryFilters ): Promise => { const safeLimit = clampLimit(limit); const conditions = [ buildBeforeTupleCondition("ts", "seq", beforeTs, beforeSeq), ...buildOptionPrintFilterConditions(filters, tracePrefix) ]; const result = await client.query({ query: `SELECT * FROM ${OPTION_PRINTS_TABLE} WHERE ${conditions.join(" AND ")} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); return OptionPrintSchema.array().parse(rows.map(normalizeOptionRow)); }; export const fetchOptionNBBOBefore = async ( client: ClickHouseClient, beforeTs: number, beforeSeq: number, limit: number, tracePrefix?: string ): Promise => { const safeLimit = clampLimit(limit); const conditions = [buildBeforeTupleCondition("ts", "seq", beforeTs, beforeSeq)]; const traceCondition = buildTracePrefixCondition(tracePrefix); if (traceCondition) { conditions.push(traceCondition); } const result = await client.query({ query: `SELECT * FROM ${OPTION_NBBO_TABLE} WHERE ${conditions.join(" AND ")} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); return OptionNBBOSchema.array().parse(rows.map(normalizeOptionNbboRow)); }; export const fetchEquityPrintsBefore = async ( client: ClickHouseClient, beforeTs: number, beforeSeq: number, limit: number, filters?: EquityPrintQueryFilters ): Promise => { const safeLimit = clampLimit(limit); const conditions = [ buildBeforeTupleCondition("ts", "seq", beforeTs, beforeSeq), ...buildEquityPrintFilterConditions(filters) ]; const result = await client.query({ query: `SELECT * FROM ${EQUITY_PRINTS_TABLE} WHERE ${conditions.join(" AND ")} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); return EquityPrintSchema.array().parse(rows.map(normalizeEquityRow)); }; export const fetchEquityQuotesBefore = async ( client: ClickHouseClient, beforeTs: number, beforeSeq: number, limit: number ): Promise => { const safeLimit = clampLimit(limit); const result = await client.query({ query: `SELECT * FROM ${EQUITY_QUOTES_TABLE} WHERE ${buildBeforeTupleCondition("ts", "seq", beforeTs, beforeSeq)} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); return EquityQuoteSchema.array().parse(rows.map(normalizeEquityQuoteRow)); }; export const fetchEquityPrintJoinsBefore = async ( client: ClickHouseClient, beforeTs: number, beforeSeq: number, limit: number ): Promise => { const safeLimit = clampLimit(limit); const result = await client.query({ query: `SELECT * FROM ${EQUITY_PRINT_JOINS_TABLE} WHERE ${buildBeforeTupleCondition("source_ts", "seq", beforeTs, beforeSeq)} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeEquityPrintJoinRow) .filter((record): record is EquityPrintJoinRecord => record !== null); return EquityPrintJoinSchema.array().parse(records.map(fromEquityPrintJoinRecord)); }; export const fetchFlowPacketsBefore = async ( client: ClickHouseClient, beforeTs: number, beforeSeq: number, limit: number ): Promise => { const safeLimit = clampLimit(limit); const result = await client.query({ query: `SELECT * FROM ${FLOW_PACKETS_TABLE} WHERE ${buildBeforeTupleCondition("source_ts", "seq", beforeTs, beforeSeq)} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeFlowPacketRow) .filter((record): record is FlowPacketRecord => record !== null); return FlowPacketSchema.array().parse(records.map(fromFlowPacketRecord)); }; export const fetchClassifierHitsBefore = async ( client: ClickHouseClient, beforeTs: number, beforeSeq: number, limit: number ): Promise => { const safeLimit = clampLimit(limit); const result = await client.query({ query: `SELECT * FROM ${CLASSIFIER_HITS_TABLE} WHERE ${buildBeforeTupleCondition("source_ts", "seq", beforeTs, beforeSeq)} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeClassifierHitRow) .filter((record): record is ClassifierHitRecord => record !== null); return ClassifierHitEventSchema.array().parse(records.map(fromClassifierHitRecord)); }; export const fetchSmartMoneyEventsBefore = async ( client: ClickHouseClient, beforeTs: number, beforeSeq: number, limit: number ): Promise => { const safeLimit = clampLimit(limit); const result = await client.query({ query: `SELECT * FROM ${SMART_MONEY_EVENTS_TABLE} WHERE ${buildBeforeTupleCondition("source_ts", "seq", beforeTs, beforeSeq)} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeSmartMoneyEventRow) .filter((record): record is SmartMoneyEventRecord => record !== null); return SmartMoneyEventSchema.array().parse(records.map(fromSmartMoneyEventRecord)); }; export const fetchAlertsBefore = async ( client: ClickHouseClient, beforeTs: number, beforeSeq: number, limit: number ): Promise => { const safeLimit = clampLimit(limit); const result = await client.query({ query: `SELECT * FROM ${ALERTS_TABLE} WHERE ${buildBeforeTupleCondition("source_ts", "seq", beforeTs, beforeSeq)} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeAlertRow) .filter((record): record is AlertRecord => record !== null); return AlertEventSchema.array().parse(records.map(fromAlertRecord)); }; export const fetchNewsBefore = async ( client: ClickHouseClient, beforeTs: number, beforeSeq: number, limit: number ): Promise => { const safeLimit = clampLimit(limit); const result = await client.query({ query: `${latestNewsSelect} AND ${buildBeforeTupleCondition("published_ts", "seq", beforeTs, beforeSeq)} ORDER BY published_ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeNewsRow) .filter((record): record is NewsRecord => record !== null); return NewsStorySchema.array().parse(records.map(fromNewsRecord)); }; export const fetchInferredDarkBefore = async ( client: ClickHouseClient, beforeTs: number, beforeSeq: number, limit: number ): Promise => { const safeLimit = clampLimit(limit); const result = await client.query({ query: `SELECT * FROM ${INFERRED_DARK_TABLE} WHERE ${buildBeforeTupleCondition("source_ts", "seq", beforeTs, beforeSeq)} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeInferredDarkRow) .filter((record): record is InferredDarkRecord => record !== null); return InferredDarkEventSchema.array().parse(records.map(fromInferredDarkRecord)); }; export const fetchFlowPacketById = async ( client: ClickHouseClient, id: string ): Promise => { const result = await client.query({ query: `SELECT * FROM ${FLOW_PACKETS_TABLE} WHERE id = ${quoteString(id)} ORDER BY source_ts DESC, seq DESC LIMIT 1`, format: "JSONEachRow" }); const rows = await result.json(); const record = rows .map(normalizeFlowPacketRow) .find((row): row is FlowPacketRecord => row !== null); return record ? FlowPacketSchema.parse(fromFlowPacketRecord(record)) : null; }; export const fetchFlowPacketsByIds = async ( client: ClickHouseClient, ids: string[] ): Promise => { const uniqueIds = Array.from(new Set(ids.map((id) => id.trim()).filter(Boolean))); if (uniqueIds.length === 0) { return []; } const result = await client.query({ query: `SELECT * FROM ${FLOW_PACKETS_TABLE} WHERE id IN (${buildStringList(uniqueIds)}) ORDER BY source_ts DESC, seq DESC LIMIT ${clampLookupLimit(uniqueIds.length)}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeFlowPacketRow) .filter((record): record is FlowPacketRecord => record !== null); return FlowPacketSchema.array().parse(records.map(fromFlowPacketRecord)); }; export const fetchFlowPacketsByMemberTraceIds = async ( client: ClickHouseClient, traceIds: string[] ): Promise => { const ids = Array.from(new Set(traceIds.map((id) => id.trim()).filter(Boolean))); if (ids.length === 0) { return []; } const memberPredicates = ids.map((id) => `has(members, ${quoteString(id)})`); const result = await client.query({ query: `SELECT * FROM ${FLOW_PACKETS_TABLE} WHERE ${memberPredicates.join(" OR ")} ORDER BY source_ts DESC, seq DESC LIMIT ${clampLookupLimit(ids.length * 4)}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeFlowPacketRow) .filter((record): record is FlowPacketRecord => record !== null); return FlowPacketSchema.array().parse(records.map(fromFlowPacketRecord)); }; export const fetchSmartMoneyEventsByPacketIds = async ( client: ClickHouseClient, packetIds: string[] ): Promise => { const ids = Array.from(new Set(packetIds.map((id) => id.trim()).filter(Boolean))); if (ids.length === 0) { return []; } const packetPredicates = ids.map((id) => `has(packet_ids, ${quoteString(id)})`); const result = await client.query({ query: `SELECT * FROM ${SMART_MONEY_EVENTS_TABLE} WHERE ${packetPredicates.join(" OR ")} ORDER BY source_ts DESC, seq DESC LIMIT ${clampLookupLimit(ids.length * 4)}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeSmartMoneyEventRow) .filter((record): record is SmartMoneyEventRecord => record !== null); return SmartMoneyEventSchema.array().parse(records.map(fromSmartMoneyEventRecord)); }; export const fetchClassifierHitsByPacketIds = async ( client: ClickHouseClient, packetIds: string[] ): Promise => { const ids = Array.from(new Set(packetIds.map((id) => id.trim()).filter(Boolean))); if (ids.length === 0) { return []; } const tracePredicates = ids.map((id) => `position(trace_id, ${quoteString(id)}) > 0`); const result = await client.query({ query: `SELECT * FROM ${CLASSIFIER_HITS_TABLE} WHERE ${tracePredicates.join(" OR ")} ORDER BY source_ts DESC, seq DESC LIMIT ${clampLookupLimit(ids.length * 4)}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeClassifierHitRow) .filter((record): record is ClassifierHitRecord => record !== null); return ClassifierHitEventSchema.array().parse(records.map(fromClassifierHitRecord)); }; export const fetchNearestOptionNBBOForPrints = async ( client: ClickHouseClient, inputs: Array<{ trace_id: string; option_contract_id: string; ts: number }> ): Promise> => { const normalized = inputs .map((item) => ({ trace_id: item.trace_id.trim(), option_contract_id: item.option_contract_id.trim(), ts: clampCursor(item.ts) })) .filter((item) => item.trace_id && item.option_contract_id); if (normalized.length === 0) { return {}; } const byTraceId: Record = Object.fromEntries( normalized.map((item) => [item.trace_id, null]) ); await Promise.all( normalized.map(async (item) => { const result = await client.query({ query: `SELECT * FROM ${OPTION_NBBO_TABLE} WHERE option_contract_id = ${quoteString(item.option_contract_id)} AND ts <= ${item.ts} ORDER BY ts DESC, seq DESC LIMIT 1`, format: "JSONEachRow" }); const rows = await result.json(); const quote = OptionNBBOSchema.array().parse(rows.map(normalizeOptionNbboRow))[0] ?? null; byTraceId[item.trace_id] = quote; }) ); return byTraceId; }; export const fetchOptionPrintsByTraceIds = async ( client: ClickHouseClient, traceIds: string[] ): Promise => { const ids = Array.from(new Set(traceIds.map((id) => id.trim()).filter(Boolean))); if (ids.length === 0) { return []; } const result = await client.query({ query: `SELECT * FROM ${OPTION_PRINTS_TABLE} WHERE trace_id IN (${buildStringList(ids)}) ORDER BY ts DESC, seq DESC LIMIT ${clampLookupLimit(ids.length)}`, format: "JSONEachRow" }); const rows = await result.json(); return OptionPrintSchema.array().parse(rows.map(normalizeOptionRow)); }; export const fetchEquityPrintJoinsByIds = async ( client: ClickHouseClient, ids: string[] ): Promise => { const uniqueIds = Array.from(new Set(ids.map((id) => id.trim()).filter(Boolean))); if (uniqueIds.length === 0) { return []; } const joinIds = new Set(); const printTraceIds = new Set(); for (const id of uniqueIds) { joinIds.add(id); if (id.startsWith("equityjoin:")) { const trace = id.slice("equityjoin:".length); if (trace) { printTraceIds.add(trace); } } else { joinIds.add(`equityjoin:${id}`); printTraceIds.add(id); } } const joinIdList = Array.from(joinIds); const printTraceList = Array.from(printTraceIds); const whereParts = [ `id IN (${buildStringList(joinIdList)})`, `trace_id IN (${buildStringList(joinIdList)})` ]; if (printTraceList.length > 0) { whereParts.push(`print_trace_id IN (${buildStringList(printTraceList)})`); } const lookupLimit = clampLookupLimit(joinIdList.length + printTraceList.length); const result = await client.query({ query: `SELECT * FROM ${EQUITY_PRINT_JOINS_TABLE} WHERE ${whereParts.join(" OR ")} ORDER BY source_ts DESC, seq DESC LIMIT ${lookupLimit}`, format: "JSONEachRow" }); const rows = await result.json(); const records = rows .map(normalizeEquityPrintJoinRow) .filter((record): record is EquityPrintJoinRecord => record !== null); return EquityPrintJoinSchema.array().parse(records.map(fromEquityPrintJoinRecord)); };