From 6a1f457028710e92f7edb23879c558dc0b5dc58f Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Sat, 27 Dec 2025 19:21:01 -0500 Subject: [PATCH] Add equity prints ingestion --- bun.lock | 4 + docker-compose.yml | 3 +- packages/bus/src/subjects.ts | 2 + packages/storage/src/clickhouse.ts | 33 +++- packages/storage/src/equity-prints.ts | 26 +++ packages/storage/src/index.ts | 1 + packages/storage/tests/equity-prints.test.ts | 27 ++++ services/ingest-equities/package.json | 6 +- services/ingest-equities/src/index.ts | 162 ++++++++++++++++++- 9 files changed, 252 insertions(+), 12 deletions(-) create mode 100644 packages/storage/src/equity-prints.ts create mode 100644 packages/storage/tests/equity-prints.test.ts diff --git a/bun.lock b/bun.lock index 467f6b0..d553dc0 100644 --- a/bun.lock +++ b/bun.lock @@ -75,8 +75,12 @@ "services/ingest-equities": { "name": "@islandflow/ingest-equities", "dependencies": { + "@islandflow/bus": "workspace:*", "@islandflow/config": "workspace:*", "@islandflow/observability": "workspace:*", + "@islandflow/storage": "workspace:*", + "@islandflow/types": "workspace:*", + "zod": "^3.23.8", }, }, "services/ingest-options": { diff --git a/docker-compose.yml b/docker-compose.yml index eb507b6..d7a8c5c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,6 +3,7 @@ services: image: clickhouse/clickhouse-server:23.8 ports: - "8123:8123" + - "9000:9000" volumes: - clickhouse-data:/var/lib/clickhouse ulimits: @@ -12,7 +13,7 @@ services: redis: image: redis:7.2 ports: - - "6380:6379" + - "6379:6379" volumes: - redis-data:/data nats: diff --git a/packages/bus/src/subjects.ts b/packages/bus/src/subjects.ts index 39f7f5f..57d6059 100644 --- a/packages/bus/src/subjects.ts +++ b/packages/bus/src/subjects.ts @@ -1,2 +1,4 @@ export const STREAM_OPTION_PRINTS = "OPTIONS_PRINTS"; export const SUBJECT_OPTION_PRINTS = "options.prints"; +export const STREAM_EQUITY_PRINTS = "EQUITY_PRINTS"; +export const SUBJECT_EQUITY_PRINTS = "equities.prints"; diff --git a/packages/storage/src/clickhouse.ts b/packages/storage/src/clickhouse.ts index 2fc8e50..e00e841 100644 --- a/packages/storage/src/clickhouse.ts +++ b/packages/storage/src/clickhouse.ts @@ -1,6 +1,15 @@ import { createClient, type ClickHouseClient } from "@clickhouse/client"; -import type { OptionPrint } from "@islandflow/types"; -import { normalizeOptionPrint, optionPrintsTableDDL, OPTION_PRINTS_TABLE } from "./option-prints"; +import type { EquityPrint, OptionPrint } from "@islandflow/types"; +import { + normalizeOptionPrint, + optionPrintsTableDDL, + OPTION_PRINTS_TABLE +} from "./option-prints"; +import { + equityPrintsTableDDL, + EQUITY_PRINTS_TABLE, + normalizeEquityPrint +} from "./equity-prints"; export type ClickHouseOptions = { url: string; @@ -26,6 +35,14 @@ export const ensureOptionPrintsTable = async ( }); }; +export const ensureEquityPrintsTable = async ( + client: ClickHouseClient +): Promise => { + await client.exec({ + query: equityPrintsTableDDL() + }); +}; + export const insertOptionPrint = async ( client: ClickHouseClient, print: OptionPrint @@ -37,3 +54,15 @@ export const insertOptionPrint = async ( format: "JSONEachRow" }); }; + +export const insertEquityPrint = async ( + client: ClickHouseClient, + print: EquityPrint +): Promise => { + const record = normalizeEquityPrint(print); + await client.insert({ + table: EQUITY_PRINTS_TABLE, + values: [record], + format: "JSONEachRow" + }); +}; diff --git a/packages/storage/src/equity-prints.ts b/packages/storage/src/equity-prints.ts new file mode 100644 index 0000000..34f6124 --- /dev/null +++ b/packages/storage/src/equity-prints.ts @@ -0,0 +1,26 @@ +import type { EquityPrint } from "@islandflow/types"; + +export const EQUITY_PRINTS_TABLE = "equity_prints"; + +export const equityPrintsTableDDL = (): string => { + return ` +CREATE TABLE IF NOT EXISTS ${EQUITY_PRINTS_TABLE} ( + source_ts UInt64, + ingest_ts UInt64, + seq UInt64, + trace_id String, + ts UInt64, + underlying_id String, + price Float64, + size UInt32, + exchange String, + offExchangeFlag Bool +) +ENGINE = MergeTree +ORDER BY (ts, underlying_id) +`; +}; + +export const normalizeEquityPrint = (print: EquityPrint): EquityPrint => { + return print; +}; diff --git a/packages/storage/src/index.ts b/packages/storage/src/index.ts index e2f4e36..ab50920 100644 --- a/packages/storage/src/index.ts +++ b/packages/storage/src/index.ts @@ -1,2 +1,3 @@ export * from "./clickhouse"; +export * from "./equity-prints"; export * from "./option-prints"; diff --git a/packages/storage/tests/equity-prints.test.ts b/packages/storage/tests/equity-prints.test.ts new file mode 100644 index 0000000..aeeef5d --- /dev/null +++ b/packages/storage/tests/equity-prints.test.ts @@ -0,0 +1,27 @@ +import { describe, expect, it } from "bun:test"; +import { equityPrintsTableDDL, EQUITY_PRINTS_TABLE } from "../src/equity-prints"; + +const basePrint = { + source_ts: 100, + ingest_ts: 200, + seq: 1, + trace_id: "trace-1", + ts: 100, + underlying_id: "SPY", + price: 450.1, + size: 100, + exchange: "TEST", + offExchangeFlag: false +}; + +describe("equity-prints storage helpers", () => { + it("keeps required fields intact", () => { + expect(basePrint.offExchangeFlag).toBe(false); + }); + + it("includes the correct table name in the DDL", () => { + const ddl = equityPrintsTableDDL(); + expect(ddl).toContain(EQUITY_PRINTS_TABLE); + expect(ddl).toContain("CREATE TABLE IF NOT EXISTS"); + }); +}); diff --git a/services/ingest-equities/package.json b/services/ingest-equities/package.json index b907ccf..5452f2f 100644 --- a/services/ingest-equities/package.json +++ b/services/ingest-equities/package.json @@ -6,7 +6,11 @@ "dev": "bun run src/index.ts" }, "dependencies": { + "@islandflow/bus": "workspace:*", "@islandflow/config": "workspace:*", - "@islandflow/observability": "workspace:*" + "@islandflow/observability": "workspace:*", + "@islandflow/storage": "workspace:*", + "@islandflow/types": "workspace:*", + "zod": "^3.23.8" } } diff --git a/services/ingest-equities/src/index.ts b/services/ingest-equities/src/index.ts index 417922b..298906b 100644 --- a/services/ingest-equities/src/index.ts +++ b/services/ingest-equities/src/index.ts @@ -1,17 +1,163 @@ +import { readEnv } from "@islandflow/config"; import { createLogger } from "@islandflow/observability"; +import { + SUBJECT_EQUITY_PRINTS, + STREAM_EQUITY_PRINTS, + connectJetStreamWithRetry, + ensureStream, + publishJson +} from "@islandflow/bus"; +import { + createClickHouseClient, + ensureEquityPrintsTable, + insertEquityPrint +} from "@islandflow/storage"; +import { EquityPrintSchema, type EquityPrint } from "@islandflow/types"; +import { z } from "zod"; const service = "ingest-equities"; const logger = createLogger({ service }); -logger.info("service starting"); +const envSchema = z.object({ + NATS_URL: z.string().default("nats://localhost:4222"), + CLICKHOUSE_URL: z.string().default("http://localhost:8123"), + CLICKHOUSE_DATABASE: z.string().default("default"), + EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000) +}); -const shutdown = (signal: string) => { - logger.info("service stopping", { signal }); - process.exit(0); +const env = readEnv(envSchema); + +const state = { + shuttingDown: false, + seq: 0, + timer: null as ReturnType | null }; -process.on("SIGINT", () => shutdown("SIGINT")); -process.on("SIGTERM", () => shutdown("SIGTERM")); +const retry = async ( + label: string, + attempts: number, + delayMs: number, + task: () => Promise +): Promise => { + let lastError: unknown; -// Keep the process alive until real listeners are wired. -setInterval(() => {}, 60_000); + for (let attempt = 1; attempt <= attempts; attempt += 1) { + try { + return await task(); + } catch (error) { + lastError = error; + logger.warn(`${label} attempt failed`, { + attempt, + error: error instanceof Error ? error.message : String(error) + }); + + if (attempt < attempts) { + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } + } + } + + throw lastError ?? new Error(`${label} failed after retries`); +}; + +const buildSyntheticPrint = (): EquityPrint => { + const now = Date.now(); + state.seq += 1; + + return { + source_ts: now, + ingest_ts: now, + seq: state.seq, + trace_id: `ingest-equities-${state.seq}`, + ts: now, + underlying_id: "SPY", + price: 450.1, + size: 100, + exchange: "TEST", + offExchangeFlag: false + }; +}; + +const run = async () => { + logger.info("service starting"); + + const { nc, js, jsm } = await connectJetStreamWithRetry( + { + servers: env.NATS_URL, + name: service + }, + { attempts: 20, delayMs: 500 } + ); + + await ensureStream(jsm, { + name: STREAM_EQUITY_PRINTS, + subjects: [SUBJECT_EQUITY_PRINTS], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + + const clickhouse = createClickHouseClient({ + url: env.CLICKHOUSE_URL, + database: env.CLICKHOUSE_DATABASE + }); + + await retry("clickhouse table init", 20, 500, async () => { + await ensureEquityPrintsTable(clickhouse); + }); + + const emit = async () => { + if (state.shuttingDown) { + return; + } + + const candidate = buildSyntheticPrint(); + const print = EquityPrintSchema.parse(candidate); + + try { + await insertEquityPrint(clickhouse, print); + await publishJson(js, SUBJECT_EQUITY_PRINTS, print); + logger.info("published equity print", { + trace_id: print.trace_id, + seq: print.seq, + underlying_id: print.underlying_id + }); + } catch (error) { + logger.error("failed to publish equity print", { + error: error instanceof Error ? error.message : String(error), + trace_id: print.trace_id + }); + } + }; + + state.timer = setInterval(() => { + void emit(); + }, env.EMIT_INTERVAL_MS); + + const shutdown = async (signal: string) => { + if (state.shuttingDown) { + return; + } + + state.shuttingDown = true; + if (state.timer) { + clearInterval(state.timer); + } + + logger.info("service stopping", { signal }); + + await nc.drain(); + await clickhouse.close(); + process.exit(0); + }; + + process.on("SIGINT", () => void shutdown("SIGINT")); + process.on("SIGTERM", () => void shutdown("SIGTERM")); +}; + +await run();