From 3164167beed4f9268f5570d0a08d1fa1015b7ef8 Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Sun, 4 Jan 2026 17:07:43 -0500 Subject: [PATCH] Add equity print quote joins --- packages/bus/src/subjects.ts | 4 + packages/storage/src/clickhouse.ts | 163 +++++++++++ packages/storage/src/equity-print-joins.ts | 74 +++++ packages/storage/src/equity-quotes.ts | 24 ++ packages/storage/src/index.ts | 2 + .../storage/tests/equity-print-joins.test.ts | 42 +++ packages/storage/tests/equity-quotes.test.ts | 30 ++ packages/types/src/events.ts | 12 + services/api/src/index.ts | 148 +++++++++- services/compute/src/equity-joins.ts | 104 +++++++ services/compute/src/index.ts | 277 ++++++++++++++++++ services/compute/tests/equity-joins.test.ts | 72 +++++ .../ingest-equities/src/adapters/synthetic.ts | 34 ++- services/ingest-equities/src/index.ts | 50 +++- tsconfig.base.json | 4 +- 15 files changed, 1033 insertions(+), 7 deletions(-) create mode 100644 packages/storage/src/equity-print-joins.ts create mode 100644 packages/storage/src/equity-quotes.ts create mode 100644 packages/storage/tests/equity-print-joins.test.ts create mode 100644 packages/storage/tests/equity-quotes.test.ts create mode 100644 services/compute/src/equity-joins.ts create mode 100644 services/compute/tests/equity-joins.test.ts diff --git a/packages/bus/src/subjects.ts b/packages/bus/src/subjects.ts index 4bd675a..1bacd52 100644 --- a/packages/bus/src/subjects.ts +++ b/packages/bus/src/subjects.ts @@ -4,6 +4,10 @@ export const STREAM_OPTION_NBBO = "OPTIONS_NBBO"; export const SUBJECT_OPTION_NBBO = "options.nbbo"; export const STREAM_EQUITY_PRINTS = "EQUITY_PRINTS"; export const SUBJECT_EQUITY_PRINTS = "equities.prints"; +export const STREAM_EQUITY_QUOTES = "EQUITY_QUOTES"; +export const SUBJECT_EQUITY_QUOTES = "equities.quotes"; +export const STREAM_EQUITY_JOINS = "EQUITY_JOINS"; +export const SUBJECT_EQUITY_JOINS = "equities.joins"; export const STREAM_FLOW_PACKETS = "FLOW_PACKETS"; export const SUBJECT_FLOW_PACKETS = "flow.packets"; export const STREAM_CLASSIFIER_HITS = "CLASSIFIER_HITS"; diff --git a/packages/storage/src/clickhouse.ts b/packages/storage/src/clickhouse.ts index f99dc16..3b4957f 100644 --- a/packages/storage/src/clickhouse.ts +++ b/packages/storage/src/clickhouse.ts @@ -3,6 +3,8 @@ import { AlertEventSchema, ClassifierHitEventSchema, EquityPrintSchema, + EquityQuoteSchema, + EquityPrintJoinSchema, FlowPacketSchema, OptionNBBOSchema, OptionPrintSchema @@ -11,6 +13,8 @@ import type { AlertEvent, ClassifierHitEvent, EquityPrint, + EquityQuote, + EquityPrintJoin, FlowPacket, OptionNBBO, OptionPrint @@ -26,6 +30,18 @@ import { EQUITY_PRINTS_TABLE, normalizeEquityPrint } from "./equity-prints"; +import { + equityQuotesTableDDL, + EQUITY_QUOTES_TABLE, + normalizeEquityQuote +} from "./equity-quotes"; +import { + equityPrintJoinsTableDDL, + EQUITY_PRINT_JOINS_TABLE, + fromEquityPrintJoinRecord, + toEquityPrintJoinRecord, + type EquityPrintJoinRecord +} from "./equity-print-joins"; import { FLOW_PACKETS_TABLE, flowPacketsTableDDL, @@ -88,6 +104,22 @@ export const ensureEquityPrintsTable = async ( }); }; +export const ensureEquityQuotesTable = async ( + client: ClickHouseClient +): Promise => { + await client.exec({ + query: equityQuotesTableDDL() + }); +}; + +export const ensureEquityPrintJoinsTable = async ( + client: ClickHouseClient +): Promise => { + await client.exec({ + query: equityPrintJoinsTableDDL() + }); +}; + export const ensureFlowPacketsTable = async ( client: ClickHouseClient ): Promise => { @@ -146,6 +178,30 @@ export const insertEquityPrint = async ( }); }; +export const insertEquityQuote = async ( + client: ClickHouseClient, + quote: EquityQuote +): Promise => { + const record = normalizeEquityQuote(quote); + await client.insert({ + table: EQUITY_QUOTES_TABLE, + values: [record], + format: "JSONEachRow" + }); +}; + +export const insertEquityPrintJoin = async ( + client: ClickHouseClient, + join: EquityPrintJoin +): Promise => { + const record = toEquityPrintJoinRecord(join); + await client.insert({ + table: EQUITY_PRINT_JOINS_TABLE, + values: [record], + format: "JSONEachRow" + }); +}; + export const insertFlowPacket = async ( client: ClickHouseClient, packet: FlowPacket @@ -253,6 +309,20 @@ const normalizeOptionNbboRow = (row: unknown): unknown => { return row; }; +const normalizeEquityQuoteRow = (row: unknown): unknown => { + if (row && typeof row === "object") { + return normalizeNumericFields(row as Record, [ + "source_ts", + "ingest_ts", + "seq", + "ts", + "bid", + "ask" + ]); + } + + return row; +}; const normalizeEquityRow = (row: unknown): unknown => { if (row && typeof row === "object") { @@ -278,6 +348,25 @@ const normalizeEquityRow = (row: unknown): unknown => { return row; }; +const normalizeEquityPrintJoinRow = (row: unknown): EquityPrintJoinRecord | null => { + if (!row || typeof row !== "object") { + return null; + } + + const record = row as Record; + return { + source_ts: coerceNumber(record.source_ts) as number, + ingest_ts: coerceNumber(record.ingest_ts) as number, + seq: coerceNumber(record.seq) as number, + trace_id: String(record.trace_id ?? ""), + id: String(record.id ?? ""), + print_trace_id: String(record.print_trace_id ?? ""), + quote_trace_id: String(record.quote_trace_id ?? ""), + features_json: String(record.features_json ?? "{}"), + join_quality_json: String(record.join_quality_json ?? "{}") + }; +}; + const normalizeFlowPacketRow = (row: unknown): FlowPacketRecord | null => { if (!row || typeof row !== "object") { return null; @@ -376,6 +465,38 @@ export const fetchRecentEquityPrints = async ( return EquityPrintSchema.array().parse(rows.map(normalizeEquityRow)); }; +export const fetchRecentEquityQuotes = async ( + client: ClickHouseClient, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const result = await client.query({ + query: `SELECT * FROM ${EQUITY_QUOTES_TABLE} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + return EquityQuoteSchema.array().parse(rows.map(normalizeEquityQuoteRow)); +}; + +export const fetchRecentEquityPrintJoins = async ( + client: ClickHouseClient, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const result = await client.query({ + query: `SELECT * FROM ${EQUITY_PRINT_JOINS_TABLE} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeEquityPrintJoinRow) + .filter((record): record is EquityPrintJoinRecord => record !== null); + const joins = records.map(fromEquityPrintJoinRecord); + return EquityPrintJoinSchema.array().parse(joins); +}; + export const fetchRecentFlowPackets = async ( client: ClickHouseClient, limit: number @@ -486,3 +607,45 @@ export const fetchEquityPrintsAfter = async ( const rows = await result.json(); return EquityPrintSchema.array().parse(rows.map(normalizeEquityRow)); }; + +export const fetchEquityQuotesAfter = async ( + client: ClickHouseClient, + afterTs: number, + afterSeq: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const safeAfterTs = clampCursor(afterTs); + const safeAfterSeq = clampCursor(afterSeq); + + const result = await client.query({ + query: `SELECT * FROM ${EQUITY_QUOTES_TABLE} WHERE (ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + return EquityQuoteSchema.array().parse(rows.map(normalizeEquityQuoteRow)); +}; + +export const fetchEquityPrintJoinsAfter = async ( + client: ClickHouseClient, + afterTs: number, + afterSeq: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const safeAfterTs = clampCursor(afterTs); + const safeAfterSeq = clampCursor(afterSeq); + + const result = await client.query({ + query: `SELECT * FROM ${EQUITY_PRINT_JOINS_TABLE} WHERE (source_ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY source_ts ASC, seq ASC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeEquityPrintJoinRow) + .filter((record): record is EquityPrintJoinRecord => record !== null); + const joins = records.map(fromEquityPrintJoinRecord); + return EquityPrintJoinSchema.array().parse(joins); +}; diff --git a/packages/storage/src/equity-print-joins.ts b/packages/storage/src/equity-print-joins.ts new file mode 100644 index 0000000..8d20eec --- /dev/null +++ b/packages/storage/src/equity-print-joins.ts @@ -0,0 +1,74 @@ +import type { EquityPrintJoin } from "@islandflow/types"; + +export const EQUITY_PRINT_JOINS_TABLE = "equity_print_joins"; + +export type EquityPrintJoinRecord = { + source_ts: number; + ingest_ts: number; + seq: number; + trace_id: string; + id: string; + print_trace_id: string; + quote_trace_id: string; + features_json: string; + join_quality_json: string; +}; + +export const equityPrintJoinsTableDDL = (): string => { + return ` +CREATE TABLE IF NOT EXISTS ${EQUITY_PRINT_JOINS_TABLE} ( + source_ts UInt64, + ingest_ts UInt64, + seq UInt64, + trace_id String, + id String, + print_trace_id String, + quote_trace_id String, + features_json String, + join_quality_json String +) +ENGINE = MergeTree +ORDER BY (source_ts, seq) +`; +}; + +export const toEquityPrintJoinRecord = (join: EquityPrintJoin): EquityPrintJoinRecord => { + return { + source_ts: join.source_ts, + ingest_ts: join.ingest_ts, + seq: join.seq, + trace_id: join.trace_id, + id: join.id, + print_trace_id: join.print_trace_id, + quote_trace_id: join.quote_trace_id, + features_json: JSON.stringify(join.features), + join_quality_json: JSON.stringify(join.join_quality) + }; +}; + +const safeJson = (value: string, fallback: Record): Record => { + try { + const parsed = JSON.parse(value); + if (parsed && typeof parsed === "object") { + return parsed as Record; + } + } catch { + // ignore + } + + return fallback; +}; + +export const fromEquityPrintJoinRecord = (record: EquityPrintJoinRecord): EquityPrintJoin => { + return { + source_ts: record.source_ts, + ingest_ts: record.ingest_ts, + seq: record.seq, + trace_id: record.trace_id, + id: record.id, + print_trace_id: record.print_trace_id, + quote_trace_id: record.quote_trace_id, + features: safeJson(record.features_json, {}), + join_quality: safeJson(record.join_quality_json, {}) as Record + }; +}; diff --git a/packages/storage/src/equity-quotes.ts b/packages/storage/src/equity-quotes.ts new file mode 100644 index 0000000..cc8c792 --- /dev/null +++ b/packages/storage/src/equity-quotes.ts @@ -0,0 +1,24 @@ +import type { EquityQuote } from "@islandflow/types"; + +export const EQUITY_QUOTES_TABLE = "equity_quotes"; + +export const equityQuotesTableDDL = (): string => { + return ` +CREATE TABLE IF NOT EXISTS ${EQUITY_QUOTES_TABLE} ( + source_ts UInt64, + ingest_ts UInt64, + seq UInt64, + trace_id String, + ts UInt64, + underlying_id String, + bid Float64, + ask Float64 +) +ENGINE = MergeTree +ORDER BY (ts, underlying_id) +`; +}; + +export const normalizeEquityQuote = (quote: EquityQuote): EquityQuote => { + return quote; +}; diff --git a/packages/storage/src/index.ts b/packages/storage/src/index.ts index c28d352..dadea11 100644 --- a/packages/storage/src/index.ts +++ b/packages/storage/src/index.ts @@ -3,5 +3,7 @@ export * from "./classifier-hits"; export * from "./alerts"; export * from "./flow-packets"; export * from "./equity-prints"; +export * from "./equity-quotes"; +export * from "./equity-print-joins"; export * from "./option-prints"; export * from "./option-nbbo"; diff --git a/packages/storage/tests/equity-print-joins.test.ts b/packages/storage/tests/equity-print-joins.test.ts new file mode 100644 index 0000000..f27f2b1 --- /dev/null +++ b/packages/storage/tests/equity-print-joins.test.ts @@ -0,0 +1,42 @@ +import { describe, expect, it } from "bun:test"; +import { + equityPrintJoinsTableDDL, + EQUITY_PRINT_JOINS_TABLE, + fromEquityPrintJoinRecord, + toEquityPrintJoinRecord +} from "../src/equity-print-joins"; + +const join = { + source_ts: 100, + ingest_ts: 120, + seq: 1, + trace_id: "equityjoin:trace-1", + id: "equityjoin:trace-1", + print_trace_id: "trace-1", + quote_trace_id: "quote-1", + features: { + underlying_id: "SPY", + price: 450.12, + size: 200, + off_exchange_flag: true, + quote_placement: "A" + }, + join_quality: { + quote_age_ms: 15 + } +}; + +describe("equity-print-joins storage helpers", () => { + it("includes the correct table name in the DDL", () => { + const ddl = equityPrintJoinsTableDDL(); + expect(ddl).toContain(EQUITY_PRINT_JOINS_TABLE); + expect(ddl).toContain("CREATE TABLE IF NOT EXISTS"); + }); + + it("round-trips equity print join records", () => { + const record = toEquityPrintJoinRecord(join); + const restored = fromEquityPrintJoinRecord(record); + expect(restored.features).toEqual(join.features); + expect(restored.join_quality).toEqual(join.join_quality); + }); +}); diff --git a/packages/storage/tests/equity-quotes.test.ts b/packages/storage/tests/equity-quotes.test.ts new file mode 100644 index 0000000..bc3917e --- /dev/null +++ b/packages/storage/tests/equity-quotes.test.ts @@ -0,0 +1,30 @@ +import { describe, expect, it } from "bun:test"; +import { + equityQuotesTableDDL, + EQUITY_QUOTES_TABLE, + normalizeEquityQuote +} from "../src/equity-quotes"; + +const baseQuote = { + source_ts: 100, + ingest_ts: 200, + seq: 1, + trace_id: "trace-1", + ts: 100, + underlying_id: "SPY", + bid: 450.1, + ask: 450.2 +}; + +describe("equity-quotes storage helpers", () => { + it("keeps required fields intact", () => { + const normalized = normalizeEquityQuote(baseQuote); + expect(normalized).toEqual(baseQuote); + }); + + it("includes the correct table name in the DDL", () => { + const ddl = equityQuotesTableDDL(); + expect(ddl).toContain(EQUITY_QUOTES_TABLE); + expect(ddl).toContain("CREATE TABLE IF NOT EXISTS"); + }); +}); diff --git a/packages/types/src/events.ts b/packages/types/src/events.ts index 1c3a4a6..4200d27 100644 --- a/packages/types/src/events.ts +++ b/packages/types/src/events.ts @@ -59,6 +59,18 @@ export const EquityQuoteSchema = EventMetaSchema.merge( export type EquityQuote = z.infer; +export const EquityPrintJoinSchema = EventMetaSchema.merge( + z.object({ + id: z.string().min(1), + print_trace_id: z.string().min(1), + quote_trace_id: z.string(), + features: z.record(z.union([z.string(), z.number(), z.boolean()])), + join_quality: z.record(z.number()) + }) +); + +export type EquityPrintJoin = z.infer; + export const FlowPacketSchema = EventMetaSchema.merge( z.object({ id: z.string().min(1), diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 59495cb..aa0d2db 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -3,13 +3,17 @@ import { createLogger } from "@islandflow/observability"; import { SUBJECT_ALERTS, SUBJECT_CLASSIFIER_HITS, + SUBJECT_EQUITY_JOINS, SUBJECT_EQUITY_PRINTS, + SUBJECT_EQUITY_QUOTES, SUBJECT_FLOW_PACKETS, SUBJECT_OPTION_NBBO, SUBJECT_OPTION_PRINTS, STREAM_ALERTS, STREAM_CLASSIFIER_HITS, + STREAM_EQUITY_JOINS, STREAM_EQUITY_PRINTS, + STREAM_EQUITY_QUOTES, STREAM_FLOW_PACKETS, STREAM_OPTION_NBBO, STREAM_OPTION_PRINTS, @@ -22,15 +26,21 @@ import { createClickHouseClient, ensureAlertsTable, ensureClassifierHitsTable, + ensureEquityPrintJoinsTable, ensureEquityPrintsTable, + ensureEquityQuotesTable, ensureFlowPacketsTable, ensureOptionNBBOTable, ensureOptionPrintsTable, fetchRecentAlerts, fetchRecentClassifierHits, + fetchRecentEquityPrintJoins, fetchRecentFlowPackets, + fetchRecentEquityQuotes, fetchRecentOptionNBBO, fetchEquityPrintsAfter, + fetchEquityPrintJoinsAfter, + fetchEquityQuotesAfter, fetchRecentEquityPrints, fetchOptionNBBOAfter, fetchOptionPrintsAfter, @@ -40,6 +50,8 @@ import { AlertEventSchema, ClassifierHitEventSchema, EquityPrintSchema, + EquityPrintJoinSchema, + EquityQuoteSchema, FlowPacketSchema, OptionNBBOSchema, OptionPrintSchema @@ -93,7 +105,15 @@ const replayParamsSchema = z.object({ limit: z.coerce.number().int().positive().max(1000).default(200) }); -type Channel = "options" | "options-nbbo" | "equities" | "flow" | "classifier-hits" | "alerts"; +type Channel = + | "options" + | "options-nbbo" + | "equities" + | "equity-quotes" + | "equity-joins" + | "flow" + | "classifier-hits" + | "alerts"; type WsData = { channel: Channel; @@ -102,6 +122,8 @@ type WsData = { const optionSockets = new Set>(); const optionNbboSockets = new Set>(); const equitySockets = new Set>(); +const equityQuoteSockets = new Set>(); +const equityJoinSockets = new Set>(); const flowSockets = new Set>(); const classifierHitSockets = new Set>(); const alertSockets = new Set>(); @@ -202,6 +224,32 @@ const run = async () => { num_replicas: 1 }); + await ensureStream(jsm, { + name: STREAM_EQUITY_QUOTES, + subjects: [SUBJECT_EQUITY_QUOTES], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + + await ensureStream(jsm, { + name: STREAM_EQUITY_JOINS, + subjects: [SUBJECT_EQUITY_JOINS], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + await ensureStream(jsm, { name: STREAM_FLOW_PACKETS, subjects: [SUBJECT_FLOW_PACKETS], @@ -250,6 +298,8 @@ const run = async () => { await ensureOptionPrintsTable(clickhouse); await ensureOptionNBBOTable(clickhouse); await ensureEquityPrintsTable(clickhouse); + await ensureEquityQuotesTable(clickhouse); + await ensureEquityPrintJoinsTable(clickhouse); await ensureFlowPacketsTable(clickhouse); await ensureClassifierHitsTable(clickhouse); await ensureAlertsTable(clickhouse); @@ -311,6 +361,18 @@ const run = async () => { "api-equity-prints" ); + const equityQuoteSubscription = await subscribeWithReset( + SUBJECT_EQUITY_QUOTES, + STREAM_EQUITY_QUOTES, + "api-equity-quotes" + ); + + const equityJoinSubscription = await subscribeWithReset( + SUBJECT_EQUITY_JOINS, + STREAM_EQUITY_JOINS, + "api-equity-joins" + ); + const flowSubscription = await subscribeWithReset( SUBJECT_FLOW_PACKETS, STREAM_FLOW_PACKETS, @@ -374,6 +436,36 @@ const run = async () => { } }; + const pumpEquityQuotes = async () => { + for await (const msg of equityQuoteSubscription.messages) { + try { + const payload = EquityQuoteSchema.parse(equityQuoteSubscription.decode(msg)); + broadcast(equityQuoteSockets, { type: "equity-quote", payload }); + msg.ack(); + } catch (error) { + logger.error("failed to process equity quote", { + error: error instanceof Error ? error.message : String(error) + }); + msg.term(); + } + } + }; + + const pumpEquityJoins = async () => { + for await (const msg of equityJoinSubscription.messages) { + try { + const payload = EquityPrintJoinSchema.parse(equityJoinSubscription.decode(msg)); + broadcast(equityJoinSockets, { type: "equity-join", payload }); + msg.ack(); + } catch (error) { + logger.error("failed to process equity join", { + error: error instanceof Error ? error.message : String(error) + }); + msg.term(); + } + } + }; + const pumpFlow = async () => { for await (const msg of flowSubscription.messages) { try { @@ -422,6 +514,8 @@ const run = async () => { void pumpOptions(); void pumpOptionNbbo(); void pumpEquities(); + void pumpEquityQuotes(); + void pumpEquityJoins(); void pumpFlow(); void pumpClassifierHits(); void pumpAlerts(); @@ -453,6 +547,18 @@ const run = async () => { return jsonResponse({ data }); } + if (req.method === "GET" && url.pathname === "/quotes/equities") { + const limit = parseLimit(url.searchParams.get("limit")); + const data = await fetchRecentEquityQuotes(clickhouse, limit); + return jsonResponse({ data }); + } + + if (req.method === "GET" && url.pathname === "/joins/equities") { + const limit = parseLimit(url.searchParams.get("limit")); + const data = await fetchRecentEquityPrintJoins(clickhouse, limit); + return jsonResponse({ data }); + } + if (req.method === "GET" && url.pathname === "/flow/packets") { const limit = parseLimit(url.searchParams.get("limit")); const data = await fetchRecentFlowPackets(clickhouse, limit); @@ -495,6 +601,22 @@ const run = async () => { return jsonResponse({ data, next }); } + if (req.method === "GET" && url.pathname === "/replay/equity-quotes") { + const { afterTs, afterSeq, limit } = parseReplayParams(url); + const data = await fetchEquityQuotesAfter(clickhouse, afterTs, afterSeq, limit); + const last = data.at(-1); + const next = last ? { ts: last.ts, seq: last.seq } : null; + return jsonResponse({ data, next }); + } + + if (req.method === "GET" && url.pathname === "/replay/equity-joins") { + const { afterTs, afterSeq, limit } = parseReplayParams(url); + const data = await fetchEquityPrintJoinsAfter(clickhouse, afterTs, afterSeq, limit); + const last = data.at(-1); + const next = last ? { ts: last.source_ts, seq: last.seq } : null; + return jsonResponse({ data, next }); + } + if (req.method === "GET" && url.pathname === "/ws/options") { if (serverRef.upgrade(req, { data: { channel: "options" } })) { return new Response(null, { status: 101 }); @@ -519,6 +641,22 @@ const run = async () => { return jsonResponse({ error: "websocket upgrade failed" }, 400); } + if (req.method === "GET" && url.pathname === "/ws/equity-quotes") { + if (serverRef.upgrade(req, { data: { channel: "equity-quotes" } })) { + return new Response(null, { status: 101 }); + } + + return jsonResponse({ error: "websocket upgrade failed" }, 400); + } + + if (req.method === "GET" && url.pathname === "/ws/equity-joins") { + if (serverRef.upgrade(req, { data: { channel: "equity-joins" } })) { + return new Response(null, { status: 101 }); + } + + return jsonResponse({ error: "websocket upgrade failed" }, 400); + } + if (req.method === "GET" && url.pathname === "/ws/flow") { if (serverRef.upgrade(req, { data: { channel: "flow" } })) { return new Response(null, { status: 101 }); @@ -553,6 +691,10 @@ const run = async () => { optionNbboSockets.add(socket); } else if (socket.data.channel === "equities") { equitySockets.add(socket); + } else if (socket.data.channel === "equity-quotes") { + equityQuoteSockets.add(socket); + } else if (socket.data.channel === "equity-joins") { + equityJoinSockets.add(socket); } else if (socket.data.channel === "flow") { flowSockets.add(socket); } else if (socket.data.channel === "classifier-hits") { @@ -570,6 +712,10 @@ const run = async () => { optionNbboSockets.delete(socket); } else if (socket.data.channel === "equities") { equitySockets.delete(socket); + } else if (socket.data.channel === "equity-quotes") { + equityQuoteSockets.delete(socket); + } else if (socket.data.channel === "equity-joins") { + equityJoinSockets.delete(socket); } else if (socket.data.channel === "flow") { flowSockets.delete(socket); } else if (socket.data.channel === "classifier-hits") { diff --git a/services/compute/src/equity-joins.ts b/services/compute/src/equity-joins.ts new file mode 100644 index 0000000..0f25bce --- /dev/null +++ b/services/compute/src/equity-joins.ts @@ -0,0 +1,104 @@ +import type { EquityPrint, EquityPrintJoin, EquityQuote } from "@islandflow/types"; + +export type EquityQuoteJoin = { + quote: EquityQuote | null; + ageMs: number; + stale: boolean; +}; + +export type QuotePlacement = "AA" | "A" | "B" | "BB" | "MID" | "MISSING" | "STALE"; + +const roundTo = (value: number, digits = 4): number => { + if (!Number.isFinite(value)) { + return 0; + } + return Number(value.toFixed(digits)); +}; + +export const classifyQuotePlacement = ( + price: number, + join: EquityQuoteJoin +): QuotePlacement => { + if (!Number.isFinite(price)) { + return "MISSING"; + } + if (!join.quote) { + return "MISSING"; + } + if (join.stale) { + return "STALE"; + } + + const bid = join.quote.bid; + const ask = join.quote.ask; + if (!Number.isFinite(bid) || !Number.isFinite(ask) || ask <= 0) { + return "MISSING"; + } + + const spread = Math.max(0, ask - bid); + const epsilon = Math.max(0.01, spread * 0.05); + + if (price > ask + epsilon) { + return "AA"; + } + if (price >= ask - epsilon) { + return "A"; + } + if (price < bid - epsilon) { + return "BB"; + } + if (price <= bid + epsilon) { + return "B"; + } + + return "MID"; +}; + +export const buildEquityPrintJoin = ( + print: EquityPrint, + join: EquityQuoteJoin +): EquityPrintJoin => { + const joinQuality: Record = {}; + const placement = classifyQuotePlacement(print.price, join); + const features: Record = { + underlying_id: print.underlying_id, + price: print.price, + size: print.size, + off_exchange_flag: print.offExchangeFlag, + print_ts: print.ts, + quote_placement: placement + }; + + if (!join.quote) { + joinQuality.quote_missing = 1; + } else { + joinQuality.quote_age_ms = join.ageMs; + if (join.stale) { + joinQuality.quote_stale = 1; + } else { + const bid = join.quote.bid; + const ask = join.quote.ask; + const mid = (bid + ask) / 2; + const spread = ask - bid; + features.quote_ts = join.quote.ts; + features.quote_bid = bid; + features.quote_ask = ask; + features.quote_mid = roundTo(mid); + features.quote_spread = roundTo(spread); + } + } + + const joinId = `equityjoin:${print.trace_id}`; + + return { + source_ts: print.source_ts, + ingest_ts: print.ingest_ts, + seq: print.seq, + trace_id: joinId, + id: joinId, + print_trace_id: print.trace_id, + quote_trace_id: join.quote?.trace_id ?? "", + features, + join_quality: joinQuality + }; +}; diff --git a/services/compute/src/index.ts b/services/compute/src/index.ts index c8caf0e..c9d638b 100644 --- a/services/compute/src/index.ts +++ b/services/compute/src/index.ts @@ -3,11 +3,17 @@ import { createLogger } from "@islandflow/observability"; import { SUBJECT_ALERTS, SUBJECT_CLASSIFIER_HITS, + SUBJECT_EQUITY_JOINS, + SUBJECT_EQUITY_PRINTS, + SUBJECT_EQUITY_QUOTES, SUBJECT_FLOW_PACKETS, SUBJECT_OPTION_NBBO, SUBJECT_OPTION_PRINTS, STREAM_ALERTS, STREAM_CLASSIFIER_HITS, + STREAM_EQUITY_JOINS, + STREAM_EQUITY_PRINTS, + STREAM_EQUITY_QUOTES, STREAM_FLOW_PACKETS, STREAM_OPTION_NBBO, STREAM_OPTION_PRINTS, @@ -21,19 +27,27 @@ import { createClickHouseClient, ensureAlertsTable, ensureClassifierHitsTable, + ensureEquityPrintJoinsTable, ensureFlowPacketsTable, insertAlert, insertClassifierHit, + insertEquityPrintJoin, insertFlowPacket } from "@islandflow/storage"; import { AlertEventSchema, ClassifierHitEventSchema, + EquityPrintJoinSchema, + EquityPrintSchema, + EquityQuoteSchema, FlowPacketSchema, OptionNBBOSchema, OptionPrintSchema, type AlertEvent, type ClassifierHitEvent, + type EquityPrint, + type EquityQuote, + type EquityPrintJoin, type FlowPacket, type OptionNBBO, type OptionPrint @@ -41,6 +55,7 @@ import { import { z } from "zod"; import { evaluateClassifiers, type ClassifierConfig } from "./classifiers"; import { parseContractId } from "./contracts"; +import { buildEquityPrintJoin, type EquityQuoteJoin } from "./equity-joins"; import { createRedisClient, updateRollingStats, type RollingStatsConfig } from "./rolling-stats"; import { summarizeStructure, type ContractLeg } from "./structures"; @@ -71,6 +86,7 @@ const envSchema = z.object({ }, z.boolean()) .default(false), NBBO_MAX_AGE_MS: z.coerce.number().int().positive().default(1000), + EQUITY_QUOTE_MAX_AGE_MS: z.coerce.number().int().positive().default(1000), CLASSIFIER_SWEEP_MIN_PREMIUM: z.coerce.number().positive().default(40_000), CLASSIFIER_SWEEP_MIN_COUNT: z.coerce.number().int().positive().default(3), CLASSIFIER_SWEEP_MIN_PREMIUM_Z: z.coerce.number().nonnegative().default(2), @@ -161,6 +177,7 @@ type ClusterState = { const clusters = new Map(); const nbboCache = new Map(); +const equityQuoteCache = new Map(); const recentLegsByKey = new Map(); const MAX_RECENT_LEGS = 20; @@ -341,6 +358,17 @@ const updateNbboCache = (nbbo: OptionNBBO): void => { } }; +const updateEquityQuoteCache = (quote: EquityQuote): void => { + const existing = equityQuoteCache.get(quote.underlying_id); + if ( + !existing || + quote.ts > existing.ts || + (quote.ts === existing.ts && quote.seq >= existing.seq) + ) { + equityQuoteCache.set(quote.underlying_id, quote); + } +}; + const selectNbbo = (contractId: string, ts: number): NbboJoin => { const nbbo = nbboCache.get(contractId) ?? null; if (!nbbo) { @@ -352,6 +380,17 @@ const selectNbbo = (contractId: string, ts: number): NbboJoin => { return { nbbo, ageMs, stale }; }; +const selectEquityQuote = (underlyingId: string, ts: number): EquityQuoteJoin => { + const quote = equityQuoteCache.get(underlyingId) ?? null; + if (!quote) { + return { quote: null, ageMs: env.EQUITY_QUOTE_MAX_AGE_MS + 1, stale: true }; + } + + const ageMs = Math.abs(ts - quote.ts); + const stale = ageMs > env.EQUITY_QUOTE_MAX_AGE_MS; + return { quote, ageMs, stale }; +}; + const classifyPlacement = (price: number, join: NbboJoin): NbboPlacement => { if (!Number.isFinite(price)) { return "MISSING"; @@ -609,6 +648,25 @@ const emitClassifiers = async ( } }; +const emitEquityJoin = async ( + clickhouse: ReturnType, + js: Awaited>["js"], + print: EquityPrint +): Promise => { + const join = selectEquityQuote(print.underlying_id, print.ts); + const payload: EquityPrintJoin = EquityPrintJoinSchema.parse(buildEquityPrintJoin(print, join)); + + try { + await insertEquityPrintJoin(clickhouse, payload); + await publishJson(js, SUBJECT_EQUITY_JOINS, payload); + } catch (error) { + logger.error("failed to emit equity print join", { + error: error instanceof Error ? error.message : String(error), + trace_id: payload.trace_id + }); + } +}; + const flushEligibleClusters = async ( clickhouse: ReturnType, js: Awaited>["js"], @@ -666,6 +724,32 @@ const run = async () => { num_replicas: 1 }); + await ensureStream(jsm, { + name: STREAM_EQUITY_PRINTS, + subjects: [SUBJECT_EQUITY_PRINTS], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + + await ensureStream(jsm, { + name: STREAM_EQUITY_QUOTES, + subjects: [SUBJECT_EQUITY_QUOTES], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + await ensureStream(jsm, { name: STREAM_FLOW_PACKETS, subjects: [SUBJECT_FLOW_PACKETS], @@ -679,6 +763,19 @@ const run = async () => { num_replicas: 1 }); + await ensureStream(jsm, { + name: STREAM_EQUITY_JOINS, + subjects: [SUBJECT_EQUITY_JOINS], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + await ensureStream(jsm, { name: STREAM_CLASSIFIER_HITS, subjects: [SUBJECT_CLASSIFIER_HITS], @@ -726,12 +823,15 @@ const run = async () => { await retry("clickhouse table init", 20, 500, async () => { await ensureFlowPacketsTable(clickhouse); + await ensureEquityPrintJoinsTable(clickhouse); await ensureClassifierHitsTable(clickhouse); await ensureAlertsTable(clickhouse); }); const durableName = "compute-option-prints"; const nbboDurableName = "compute-option-nbbo"; + const equityPrintDurableName = "compute-equity-prints"; + const equityQuoteDurableName = "compute-equity-quotes"; if (env.COMPUTE_CONSUMER_RESET) { try { @@ -791,6 +891,76 @@ const run = async () => { } } + if (env.COMPUTE_CONSUMER_RESET) { + try { + await jsm.consumers.delete(STREAM_EQUITY_PRINTS, equityPrintDurableName); + logger.warn("reset jetstream consumer", { durable: equityPrintDurableName }); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (!message.includes("not found")) { + logger.warn("failed to reset jetstream consumer", { + durable: equityPrintDurableName, + error: message + }); + } + } + } else { + try { + const info = await jsm.consumers.info(STREAM_EQUITY_PRINTS, equityPrintDurableName); + if (info?.config?.deliver_policy && info.config.deliver_policy !== env.COMPUTE_DELIVER_POLICY) { + logger.warn("resetting consumer due to deliver policy change", { + durable: equityPrintDurableName, + current: info.config.deliver_policy, + desired: env.COMPUTE_DELIVER_POLICY + }); + await jsm.consumers.delete(STREAM_EQUITY_PRINTS, equityPrintDurableName); + } + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (!message.includes("not found")) { + logger.warn("failed to inspect jetstream consumer", { + durable: equityPrintDurableName, + error: message + }); + } + } + } + + if (env.COMPUTE_CONSUMER_RESET) { + try { + await jsm.consumers.delete(STREAM_EQUITY_QUOTES, equityQuoteDurableName); + logger.warn("reset jetstream consumer", { durable: equityQuoteDurableName }); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (!message.includes("not found")) { + logger.warn("failed to reset jetstream consumer", { + durable: equityQuoteDurableName, + error: message + }); + } + } + } else { + try { + const info = await jsm.consumers.info(STREAM_EQUITY_QUOTES, equityQuoteDurableName); + if (info?.config?.deliver_policy && info.config.deliver_policy !== env.COMPUTE_DELIVER_POLICY) { + logger.warn("resetting consumer due to deliver policy change", { + durable: equityQuoteDurableName, + current: info.config.deliver_policy, + desired: env.COMPUTE_DELIVER_POLICY + }); + await jsm.consumers.delete(STREAM_EQUITY_QUOTES, equityQuoteDurableName); + } + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (!message.includes("not found")) { + logger.warn("failed to inspect jetstream consumer", { + durable: equityQuoteDurableName, + error: message + }); + } + } + } + const subscription = await (async () => { const opts = buildDurableConsumer(durableName); applyDeliverPolicy(opts, env.COMPUTE_DELIVER_POLICY); @@ -863,6 +1033,81 @@ const run = async () => { } })(); + const equitySubscription = await (async () => { + const opts = buildDurableConsumer(equityPrintDurableName); + applyDeliverPolicy(opts, env.COMPUTE_DELIVER_POLICY); + try { + return await subscribeJson(js, SUBJECT_EQUITY_PRINTS, opts); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + const shouldReset = + message.includes("duplicate subscription") || + message.includes("durable requires") || + message.includes("subject does not match consumer"); + + if (!shouldReset) { + throw error; + } + + logger.warn("resetting jetstream consumer", { durable: equityPrintDurableName, error: message }); + + try { + await jsm.consumers.delete(STREAM_EQUITY_PRINTS, equityPrintDurableName); + } catch (deleteError) { + const deleteMessage = deleteError instanceof Error ? deleteError.message : String(deleteError); + if (!deleteMessage.includes("not found")) { + logger.warn("failed to delete jetstream consumer", { + durable: equityPrintDurableName, + error: deleteMessage + }); + } + } + + const resetOpts = buildDurableConsumer(equityPrintDurableName); + applyDeliverPolicy(resetOpts, env.COMPUTE_DELIVER_POLICY); + return await subscribeJson(js, SUBJECT_EQUITY_PRINTS, resetOpts); + } + })(); + + const equityQuoteSubscription = await (async () => { + const opts = buildDurableConsumer(equityQuoteDurableName); + applyDeliverPolicy(opts, env.COMPUTE_DELIVER_POLICY); + try { + return await subscribeJson(js, SUBJECT_EQUITY_QUOTES, opts); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + const shouldReset = + message.includes("duplicate subscription") || + message.includes("durable requires") || + message.includes("subject does not match consumer"); + + if (!shouldReset) { + throw error; + } + + logger.warn("resetting jetstream consumer", { + durable: equityQuoteDurableName, + error: message + }); + + try { + await jsm.consumers.delete(STREAM_EQUITY_QUOTES, equityQuoteDurableName); + } catch (deleteError) { + const deleteMessage = deleteError instanceof Error ? deleteError.message : String(deleteError); + if (!deleteMessage.includes("not found")) { + logger.warn("failed to delete jetstream consumer", { + durable: equityQuoteDurableName, + error: deleteMessage + }); + } + } + + const resetOpts = buildDurableConsumer(equityQuoteDurableName); + applyDeliverPolicy(resetOpts, env.COMPUTE_DELIVER_POLICY); + return await subscribeJson(js, SUBJECT_EQUITY_QUOTES, resetOpts); + } + })(); + const nbboLoop = async () => { for await (const msg of nbboSubscription.messages) { try { @@ -878,7 +1123,39 @@ const run = async () => { } }; + const equityQuoteLoop = async () => { + for await (const msg of equityQuoteSubscription.messages) { + try { + const quote = EquityQuoteSchema.parse(equityQuoteSubscription.decode(msg)); + updateEquityQuoteCache(quote); + msg.ack(); + } catch (error) { + logger.error("failed to process equity quote", { + error: error instanceof Error ? error.message : String(error) + }); + msg.term(); + } + } + }; + + const equityPrintLoop = async () => { + for await (const msg of equitySubscription.messages) { + try { + const print = EquityPrintSchema.parse(equitySubscription.decode(msg)); + await emitEquityJoin(clickhouse, js, print); + msg.ack(); + } catch (error) { + logger.error("failed to process equity print", { + error: error instanceof Error ? error.message : String(error) + }); + msg.term(); + } + } + }; + void nbboLoop(); + void equityQuoteLoop(); + void equityPrintLoop(); const shutdown = async (signal: string) => { logger.info("service stopping", { signal }); diff --git a/services/compute/tests/equity-joins.test.ts b/services/compute/tests/equity-joins.test.ts new file mode 100644 index 0000000..3888a4f --- /dev/null +++ b/services/compute/tests/equity-joins.test.ts @@ -0,0 +1,72 @@ +import { describe, expect, it } from "bun:test"; +import { buildEquityPrintJoin, classifyQuotePlacement } from "../src/equity-joins"; + +const basePrint = { + source_ts: 100, + ingest_ts: 110, + seq: 1, + trace_id: "print-1", + ts: 100, + underlying_id: "SPY", + price: 100, + size: 50, + exchange: "TEST", + offExchangeFlag: false +}; + +const baseQuote = { + source_ts: 95, + ingest_ts: 105, + seq: 2, + trace_id: "quote-1", + ts: 98, + underlying_id: "SPY", + bid: 99.9, + ask: 100.1 +}; + +describe("equity join helpers", () => { + it("classifies placements with stale and missing quotes", () => { + const missing = classifyQuotePlacement(basePrint.price, { + quote: null, + ageMs: 1500, + stale: true + }); + const stale = classifyQuotePlacement(basePrint.price, { + quote: baseQuote, + ageMs: 1500, + stale: true + }); + + expect(missing).toBe("MISSING"); + expect(stale).toBe("STALE"); + }); + + it("builds join payloads with quote features when fresh", () => { + const join = buildEquityPrintJoin(basePrint, { + quote: baseQuote, + ageMs: 5, + stale: false + }); + + expect(join.id).toBe("equityjoin:print-1"); + expect(join.quote_trace_id).toBe("quote-1"); + expect(join.join_quality.quote_age_ms).toBe(5); + expect(join.features.quote_bid).toBe(99.9); + expect(join.features.quote_ask).toBe(100.1); + expect(join.features.quote_mid).toBeCloseTo(100, 2); + expect(join.features.quote_spread).toBeCloseTo(0.2, 2); + }); + + it("marks missing quotes in join quality", () => { + const join = buildEquityPrintJoin(basePrint, { + quote: null, + ageMs: 2000, + stale: true + }); + + expect(join.quote_trace_id).toBe(""); + expect(join.join_quality.quote_missing).toBe(1); + expect(join.features.quote_placement).toBe("MISSING"); + }); +}); diff --git a/services/ingest-equities/src/adapters/synthetic.ts b/services/ingest-equities/src/adapters/synthetic.ts index 930e5e2..c1d21fa 100644 --- a/services/ingest-equities/src/adapters/synthetic.ts +++ b/services/ingest-equities/src/adapters/synthetic.ts @@ -1,4 +1,4 @@ -import { SP500_SYMBOLS, type EquityPrint } from "@islandflow/types"; +import { SP500_SYMBOLS, type EquityPrint, type EquityQuote } from "@islandflow/types"; import type { EquityIngestAdapter, EquityIngestHandlers } from "./types"; type SyntheticEquitiesAdapterConfig = { @@ -38,6 +38,25 @@ const buildSyntheticPrint = ( }; }; +const buildSyntheticQuote = ( + seq: number, + now: number, + symbol: string, + bid: number, + ask: number +): EquityQuote => { + return { + source_ts: now, + ingest_ts: now, + seq, + trace_id: `synthetic-equity-quote-${seq}`, + ts: now, + underlying_id: symbol, + bid, + ask + }; +}; + export const createSyntheticEquitiesAdapter = ( config: SyntheticEquitiesAdapterConfig ): EquityIngestAdapter => { @@ -45,6 +64,7 @@ export const createSyntheticEquitiesAdapter = ( name: "synthetic", start: (handlers: EquityIngestHandlers) => { let seq = 0; + let quoteSeq = 0; let timer: ReturnType | null = null; let stopped = false; @@ -65,8 +85,18 @@ export const createSyntheticEquitiesAdapter = ( const size = 10 + (seq % 600); const exchange = EXCHANGES[(seq + symbolHash) % EXCHANGES.length]; const offExchangeFlag = (seq + i) % 6 === 0; - const print = buildSyntheticPrint(seq, now + i * 4, symbol, price, size, exchange, offExchangeFlag); + const eventTs = now + i * 4; + const print = buildSyntheticPrint(seq, eventTs, symbol, price, size, exchange, offExchangeFlag); void handlers.onTrade(print); + + if (handlers.onQuote) { + quoteSeq += 1; + const spread = Math.max(0.02, Number((price * 0.002).toFixed(2))); + const bid = Math.max(0.01, Number((price - spread / 2).toFixed(2))); + const ask = Math.max(bid + 0.01, Number((price + spread / 2).toFixed(2))); + const quote = buildSyntheticQuote(quoteSeq, eventTs, symbol, bid, ask); + void handlers.onQuote(quote); + } } }; diff --git a/services/ingest-equities/src/index.ts b/services/ingest-equities/src/index.ts index 5001cec..de0c324 100644 --- a/services/ingest-equities/src/index.ts +++ b/services/ingest-equities/src/index.ts @@ -2,7 +2,9 @@ import { readEnv } from "@islandflow/config"; import { createLogger } from "@islandflow/observability"; import { SUBJECT_EQUITY_PRINTS, + SUBJECT_EQUITY_QUOTES, STREAM_EQUITY_PRINTS, + STREAM_EQUITY_QUOTES, connectJetStreamWithRetry, ensureStream, publishJson @@ -10,9 +12,16 @@ import { import { createClickHouseClient, ensureEquityPrintsTable, - insertEquityPrint + ensureEquityQuotesTable, + insertEquityPrint, + insertEquityQuote } from "@islandflow/storage"; -import { EquityPrintSchema, type EquityPrint } from "@islandflow/types"; +import { + EquityPrintSchema, + EquityQuoteSchema, + type EquityPrint, + type EquityQuote +} from "@islandflow/types"; import { createSyntheticEquitiesAdapter } from "./adapters/synthetic"; import type { EquityIngestAdapter, StopHandler } from "./adapters/types"; import { z } from "zod"; @@ -136,6 +145,19 @@ const run = async () => { num_replicas: 1 }); + await ensureStream(jsm, { + name: STREAM_EQUITY_QUOTES, + subjects: [SUBJECT_EQUITY_QUOTES], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + const clickhouse = createClickHouseClient({ url: env.CLICKHOUSE_URL, database: env.CLICKHOUSE_DATABASE @@ -143,11 +165,13 @@ const run = async () => { await retry("clickhouse table init", 20, 500, async () => { await ensureEquityPrintsTable(clickhouse); + await ensureEquityQuotesTable(clickhouse); }); const adapter = selectAdapter(env.EQUITIES_INGEST_ADAPTER); logger.info("ingest adapter selected", { adapter: adapter.name }); const allowPublish = buildThrottle(env.TESTING_MODE, env.TESTING_THROTTLE_MS); + const allowQuotePublish = buildThrottle(env.TESTING_MODE, env.TESTING_THROTTLE_MS); const stopAdapter: StopHandler = await adapter.start({ onTrade: async (candidate: EquityPrint) => { @@ -176,6 +200,28 @@ const run = async () => { trace_id: print.trace_id }); } + }, + onQuote: async (candidate: EquityQuote) => { + if (state.shuttingDown) { + return; + } + + const now = Date.now(); + if (!allowQuotePublish(now)) { + return; + } + + const quote = EquityQuoteSchema.parse(candidate); + + try { + await insertEquityQuote(clickhouse, quote); + await publishJson(js, SUBJECT_EQUITY_QUOTES, quote); + } catch (error) { + logger.error("failed to publish equity quote", { + error: error instanceof Error ? error.message : String(error), + trace_id: quote.trace_id + }); + } } }); diff --git a/tsconfig.base.json b/tsconfig.base.json index f98f46a..34b15d2 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -8,6 +8,6 @@ "isolatedModules": true, "resolveJsonModule": true, "skipLibCheck": true, - "noEmit": true - } + "noEmit": true, + }, }