import { readEnv } from "@islandflow/config"; import { createLogger } from "@islandflow/observability"; import { SUBJECT_FLOW_PACKETS, SUBJECT_OPTION_PRINTS, STREAM_FLOW_PACKETS, STREAM_OPTION_PRINTS, buildDurableConsumer, connectJetStreamWithRetry, ensureStream, publishJson, subscribeJson } from "@islandflow/bus"; import { createClickHouseClient, ensureFlowPacketsTable, insertFlowPacket } from "@islandflow/storage"; import { FlowPacketSchema, OptionPrintSchema, type FlowPacket, type OptionPrint } from "@islandflow/types"; import { z } from "zod"; const service = "compute"; const logger = createLogger({ service }); const envSchema = z.object({ NATS_URL: z.string().default("nats://localhost:4222"), CLICKHOUSE_URL: z.string().default("http://localhost:8123"), CLICKHOUSE_DATABASE: z.string().default("default"), CLUSTER_WINDOW_MS: z.coerce.number().int().positive().default(500), COMPUTE_DELIVER_POLICY: z.enum(["new", "all", "last", "last_per_subject"]).default("new"), COMPUTE_CONSUMER_RESET: z .preprocess((value) => { if (typeof value === "string") { const normalized = value.trim().toLowerCase(); if (["1", "true", "yes", "on"].includes(normalized)) { return true; } if (["0", "false", "no", "off"].includes(normalized)) { return false; } } return value; }, z.boolean()) .default(false) }); const env = readEnv(envSchema); const retry = async ( label: string, attempts: number, delayMs: number, task: () => Promise ): Promise => { let lastError: unknown; for (let attempt = 1; attempt <= attempts; attempt += 1) { try { return await task(); } catch (error) { lastError = error; logger.warn(`${label} attempt failed`, { attempt, error: error instanceof Error ? error.message : String(error) }); if (attempt < attempts) { await new Promise((resolve) => setTimeout(resolve, delayMs)); } } } throw lastError ?? new Error(`${label} failed after retries`); }; type ClusterState = { contractId: string; startTs: number; endTs: number; startSourceTs: number; endIngestTs: number; endSeq: number; members: string[]; totalSize: number; totalPremium: number; firstPrice: number; lastPrice: number; }; const clusters = new Map(); const applyDeliverPolicy = ( opts: ReturnType, policy: typeof env.COMPUTE_DELIVER_POLICY ) => { switch (policy) { case "all": opts.deliverAll(); break; case "last": opts.deliverLast(); break; case "last_per_subject": opts.deliverLastPerSubject(); break; case "new": default: opts.deliverNew(); break; } }; const buildCluster = (print: OptionPrint): ClusterState => { return { contractId: print.option_contract_id, startTs: print.ts, endTs: print.ts, startSourceTs: print.source_ts, endIngestTs: print.ingest_ts, endSeq: print.seq, members: [print.trace_id], totalSize: print.size, totalPremium: print.price * print.size, firstPrice: print.price, lastPrice: print.price }; }; const updateCluster = (cluster: ClusterState, print: OptionPrint): ClusterState => { cluster.endTs = Math.max(cluster.endTs, print.ts); cluster.endIngestTs = Math.max(cluster.endIngestTs, print.ingest_ts); cluster.endSeq = Math.max(cluster.endSeq, print.seq); cluster.members.push(print.trace_id); cluster.totalSize += print.size; cluster.totalPremium += print.price * print.size; cluster.lastPrice = print.price; return cluster; }; const flushCluster = async ( clickhouse: ReturnType, js: Awaited>["js"], cluster: ClusterState ): Promise => { const features = { option_contract_id: cluster.contractId, count: cluster.members.length, total_size: cluster.totalSize, total_premium: Number(cluster.totalPremium.toFixed(4)), first_price: cluster.firstPrice, last_price: cluster.lastPrice, start_ts: cluster.startTs, end_ts: cluster.endTs, window_ms: env.CLUSTER_WINDOW_MS }; const packet: FlowPacket = { source_ts: cluster.startSourceTs, ingest_ts: cluster.endIngestTs, seq: cluster.endSeq, trace_id: `flowpacket:${cluster.contractId}:${cluster.startTs}:${cluster.endTs}`, id: `flowpacket:${cluster.contractId}:${cluster.startTs}:${cluster.endTs}`, members: cluster.members, features, join_quality: {} }; const validated = FlowPacketSchema.parse(packet); await insertFlowPacket(clickhouse, validated); await publishJson(js, SUBJECT_FLOW_PACKETS, validated); logger.info("emitted flow packet", { id: validated.id, contract: cluster.contractId, count: cluster.members.length }); }; const flushEligibleClusters = async ( clickhouse: ReturnType, js: Awaited>["js"], currentTs: number, skipContractId: string ): Promise => { for (const [contractId, cluster] of clusters) { if (contractId === skipContractId) { continue; } if (currentTs - cluster.endTs > env.CLUSTER_WINDOW_MS) { clusters.delete(contractId); await flushCluster(clickhouse, js, cluster); } } }; const run = async () => { logger.info("service starting"); const { nc, js, jsm } = await connectJetStreamWithRetry( { servers: env.NATS_URL, name: service }, { attempts: 20, delayMs: 500 } ); await ensureStream(jsm, { name: STREAM_OPTION_PRINTS, subjects: [SUBJECT_OPTION_PRINTS], retention: "limits", storage: "file", discard: "old", max_msgs_per_subject: -1, max_msgs: -1, max_bytes: -1, max_age: 0, num_replicas: 1 }); await ensureStream(jsm, { name: STREAM_FLOW_PACKETS, subjects: [SUBJECT_FLOW_PACKETS], retention: "limits", storage: "file", discard: "old", max_msgs_per_subject: -1, max_msgs: -1, max_bytes: -1, max_age: 0, num_replicas: 1 }); const clickhouse = createClickHouseClient({ url: env.CLICKHOUSE_URL, database: env.CLICKHOUSE_DATABASE }); await retry("clickhouse table init", 20, 500, async () => { await ensureFlowPacketsTable(clickhouse); }); const durableName = "compute-option-prints"; if (env.COMPUTE_CONSUMER_RESET) { try { await jsm.consumers.delete(STREAM_OPTION_PRINTS, durableName); logger.warn("reset jetstream consumer", { durable: durableName }); } catch (error) { const message = error instanceof Error ? error.message : String(error); if (!message.includes("not found")) { logger.warn("failed to reset jetstream consumer", { durable: durableName, error: message }); } } } else { try { const info = await jsm.consumers.info(STREAM_OPTION_PRINTS, durableName); if (info?.config?.deliver_policy && info.config.deliver_policy !== env.COMPUTE_DELIVER_POLICY) { logger.warn("resetting consumer due to deliver policy change", { durable: durableName, current: info.config.deliver_policy, desired: env.COMPUTE_DELIVER_POLICY }); await jsm.consumers.delete(STREAM_OPTION_PRINTS, durableName); } } catch (error) { const message = error instanceof Error ? error.message : String(error); if (!message.includes("not found")) { logger.warn("failed to inspect jetstream consumer", { durable: durableName, error: message }); } } } const subscription = await (async () => { const opts = buildDurableConsumer(durableName); applyDeliverPolicy(opts, env.COMPUTE_DELIVER_POLICY); try { return await subscribeJson(js, SUBJECT_OPTION_PRINTS, opts); } catch (error) { const message = error instanceof Error ? error.message : String(error); const shouldReset = message.includes("duplicate subscription") || message.includes("durable requires") || message.includes("subject does not match consumer"); if (!shouldReset) { throw error; } logger.warn("resetting jetstream consumer", { durable: durableName, error: message }); try { await jsm.consumers.delete(STREAM_OPTION_PRINTS, durableName); } catch (deleteError) { const deleteMessage = deleteError instanceof Error ? deleteError.message : String(deleteError); if (!deleteMessage.includes("not found")) { logger.warn("failed to delete jetstream consumer", { durable: durableName, error: deleteMessage }); } } const resetOpts = buildDurableConsumer(durableName); applyDeliverPolicy(resetOpts, env.COMPUTE_DELIVER_POLICY); return await subscribeJson(js, SUBJECT_OPTION_PRINTS, resetOpts); } })(); const shutdown = async (signal: string) => { logger.info("service stopping", { signal }); for (const cluster of clusters.values()) { await flushCluster(clickhouse, js, cluster); } clusters.clear(); await nc.drain(); await clickhouse.close(); process.exit(0); }; process.on("SIGINT", () => void shutdown("SIGINT")); process.on("SIGTERM", () => void shutdown("SIGTERM")); for await (const msg of subscription.messages) { try { const print = OptionPrintSchema.parse(subscription.decode(msg)); await flushEligibleClusters(clickhouse, js, print.ts, print.option_contract_id); const existing = clusters.get(print.option_contract_id); if (!existing) { clusters.set(print.option_contract_id, buildCluster(print)); } else if (print.ts - existing.startTs <= env.CLUSTER_WINDOW_MS) { updateCluster(existing, print); } else { clusters.delete(print.option_contract_id); await flushCluster(clickhouse, js, existing); clusters.set(print.option_contract_id, buildCluster(print)); } msg.ack(); } catch (error) { logger.error("failed to process option print", { error: error instanceof Error ? error.message : String(error) }); msg.term(); } } }; await run();