From 488ae82ed6e694637ccd4eb52991d82585b0efb3 Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Sat, 27 Dec 2025 19:14:27 -0500 Subject: [PATCH] Add event bus and storage layer --- apps/README.md | 3 + bun.lock | 34 ++++ docker-compose.yml | 3 +- packages/bus/package.json | 11 ++ packages/bus/src/index.ts | 2 + packages/bus/src/jetstream.ts | 125 ++++++++++++++ packages/bus/src/subjects.ts | 2 + packages/bus/tsconfig.json | 7 + packages/storage/package.json | 12 ++ packages/storage/src/clickhouse.ts | 39 +++++ packages/storage/src/index.ts | 2 + packages/storage/src/option-prints.ts | 29 ++++ packages/storage/tests/option-prints.test.ts | 27 ++++ packages/storage/tsconfig.json | 7 + services/README.md | 3 + services/compute/package.json | 5 +- services/compute/src/index.ts | 79 +++++++-- services/ingest-options/package.json | 6 +- services/ingest-options/src/index.ts | 162 ++++++++++++++++++- 19 files changed, 537 insertions(+), 21 deletions(-) create mode 100644 apps/README.md create mode 100644 packages/bus/package.json create mode 100644 packages/bus/src/index.ts create mode 100644 packages/bus/src/jetstream.ts create mode 100644 packages/bus/src/subjects.ts create mode 100644 packages/bus/tsconfig.json create mode 100644 packages/storage/package.json create mode 100644 packages/storage/src/clickhouse.ts create mode 100644 packages/storage/src/index.ts create mode 100644 packages/storage/src/option-prints.ts create mode 100644 packages/storage/tests/option-prints.test.ts create mode 100644 packages/storage/tsconfig.json create mode 100644 services/README.md diff --git a/apps/README.md b/apps/README.md new file mode 100644 index 0000000..09dfa6e --- /dev/null +++ b/apps/README.md @@ -0,0 +1,3 @@ +# Apps + +Next.js app(s) live here. Scaffold pending. diff --git a/bun.lock b/bun.lock index 82ebb5e..467f6b0 100644 --- a/bun.lock +++ b/bun.lock @@ -13,6 +13,12 @@ "react-dom": "^18.3.1", }, }, + "packages/bus": { + "name": "@islandflow/bus", + "dependencies": { + "nats": "^2.24.0", + }, + }, "packages/config": { "name": "@islandflow/config", "dependencies": { @@ -22,6 +28,13 @@ "packages/observability": { "name": "@islandflow/observability", }, + "packages/storage": { + "name": "@islandflow/storage", + "dependencies": { + "@clickhouse/client": "^0.2.6", + "@islandflow/types": "workspace:*", + }, + }, "packages/types": { "name": "@islandflow/types", "dependencies": { @@ -45,8 +58,11 @@ "services/compute": { "name": "@islandflow/compute", "dependencies": { + "@islandflow/bus": "workspace:*", "@islandflow/config": "workspace:*", "@islandflow/observability": "workspace:*", + "@islandflow/types": "workspace:*", + "zod": "^3.23.8", }, }, "services/eod-enricher": { @@ -66,8 +82,12 @@ "services/ingest-options": { "name": "@islandflow/ingest-options", "dependencies": { + "@islandflow/bus": "workspace:*", "@islandflow/config": "workspace:*", "@islandflow/observability": "workspace:*", + "@islandflow/storage": "workspace:*", + "@islandflow/types": "workspace:*", + "zod": "^3.23.8", }, }, "services/refdata": { @@ -79,8 +99,14 @@ }, }, "packages": { + "@clickhouse/client": ["@clickhouse/client@0.2.10", "", { "dependencies": { "@clickhouse/client-common": "0.2.10" } }, "sha512-ZwBgzjEAFN/ogS0ym5KHVbR7Hx/oYCX01qGp2baEyfN2HM73kf/7Vp3GvMHWRy+zUXISONEtFv7UTViOXnmFrg=="], + + "@clickhouse/client-common": ["@clickhouse/client-common@0.2.10", "", {}, "sha512-BvTY0IXS96y9RUeNCpKL4HUzHmY80L0lDcGN0lmUD6zjOqYMn78+xyHYJ/AIAX7JQsc+/KwFt2soZutQTKxoGQ=="], + "@islandflow/api": ["@islandflow/api@workspace:services/api"], + "@islandflow/bus": ["@islandflow/bus@workspace:packages/bus"], + "@islandflow/candles": ["@islandflow/candles@workspace:services/candles"], "@islandflow/compute": ["@islandflow/compute@workspace:services/compute"], @@ -97,6 +123,8 @@ "@islandflow/refdata": ["@islandflow/refdata@workspace:services/refdata"], + "@islandflow/storage": ["@islandflow/storage@workspace:packages/storage"], + "@islandflow/types": ["@islandflow/types@workspace:packages/types"], "@islandflow/web": ["@islandflow/web@workspace:apps/web"], @@ -139,8 +167,12 @@ "nanoid": ["nanoid@3.3.11", "", { "bin": { "nanoid": "bin/nanoid.cjs" } }, "sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w=="], + "nats": ["nats@2.29.3", "", { "dependencies": { "nkeys.js": "1.1.0" } }, "sha512-tOQCRCwC74DgBTk4pWZ9V45sk4d7peoE2njVprMRCBXrhJ5q5cYM7i6W+Uvw2qUrcfOSnuisrX7bEx3b3Wx4QA=="], + "next": ["next@14.2.35", "", { "dependencies": { "@next/env": "14.2.35", "@swc/helpers": "0.5.5", "busboy": "1.6.0", "caniuse-lite": "^1.0.30001579", "graceful-fs": "^4.2.11", "postcss": "8.4.31", "styled-jsx": "5.1.1" }, "optionalDependencies": { "@next/swc-darwin-arm64": "14.2.33", "@next/swc-darwin-x64": "14.2.33", "@next/swc-linux-arm64-gnu": "14.2.33", "@next/swc-linux-arm64-musl": "14.2.33", "@next/swc-linux-x64-gnu": "14.2.33", "@next/swc-linux-x64-musl": "14.2.33", "@next/swc-win32-arm64-msvc": "14.2.33", "@next/swc-win32-ia32-msvc": "14.2.33", "@next/swc-win32-x64-msvc": "14.2.33" }, "peerDependencies": { "@opentelemetry/api": "^1.1.0", "@playwright/test": "^1.41.2", "react": "^18.2.0", "react-dom": "^18.2.0", "sass": "^1.3.0" }, "optionalPeers": ["@opentelemetry/api", "@playwright/test", "sass"], "bin": { "next": "dist/bin/next" } }, "sha512-KhYd2Hjt/O1/1aZVX3dCwGXM1QmOV4eNM2UTacK5gipDdPN/oHHK/4oVGy7X8GMfPMsUTUEmGlsy0EY1YGAkig=="], + "nkeys.js": ["nkeys.js@1.1.0", "", { "dependencies": { "tweetnacl": "1.0.3" } }, "sha512-tB/a0shZL5UZWSwsoeyqfTszONTt4k2YS0tuQioMOD180+MbombYVgzDUYHlx+gejYK6rgf08n/2Df99WY0Sxg=="], + "picocolors": ["picocolors@1.1.1", "", {}, "sha512-xceH2snhtb5M9liqDsmEw56le376mTZkEX/jEb/RxNFyegNul7eNslCXP9FDj/Lcu0X8KEyMceP2ntpaHrDEVA=="], "postcss": ["postcss@8.4.31", "", { "dependencies": { "nanoid": "^3.3.6", "picocolors": "^1.0.0", "source-map-js": "^1.0.2" } }, "sha512-PS08Iboia9mts/2ygV3eLpY5ghnUcfLV/EXTOW1E2qYxJKGGBUtNjN76FYHnMs36RmARn41bC0AZmn+rR0OVpQ=="], @@ -159,6 +191,8 @@ "tslib": ["tslib@2.8.1", "", {}, "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w=="], + "tweetnacl": ["tweetnacl@1.0.3", "", {}, "sha512-6rt+RN7aOi1nGMyC4Xa5DdYiukl2UWCbcJft7YhxReBGQD7OAM8Pbxw6YMo4r2diNEA8FEmu32YOn9rhaiE5yw=="], + "zod": ["zod@3.25.76", "", {}, "sha512-gzUt/qt81nXsFGKIFcC3YnfEAx5NkunCfnDlvuBSSFS02bcXu4Lmea0AFIUwbLWxWPx3d9p8S5QoaujKcNQxcQ=="], } } diff --git a/docker-compose.yml b/docker-compose.yml index d7a8c5c..eb507b6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,7 +3,6 @@ services: image: clickhouse/clickhouse-server:23.8 ports: - "8123:8123" - - "9000:9000" volumes: - clickhouse-data:/var/lib/clickhouse ulimits: @@ -13,7 +12,7 @@ services: redis: image: redis:7.2 ports: - - "6379:6379" + - "6380:6379" volumes: - redis-data:/data nats: diff --git a/packages/bus/package.json b/packages/bus/package.json new file mode 100644 index 0000000..ed2a3f2 --- /dev/null +++ b/packages/bus/package.json @@ -0,0 +1,11 @@ +{ + "name": "@islandflow/bus", + "private": true, + "type": "module", + "exports": { + ".": "./src/index.ts" + }, + "dependencies": { + "nats": "^2.24.0" + } +} diff --git a/packages/bus/src/index.ts b/packages/bus/src/index.ts new file mode 100644 index 0000000..8743727 --- /dev/null +++ b/packages/bus/src/index.ts @@ -0,0 +1,2 @@ +export * from "./jetstream"; +export * from "./subjects"; diff --git a/packages/bus/src/jetstream.ts b/packages/bus/src/jetstream.ts new file mode 100644 index 0000000..d79daba --- /dev/null +++ b/packages/bus/src/jetstream.ts @@ -0,0 +1,125 @@ +import { + connect, + consumerOpts, + type ConsumerOptsBuilder, + type JetStreamClient, + type JetStreamManager, + type NatsConnection, + type StreamConfig, + JSONCodec, + type JsMsg, + createInbox +} from "nats"; + +export type NatsConnectionOptions = { + servers: string | string[]; + name?: string; + timeoutMs?: number; +}; + +export type JetStreamConnection = { + nc: NatsConnection; + js: JetStreamClient; + jsm: JetStreamManager; +}; + +export type RetryOptions = { + attempts: number; + delayMs: number; +}; + +const sleep = (delayMs: number): Promise => { + return new Promise((resolve) => setTimeout(resolve, delayMs)); +}; + +export const connectJetStream = async ( + options: NatsConnectionOptions +): Promise => { + const nc = await connect({ + servers: options.servers, + name: options.name, + timeout: options.timeoutMs + }); + + const js = nc.jetstream(); + const jsm = await nc.jetstreamManager(); + + return { nc, js, jsm }; +}; + +export const connectJetStreamWithRetry = async ( + options: NatsConnectionOptions, + retry: RetryOptions +): Promise => { + let lastError: unknown; + + for (let attempt = 1; attempt <= retry.attempts; attempt += 1) { + try { + return await connectJetStream(options); + } catch (error) { + lastError = error; + if (attempt < retry.attempts) { + await sleep(retry.delayMs); + } + } + } + + throw lastError ?? new Error("Failed to connect to NATS"); +}; + +export const ensureStream = async ( + jsm: JetStreamManager, + config: StreamConfig +): Promise => { + try { + await jsm.streams.info(config.name); + return; + } catch (error) { + if (error instanceof Error && error.message.includes("not found")) { + await jsm.streams.add(config); + return; + } + + throw error; + } +}; + +export const buildDurableConsumer = ( + durableName: string, + deliverSubject: string = createInbox() +): ConsumerOptsBuilder => { + const opts = consumerOpts(); + opts.durable(durableName); + opts.manualAck(); + opts.ackExplicit(); + opts.deliverTo(deliverSubject); + return opts; +}; + +export const publishJson = async ( + js: JetStreamClient, + subject: string, + payload: T +): Promise => { + const codec = JSONCodec(); + await js.publish(subject, codec.encode(payload)); +}; + +export type JsonSubscription = { + messages: AsyncIterable; + decode: (msg: JsMsg) => T; +}; + +export const subscribeJson = async ( + js: JetStreamClient, + subject: string, + opts: ConsumerOptsBuilder +): Promise> => { + const codec = JSONCodec(); + const sub = await js.subscribe(subject, opts); + + return { + messages: sub, + decode: (msg) => codec.decode(msg.data) + }; +}; diff --git a/packages/bus/src/subjects.ts b/packages/bus/src/subjects.ts new file mode 100644 index 0000000..39f7f5f --- /dev/null +++ b/packages/bus/src/subjects.ts @@ -0,0 +1,2 @@ +export const STREAM_OPTION_PRINTS = "OPTIONS_PRINTS"; +export const SUBJECT_OPTION_PRINTS = "options.prints"; diff --git a/packages/bus/tsconfig.json b/packages/bus/tsconfig.json new file mode 100644 index 0000000..d8c6443 --- /dev/null +++ b/packages/bus/tsconfig.json @@ -0,0 +1,7 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "types": [] + }, + "include": ["src/**/*.ts"] +} diff --git a/packages/storage/package.json b/packages/storage/package.json new file mode 100644 index 0000000..0fa71df --- /dev/null +++ b/packages/storage/package.json @@ -0,0 +1,12 @@ +{ + "name": "@islandflow/storage", + "private": true, + "type": "module", + "exports": { + ".": "./src/index.ts" + }, + "dependencies": { + "@clickhouse/client": "^0.2.6", + "@islandflow/types": "workspace:*" + } +} diff --git a/packages/storage/src/clickhouse.ts b/packages/storage/src/clickhouse.ts new file mode 100644 index 0000000..2fc8e50 --- /dev/null +++ b/packages/storage/src/clickhouse.ts @@ -0,0 +1,39 @@ +import { createClient, type ClickHouseClient } from "@clickhouse/client"; +import type { OptionPrint } from "@islandflow/types"; +import { normalizeOptionPrint, optionPrintsTableDDL, OPTION_PRINTS_TABLE } from "./option-prints"; + +export type ClickHouseOptions = { + url: string; + database?: string; + username?: string; + password?: string; +}; + +export const createClickHouseClient = (options: ClickHouseOptions): ClickHouseClient => { + return createClient({ + url: options.url, + database: options.database, + username: options.username, + password: options.password + }); +}; + +export const ensureOptionPrintsTable = async ( + client: ClickHouseClient +): Promise => { + await client.exec({ + query: optionPrintsTableDDL() + }); +}; + +export const insertOptionPrint = async ( + client: ClickHouseClient, + print: OptionPrint +): Promise => { + const record = normalizeOptionPrint(print); + await client.insert({ + table: OPTION_PRINTS_TABLE, + values: [record], + format: "JSONEachRow" + }); +}; diff --git a/packages/storage/src/index.ts b/packages/storage/src/index.ts new file mode 100644 index 0000000..e2f4e36 --- /dev/null +++ b/packages/storage/src/index.ts @@ -0,0 +1,2 @@ +export * from "./clickhouse"; +export * from "./option-prints"; diff --git a/packages/storage/src/option-prints.ts b/packages/storage/src/option-prints.ts new file mode 100644 index 0000000..525038e --- /dev/null +++ b/packages/storage/src/option-prints.ts @@ -0,0 +1,29 @@ +import type { OptionPrint } from "@islandflow/types"; + +export const OPTION_PRINTS_TABLE = "option_prints"; + +export const optionPrintsTableDDL = (): string => { + return ` +CREATE TABLE IF NOT EXISTS ${OPTION_PRINTS_TABLE} ( + source_ts UInt64, + ingest_ts UInt64, + seq UInt64, + trace_id String, + ts UInt64, + option_contract_id String, + price Float64, + size UInt32, + exchange String, + conditions Array(String) +) +ENGINE = MergeTree +ORDER BY (ts, option_contract_id) +`; +}; + +export const normalizeOptionPrint = (print: OptionPrint): OptionPrint => { + return { + ...print, + conditions: print.conditions ?? [] + }; +}; diff --git a/packages/storage/tests/option-prints.test.ts b/packages/storage/tests/option-prints.test.ts new file mode 100644 index 0000000..debbf30 --- /dev/null +++ b/packages/storage/tests/option-prints.test.ts @@ -0,0 +1,27 @@ +import { describe, expect, it } from "bun:test"; +import { normalizeOptionPrint, optionPrintsTableDDL, OPTION_PRINTS_TABLE } from "../src/option-prints"; + +const basePrint = { + source_ts: 100, + ingest_ts: 200, + seq: 1, + trace_id: "trace-1", + ts: 100, + option_contract_id: "SPY-2025-01-17-450-C", + price: 1.25, + size: 10, + exchange: "TEST" +}; + +describe("option-prints storage helpers", () => { + it("normalizes missing conditions to empty array", () => { + const normalized = normalizeOptionPrint(basePrint); + expect(normalized.conditions).toEqual([]); + }); + + it("includes the correct table name in the DDL", () => { + const ddl = optionPrintsTableDDL(); + expect(ddl).toContain(OPTION_PRINTS_TABLE); + expect(ddl).toContain("CREATE TABLE IF NOT EXISTS"); + }); +}); diff --git a/packages/storage/tsconfig.json b/packages/storage/tsconfig.json new file mode 100644 index 0000000..43ef119 --- /dev/null +++ b/packages/storage/tsconfig.json @@ -0,0 +1,7 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "types": [] + }, + "include": ["src/**/*.ts", "tests/**/*.ts"] +} diff --git a/services/README.md b/services/README.md new file mode 100644 index 0000000..08e360a --- /dev/null +++ b/services/README.md @@ -0,0 +1,3 @@ +# Services + +Ingest, compute, API, and other runtime services live here. Scaffold pending. diff --git a/services/compute/package.json b/services/compute/package.json index 0fa9583..0bfc7eb 100644 --- a/services/compute/package.json +++ b/services/compute/package.json @@ -6,7 +6,10 @@ "dev": "bun run src/index.ts" }, "dependencies": { + "@islandflow/bus": "workspace:*", "@islandflow/config": "workspace:*", - "@islandflow/observability": "workspace:*" + "@islandflow/observability": "workspace:*", + "@islandflow/types": "workspace:*", + "zod": "^3.23.8" } } diff --git a/services/compute/src/index.ts b/services/compute/src/index.ts index 128439a..9ffc927 100644 --- a/services/compute/src/index.ts +++ b/services/compute/src/index.ts @@ -1,17 +1,78 @@ +import { readEnv } from "@islandflow/config"; import { createLogger } from "@islandflow/observability"; +import { + SUBJECT_OPTION_PRINTS, + STREAM_OPTION_PRINTS, + buildDurableConsumer, + connectJetStreamWithRetry, + ensureStream, + subscribeJson +} from "@islandflow/bus"; +import { OptionPrintSchema } from "@islandflow/types"; +import { z } from "zod"; const service = "compute"; const logger = createLogger({ service }); -logger.info("service starting"); +const envSchema = z.object({ + NATS_URL: z.string().default("nats://localhost:4222") +}); -const shutdown = (signal: string) => { - logger.info("service stopping", { signal }); - process.exit(0); +const env = readEnv(envSchema); + +const run = async () => { + logger.info("service starting"); + + const { nc, js, jsm } = await connectJetStreamWithRetry( + { + servers: env.NATS_URL, + name: service + }, + { attempts: 20, delayMs: 500 } + ); + + await ensureStream(jsm, { + name: STREAM_OPTION_PRINTS, + subjects: [SUBJECT_OPTION_PRINTS], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + + const opts = buildDurableConsumer("compute-option-prints"); + + const subscription = await subscribeJson(js, SUBJECT_OPTION_PRINTS, opts); + + const shutdown = async (signal: string) => { + logger.info("service stopping", { signal }); + await nc.drain(); + process.exit(0); + }; + + process.on("SIGINT", () => void shutdown("SIGINT")); + process.on("SIGTERM", () => void shutdown("SIGTERM")); + + for await (const msg of subscription.messages) { + try { + const print = OptionPrintSchema.parse(subscription.decode(msg)); + logger.info("received option print", { + trace_id: print.trace_id, + seq: print.seq, + option_contract_id: print.option_contract_id + }); + msg.ack(); + } catch (error) { + logger.error("failed to process option print", { + error: error instanceof Error ? error.message : String(error) + }); + msg.term(); + } + } }; -process.on("SIGINT", () => shutdown("SIGINT")); -process.on("SIGTERM", () => shutdown("SIGTERM")); - -// Keep the process alive until real listeners are wired. -setInterval(() => {}, 60_000); +await run(); diff --git a/services/ingest-options/package.json b/services/ingest-options/package.json index eb0a0f0..05fc47e 100644 --- a/services/ingest-options/package.json +++ b/services/ingest-options/package.json @@ -6,7 +6,11 @@ "dev": "bun run src/index.ts" }, "dependencies": { + "@islandflow/bus": "workspace:*", "@islandflow/config": "workspace:*", - "@islandflow/observability": "workspace:*" + "@islandflow/observability": "workspace:*", + "@islandflow/storage": "workspace:*", + "@islandflow/types": "workspace:*", + "zod": "^3.23.8" } } diff --git a/services/ingest-options/src/index.ts b/services/ingest-options/src/index.ts index a07d1cc..31b5e8a 100644 --- a/services/ingest-options/src/index.ts +++ b/services/ingest-options/src/index.ts @@ -1,17 +1,163 @@ +import { readEnv } from "@islandflow/config"; import { createLogger } from "@islandflow/observability"; +import { + SUBJECT_OPTION_PRINTS, + STREAM_OPTION_PRINTS, + connectJetStreamWithRetry, + ensureStream, + publishJson +} from "@islandflow/bus"; +import { + createClickHouseClient, + ensureOptionPrintsTable, + insertOptionPrint +} from "@islandflow/storage"; +import { OptionPrintSchema, type OptionPrint } from "@islandflow/types"; +import { z } from "zod"; const service = "ingest-options"; const logger = createLogger({ service }); -logger.info("service starting"); +const envSchema = z.object({ + NATS_URL: z.string().default("nats://localhost:4222"), + CLICKHOUSE_URL: z.string().default("http://localhost:8123"), + CLICKHOUSE_DATABASE: z.string().default("default"), + EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000) +}); -const shutdown = (signal: string) => { - logger.info("service stopping", { signal }); - process.exit(0); +const env = readEnv(envSchema); + +const state = { + shuttingDown: false, + seq: 0, + timer: null as ReturnType | null }; -process.on("SIGINT", () => shutdown("SIGINT")); -process.on("SIGTERM", () => shutdown("SIGTERM")); +const retry = async ( + label: string, + attempts: number, + delayMs: number, + task: () => Promise +): Promise => { + let lastError: unknown; -// Keep the process alive until real listeners are wired. -setInterval(() => {}, 60_000); + for (let attempt = 1; attempt <= attempts; attempt += 1) { + try { + return await task(); + } catch (error) { + lastError = error; + logger.warn(`${label} attempt failed`, { + attempt, + error: error instanceof Error ? error.message : String(error) + }); + + if (attempt < attempts) { + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } + } + } + + throw lastError ?? new Error(`${label} failed after retries`); +}; + +const buildSyntheticPrint = (): OptionPrint => { + const now = Date.now(); + state.seq += 1; + + return { + source_ts: now, + ingest_ts: now, + seq: state.seq, + trace_id: `ingest-options-${state.seq}`, + ts: now, + option_contract_id: "SPY-2025-01-17-450-C", + price: 1.25, + size: 10, + exchange: "TEST", + conditions: ["TEST"] + }; +}; + +const run = async () => { + logger.info("service starting"); + + const { nc, js, jsm } = await connectJetStreamWithRetry( + { + servers: env.NATS_URL, + name: service + }, + { attempts: 20, delayMs: 500 } + ); + + await ensureStream(jsm, { + name: STREAM_OPTION_PRINTS, + subjects: [SUBJECT_OPTION_PRINTS], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + + const clickhouse = createClickHouseClient({ + url: env.CLICKHOUSE_URL, + database: env.CLICKHOUSE_DATABASE + }); + + await retry("clickhouse table init", 20, 500, async () => { + await ensureOptionPrintsTable(clickhouse); + }); + + const emit = async () => { + if (state.shuttingDown) { + return; + } + + const candidate = buildSyntheticPrint(); + const print = OptionPrintSchema.parse(candidate); + + try { + await insertOptionPrint(clickhouse, print); + await publishJson(js, SUBJECT_OPTION_PRINTS, print); + logger.info("published option print", { + trace_id: print.trace_id, + seq: print.seq, + option_contract_id: print.option_contract_id + }); + } catch (error) { + logger.error("failed to publish option print", { + error: error instanceof Error ? error.message : String(error), + trace_id: print.trace_id + }); + } + }; + + state.timer = setInterval(() => { + void emit(); + }, env.EMIT_INTERVAL_MS); + + const shutdown = async (signal: string) => { + if (state.shuttingDown) { + return; + } + + state.shuttingDown = true; + if (state.timer) { + clearInterval(state.timer); + } + + logger.info("service stopping", { signal }); + + await nc.drain(); + await clickhouse.close(); + process.exit(0); + }; + + process.on("SIGINT", () => void shutdown("SIGINT")); + process.on("SIGTERM", () => void shutdown("SIGTERM")); +}; + +await run();