From ba84554b0becdd95b1564a9eba7fc1c66ac5148a Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Sat, 27 Dec 2025 19:28:30 -0500 Subject: [PATCH] Add API gateway for prints --- bun.lock | 4 + packages/storage/src/clickhouse.ts | 102 ++++++++++++ services/api/package.json | 6 +- services/api/src/index.ts | 243 ++++++++++++++++++++++++++++- 4 files changed, 346 insertions(+), 9 deletions(-) diff --git a/bun.lock b/bun.lock index d553dc0..4b2f3e0 100644 --- a/bun.lock +++ b/bun.lock @@ -44,8 +44,12 @@ "services/api": { "name": "@islandflow/api", "dependencies": { + "@islandflow/bus": "workspace:*", "@islandflow/config": "workspace:*", "@islandflow/observability": "workspace:*", + "@islandflow/storage": "workspace:*", + "@islandflow/types": "workspace:*", + "zod": "^3.23.8", }, }, "services/candles": { diff --git a/packages/storage/src/clickhouse.ts b/packages/storage/src/clickhouse.ts index e00e841..2298fbf 100644 --- a/packages/storage/src/clickhouse.ts +++ b/packages/storage/src/clickhouse.ts @@ -1,4 +1,5 @@ import { createClient, type ClickHouseClient } from "@clickhouse/client"; +import { EquityPrintSchema, OptionPrintSchema } from "@islandflow/types"; import type { EquityPrint, OptionPrint } from "@islandflow/types"; import { normalizeOptionPrint, @@ -66,3 +67,104 @@ export const insertEquityPrint = async ( format: "JSONEachRow" }); }; + +const clampLimit = (limit: number): number => { + if (!Number.isFinite(limit)) { + return 100; + } + + return Math.max(1, Math.min(1000, Math.floor(limit))); +}; + +const coerceNumber = (value: unknown): unknown => { + if (typeof value === "string") { + const parsed = Number(value); + if (Number.isFinite(parsed)) { + return parsed; + } + } + + return value; +}; + +const normalizeNumericFields = ( + row: Record, + fields: string[] +): Record => { + const record: Record = { ...row }; + + for (const field of fields) { + if (field in record) { + record[field] = coerceNumber(record[field]); + } + } + + return record; +}; + +export const fetchRecentOptionPrints = async ( + client: ClickHouseClient, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const result = await client.query({ + query: `SELECT * FROM ${OPTION_PRINTS_TABLE} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const normalized = rows.map((row) => { + if (row && typeof row === "object") { + return normalizeNumericFields(row as Record, [ + "source_ts", + "ingest_ts", + "seq", + "ts", + "price", + "size" + ]); + } + + return row; + }); + + return OptionPrintSchema.array().parse(normalized); +}; + +export const fetchRecentEquityPrints = async ( + client: ClickHouseClient, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const result = await client.query({ + query: `SELECT * FROM ${EQUITY_PRINTS_TABLE} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const normalized = rows.map((row) => { + if (row && typeof row === "object") { + const record = normalizeNumericFields(row as Record, [ + "source_ts", + "ingest_ts", + "seq", + "ts", + "price", + "size" + ]); + + if ("offExchangeFlag" in record) { + return { + ...record, + offExchangeFlag: Boolean(record.offExchangeFlag) + }; + } + + return record; + } + + return row; + }); + + return EquityPrintSchema.array().parse(normalized); +}; diff --git a/services/api/package.json b/services/api/package.json index 91d3227..6044a2f 100644 --- a/services/api/package.json +++ b/services/api/package.json @@ -6,7 +6,11 @@ "dev": "bun run src/index.ts" }, "dependencies": { + "@islandflow/bus": "workspace:*", "@islandflow/config": "workspace:*", - "@islandflow/observability": "workspace:*" + "@islandflow/observability": "workspace:*", + "@islandflow/storage": "workspace:*", + "@islandflow/types": "workspace:*", + "zod": "^3.23.8" } } diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 0a473e8..0ff3ed1 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -1,17 +1,244 @@ +import { readEnv } from "@islandflow/config"; import { createLogger } from "@islandflow/observability"; +import { + SUBJECT_EQUITY_PRINTS, + SUBJECT_OPTION_PRINTS, + STREAM_EQUITY_PRINTS, + STREAM_OPTION_PRINTS, + buildDurableConsumer, + connectJetStreamWithRetry, + ensureStream, + subscribeJson +} from "@islandflow/bus"; +import { + createClickHouseClient, + ensureEquityPrintsTable, + ensureOptionPrintsTable, + fetchRecentEquityPrints, + fetchRecentOptionPrints +} from "@islandflow/storage"; +import { EquityPrintSchema, OptionPrintSchema } from "@islandflow/types"; +import { z } from "zod"; const service = "api"; const logger = createLogger({ service }); -logger.info("service starting"); +const envSchema = z.object({ + API_PORT: z.coerce.number().int().positive().default(4000), + NATS_URL: z.string().default("nats://localhost:4222"), + CLICKHOUSE_URL: z.string().default("http://localhost:8123"), + CLICKHOUSE_DATABASE: z.string().default("default"), + REST_DEFAULT_LIMIT: z.coerce.number().int().positive().default(200) +}); -const shutdown = (signal: string) => { - logger.info("service stopping", { signal }); - process.exit(0); +const env = readEnv(envSchema); + +const limitSchema = z.coerce.number().int().positive().max(1000); + +type Channel = "options" | "equities"; + +type WsData = { + channel: Channel; }; -process.on("SIGINT", () => shutdown("SIGINT")); -process.on("SIGTERM", () => shutdown("SIGTERM")); +const optionSockets = new Set>(); +const equitySockets = new Set>(); -// Keep the process alive until real listeners are wired. -setInterval(() => {}, 60_000); +const jsonResponse = (body: unknown, status = 200): Response => { + return new Response(JSON.stringify(body), { + status, + headers: { + "content-type": "application/json" + } + }); +}; + +const parseLimit = (value: string | null): number => { + if (value === null) { + return env.REST_DEFAULT_LIMIT; + } + + return limitSchema.parse(value); +}; + +const broadcast = (sockets: Set>, payload: unknown): void => { + const message = JSON.stringify(payload); + + for (const socket of sockets) { + try { + socket.send(message); + } catch (error) { + logger.warn("failed to send websocket message", { + error: error instanceof Error ? error.message : String(error) + }); + sockets.delete(socket); + } + } +}; + +const run = async () => { + logger.info("service starting"); + + const { nc, js, jsm } = await connectJetStreamWithRetry( + { + servers: env.NATS_URL, + name: service + }, + { attempts: 20, delayMs: 500 } + ); + + await ensureStream(jsm, { + name: STREAM_OPTION_PRINTS, + subjects: [SUBJECT_OPTION_PRINTS], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + + await ensureStream(jsm, { + name: STREAM_EQUITY_PRINTS, + subjects: [SUBJECT_EQUITY_PRINTS], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + + const clickhouse = createClickHouseClient({ + url: env.CLICKHOUSE_URL, + database: env.CLICKHOUSE_DATABASE + }); + + await ensureOptionPrintsTable(clickhouse); + await ensureEquityPrintsTable(clickhouse); + + const optionSubscription = await subscribeJson( + js, + SUBJECT_OPTION_PRINTS, + buildDurableConsumer("api-option-prints") + ); + + const equitySubscription = await subscribeJson( + js, + SUBJECT_EQUITY_PRINTS, + buildDurableConsumer("api-equity-prints") + ); + + const pumpOptions = async () => { + for await (const msg of optionSubscription.messages) { + try { + const payload = OptionPrintSchema.parse(optionSubscription.decode(msg)); + broadcast(optionSockets, { type: "option-print", payload }); + msg.ack(); + } catch (error) { + logger.error("failed to process option print", { + error: error instanceof Error ? error.message : String(error) + }); + msg.term(); + } + } + }; + + const pumpEquities = async () => { + for await (const msg of equitySubscription.messages) { + try { + const payload = EquityPrintSchema.parse(equitySubscription.decode(msg)); + broadcast(equitySockets, { type: "equity-print", payload }); + msg.ack(); + } catch (error) { + logger.error("failed to process equity print", { + error: error instanceof Error ? error.message : String(error) + }); + msg.term(); + } + } + }; + + void pumpOptions(); + void pumpEquities(); + + const server = Bun.serve({ + port: env.API_PORT, + fetch: async (req, serverRef) => { + const url = new URL(req.url); + + if (req.method === "GET" && url.pathname === "/health") { + return jsonResponse({ status: "ok" }); + } + + if (req.method === "GET" && url.pathname === "/prints/options") { + const limit = parseLimit(url.searchParams.get("limit")); + const data = await fetchRecentOptionPrints(clickhouse, limit); + return jsonResponse({ data }); + } + + if (req.method === "GET" && url.pathname === "/prints/equities") { + const limit = parseLimit(url.searchParams.get("limit")); + const data = await fetchRecentEquityPrints(clickhouse, limit); + return jsonResponse({ data }); + } + + if (req.method === "GET" && url.pathname === "/ws/options") { + if (serverRef.upgrade(req, { data: { channel: "options" } })) { + return new Response(null, { status: 101 }); + } + + return jsonResponse({ error: "websocket upgrade failed" }, 400); + } + + if (req.method === "GET" && url.pathname === "/ws/equities") { + if (serverRef.upgrade(req, { data: { channel: "equities" } })) { + return new Response(null, { status: 101 }); + } + + return jsonResponse({ error: "websocket upgrade failed" }, 400); + } + + return jsonResponse({ error: "not found" }, 404); + }, + websocket: { + open: (socket) => { + if (socket.data.channel === "options") { + optionSockets.add(socket); + } else { + equitySockets.add(socket); + } + + logger.info("websocket connected", { channel: socket.data.channel }); + }, + close: (socket) => { + if (socket.data.channel === "options") { + optionSockets.delete(socket); + } else { + equitySockets.delete(socket); + } + + logger.info("websocket disconnected", { channel: socket.data.channel }); + } + } + }); + + logger.info("api listening", { port: server.port }); + + const shutdown = async (signal: string) => { + logger.info("service stopping", { signal }); + server.stop(); + await nc.drain(); + await clickhouse.close(); + process.exit(0); + }; + + process.on("SIGINT", () => void shutdown("SIGINT")); + process.on("SIGTERM", () => void shutdown("SIGTERM")); +}; + +await run();