From 6c376b26dcceacf0d2a83690d17197af9bd45ce9 Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Sat, 27 Dec 2025 20:25:32 -0500 Subject: [PATCH] Add flow packet clustering --- packages/bus/src/subjects.ts | 2 + packages/storage/src/clickhouse.ts | 69 +++++++- packages/storage/src/flow-packets.ts | 70 ++++++++ packages/storage/src/index.ts | 1 + packages/storage/tests/flow-packets.test.ts | 39 +++++ services/api/src/index.ts | 9 ++ services/compute/package.json | 1 + services/compute/src/index.ts | 167 ++++++++++++++++++-- 8 files changed, 347 insertions(+), 11 deletions(-) create mode 100644 packages/storage/src/flow-packets.ts create mode 100644 packages/storage/tests/flow-packets.test.ts diff --git a/packages/bus/src/subjects.ts b/packages/bus/src/subjects.ts index 57d6059..9b22df7 100644 --- a/packages/bus/src/subjects.ts +++ b/packages/bus/src/subjects.ts @@ -2,3 +2,5 @@ export const STREAM_OPTION_PRINTS = "OPTIONS_PRINTS"; export const SUBJECT_OPTION_PRINTS = "options.prints"; export const STREAM_EQUITY_PRINTS = "EQUITY_PRINTS"; export const SUBJECT_EQUITY_PRINTS = "equities.prints"; +export const STREAM_FLOW_PACKETS = "FLOW_PACKETS"; +export const SUBJECT_FLOW_PACKETS = "flow.packets"; diff --git a/packages/storage/src/clickhouse.ts b/packages/storage/src/clickhouse.ts index 16d7cd8..c674aeb 100644 --- a/packages/storage/src/clickhouse.ts +++ b/packages/storage/src/clickhouse.ts @@ -1,6 +1,6 @@ import { createClient, type ClickHouseClient } from "@clickhouse/client"; -import { EquityPrintSchema, OptionPrintSchema } from "@islandflow/types"; -import type { EquityPrint, OptionPrint } from "@islandflow/types"; +import { EquityPrintSchema, FlowPacketSchema, OptionPrintSchema } from "@islandflow/types"; +import type { EquityPrint, FlowPacket, OptionPrint } from "@islandflow/types"; import { normalizeOptionPrint, optionPrintsTableDDL, @@ -11,6 +11,13 @@ import { EQUITY_PRINTS_TABLE, normalizeEquityPrint } from "./equity-prints"; +import { + FLOW_PACKETS_TABLE, + flowPacketsTableDDL, + fromFlowPacketRecord, + toFlowPacketRecord, + type FlowPacketRecord +} from "./flow-packets"; export type ClickHouseOptions = { url: string; @@ -44,6 +51,14 @@ export const ensureEquityPrintsTable = async ( }); }; +export const ensureFlowPacketsTable = async ( + client: ClickHouseClient +): Promise => { + await client.exec({ + query: flowPacketsTableDDL() + }); +}; + export const insertOptionPrint = async ( client: ClickHouseClient, print: OptionPrint @@ -68,6 +83,18 @@ export const insertEquityPrint = async ( }); }; +export const insertFlowPacket = async ( + client: ClickHouseClient, + packet: FlowPacket +): Promise => { + const record = toFlowPacketRecord(packet); + await client.insert({ + table: FLOW_PACKETS_TABLE, + values: [record], + format: "JSONEachRow" + }); +}; + const clampLimit = (limit: number): number => { if (!Number.isFinite(limit)) { return 100; @@ -149,6 +176,26 @@ const normalizeEquityRow = (row: unknown): unknown => { return row; }; +const normalizeFlowPacketRow = (row: unknown): FlowPacketRecord | null => { + if (!row || typeof row !== "object") { + return null; + } + + const record = row as Record; + return { + source_ts: coerceNumber(record.source_ts) as number, + ingest_ts: coerceNumber(record.ingest_ts) as number, + seq: coerceNumber(record.seq) as number, + trace_id: String(record.trace_id ?? ""), + id: String(record.id ?? ""), + members: Array.isArray(record.members) + ? record.members.map((value) => String(value)) + : [], + features_json: String(record.features_json ?? "{}"), + join_quality_json: String(record.join_quality_json ?? "{}") + }; +}; + export const fetchRecentOptionPrints = async ( client: ClickHouseClient, limit: number @@ -177,6 +224,24 @@ export const fetchRecentEquityPrints = async ( return EquityPrintSchema.array().parse(rows.map(normalizeEquityRow)); }; +export const fetchRecentFlowPackets = async ( + client: ClickHouseClient, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const result = await client.query({ + query: `SELECT * FROM ${FLOW_PACKETS_TABLE} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeFlowPacketRow) + .filter((record): record is FlowPacketRecord => record !== null); + const packets = records.map(fromFlowPacketRecord); + return FlowPacketSchema.array().parse(packets); +}; + export const fetchOptionPrintsAfter = async ( client: ClickHouseClient, afterTs: number, diff --git a/packages/storage/src/flow-packets.ts b/packages/storage/src/flow-packets.ts new file mode 100644 index 0000000..0324663 --- /dev/null +++ b/packages/storage/src/flow-packets.ts @@ -0,0 +1,70 @@ +import type { FlowPacket } from "@islandflow/types"; + +export const FLOW_PACKETS_TABLE = "flow_packets"; + +export type FlowPacketRecord = { + source_ts: number; + ingest_ts: number; + seq: number; + trace_id: string; + id: string; + members: string[]; + features_json: string; + join_quality_json: string; +}; + +export const flowPacketsTableDDL = (): string => { + return ` +CREATE TABLE IF NOT EXISTS ${FLOW_PACKETS_TABLE} ( + source_ts UInt64, + ingest_ts UInt64, + seq UInt64, + trace_id String, + id String, + members Array(String), + features_json String, + join_quality_json String +) +ENGINE = MergeTree +ORDER BY (source_ts, seq) +`; +}; + +export const toFlowPacketRecord = (packet: FlowPacket): FlowPacketRecord => { + return { + source_ts: packet.source_ts, + ingest_ts: packet.ingest_ts, + seq: packet.seq, + trace_id: packet.trace_id, + id: packet.id, + members: packet.members, + features_json: JSON.stringify(packet.features), + join_quality_json: JSON.stringify(packet.join_quality) + }; +}; + +const safeJson = (value: string, fallback: Record): Record => { + try { + const parsed = JSON.parse(value); + if (parsed && typeof parsed === "object") { + return parsed as Record; + } + } catch { + // ignore + } + + return fallback; +}; + +export const fromFlowPacketRecord = (record: FlowPacketRecord): FlowPacket => { + return { + source_ts: record.source_ts, + ingest_ts: record.ingest_ts, + seq: record.seq, + trace_id: record.trace_id, + id: record.id, + members: record.members, + features: safeJson(record.features_json, {}), + join_quality: safeJson(record.join_quality_json, {}) as Record + }; +}; diff --git a/packages/storage/src/index.ts b/packages/storage/src/index.ts index ab50920..69a4bca 100644 --- a/packages/storage/src/index.ts +++ b/packages/storage/src/index.ts @@ -1,3 +1,4 @@ export * from "./clickhouse"; +export * from "./flow-packets"; export * from "./equity-prints"; export * from "./option-prints"; diff --git a/packages/storage/tests/flow-packets.test.ts b/packages/storage/tests/flow-packets.test.ts new file mode 100644 index 0000000..8660625 --- /dev/null +++ b/packages/storage/tests/flow-packets.test.ts @@ -0,0 +1,39 @@ +import { describe, expect, it } from "bun:test"; +import { + flowPacketsTableDDL, + FLOW_PACKETS_TABLE, + fromFlowPacketRecord, + toFlowPacketRecord +} from "../src/flow-packets"; + +const packet = { + source_ts: 10, + ingest_ts: 20, + seq: 1, + trace_id: "fp-1", + id: "fp-1", + members: ["p1", "p2"], + features: { + option_contract_id: "SPY-2025-01-17-450-C", + count: 2, + total_size: 30 + }, + join_quality: { + nbbo_age_ms: 5 + } +}; + +describe("flow-packets storage helpers", () => { + it("includes the correct table name in the DDL", () => { + const ddl = flowPacketsTableDDL(); + expect(ddl).toContain(FLOW_PACKETS_TABLE); + expect(ddl).toContain("CREATE TABLE IF NOT EXISTS"); + }); + + it("round-trips flow packet records", () => { + const record = toFlowPacketRecord(packet); + const restored = fromFlowPacketRecord(record); + expect(restored.features).toEqual(packet.features); + expect(restored.join_quality).toEqual(packet.join_quality); + }); +}); diff --git a/services/api/src/index.ts b/services/api/src/index.ts index d8b0477..6e879cc 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -13,7 +13,9 @@ import { import { createClickHouseClient, ensureEquityPrintsTable, + ensureFlowPacketsTable, ensureOptionPrintsTable, + fetchRecentFlowPackets, fetchEquityPrintsAfter, fetchRecentEquityPrints, fetchOptionPrintsAfter, @@ -141,6 +143,7 @@ const run = async () => { await ensureOptionPrintsTable(clickhouse); await ensureEquityPrintsTable(clickhouse); + await ensureFlowPacketsTable(clickhouse); const optionSubscription = await subscribeJson( js, @@ -208,6 +211,12 @@ const run = async () => { return jsonResponse({ data }); } + if (req.method === "GET" && url.pathname === "/flow/packets") { + const limit = parseLimit(url.searchParams.get("limit")); + const data = await fetchRecentFlowPackets(clickhouse, limit); + return jsonResponse({ data }); + } + if (req.method === "GET" && url.pathname === "/replay/options") { const { afterTs, afterSeq, limit } = parseReplayParams(url); const data = await fetchOptionPrintsAfter(clickhouse, afterTs, afterSeq, limit); diff --git a/services/compute/package.json b/services/compute/package.json index 0bfc7eb..d148767 100644 --- a/services/compute/package.json +++ b/services/compute/package.json @@ -9,6 +9,7 @@ "@islandflow/bus": "workspace:*", "@islandflow/config": "workspace:*", "@islandflow/observability": "workspace:*", + "@islandflow/storage": "workspace:*", "@islandflow/types": "workspace:*", "zod": "^3.23.8" } diff --git a/services/compute/src/index.ts b/services/compute/src/index.ts index 9ffc927..e29c342 100644 --- a/services/compute/src/index.ts +++ b/services/compute/src/index.ts @@ -1,25 +1,137 @@ import { readEnv } from "@islandflow/config"; import { createLogger } from "@islandflow/observability"; import { + SUBJECT_FLOW_PACKETS, SUBJECT_OPTION_PRINTS, + STREAM_FLOW_PACKETS, STREAM_OPTION_PRINTS, buildDurableConsumer, connectJetStreamWithRetry, ensureStream, + publishJson, subscribeJson } from "@islandflow/bus"; -import { OptionPrintSchema } from "@islandflow/types"; +import { + createClickHouseClient, + ensureFlowPacketsTable, + insertFlowPacket +} from "@islandflow/storage"; +import { FlowPacketSchema, OptionPrintSchema, type FlowPacket, type OptionPrint } from "@islandflow/types"; import { z } from "zod"; const service = "compute"; const logger = createLogger({ service }); const envSchema = z.object({ - NATS_URL: z.string().default("nats://localhost:4222") + NATS_URL: z.string().default("nats://localhost:4222"), + CLICKHOUSE_URL: z.string().default("http://localhost:8123"), + CLICKHOUSE_DATABASE: z.string().default("default"), + CLUSTER_WINDOW_MS: z.coerce.number().int().positive().default(500) }); const env = readEnv(envSchema); +type ClusterState = { + contractId: string; + startTs: number; + endTs: number; + startSourceTs: number; + endIngestTs: number; + endSeq: number; + members: string[]; + totalSize: number; + totalPremium: number; + firstPrice: number; + lastPrice: number; +}; + +const clusters = new Map(); + +const buildCluster = (print: OptionPrint): ClusterState => { + return { + contractId: print.option_contract_id, + startTs: print.ts, + endTs: print.ts, + startSourceTs: print.source_ts, + endIngestTs: print.ingest_ts, + endSeq: print.seq, + members: [print.trace_id], + totalSize: print.size, + totalPremium: print.price * print.size, + firstPrice: print.price, + lastPrice: print.price + }; +}; + +const updateCluster = (cluster: ClusterState, print: OptionPrint): ClusterState => { + cluster.endTs = Math.max(cluster.endTs, print.ts); + cluster.endIngestTs = Math.max(cluster.endIngestTs, print.ingest_ts); + cluster.endSeq = Math.max(cluster.endSeq, print.seq); + cluster.members.push(print.trace_id); + cluster.totalSize += print.size; + cluster.totalPremium += print.price * print.size; + cluster.lastPrice = print.price; + return cluster; +}; + +const flushCluster = async ( + clickhouse: ReturnType, + js: Awaited>["js"], + cluster: ClusterState +): Promise => { + const features = { + option_contract_id: cluster.contractId, + count: cluster.members.length, + total_size: cluster.totalSize, + total_premium: Number(cluster.totalPremium.toFixed(4)), + first_price: cluster.firstPrice, + last_price: cluster.lastPrice, + start_ts: cluster.startTs, + end_ts: cluster.endTs, + window_ms: env.CLUSTER_WINDOW_MS + }; + + const packet: FlowPacket = { + source_ts: cluster.startSourceTs, + ingest_ts: cluster.endIngestTs, + seq: cluster.endSeq, + trace_id: `flowpacket:${cluster.contractId}:${cluster.startTs}:${cluster.endTs}`, + id: `flowpacket:${cluster.contractId}:${cluster.startTs}:${cluster.endTs}`, + members: cluster.members, + features, + join_quality: {} + }; + + const validated = FlowPacketSchema.parse(packet); + + await insertFlowPacket(clickhouse, validated); + await publishJson(js, SUBJECT_FLOW_PACKETS, validated); + + logger.info("emitted flow packet", { + id: validated.id, + contract: cluster.contractId, + count: cluster.members.length + }); +}; + +const flushEligibleClusters = async ( + clickhouse: ReturnType, + js: Awaited>["js"], + currentTs: number, + skipContractId: string +): Promise => { + for (const [contractId, cluster] of clusters) { + if (contractId === skipContractId) { + continue; + } + + if (currentTs - cluster.endTs > env.CLUSTER_WINDOW_MS) { + clusters.delete(contractId); + await flushCluster(clickhouse, js, cluster); + } + } +}; + const run = async () => { logger.info("service starting"); @@ -44,13 +156,42 @@ const run = async () => { num_replicas: 1 }); - const opts = buildDurableConsumer("compute-option-prints"); + await ensureStream(jsm, { + name: STREAM_FLOW_PACKETS, + subjects: [SUBJECT_FLOW_PACKETS], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); - const subscription = await subscribeJson(js, SUBJECT_OPTION_PRINTS, opts); + const clickhouse = createClickHouseClient({ + url: env.CLICKHOUSE_URL, + database: env.CLICKHOUSE_DATABASE + }); + + await ensureFlowPacketsTable(clickhouse); + + const subscription = await subscribeJson( + js, + SUBJECT_OPTION_PRINTS, + buildDurableConsumer("compute-option-prints") + ); const shutdown = async (signal: string) => { logger.info("service stopping", { signal }); + + for (const cluster of clusters.values()) { + await flushCluster(clickhouse, js, cluster); + } + clusters.clear(); + await nc.drain(); + await clickhouse.close(); process.exit(0); }; @@ -60,11 +201,19 @@ const run = async () => { for await (const msg of subscription.messages) { try { const print = OptionPrintSchema.parse(subscription.decode(msg)); - logger.info("received option print", { - trace_id: print.trace_id, - seq: print.seq, - option_contract_id: print.option_contract_id - }); + await flushEligibleClusters(clickhouse, js, print.ts, print.option_contract_id); + + const existing = clusters.get(print.option_contract_id); + if (!existing) { + clusters.set(print.option_contract_id, buildCluster(print)); + } else if (print.ts - existing.startTs <= env.CLUSTER_WINDOW_MS) { + updateCluster(existing, print); + } else { + clusters.delete(print.option_contract_id); + await flushCluster(clickhouse, js, existing); + clusters.set(print.option_contract_id, buildCluster(print)); + } + msg.ack(); } catch (error) { logger.error("failed to process option print", {