diff --git a/bun.lock b/bun.lock index 4c233bc..557deeb 100644 --- a/bun.lock +++ b/bun.lock @@ -55,14 +55,20 @@ "@islandflow/observability": "workspace:*", "@islandflow/storage": "workspace:*", "@islandflow/types": "workspace:*", + "redis": "^5.10.0", "zod": "^3.23.8", }, }, "services/candles": { "name": "@islandflow/candles", "dependencies": { + "@islandflow/bus": "workspace:*", "@islandflow/config": "workspace:*", "@islandflow/observability": "workspace:*", + "@islandflow/storage": "workspace:*", + "@islandflow/types": "workspace:*", + "redis": "^5.10.0", + "zod": "^3.23.8", }, }, "services/compute": { diff --git a/packages/bus/src/subjects.ts b/packages/bus/src/subjects.ts index 371c102..82274c4 100644 --- a/packages/bus/src/subjects.ts +++ b/packages/bus/src/subjects.ts @@ -6,6 +6,8 @@ export const STREAM_EQUITY_PRINTS = "EQUITY_PRINTS"; export const SUBJECT_EQUITY_PRINTS = "equities.prints"; export const STREAM_EQUITY_QUOTES = "EQUITY_QUOTES"; export const SUBJECT_EQUITY_QUOTES = "equities.quotes"; +export const STREAM_EQUITY_CANDLES = "EQUITY_CANDLES"; +export const SUBJECT_EQUITY_CANDLES = "equities.candles"; export const STREAM_EQUITY_JOINS = "EQUITY_JOINS"; export const SUBJECT_EQUITY_JOINS = "equities.joins"; export const STREAM_INFERRED_DARK = "INFERRED_DARK"; diff --git a/packages/storage/src/clickhouse.ts b/packages/storage/src/clickhouse.ts index 9fac84c..0aea7c5 100644 --- a/packages/storage/src/clickhouse.ts +++ b/packages/storage/src/clickhouse.ts @@ -2,6 +2,7 @@ import { createClient, type ClickHouseClient } from "@clickhouse/client"; import { AlertEventSchema, ClassifierHitEventSchema, + EquityCandleSchema, EquityPrintSchema, EquityQuoteSchema, EquityPrintJoinSchema, @@ -13,6 +14,7 @@ import { import type { AlertEvent, ClassifierHitEvent, + EquityCandle, EquityPrint, EquityQuote, EquityPrintJoin, @@ -37,6 +39,11 @@ import { EQUITY_QUOTES_TABLE, normalizeEquityQuote } from "./equity-quotes"; +import { + equityCandlesTableDDL, + EQUITY_CANDLES_TABLE, + normalizeEquityCandle +} from "./equity-candles"; import { equityPrintJoinsTableDDL, EQUITY_PRINT_JOINS_TABLE, @@ -121,6 +128,14 @@ export const ensureEquityQuotesTable = async ( }); }; +export const ensureEquityCandlesTable = async ( + client: ClickHouseClient +): Promise => { + await client.exec({ + query: equityCandlesTableDDL() + }); +}; + export const ensureEquityPrintJoinsTable = async ( client: ClickHouseClient ): Promise => { @@ -207,6 +222,18 @@ export const insertEquityQuote = async ( }); }; +export const insertEquityCandle = async ( + client: ClickHouseClient, + candle: EquityCandle +): Promise => { + const record = normalizeEquityCandle(candle); + await client.insert({ + table: EQUITY_CANDLES_TABLE, + values: [record], + format: "JSONEachRow" + }); +}; + export const insertEquityPrintJoin = async ( client: ClickHouseClient, join: EquityPrintJoin @@ -272,6 +299,14 @@ const clampLimit = (limit: number): number => { return Math.max(1, Math.min(1000, Math.floor(limit))); }; +const clampPositiveInt = (value: number, fallback = 1): number => { + if (!Number.isFinite(value)) { + return fallback; + } + + return Math.max(1, Math.floor(value)); +}; + const clampCursor = (value: number): number => { if (!Number.isFinite(value)) { return 0; @@ -291,6 +326,10 @@ const coerceNumber = (value: unknown): unknown => { return value; }; +const quoteString = (value: string): string => { + return JSON.stringify(value); +}; + const normalizeNumericFields = ( row: Record, fields: string[] @@ -353,6 +392,26 @@ const normalizeEquityQuoteRow = (row: unknown): unknown => { return row; }; +const normalizeEquityCandleRow = (row: unknown): unknown => { + if (row && typeof row === "object") { + return normalizeNumericFields(row as Record, [ + "source_ts", + "ingest_ts", + "seq", + "ts", + "interval_ms", + "open", + "high", + "low", + "close", + "volume", + "trade_count" + ]); + } + + return row; +}; + const normalizeEquityRow = (row: unknown): unknown => { if (row && typeof row === "object") { const record = normalizeNumericFields(row as Record, [ @@ -525,6 +584,24 @@ export const fetchRecentEquityQuotes = async ( return EquityQuoteSchema.array().parse(rows.map(normalizeEquityQuoteRow)); }; +export const fetchRecentEquityCandles = async ( + client: ClickHouseClient, + underlyingId: string, + intervalMs: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const safeInterval = clampPositiveInt(intervalMs, 1000); + const safeUnderlying = quoteString(underlyingId); + const result = await client.query({ + query: `SELECT * FROM ${EQUITY_CANDLES_TABLE} WHERE underlying_id = ${safeUnderlying} AND interval_ms = ${safeInterval} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + return EquityCandleSchema.array().parse(rows.map(normalizeEquityCandleRow)); +}; + export const fetchRecentEquityPrintJoins = async ( client: ClickHouseClient, limit: number @@ -691,6 +768,52 @@ export const fetchEquityQuotesAfter = async ( return EquityQuoteSchema.array().parse(rows.map(normalizeEquityQuoteRow)); }; +export const fetchEquityCandlesAfter = async ( + client: ClickHouseClient, + underlyingId: string, + intervalMs: number, + afterTs: number, + afterSeq: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const safeAfterTs = clampCursor(afterTs); + const safeAfterSeq = clampCursor(afterSeq); + const safeInterval = clampPositiveInt(intervalMs, 1000); + const safeUnderlying = quoteString(underlyingId); + + const result = await client.query({ + query: `SELECT * FROM ${EQUITY_CANDLES_TABLE} WHERE underlying_id = ${safeUnderlying} AND interval_ms = ${safeInterval} AND (ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + return EquityCandleSchema.array().parse(rows.map(normalizeEquityCandleRow)); +}; + +export const fetchEquityCandlesRange = async ( + client: ClickHouseClient, + underlyingId: string, + intervalMs: number, + startTs: number, + endTs: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const safeStart = clampCursor(startTs); + const safeEnd = clampCursor(endTs); + const safeInterval = clampPositiveInt(intervalMs, 1000); + const safeUnderlying = quoteString(underlyingId); + + const result = await client.query({ + query: `SELECT * FROM ${EQUITY_CANDLES_TABLE} WHERE underlying_id = ${safeUnderlying} AND interval_ms = ${safeInterval} AND ts >= ${safeStart} AND ts <= ${safeEnd} ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + return EquityCandleSchema.array().parse(rows.map(normalizeEquityCandleRow)); +}; + export const fetchEquityPrintJoinsAfter = async ( client: ClickHouseClient, afterTs: number, diff --git a/packages/storage/src/equity-candles.ts b/packages/storage/src/equity-candles.ts new file mode 100644 index 0000000..9ddfda6 --- /dev/null +++ b/packages/storage/src/equity-candles.ts @@ -0,0 +1,29 @@ +import type { EquityCandle } from "@islandflow/types"; + +export const EQUITY_CANDLES_TABLE = "equity_candles"; + +export const equityCandlesTableDDL = (): string => { + return ` +CREATE TABLE IF NOT EXISTS ${EQUITY_CANDLES_TABLE} ( + source_ts UInt64, + ingest_ts UInt64, + seq UInt64, + trace_id String, + ts UInt64, + interval_ms UInt32, + underlying_id String, + open Float64, + high Float64, + low Float64, + close Float64, + volume UInt64, + trade_count UInt32 +) +ENGINE = MergeTree +ORDER BY (underlying_id, interval_ms, ts) +`; +}; + +export const normalizeEquityCandle = (candle: EquityCandle): EquityCandle => { + return candle; +}; diff --git a/packages/storage/src/index.ts b/packages/storage/src/index.ts index a4c4079..192a474 100644 --- a/packages/storage/src/index.ts +++ b/packages/storage/src/index.ts @@ -4,6 +4,7 @@ export * from "./alerts"; export * from "./flow-packets"; export * from "./equity-prints"; export * from "./equity-quotes"; +export * from "./equity-candles"; export * from "./equity-print-joins"; export * from "./inferred-dark"; export * from "./option-prints"; diff --git a/packages/storage/tests/equity-candles.test.ts b/packages/storage/tests/equity-candles.test.ts new file mode 100644 index 0000000..cbb8dfd --- /dev/null +++ b/packages/storage/tests/equity-candles.test.ts @@ -0,0 +1,35 @@ +import { describe, expect, it } from "bun:test"; +import { + equityCandlesTableDDL, + EQUITY_CANDLES_TABLE, + normalizeEquityCandle +} from "../src/equity-candles"; + +const baseCandle = { + source_ts: 100, + ingest_ts: 200, + seq: 3, + trace_id: "candle:SPY:1000:0", + ts: 0, + interval_ms: 1000, + underlying_id: "SPY", + open: 450, + high: 451.5, + low: 449.25, + close: 450.75, + volume: 1200, + trade_count: 15 +}; + +describe("equity-candles storage helpers", () => { + it("keeps required fields intact", () => { + const normalized = normalizeEquityCandle(baseCandle); + expect(normalized).toEqual(baseCandle); + }); + + it("includes the correct table name in the DDL", () => { + const ddl = equityCandlesTableDDL(); + expect(ddl).toContain(EQUITY_CANDLES_TABLE); + expect(ddl).toContain("CREATE TABLE IF NOT EXISTS"); + }); +}); diff --git a/packages/types/src/events.ts b/packages/types/src/events.ts index 4200d27..b27a45f 100644 --- a/packages/types/src/events.ts +++ b/packages/types/src/events.ts @@ -59,6 +59,22 @@ export const EquityQuoteSchema = EventMetaSchema.merge( export type EquityQuote = z.infer; +export const EquityCandleSchema = EventMetaSchema.merge( + z.object({ + ts: z.number().int().nonnegative(), + interval_ms: z.number().int().positive(), + underlying_id: z.string().min(1), + open: z.number().nonnegative(), + high: z.number().nonnegative(), + low: z.number().nonnegative(), + close: z.number().nonnegative(), + volume: z.number().int().nonnegative(), + trade_count: z.number().int().nonnegative() + }) +); + +export type EquityCandle = z.infer; + export const EquityPrintJoinSchema = EventMetaSchema.merge( z.object({ id: z.string().min(1), diff --git a/services/api/package.json b/services/api/package.json index 6044a2f..41cf267 100644 --- a/services/api/package.json +++ b/services/api/package.json @@ -11,6 +11,7 @@ "@islandflow/observability": "workspace:*", "@islandflow/storage": "workspace:*", "@islandflow/types": "workspace:*", + "redis": "^5.10.0", "zod": "^3.23.8" } } diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 9e50961..d789a42 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -3,6 +3,7 @@ import { createLogger } from "@islandflow/observability"; import { SUBJECT_ALERTS, SUBJECT_CLASSIFIER_HITS, + SUBJECT_EQUITY_CANDLES, SUBJECT_EQUITY_JOINS, SUBJECT_EQUITY_PRINTS, SUBJECT_EQUITY_QUOTES, @@ -12,6 +13,7 @@ import { SUBJECT_OPTION_PRINTS, STREAM_ALERTS, STREAM_CLASSIFIER_HITS, + STREAM_EQUITY_CANDLES, STREAM_EQUITY_JOINS, STREAM_EQUITY_PRINTS, STREAM_EQUITY_QUOTES, @@ -28,6 +30,7 @@ import { createClickHouseClient, ensureAlertsTable, ensureClassifierHitsTable, + ensureEquityCandlesTable, ensureEquityPrintJoinsTable, ensureEquityPrintsTable, ensureEquityQuotesTable, @@ -41,6 +44,8 @@ import { fetchRecentFlowPackets, fetchRecentInferredDark, fetchRecentEquityQuotes, + fetchEquityCandlesAfter, + fetchEquityCandlesRange, fetchRecentOptionNBBO, fetchEquityPrintsAfter, fetchEquityPrintJoinsAfter, @@ -54,6 +59,7 @@ import { import { AlertEventSchema, ClassifierHitEventSchema, + EquityCandleSchema, EquityPrintSchema, EquityPrintJoinSchema, EquityQuoteSchema, @@ -62,6 +68,7 @@ import { OptionNBBOSchema, OptionPrintSchema } from "@islandflow/types"; +import { createClient } from "redis"; import { z } from "zod"; const service = "api"; @@ -72,6 +79,7 @@ const envSchema = z.object({ NATS_URL: z.string().default("nats://localhost:4222"), CLICKHOUSE_URL: z.string().default("http://localhost:8123"), CLICKHOUSE_DATABASE: z.string().default("default"), + REDIS_URL: z.string().default("redis://localhost:6379"), REST_DEFAULT_LIMIT: z.coerce.number().int().positive().default(200) }); @@ -105,16 +113,30 @@ const retry = async ( }; const limitSchema = z.coerce.number().int().positive().max(1000); +const candleLimitSchema = z.coerce.number().int().positive().max(5000); const replayParamsSchema = z.object({ after_ts: z.coerce.number().int().nonnegative().default(0), after_seq: z.coerce.number().int().nonnegative().default(0), limit: z.coerce.number().int().positive().max(1000).default(200) }); +const candleQuerySchema = z.object({ + underlying_id: z.string().min(1), + interval_ms: z.coerce.number().int().positive(), + start_ts: z.coerce.number().int().nonnegative().optional(), + end_ts: z.coerce.number().int().nonnegative().optional(), + limit: candleLimitSchema.optional(), + cache: z.string().optional() +}); +const candleReplaySchema = replayParamsSchema.extend({ + underlying_id: z.string().min(1), + interval_ms: z.coerce.number().int().positive() +}); type Channel = | "options" | "options-nbbo" | "equities" + | "equity-candles" | "equity-quotes" | "equity-joins" | "inferred-dark" @@ -129,6 +151,7 @@ type WsData = { const optionSockets = new Set>(); const optionNbboSockets = new Set>(); const equitySockets = new Set>(); +const equityCandleSockets = new Set>(); const equityQuoteSockets = new Set>(); const equityJoinSockets = new Set>(); const inferredDarkSockets = new Set>(); @@ -167,6 +190,70 @@ const parseReplayParams = (url: URL): { afterTs: number; afterSeq: number; limit }; }; +const parseBooleanParam = (value: string | null | undefined): boolean => { + if (!value) { + return false; + } + const normalized = value.trim().toLowerCase(); + return ["1", "true", "yes", "on"].includes(normalized); +}; + +const parseCandleParams = ( + url: URL +): { + underlyingId: string; + intervalMs: number; + startTs: number; + endTs: number; + limit: number; + useCache: boolean; +} => { + const params = candleQuerySchema.parse({ + underlying_id: url.searchParams.get("underlying_id") ?? undefined, + interval_ms: url.searchParams.get("interval_ms") ?? undefined, + start_ts: url.searchParams.get("start_ts") ?? undefined, + end_ts: url.searchParams.get("end_ts") ?? undefined, + limit: url.searchParams.get("limit") ?? undefined, + cache: url.searchParams.get("cache") ?? undefined + }); + + const endTs = params.end_ts ?? Date.now(); + const limit = params.limit ?? env.REST_DEFAULT_LIMIT; + const startTs = + params.start_ts ?? Math.max(0, Math.floor(endTs - params.interval_ms * limit)); + const rangeStart = Math.min(startTs, endTs); + const rangeEnd = Math.max(startTs, endTs); + + return { + underlyingId: params.underlying_id, + intervalMs: params.interval_ms, + startTs: rangeStart, + endTs: rangeEnd, + limit, + useCache: parseBooleanParam(params.cache) + }; +}; + +const parseCandleReplayParams = ( + url: URL +): { underlyingId: string; intervalMs: number; afterTs: number; afterSeq: number; limit: number } => { + const params = candleReplaySchema.parse({ + underlying_id: url.searchParams.get("underlying_id") ?? undefined, + interval_ms: url.searchParams.get("interval_ms") ?? undefined, + after_ts: url.searchParams.get("after_ts") ?? undefined, + after_seq: url.searchParams.get("after_seq") ?? undefined, + limit: url.searchParams.get("limit") ?? undefined + }); + + return { + underlyingId: params.underlying_id, + intervalMs: params.interval_ms, + afterTs: params.after_ts, + afterSeq: params.after_seq, + limit: params.limit + }; +}; + const broadcast = (sockets: Set>, payload: unknown): void => { const message = JSON.stringify(payload); @@ -182,6 +269,40 @@ const broadcast = (sockets: Set>, payload: unknown): void => { } }; +const buildCandleCacheKey = (underlyingId: string, intervalMs: number): string => { + return `candles:equity:${intervalMs}:${underlyingId}`; +}; + +const fetchEquityCandlesFromCache = async ( + client: ReturnType, + underlyingId: string, + intervalMs: number, + startTs: number, + endTs: number +): Promise => { + const key = buildCandleCacheKey(underlyingId, intervalMs); + const payloads = await client.zRangeByScore(key, startTs, endTs); + const parsed = payloads + .map((payload) => { + try { + return JSON.parse(payload) as unknown; + } catch { + return null; + } + }) + .filter((value): value is unknown => value !== null); + + const validated: unknown[] = []; + for (const entry of parsed) { + const result = EquityCandleSchema.safeParse(entry); + if (result.success) { + validated.push(result.data); + } + } + + return validated; +}; + const run = async () => { logger.info("service starting"); @@ -245,6 +366,19 @@ const run = async () => { num_replicas: 1 }); + await ensureStream(jsm, { + name: STREAM_EQUITY_CANDLES, + subjects: [SUBJECT_EQUITY_CANDLES], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + await ensureStream(jsm, { name: STREAM_EQUITY_JOINS, subjects: [SUBJECT_EQUITY_JOINS], @@ -320,6 +454,7 @@ const run = async () => { await ensureOptionNBBOTable(clickhouse); await ensureEquityPrintsTable(clickhouse); await ensureEquityQuotesTable(clickhouse); + await ensureEquityCandlesTable(clickhouse); await ensureEquityPrintJoinsTable(clickhouse); await ensureInferredDarkTable(clickhouse); await ensureFlowPacketsTable(clickhouse); @@ -327,6 +462,27 @@ const run = async () => { await ensureAlertsTable(clickhouse); }); + let redis: ReturnType | null = null; + try { + redis = createClient({ url: env.REDIS_URL }); + redis.on("error", (error) => { + logger.warn("redis client error", { + error: error instanceof Error ? error.message : String(error) + }); + }); + await retry("redis connect", 5, 500, async () => { + if (!redis) { + return; + } + await redis.connect(); + }); + } catch (error) { + logger.warn("redis unavailable, skipping candle cache", { + error: error instanceof Error ? error.message : String(error) + }); + redis = null; + } + const subscribeWithReset = async ( subject: string, stream: string, @@ -389,6 +545,12 @@ const run = async () => { "api-equity-quotes" ); + const equityCandleSubscription = await subscribeWithReset( + SUBJECT_EQUITY_CANDLES, + STREAM_EQUITY_CANDLES, + "api-equity-candles" + ); + const equityJoinSubscription = await subscribeWithReset( SUBJECT_EQUITY_JOINS, STREAM_EQUITY_JOINS, @@ -479,6 +641,21 @@ const run = async () => { } }; + const pumpEquityCandles = async () => { + for await (const msg of equityCandleSubscription.messages) { + try { + const payload = EquityCandleSchema.parse(equityCandleSubscription.decode(msg)); + broadcast(equityCandleSockets, { type: "equity-candle", payload }); + msg.ack(); + } catch (error) { + logger.error("failed to process equity candle", { + error: error instanceof Error ? error.message : String(error) + }); + msg.term(); + } + } + }; + const pumpEquityJoins = async () => { for await (const msg of equityJoinSubscription.messages) { try { @@ -558,6 +735,7 @@ const run = async () => { void pumpOptionNbbo(); void pumpEquities(); void pumpEquityQuotes(); + void pumpEquityCandles(); void pumpEquityJoins(); void pumpInferredDark(); void pumpFlow(); @@ -597,6 +775,43 @@ const run = async () => { return jsonResponse({ data }); } + if (req.method === "GET" && url.pathname === "/candles/equities") { + try { + const { underlyingId, intervalMs, startTs, endTs, limit, useCache } = + parseCandleParams(url); + if (useCache && redis && redis.isOpen) { + const cached = await fetchEquityCandlesFromCache( + redis, + underlyingId, + intervalMs, + startTs, + endTs + ); + if (cached.length > 0) { + return jsonResponse({ data: cached }); + } + } + + const data = await fetchEquityCandlesRange( + clickhouse, + underlyingId, + intervalMs, + startTs, + endTs, + limit + ); + return jsonResponse({ data }); + } catch (error) { + return jsonResponse( + { + error: "invalid candle query", + detail: error instanceof Error ? error.message : String(error) + }, + 400 + ); + } + } + if (req.method === "GET" && url.pathname === "/joins/equities") { const limit = parseLimit(url.searchParams.get("limit")); const data = await fetchRecentEquityPrintJoins(clickhouse, limit); @@ -659,6 +874,32 @@ const run = async () => { return jsonResponse({ data, next }); } + if (req.method === "GET" && url.pathname === "/replay/equity-candles") { + try { + const { underlyingId, intervalMs, afterTs, afterSeq, limit } = + parseCandleReplayParams(url); + const data = await fetchEquityCandlesAfter( + clickhouse, + underlyingId, + intervalMs, + afterTs, + afterSeq, + limit + ); + const last = data.at(-1); + const next = last ? { ts: last.ts, seq: last.seq } : null; + return jsonResponse({ data, next }); + } catch (error) { + return jsonResponse( + { + error: "invalid candle replay query", + detail: error instanceof Error ? error.message : String(error) + }, + 400 + ); + } + } + if (req.method === "GET" && url.pathname === "/replay/equity-joins") { const { afterTs, afterSeq, limit } = parseReplayParams(url); const data = await fetchEquityPrintJoinsAfter(clickhouse, afterTs, afterSeq, limit); @@ -699,6 +940,14 @@ const run = async () => { return jsonResponse({ error: "websocket upgrade failed" }, 400); } + if (req.method === "GET" && url.pathname === "/ws/equity-candles") { + if (serverRef.upgrade(req, { data: { channel: "equity-candles" } })) { + return new Response(null, { status: 101 }); + } + + return jsonResponse({ error: "websocket upgrade failed" }, 400); + } + if (req.method === "GET" && url.pathname === "/ws/equity-quotes") { if (serverRef.upgrade(req, { data: { channel: "equity-quotes" } })) { return new Response(null, { status: 101 }); @@ -757,6 +1006,8 @@ const run = async () => { optionNbboSockets.add(socket); } else if (socket.data.channel === "equities") { equitySockets.add(socket); + } else if (socket.data.channel === "equity-candles") { + equityCandleSockets.add(socket); } else if (socket.data.channel === "equity-quotes") { equityQuoteSockets.add(socket); } else if (socket.data.channel === "equity-joins") { @@ -780,6 +1031,8 @@ const run = async () => { optionNbboSockets.delete(socket); } else if (socket.data.channel === "equities") { equitySockets.delete(socket); + } else if (socket.data.channel === "equity-candles") { + equityCandleSockets.delete(socket); } else if (socket.data.channel === "equity-quotes") { equityQuoteSockets.delete(socket); } else if (socket.data.channel === "equity-joins") { @@ -804,6 +1057,9 @@ const run = async () => { const shutdown = async (signal: string) => { logger.info("service stopping", { signal }); server.stop(); + if (redis && redis.isOpen) { + await redis.quit(); + } await nc.drain(); await clickhouse.close(); process.exit(0); diff --git a/services/candles/package.json b/services/candles/package.json index d6cc269..05a16f2 100644 --- a/services/candles/package.json +++ b/services/candles/package.json @@ -6,7 +6,12 @@ "dev": "bun run src/index.ts" }, "dependencies": { + "@islandflow/bus": "workspace:*", "@islandflow/config": "workspace:*", - "@islandflow/observability": "workspace:*" + "@islandflow/observability": "workspace:*", + "@islandflow/storage": "workspace:*", + "@islandflow/types": "workspace:*", + "redis": "^5.10.0", + "zod": "^3.23.8" } } diff --git a/services/candles/src/aggregator.ts b/services/candles/src/aggregator.ts new file mode 100644 index 0000000..e00d9b9 --- /dev/null +++ b/services/candles/src/aggregator.ts @@ -0,0 +1,253 @@ +import type { EquityCandle, EquityPrint } from "@islandflow/types"; + +export type CandleAggregationConfig = { + intervalsMs: number[]; + maxLateMs: number; +}; + +export type CandleAggregationResult = { + emitted: EquityCandle[]; + droppedLate: number; +}; + +type CandleBuilder = { + windowStart: number; + intervalMs: number; + underlyingId: string; + open: number; + high: number; + low: number; + close: number; + volume: number; + tradeCount: number; + openTs: number; + openSeq: number; + openSourceTs: number; + closeTs: number; + closeSeq: number; + closeIngestTs: number; +}; + +type IntervalState = { + intervalMs: number; + underlyingId: string; + lastTsSeen: number; + windows: Map; +}; + +const toPositiveInt = (value: number): number | null => { + if (!Number.isFinite(value)) { + return null; + } + const normalized = Math.floor(value); + if (normalized <= 0) { + return null; + } + return normalized; +}; + +export const normalizeIntervals = (intervals: number[]): number[] => { + const unique = new Set(); + for (const interval of intervals) { + const normalized = toPositiveInt(interval); + if (normalized) { + unique.add(normalized); + } + } + return Array.from(unique).sort((a, b) => a - b); +}; + +export const parseIntervals = (value: string | undefined, fallback: number[]): number[] => { + if (!value) { + return normalizeIntervals(fallback); + } + + const parsed = value + .split(",") + .map((entry) => Number(entry.trim())) + .filter((entry) => Number.isFinite(entry)); + + const normalized = normalizeIntervals(parsed); + return normalized.length > 0 ? normalized : normalizeIntervals(fallback); +}; + +const buildStateKey = (underlyingId: string, intervalMs: number): string => { + return `${underlyingId}:${intervalMs}`; +}; + +const getWindowStart = (ts: number, intervalMs: number): number => { + return Math.floor(ts / intervalMs) * intervalMs; +}; + +const isEarlier = (ts: number, seq: number, otherTs: number, otherSeq: number): boolean => { + if (ts !== otherTs) { + return ts < otherTs; + } + return seq < otherSeq; +}; + +const isLater = (ts: number, seq: number, otherTs: number, otherSeq: number): boolean => { + if (ts !== otherTs) { + return ts > otherTs; + } + return seq > otherSeq; +}; + +const createBuilder = ( + print: EquityPrint, + intervalMs: number, + windowStart: number +): CandleBuilder => { + return { + windowStart, + intervalMs, + underlyingId: print.underlying_id, + open: print.price, + high: print.price, + low: print.price, + close: print.price, + volume: print.size, + tradeCount: 1, + openTs: print.ts, + openSeq: print.seq, + openSourceTs: print.source_ts, + closeTs: print.ts, + closeSeq: print.seq, + closeIngestTs: print.ingest_ts + }; +}; + +const updateBuilder = (builder: CandleBuilder, print: EquityPrint): CandleBuilder => { + builder.volume += print.size; + builder.tradeCount += 1; + builder.high = Math.max(builder.high, print.price); + builder.low = Math.min(builder.low, print.price); + + if (isEarlier(print.ts, print.seq, builder.openTs, builder.openSeq)) { + builder.open = print.price; + builder.openTs = print.ts; + builder.openSeq = print.seq; + builder.openSourceTs = print.source_ts; + } + + if (isLater(print.ts, print.seq, builder.closeTs, builder.closeSeq)) { + builder.close = print.price; + builder.closeTs = print.ts; + builder.closeSeq = print.seq; + builder.closeIngestTs = print.ingest_ts; + } + + return builder; +}; + +const toEquityCandle = (builder: CandleBuilder): EquityCandle => { + return { + source_ts: builder.openSourceTs, + ingest_ts: builder.closeIngestTs, + seq: builder.closeSeq, + trace_id: `candle:${builder.underlyingId}:${builder.intervalMs}:${builder.windowStart}`, + ts: builder.windowStart, + interval_ms: builder.intervalMs, + underlying_id: builder.underlyingId, + open: builder.open, + high: builder.high, + low: builder.low, + close: builder.close, + volume: builder.volume, + trade_count: builder.tradeCount + }; +}; + +const flushState = (state: IntervalState, watermark: number): EquityCandle[] => { + const eligibleStarts: number[] = []; + for (const start of state.windows.keys()) { + if (start + state.intervalMs <= watermark) { + eligibleStarts.push(start); + } + } + + if (eligibleStarts.length === 0) { + return []; + } + + eligibleStarts.sort((a, b) => a - b); + const emitted: EquityCandle[] = []; + for (const start of eligibleStarts) { + const builder = state.windows.get(start); + if (!builder) { + continue; + } + state.windows.delete(start); + emitted.push(toEquityCandle(builder)); + } + + return emitted; +}; + +export class CandleAggregator { + private readonly intervalsMs: number[]; + private readonly maxLateMs: number; + private readonly stateByKey = new Map(); + + constructor(config: CandleAggregationConfig) { + this.intervalsMs = normalizeIntervals(config.intervalsMs); + this.maxLateMs = Math.max(0, Math.floor(config.maxLateMs)); + } + + ingest(print: EquityPrint): CandleAggregationResult { + const emitted: EquityCandle[] = []; + let droppedLate = 0; + + for (const intervalMs of this.intervalsMs) { + const key = buildStateKey(print.underlying_id, intervalMs); + const state = + this.stateByKey.get(key) ?? + ({ + intervalMs, + underlyingId: print.underlying_id, + lastTsSeen: 0, + windows: new Map() + } satisfies IntervalState); + + state.lastTsSeen = Math.max(state.lastTsSeen, print.ts); + this.stateByKey.set(key, state); + + const windowStart = getWindowStart(print.ts, intervalMs); + const windowEnd = windowStart + intervalMs; + const watermark = Math.max(0, state.lastTsSeen - this.maxLateMs); + + if (windowEnd <= watermark && !state.windows.has(windowStart)) { + droppedLate += 1; + } else { + const existing = state.windows.get(windowStart); + if (existing) { + updateBuilder(existing, print); + } else { + state.windows.set(windowStart, createBuilder(print, intervalMs, windowStart)); + } + } + + emitted.push(...flushState(state, watermark)); + } + + return { emitted, droppedLate }; + } + + drain(): EquityCandle[] { + const emitted: EquityCandle[] = []; + + for (const state of this.stateByKey.values()) { + const starts = Array.from(state.windows.keys()).sort((a, b) => a - b); + for (const start of starts) { + const builder = state.windows.get(start); + if (!builder) { + continue; + } + state.windows.delete(start); + emitted.push(toEquityCandle(builder)); + } + } + + return emitted; + } +} diff --git a/services/candles/src/index.ts b/services/candles/src/index.ts index 1c777c3..dfb773d 100644 --- a/services/candles/src/index.ts +++ b/services/candles/src/index.ts @@ -1,17 +1,387 @@ -import { createLogger } from "@islandflow/observability"; +import { readEnv } from "@islandflow/config"; +import { createLogger, createMetrics } from "@islandflow/observability"; +import { + SUBJECT_EQUITY_CANDLES, + SUBJECT_EQUITY_PRINTS, + STREAM_EQUITY_CANDLES, + STREAM_EQUITY_PRINTS, + buildDurableConsumer, + connectJetStreamWithRetry, + ensureStream, + publishJson, + subscribeJson +} from "@islandflow/bus"; +import { + createClickHouseClient, + ensureEquityCandlesTable, + insertEquityCandle +} from "@islandflow/storage"; +import { EquityCandleSchema, EquityPrintSchema, type EquityCandle } from "@islandflow/types"; +import { createClient } from "redis"; +import { z } from "zod"; +import { CandleAggregator, parseIntervals } from "./aggregator"; const service = "candles"; const logger = createLogger({ service }); +const metrics = createMetrics({ service }); -logger.info("service starting"); +const envSchema = z.object({ + NATS_URL: z.string().default("nats://localhost:4222"), + CLICKHOUSE_URL: z.string().default("http://localhost:8123"), + CLICKHOUSE_DATABASE: z.string().default("default"), + REDIS_URL: z.string().default("redis://localhost:6379"), + CANDLE_INTERVALS_MS: z.string().default("1000,5000,60000"), + CANDLE_MAX_LATE_MS: z.coerce.number().int().nonnegative().default(0), + CANDLE_CACHE_LIMIT: z.coerce.number().int().nonnegative().default(2000), + CANDLE_DELIVER_POLICY: z + .enum(["new", "all", "last", "last_per_subject"]) + .default("new"), + CANDLE_CONSUMER_RESET: z + .preprocess((value) => { + if (typeof value === "string") { + const normalized = value.trim().toLowerCase(); + if (["1", "true", "yes", "on"].includes(normalized)) { + return true; + } + if (["0", "false", "no", "off"].includes(normalized)) { + return false; + } + } + return value; + }, z.boolean()) + .default(false) +}); -const shutdown = (signal: string) => { - logger.info("service stopping", { signal }); - process.exit(0); +const env = readEnv(envSchema); + +const retry = async ( + label: string, + attempts: number, + delayMs: number, + task: () => Promise +): Promise => { + let lastError: unknown; + + for (let attempt = 1; attempt <= attempts; attempt += 1) { + try { + return await task(); + } catch (error) { + lastError = error; + logger.warn(`${label} attempt failed`, { + attempt, + error: error instanceof Error ? error.message : String(error) + }); + + if (attempt < attempts) { + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } + } + } + + throw lastError ?? new Error(`${label} failed after retries`); }; -process.on("SIGINT", () => shutdown("SIGINT")); -process.on("SIGTERM", () => shutdown("SIGTERM")); +const applyDeliverPolicy = ( + opts: ReturnType, + policy: typeof env.CANDLE_DELIVER_POLICY +) => { + switch (policy) { + case "all": + opts.deliverAll(); + break; + case "last": + opts.deliverLast(); + break; + case "last_per_subject": + opts.deliverLastPerSubject(); + break; + case "new": + default: + opts.deliverNew(); + break; + } +}; -// Keep the process alive until real listeners are wired. -setInterval(() => {}, 60_000); +const createRedisClient = (url: string) => { + return createClient({ url }); +}; + +const buildCacheKey = (underlyingId: string, intervalMs: number): string => { + return `candles:equity:${intervalMs}:${underlyingId}`; +}; + +const cacheCandle = async ( + client: ReturnType, + candle: EquityCandle, + cacheLimit: number +): Promise => { + if (cacheLimit <= 0) { + return; + } + + const key = buildCacheKey(candle.underlying_id, candle.interval_ms); + const payload = JSON.stringify(candle); + const maxAgeMs = candle.interval_ms * cacheLimit; + const trimBefore = Math.max(0, candle.ts - maxAgeMs); + const multi = client.multi(); + multi.zAdd(key, { score: candle.ts, value: payload }); + if (trimBefore > 0) { + multi.zRemRangeByScore(key, 0, trimBefore); + } + await multi.exec(); +}; + +const emitCandle = async ( + clickhouse: ReturnType, + js: Awaited>["js"], + redis: ReturnType | null, + candle: EquityCandle, + cacheLimit: number +): Promise => { + try { + await insertEquityCandle(clickhouse, candle); + } catch (error) { + metrics.count("candles.persist_failed", 1); + logger.error("failed to persist candle", { + error: error instanceof Error ? error.message : String(error), + trace_id: candle.trace_id, + underlying_id: candle.underlying_id, + interval_ms: candle.interval_ms + }); + return; + } + + metrics.count("candles.emitted", 1, { + interval_ms: String(candle.interval_ms) + }); + + try { + await publishJson(js, SUBJECT_EQUITY_CANDLES, candle); + } catch (error) { + metrics.count("candles.publish_failed", 1); + logger.error("failed to publish candle", { + error: error instanceof Error ? error.message : String(error), + trace_id: candle.trace_id, + underlying_id: candle.underlying_id, + interval_ms: candle.interval_ms + }); + } + + if (redis && redis.isOpen) { + try { + await cacheCandle(redis, candle, cacheLimit); + } catch (error) { + metrics.count("candles.cache_failed", 1); + logger.warn("failed to cache candle", { + error: error instanceof Error ? error.message : String(error), + trace_id: candle.trace_id, + underlying_id: candle.underlying_id, + interval_ms: candle.interval_ms + }); + } + } +}; + +const run = async () => { + logger.info("service starting"); + + const intervalsMs = parseIntervals(env.CANDLE_INTERVALS_MS, [1000, 5000, 60000]); + if (intervalsMs.length === 0) { + throw new Error("CANDLE_INTERVALS_MS produced no valid intervals"); + } + + const aggregator = new CandleAggregator({ + intervalsMs, + maxLateMs: env.CANDLE_MAX_LATE_MS + }); + + const { nc, js, jsm } = await connectJetStreamWithRetry( + { + servers: env.NATS_URL, + name: service + }, + { attempts: 20, delayMs: 500 } + ); + + await ensureStream(jsm, { + name: STREAM_EQUITY_PRINTS, + subjects: [SUBJECT_EQUITY_PRINTS], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + + await ensureStream(jsm, { + name: STREAM_EQUITY_CANDLES, + subjects: [SUBJECT_EQUITY_CANDLES], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + + const clickhouse = createClickHouseClient({ + url: env.CLICKHOUSE_URL, + database: env.CLICKHOUSE_DATABASE + }); + + await retry("clickhouse table init", 20, 500, async () => { + await ensureEquityCandlesTable(clickhouse); + }); + + let redis: ReturnType | null = null; + try { + redis = createRedisClient(env.REDIS_URL); + redis.on("error", (error) => { + logger.warn("redis client error", { + error: error instanceof Error ? error.message : String(error) + }); + }); + await retry("redis connect", 20, 500, async () => { + if (!redis) { + return; + } + await redis.connect(); + }); + } catch (error) { + logger.warn("redis unavailable, skipping hot cache", { + error: error instanceof Error ? error.message : String(error) + }); + redis = null; + } + + const durableName = "candles-equity-prints"; + if (env.CANDLE_CONSUMER_RESET) { + try { + await jsm.consumers.delete(STREAM_EQUITY_PRINTS, durableName); + logger.warn("reset jetstream consumer", { durable: durableName }); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (!message.includes("not found")) { + logger.warn("failed to reset jetstream consumer", { durable: durableName, error: message }); + } + } + } else { + try { + const info = await jsm.consumers.info(STREAM_EQUITY_PRINTS, durableName); + if (info?.config?.deliver_policy && info.config.deliver_policy !== env.CANDLE_DELIVER_POLICY) { + logger.warn("resetting consumer due to deliver policy change", { + durable: durableName, + current: info.config.deliver_policy, + desired: env.CANDLE_DELIVER_POLICY + }); + await jsm.consumers.delete(STREAM_EQUITY_PRINTS, durableName); + } + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (!message.includes("not found")) { + logger.warn("failed to inspect jetstream consumer", { durable: durableName, error: message }); + } + } + } + + const subscribeWithReset = async () => { + const opts = buildDurableConsumer(durableName); + applyDeliverPolicy(opts, env.CANDLE_DELIVER_POLICY); + try { + return await subscribeJson(js, SUBJECT_EQUITY_PRINTS, opts); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + const shouldReset = + message.includes("duplicate subscription") || + message.includes("durable requires") || + message.includes("subject does not match consumer"); + + if (!shouldReset) { + throw error; + } + + logger.warn("resetting jetstream consumer", { durable: durableName, error: message }); + + try { + await jsm.consumers.delete(STREAM_EQUITY_PRINTS, durableName); + } catch (deleteError) { + const deleteMessage = deleteError instanceof Error ? deleteError.message : String(deleteError); + if (!deleteMessage.includes("not found")) { + logger.warn("failed to delete jetstream consumer", { + durable: durableName, + error: deleteMessage + }); + } + } + + const resetOpts = buildDurableConsumer(durableName); + applyDeliverPolicy(resetOpts, env.CANDLE_DELIVER_POLICY); + return await subscribeJson(js, SUBJECT_EQUITY_PRINTS, resetOpts); + } + }; + + const subscription = await subscribeWithReset(); + let droppedLate = 0; + let lastLateLog = Date.now(); + + const loop = async () => { + for await (const msg of subscription.messages) { + try { + const print = EquityPrintSchema.parse(subscription.decode(msg)); + metrics.count("candles.prints", 1); + + const result = aggregator.ingest(print); + if (result.droppedLate > 0) { + droppedLate += result.droppedLate; + metrics.count("candles.prints_late", result.droppedLate); + const now = Date.now(); + if (now - lastLateLog > 5000) { + logger.warn("late equity prints dropped", { dropped: droppedLate }); + droppedLate = 0; + lastLateLog = now; + } + } + + for (const candle of result.emitted) { + const validated = EquityCandleSchema.parse(candle); + await emitCandle(clickhouse, js, redis, validated, env.CANDLE_CACHE_LIMIT); + } + + msg.ack(); + } catch (error) { + metrics.count("candles.prints_failed", 1); + logger.error("failed to process equity print", { + error: error instanceof Error ? error.message : String(error) + }); + msg.term(); + } + } + }; + + const shutdown = async (signal: string) => { + logger.info("service stopping", { signal }); + const remaining = aggregator.drain(); + for (const candle of remaining) { + const validated = EquityCandleSchema.parse(candle); + await emitCandle(clickhouse, js, redis, validated, env.CANDLE_CACHE_LIMIT); + } + if (redis && redis.isOpen) { + await redis.quit(); + } + await nc.drain(); + await clickhouse.close(); + process.exit(0); + }; + + process.on("SIGINT", () => void shutdown("SIGINT")); + process.on("SIGTERM", () => void shutdown("SIGTERM")); + + void loop(); +}; + +await run(); diff --git a/services/candles/tests/aggregator.test.ts b/services/candles/tests/aggregator.test.ts new file mode 100644 index 0000000..79a39b2 --- /dev/null +++ b/services/candles/tests/aggregator.test.ts @@ -0,0 +1,81 @@ +import { describe, expect, test } from "bun:test"; +import type { EquityPrint } from "@islandflow/types"; +import { CandleAggregator } from "../src/aggregator"; + +const buildPrint = (overrides: Partial = {}): EquityPrint => { + const ts = overrides.ts ?? 0; + return { + source_ts: overrides.source_ts ?? ts, + ingest_ts: overrides.ingest_ts ?? ts, + seq: overrides.seq ?? 0, + trace_id: overrides.trace_id ?? `print:${overrides.seq ?? 0}`, + ts, + underlying_id: overrides.underlying_id ?? "AAPL", + price: overrides.price ?? 0, + size: overrides.size ?? 1, + exchange: overrides.exchange ?? "TEST", + offExchangeFlag: overrides.offExchangeFlag ?? false + }; +}; + +describe("CandleAggregator", () => { + test("emits candle with correct OHLC and volume", () => { + const aggregator = new CandleAggregator({ intervalsMs: [1000], maxLateMs: 0 }); + + const first = buildPrint({ ts: 1000, price: 10, size: 100, seq: 1 }); + const second = buildPrint({ ts: 1500, price: 12, size: 50, seq: 2 }); + const third = buildPrint({ ts: 2500, price: 11, size: 10, seq: 3 }); + + expect(aggregator.ingest(first).emitted).toHaveLength(0); + expect(aggregator.ingest(second).emitted).toHaveLength(0); + + const result = aggregator.ingest(third); + expect(result.emitted).toHaveLength(1); + + const candle = result.emitted[0]; + expect(candle.ts).toBe(1000); + expect(candle.open).toBe(10); + expect(candle.high).toBe(12); + expect(candle.low).toBe(10); + expect(candle.close).toBe(12); + expect(candle.volume).toBe(150); + expect(candle.trade_count).toBe(2); + expect(candle.seq).toBe(2); + expect(candle.source_ts).toBe(1000); + expect(candle.ingest_ts).toBe(1500); + }); + + test("respects open and close order with out-of-order prints", () => { + const aggregator = new CandleAggregator({ intervalsMs: [1000], maxLateMs: 2000 }); + + const late = buildPrint({ ts: 1500, price: 15, size: 10, seq: 2 }); + const early = buildPrint({ ts: 1200, price: 11, size: 20, seq: 1 }); + + aggregator.ingest(late); + aggregator.ingest(early); + + const [candle] = aggregator.drain(); + expect(candle.open).toBe(11); + expect(candle.close).toBe(15); + expect(candle.trade_count).toBe(2); + expect(candle.seq).toBe(2); + expect(candle.source_ts).toBe(1200); + expect(candle.ingest_ts).toBe(1500); + }); + + test("drops late prints once window is closed", () => { + const aggregator = new CandleAggregator({ intervalsMs: [1000], maxLateMs: 0 }); + + const first = buildPrint({ ts: 1000, price: 10, size: 100, seq: 1 }); + const next = buildPrint({ ts: 3000, price: 14, size: 50, seq: 2 }); + const late = buildPrint({ ts: 1500, price: 9, size: 25, seq: 3 }); + + aggregator.ingest(first); + const flush = aggregator.ingest(next); + expect(flush.emitted).toHaveLength(1); + + const lateResult = aggregator.ingest(late); + expect(lateResult.emitted).toHaveLength(0); + expect(lateResult.droppedLate).toBe(1); + }); +});