diff --git a/scripts/dev-services.ts b/scripts/dev-services.ts index bd3cf7b..09cd381 100644 --- a/scripts/dev-services.ts +++ b/scripts/dev-services.ts @@ -1,3 +1,6 @@ +import { mkdir, readFile, rm, writeFile } from "node:fs/promises"; +import path from "node:path"; + type ChildSpec = { name: string; cmd: string[]; @@ -11,6 +14,10 @@ type Child = { const children: Child[] = []; let shuttingDown = false; +let shutdownPromise: Promise | null = null; +let forceShutdownPromise: Promise | null = null; +const stateDir = path.join(process.cwd(), ".tmp"); +const pidFile = path.join(stateDir, "dev-services-runner-pids.json"); const sleep = (delayMs: number): Promise => { return new Promise((resolve) => setTimeout(resolve, delayMs)); @@ -24,6 +31,26 @@ const waitForExit = async (proc: Bun.Subprocess, timeoutMs: number): Promise { + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +}; + +const waitForPidExit = async (pid: number, timeoutMs: number): Promise => { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (!isPidRunning(pid)) { + return true; + } + await sleep(100); + } + return !isPidRunning(pid); +}; + const signalProcess = (pid: number, signal: NodeJS.Signals): boolean => { try { process.kill(-pid, signal); @@ -43,12 +70,15 @@ const stopChild = async (child: Child, timeoutMs = 5000): Promise => { if (!pid) { return; } + await stopPid(pid, timeoutMs); +}; +const stopPid = async (pid: number, timeoutMs = 5000): Promise => { if (!signalProcess(pid, "SIGINT")) { return; } - const exited = await waitForExit(child.process, timeoutMs); + const exited = await waitForPidExit(pid, timeoutMs); if (exited) { return; } @@ -57,19 +87,62 @@ const stopChild = async (child: Child, timeoutMs = 5000): Promise => { return; } - await waitForExit(child.process, 2000); + await waitForPidExit(pid, 2000); +}; + +const persistChildren = async (): Promise => { + await mkdir(stateDir, { recursive: true }); + const payload = children + .map((child) => { + const pid = child.process.pid; + return pid ? { name: child.name, pid } : null; + }) + .filter((value): value is { name: string; pid: number } => value !== null); + await writeFile(pidFile, JSON.stringify(payload, null, 2)); +}; + +const clearPersistedChildren = async (): Promise => { + await rm(pidFile, { force: true }); +}; + +const cleanupStaleChildren = async (): Promise => { + try { + const raw = await readFile(pidFile, "utf8"); + const recorded = JSON.parse(raw) as Array<{ name?: string; pid?: number }>; + const stale = recorded.filter( + (entry): entry is { name: string; pid: number } => + typeof entry?.name === "string" && typeof entry?.pid === "number" && isPidRunning(entry.pid) + ); + + if (stale.length > 0) { + console.log( + `[dev-services] Cleaning up stale processes from previous run: ${stale + .map((entry) => `${entry.name}(${entry.pid})`) + .join(", ")}` + ); + } + + for (const entry of stale) { + await stopPid(entry.pid, 3000); + } + } catch { + // No persisted children from a prior run. + } finally { + await clearPersistedChildren(); + } }; const spawnChild = ({ name, cmd, cwd }: ChildSpec): void => { const proc = Bun.spawn(cmd, { cwd, + detached: true, stdin: "inherit", stdout: "inherit", - stderr: "inherit", - detached: true + stderr: "inherit" }); children.push({ name, process: proc }); + void persistChildren(); proc.exited.then((code) => { if (shuttingDown) { @@ -83,22 +156,68 @@ const spawnChild = ({ name, cmd, cwd }: ChildSpec): void => { }); }; -const shutdown = async (code: number): Promise => { - if (shuttingDown) { - return; +const forceShutdown = async (code: number): Promise => { + if (forceShutdownPromise) { + return forceShutdownPromise; } shuttingDown = true; + forceShutdownPromise = (async () => { + await Promise.all( + children.map(async (child) => { + const pid = child.process.pid; + if (!pid) { + return; + } - if (children.length > 0) { - await Promise.all(children.map((child) => stopChild(child))); - } + if (!signalProcess(pid, "SIGKILL")) { + return; + } - process.exit(code); + await waitForPidExit(pid, 2000); + }) + ); + + await clearPersistedChildren(); + process.exit(code); + })(); + + return forceShutdownPromise; }; -process.on("SIGINT", () => void shutdown(0)); -process.on("SIGTERM", () => void shutdown(0)); +const shutdown = async (code: number): Promise => { + if (shutdownPromise) { + return shutdownPromise; + } + + shuttingDown = true; + shutdownPromise = (async () => { + if (children.length > 0) { + await Promise.all(children.map((child) => stopChild(child))); + } + + await clearPersistedChildren(); + process.exit(code); + })(); + + return shutdownPromise; +}; + +const handleSignal = (signal: NodeJS.Signals) => { + if (shuttingDown) { + if (signal === "SIGINT") { + console.error("[dev-services] Force shutdown requested. Terminating remaining processes."); + void forceShutdown(130); + } + return; + } + + void shutdown(0); +}; + +process.on("SIGINT", () => handleSignal("SIGINT")); +process.on("SIGTERM", () => handleSignal("SIGTERM")); +process.on("SIGHUP", () => handleSignal("SIGHUP")); const tasks: ChildSpec[] = [ { name: "ingest-options", cmd: ["bun", "run", "dev"], cwd: "services/ingest-options" }, @@ -110,6 +229,7 @@ const tasks: ChildSpec[] = [ { name: "api", cmd: ["bun", "run", "dev"], cwd: "services/api" } ]; +await cleanupStaleChildren(); for (const task of tasks) { spawnChild(task); } diff --git a/scripts/dev.ts b/scripts/dev.ts index c13a338..64406d6 100644 --- a/scripts/dev.ts +++ b/scripts/dev.ts @@ -1,4 +1,6 @@ import net from "node:net"; +import { mkdir, readFile, rm, writeFile } from "node:fs/promises"; +import path from "node:path"; type ChildSpec = { name: string; @@ -13,6 +15,10 @@ type Child = { const children: Child[] = []; let shuttingDown = false; +let shutdownPromise: Promise | null = null; +let forceShutdownPromise: Promise | null = null; +const stateDir = path.join(process.cwd(), ".tmp"); +const pidFile = path.join(stateDir, "dev-runner-pids.json"); const sleep = (delayMs: number): Promise => { return new Promise((resolve) => setTimeout(resolve, delayMs)); @@ -26,6 +32,26 @@ const waitForExit = async (proc: Bun.Subprocess, timeoutMs: number): Promise { + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +}; + +const waitForPidExit = async (pid: number, timeoutMs: number): Promise => { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (!isPidRunning(pid)) { + return true; + } + await sleep(100); + } + return !isPidRunning(pid); +}; + const signalProcess = (pid: number, signal: NodeJS.Signals): boolean => { try { process.kill(-pid, signal); @@ -45,12 +71,15 @@ const stopChild = async (child: Child, timeoutMs = 5000): Promise => { if (!pid) { return; } + await stopPid(pid, timeoutMs); +}; +const stopPid = async (pid: number, timeoutMs = 5000): Promise => { if (!signalProcess(pid, "SIGINT")) { return; } - const exited = await waitForExit(child.process, timeoutMs); + const exited = await waitForPidExit(pid, timeoutMs); if (exited) { return; } @@ -59,7 +88,49 @@ const stopChild = async (child: Child, timeoutMs = 5000): Promise => { return; } - await waitForExit(child.process, 2000); + await waitForPidExit(pid, 2000); +}; + +const persistChildren = async (): Promise => { + await mkdir(stateDir, { recursive: true }); + const payload = children + .map((child) => { + const pid = child.process.pid; + return pid ? { name: child.name, pid } : null; + }) + .filter((value): value is { name: string; pid: number } => value !== null); + await writeFile(pidFile, JSON.stringify(payload, null, 2)); +}; + +const clearPersistedChildren = async (): Promise => { + await rm(pidFile, { force: true }); +}; + +const cleanupStaleChildren = async (): Promise => { + try { + const raw = await readFile(pidFile, "utf8"); + const recorded = JSON.parse(raw) as Array<{ name?: string; pid?: number }>; + const stale = recorded.filter( + (entry): entry is { name: string; pid: number } => + typeof entry?.name === "string" && typeof entry?.pid === "number" && isPidRunning(entry.pid) + ); + + if (stale.length > 0) { + console.log( + `[dev] Cleaning up stale processes from previous run: ${stale + .map((entry) => `${entry.name}(${entry.pid})`) + .join(", ")}` + ); + } + + for (const entry of stale) { + await stopPid(entry.pid, 3000); + } + } catch { + // No persisted children from a prior run. + } finally { + await clearPersistedChildren(); + } }; const parseBool = (value: string | undefined): boolean => { @@ -117,13 +188,14 @@ const checkHttp = async (url: string): Promise => { const spawnChild = ({ name, cmd, cwd }: ChildSpec): void => { const proc = Bun.spawn(cmd, { cwd, + detached: true, stdin: "inherit", stdout: "inherit", - stderr: "inherit", - detached: true + stderr: "inherit" }); children.push({ name, process: proc }); + void persistChildren(); proc.exited.then((code) => { if (shuttingDown) { @@ -142,29 +214,75 @@ const spawnChild = ({ name, cmd, cwd }: ChildSpec): void => { }); }; -const shutdown = async (code: number): Promise => { - if (shuttingDown) { - return; +const forceShutdown = async (code: number): Promise => { + if (forceShutdownPromise) { + return forceShutdownPromise; } shuttingDown = true; + forceShutdownPromise = (async () => { + await Promise.all( + children.map(async (child) => { + const pid = child.process.pid; + if (!pid) { + return; + } - const infra = children.find((child) => child.name === "infra") ?? null; - const services = children.filter((child) => child.name !== "infra"); + if (!signalProcess(pid, "SIGKILL")) { + return; + } - if (services.length > 0) { - await Promise.all(services.map((child) => stopChild(child))); - } + await waitForPidExit(pid, 2000); + }) + ); - if (infra) { - await stopChild(infra, 8000); - } + await clearPersistedChildren(); + process.exit(code); + })(); - process.exit(code); + return forceShutdownPromise; }; -process.on("SIGINT", () => void shutdown(0)); -process.on("SIGTERM", () => void shutdown(0)); +const shutdown = async (code: number): Promise => { + if (shutdownPromise) { + return shutdownPromise; + } + + shuttingDown = true; + shutdownPromise = (async () => { + const infra = children.find((child) => child.name === "infra") ?? null; + const services = children.filter((child) => child.name !== "infra"); + + if (services.length > 0) { + await Promise.all(services.map((child) => stopChild(child))); + } + + if (infra) { + await stopChild(infra, 8000); + } + + await clearPersistedChildren(); + process.exit(code); + })(); + + return shutdownPromise; +}; + +const handleSignal = (signal: NodeJS.Signals) => { + if (shuttingDown) { + if (signal === "SIGINT") { + console.error("[dev] Force shutdown requested. Terminating remaining processes."); + void forceShutdown(130); + } + return; + } + + void shutdown(0); +}; + +process.on("SIGINT", () => handleSignal("SIGINT")); +process.on("SIGTERM", () => handleSignal("SIGTERM")); +process.on("SIGHUP", () => handleSignal("SIGHUP")); const waitForInfra = async (): Promise => { const natsTarget = parseUrlHostPort(process.env.NATS_URL ?? "", "127.0.0.1", 4222); @@ -218,6 +336,7 @@ if (parseBool(process.env.REPLAY_ENABLED)) { serviceTasks.push({ name: "replay", cmd: ["bun", "run", "dev"], cwd: "services/replay" }); } +await cleanupStaleChildren(); spawnChild(infraTask); await waitForInfra(); diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 54c7ae3..ff99fcd 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -89,6 +89,31 @@ const envSchema = z.object({ const env = readEnv(envSchema); +const state = { + shuttingDown: false, + shutdownPromise: null as Promise | null +}; + +const getErrorMessage = (error: unknown): string => { + return error instanceof Error ? error.message : String(error); +}; + +const isExpectedShutdownError = (error: unknown): boolean => { + if (!state.shuttingDown) { + return false; + } + + const message = getErrorMessage(error).toUpperCase(); + return [ + "SOCKET CONNECTION WAS CLOSED UNEXPECTEDLY", + "SOCKET CLOSED UNEXPECTEDLY", + "ECONNREFUSED", + "CONNECTION_CLOSED", + "CONNECTION_DRAINING", + "TIMEOUT" + ].some((token) => message.includes(token)); +}; + const retry = async ( label: string, attempts: number, @@ -517,8 +542,12 @@ const run = async () => { try { redis = createClient({ url: env.REDIS_URL }); redis.on("error", (error) => { + if (isExpectedShutdownError(error)) { + return; + } + logger.warn("redis client error", { - error: error instanceof Error ? error.message : String(error) + error: getErrorMessage(error) }); }); await retry("redis connect", 5, 500, async () => { @@ -1150,14 +1179,45 @@ const run = async () => { logger.info("api listening", { port: server.port }); const shutdown = async (signal: string) => { - logger.info("service stopping", { signal }); - server.stop(); - if (redis && redis.isOpen) { - await redis.quit(); + if (state.shutdownPromise) { + return state.shutdownPromise; } - await nc.drain(); - await clickhouse.close(); - process.exit(0); + + state.shuttingDown = true; + state.shutdownPromise = (async () => { + logger.info("service stopping", { signal }); + server.stop(); + + if (redis && redis.isOpen) { + try { + await redis.quit(); + } catch (error) { + if (!isExpectedShutdownError(error)) { + throw error; + } + } + } + + try { + await nc.drain(); + } catch (error) { + if (!isExpectedShutdownError(error)) { + throw error; + } + } + + try { + await clickhouse.close(); + } catch (error) { + if (!isExpectedShutdownError(error)) { + throw error; + } + } + + process.exit(0); + })(); + + return state.shutdownPromise; }; process.on("SIGINT", () => void shutdown("SIGINT")); diff --git a/services/candles/src/index.ts b/services/candles/src/index.ts index a02ab70..9774e6d 100644 --- a/services/candles/src/index.ts +++ b/services/candles/src/index.ts @@ -54,6 +54,31 @@ const envSchema = z.object({ const env = readEnv(envSchema); +const state = { + shuttingDown: false, + shutdownPromise: null as Promise | null +}; + +const getErrorMessage = (error: unknown): string => { + return error instanceof Error ? error.message : String(error); +}; + +const isExpectedShutdownError = (error: unknown): boolean => { + if (!state.shuttingDown) { + return false; + } + + const message = getErrorMessage(error).toUpperCase(); + return [ + "SOCKET CONNECTION WAS CLOSED UNEXPECTEDLY", + "SOCKET CLOSED UNEXPECTEDLY", + "ECONNREFUSED", + "CONNECTION_CLOSED", + "CONNECTION_DRAINING", + "TIMEOUT" + ].some((token) => message.includes(token)); +}; + const retry = async ( label: string, attempts: number, @@ -141,9 +166,13 @@ const emitCandle = async ( try { await insertEquityCandle(clickhouse, candle); } catch (error) { + if (isExpectedShutdownError(error)) { + return; + } + metrics.count("candles.persist_failed", 1); logger.error("failed to persist candle", { - error: error instanceof Error ? error.message : String(error), + error: getErrorMessage(error), trace_id: candle.trace_id, underlying_id: candle.underlying_id, interval_ms: candle.interval_ms @@ -158,9 +187,13 @@ const emitCandle = async ( try { await publishJson(js, SUBJECT_EQUITY_CANDLES, candle); } catch (error) { + if (isExpectedShutdownError(error)) { + return; + } + metrics.count("candles.publish_failed", 1); logger.error("failed to publish candle", { - error: error instanceof Error ? error.message : String(error), + error: getErrorMessage(error), trace_id: candle.trace_id, underlying_id: candle.underlying_id, interval_ms: candle.interval_ms @@ -171,9 +204,13 @@ const emitCandle = async ( try { await cacheCandle(redis, candle, cacheLimit); } catch (error) { + if (isExpectedShutdownError(error)) { + return; + } + metrics.count("candles.cache_failed", 1); logger.warn("failed to cache candle", { - error: error instanceof Error ? error.message : String(error), + error: getErrorMessage(error), trace_id: candle.trace_id, underlying_id: candle.underlying_id, interval_ms: candle.interval_ms @@ -242,8 +279,12 @@ const run = async () => { try { redis = createRedisClient(env.REDIS_URL); redis.on("error", (error) => { + if (isExpectedShutdownError(error)) { + return; + } + logger.warn("redis client error", { - error: error instanceof Error ? error.message : String(error) + error: getErrorMessage(error) }); }); await retry("redis connect", 20, 500, async () => { @@ -376,20 +417,51 @@ const run = async () => { }; const shutdown = async (signal: string) => { - logger.info("service stopping", { signal }); - clearInterval(flushTimer); - await flushExpired(); - const remaining = aggregator.drain(); - for (const candle of remaining) { - const validated = EquityCandleSchema.parse(candle); - await emitCandle(clickhouse, js, redis, validated, env.CANDLE_CACHE_LIMIT); + if (state.shutdownPromise) { + return state.shutdownPromise; } - if (redis && redis.isOpen) { - await redis.quit(); - } - await nc.drain(); - await clickhouse.close(); - process.exit(0); + + state.shuttingDown = true; + state.shutdownPromise = (async () => { + logger.info("service stopping", { signal }); + clearInterval(flushTimer); + await flushExpired(); + const remaining = aggregator.drain(); + for (const candle of remaining) { + const validated = EquityCandleSchema.parse(candle); + await emitCandle(clickhouse, js, redis, validated, env.CANDLE_CACHE_LIMIT); + } + + if (redis && redis.isOpen) { + try { + await redis.quit(); + } catch (error) { + if (!isExpectedShutdownError(error)) { + throw error; + } + } + } + + try { + await nc.drain(); + } catch (error) { + if (!isExpectedShutdownError(error)) { + throw error; + } + } + + try { + await clickhouse.close(); + } catch (error) { + if (!isExpectedShutdownError(error)) { + throw error; + } + } + + process.exit(0); + })(); + + return state.shutdownPromise; }; process.on("SIGINT", () => void shutdown("SIGINT")); diff --git a/services/compute/src/index.ts b/services/compute/src/index.ts index 9ac8732..733cb39 100644 --- a/services/compute/src/index.ts +++ b/services/compute/src/index.ts @@ -191,6 +191,31 @@ const roundTo = (value: number, digits = 4): number => { return Number(value.toFixed(digits)); }; +const getErrorCode = (error: unknown): string | null => { + if (error && typeof error === "object" && "code" in error) { + const code = (error as { code?: unknown }).code; + if (typeof code === "string" && code.length > 0) { + return code; + } + } + + if (error instanceof Error) { + const match = error.message.match(/\bCONNECTION_(?:DRAINING|CLOSED)\b/); + if (match?.[0]) { + return match[0]; + } + } + + if (typeof error === "string") { + const match = error.match(/\bCONNECTION_(?:DRAINING|CLOSED)\b/); + if (match?.[0]) { + return match[0]; + } + } + + return null; +}; + type NbboPlacement = "AA" | "A" | "B" | "BB" | "MID" | "MISSING" | "STALE"; type NbboPlacementCounts = { @@ -216,6 +241,7 @@ type ClusterState = { firstPrice: number; lastPrice: number; placements: NbboPlacementCounts; + flushed: boolean; }; const clusters = new Map(); @@ -225,6 +251,10 @@ const darkInferenceState = createDarkInferenceState(); const recentLegsByKey = new Map(); const recentLegsByRoot = new Map(); const recentStructureEmits = new Map(); +const runtimeState = { + shuttingDown: false, + shutdownPromise: null as Promise | null +}; const MAX_RECENT_LEGS = 20; @@ -232,6 +262,15 @@ const rollingKey = (metric: string, contractId: string): string => { return `rolling:${metric}:${contractId}`; }; +const buildPacketId = (cluster: ClusterState): string => { + return `flowpacket:${cluster.contractId}:${cluster.startTs}:${cluster.endTs}`; +}; + +const isExpectedShutdownNatsError = (error: unknown): boolean => { + const code = getErrorCode(error); + return runtimeState.shuttingDown && (code === "CONNECTION_DRAINING" || code === "CONNECTION_CLOSED"); +}; + const createPlacementCounts = (): NbboPlacementCounts => ({ aa: 0, a: 0, @@ -500,7 +539,8 @@ const buildCluster = (print: OptionPrint): ClusterState => { totalPremium: print.price * print.size, firstPrice: print.price, lastPrice: print.price, - placements + placements, + flushed: false }; }; @@ -612,8 +652,14 @@ const flushCluster = async ( rollingConfig: RollingStatsConfig, cluster: ClusterState ): Promise => { + if (cluster.flushed) { + return; + } + + cluster.flushed = true; const joinQuality: Record = {}; const nbboJoin = selectNbbo(cluster.contractId, cluster.endTs); + const packetId = buildPacketId(cluster); const totalPremium = roundTo(cluster.totalPremium); const totalNotional = roundTo(totalPremium * 100, 2); @@ -776,25 +822,38 @@ const flushCluster = async ( source_ts: cluster.startSourceTs, ingest_ts: cluster.endIngestTs, seq: cluster.endSeq, - trace_id: `flowpacket:${cluster.contractId}:${cluster.startTs}:${cluster.endTs}`, - id: `flowpacket:${cluster.contractId}:${cluster.startTs}:${cluster.endTs}`, + trace_id: packetId, + id: packetId, members: cluster.members, features, join_quality: joinQuality }; const validated = FlowPacketSchema.parse(packet); + try { + await insertFlowPacket(clickhouse, validated); + await publishJson(js, SUBJECT_FLOW_PACKETS, validated); - await insertFlowPacket(clickhouse, validated); - await publishJson(js, SUBJECT_FLOW_PACKETS, validated); + await emitClassifiers(clickhouse, js, validated); - await emitClassifiers(clickhouse, js, validated); + logger.info("emitted flow packet", { + id: validated.id, + contract: cluster.contractId, + count: cluster.members.length + }); + } catch (error) { + if (isExpectedShutdownNatsError(error)) { + logger.info("skipped flow packet publish during shutdown", { + id: packetId, + contract: cluster.contractId, + error: getErrorCode(error) ?? (error instanceof Error ? error.message : String(error)) + }); + return; + } - logger.info("emitted flow packet", { - id: validated.id, - contract: cluster.contractId, - count: cluster.members.length - }); + cluster.flushed = false; + throw error; + } }; const scoreAlert = (packet: FlowPacket, hits: ClassifierHitEvent[]): { score: number; severity: string } => { @@ -834,6 +893,9 @@ const emitClassifiers = async ( await insertClassifierHit(clickhouse, hit); await publishJson(js, SUBJECT_CLASSIFIER_HITS, hit); } catch (error) { + if (isExpectedShutdownNatsError(error)) { + continue; + } logger.error("failed to emit classifier hit", { error: error instanceof Error ? error.message : String(error), classifier_id: hit.classifier_id, @@ -863,6 +925,9 @@ const emitClassifiers = async ( await insertAlert(clickhouse, alert); await publishJson(js, SUBJECT_ALERTS, alert); } catch (error) { + if (isExpectedShutdownNatsError(error)) { + return; + } logger.error("failed to emit alert", { error: error instanceof Error ? error.message : String(error), packet_id: packet.id @@ -891,6 +956,9 @@ const emitEquityJoin = async ( try { await publishJson(js, SUBJECT_EQUITY_JOINS, payload); } catch (error) { + if (isExpectedShutdownNatsError(error)) { + return; + } logger.error("failed to publish equity print join", { error: error instanceof Error ? error.message : String(error), trace_id: payload.trace_id @@ -912,6 +980,9 @@ const emitDarkInferences = async ( await insertInferredDark(clickhouse, validated); await publishJson(js, SUBJECT_INFERRED_DARK, validated); } catch (error) { + if (isExpectedShutdownNatsError(error)) { + continue; + } logger.error("failed to emit inferred dark event", { error: error instanceof Error ? error.message : String(error), trace_id: validated.trace_id @@ -1377,6 +1448,10 @@ const run = async () => { const nbboLoop = async () => { for await (const msg of nbboSubscription.messages) { + if (runtimeState.shuttingDown) { + break; + } + try { const nbbo = OptionNBBOSchema.parse(nbboSubscription.decode(msg)); updateNbboCache(nbbo); @@ -1392,6 +1467,10 @@ const run = async () => { const equityQuoteLoop = async () => { for await (const msg of equityQuoteSubscription.messages) { + if (runtimeState.shuttingDown) { + break; + } + try { const quote = EquityQuoteSchema.parse(equityQuoteSubscription.decode(msg)); updateEquityQuoteCache(quote); @@ -1407,6 +1486,10 @@ const run = async () => { const equityPrintLoop = async () => { for await (const msg of equitySubscription.messages) { + if (runtimeState.shuttingDown) { + break; + } + try { const print = EquityPrintSchema.parse(equitySubscription.decode(msg)); await emitEquityJoin(clickhouse, js, print); @@ -1425,23 +1508,64 @@ const run = async () => { void equityPrintLoop(); const shutdown = async (signal: string) => { - logger.info("service stopping", { signal }); - - for (const cluster of clusters.values()) { - await flushCluster(clickhouse, js, redis, rollingConfig, cluster); + if (runtimeState.shutdownPromise) { + await runtimeState.shutdownPromise; + return; } - clusters.clear(); - await nc.drain(); - await clickhouse.close(); - await redis.quit(); - process.exit(0); + runtimeState.shuttingDown = true; + runtimeState.shutdownPromise = (async () => { + logger.info("service stopping", { signal }); + + for (const cluster of [...clusters.values()]) { + await flushCluster(clickhouse, js, redis, rollingConfig, cluster); + } + clusters.clear(); + + try { + await nc.drain(); + } catch (error) { + if (!isExpectedShutdownNatsError(error)) { + throw error; + } + } + + await clickhouse.close(); + if (redis.isOpen) { + await redis.quit(); + } + })(); + + try { + await runtimeState.shutdownPromise; + process.exit(0); + } catch (error) { + logger.error("service shutdown failed", { + error: error instanceof Error ? error.message : String(error) + }); + + try { + await clickhouse.close(); + } catch {} + + try { + if (redis.isOpen) { + await redis.quit(); + } + } catch {} + + process.exit(1); + } }; process.on("SIGINT", () => void shutdown("SIGINT")); process.on("SIGTERM", () => void shutdown("SIGTERM")); for await (const msg of subscription.messages) { + if (runtimeState.shuttingDown) { + break; + } + try { const print = OptionPrintSchema.parse(subscription.decode(msg)); await flushEligibleClusters( @@ -1453,6 +1577,10 @@ const run = async () => { print.option_contract_id ); + if (runtimeState.shuttingDown) { + break; + } + const existing = clusters.get(print.option_contract_id); if (!existing) { clusters.set(print.option_contract_id, buildCluster(print)); diff --git a/services/ingest-equities/src/index.ts b/services/ingest-equities/src/index.ts index 3644e7a..2a86c6e 100644 --- a/services/ingest-equities/src/index.ts +++ b/services/ingest-equities/src/index.ts @@ -65,7 +65,28 @@ const envSchema = z.object({ const env = readEnv(envSchema); const state = { - shuttingDown: false + shuttingDown: false, + shutdownPromise: null as Promise | null +}; + +const getErrorMessage = (error: unknown): string => { + return error instanceof Error ? error.message : String(error); +}; + +const isExpectedShutdownError = (error: unknown): boolean => { + if (!state.shuttingDown) { + return false; + } + + const message = getErrorMessage(error).toUpperCase(); + return [ + "SOCKET CONNECTION WAS CLOSED UNEXPECTEDLY", + "SOCKET CLOSED UNEXPECTEDLY", + "ECONNREFUSED", + "CONNECTION_CLOSED", + "CONNECTION_DRAINING", + "TIMEOUT" + ].some((token) => message.includes(token)); }; const buildThrottle = (enabled: boolean, throttleMs: number) => { @@ -223,8 +244,12 @@ const run = async () => { underlying_id: print.underlying_id }); } catch (error) { + if (isExpectedShutdownError(error)) { + return; + } + logger.error("failed to publish equity print", { - error: error instanceof Error ? error.message : String(error), + error: getErrorMessage(error), trace_id: print.trace_id }); } @@ -245,8 +270,12 @@ const run = async () => { await insertEquityQuote(clickhouse, quote); await publishJson(js, SUBJECT_EQUITY_QUOTES, quote); } catch (error) { + if (isExpectedShutdownError(error)) { + return; + } + logger.error("failed to publish equity quote", { - error: error instanceof Error ? error.message : String(error), + error: getErrorMessage(error), trace_id: quote.trace_id }); } @@ -254,18 +283,35 @@ const run = async () => { }); const shutdown = async (signal: string) => { - if (state.shuttingDown) { - return; + if (state.shutdownPromise) { + return state.shutdownPromise; } state.shuttingDown = true; - await stopAdapter(); + state.shutdownPromise = (async () => { + logger.info("service stopping", { signal }); + await stopAdapter(); - logger.info("service stopping", { signal }); + try { + await nc.drain(); + } catch (error) { + if (!isExpectedShutdownError(error)) { + throw error; + } + } - await nc.drain(); - await clickhouse.close(); - process.exit(0); + try { + await clickhouse.close(); + } catch (error) { + if (!isExpectedShutdownError(error)) { + throw error; + } + } + + process.exit(0); + })(); + + return state.shutdownPromise; }; process.on("SIGINT", () => void shutdown("SIGINT")); diff --git a/services/ingest-options/src/index.ts b/services/ingest-options/src/index.ts index 5d51678..9bbcccd 100644 --- a/services/ingest-options/src/index.ts +++ b/services/ingest-options/src/index.ts @@ -88,7 +88,28 @@ const envSchema = z.object({ const env = readEnv(envSchema); const state = { - shuttingDown: false + shuttingDown: false, + shutdownPromise: null as Promise | null +}; + +const getErrorMessage = (error: unknown): string => { + return error instanceof Error ? error.message : String(error); +}; + +const isExpectedShutdownError = (error: unknown): boolean => { + if (!state.shuttingDown) { + return false; + } + + const message = getErrorMessage(error).toUpperCase(); + return [ + "SOCKET CONNECTION WAS CLOSED UNEXPECTEDLY", + "SOCKET CLOSED UNEXPECTEDLY", + "ECONNREFUSED", + "CONNECTION_CLOSED", + "CONNECTION_DRAINING", + "TIMEOUT" + ].some((token) => message.includes(token)); }; const buildThrottle = (enabled: boolean, throttleMs: number) => { @@ -293,8 +314,12 @@ const run = async () => { option_contract_id: print.option_contract_id }); } catch (error) { + if (isExpectedShutdownError(error)) { + return; + } + logger.error("failed to publish option print", { - error: error instanceof Error ? error.message : String(error), + error: getErrorMessage(error), trace_id: print.trace_id }); } @@ -315,8 +340,12 @@ const run = async () => { await insertOptionNBBO(clickhouse, nbbo); await publishJson(js, SUBJECT_OPTION_NBBO, nbbo); } catch (error) { + if (isExpectedShutdownError(error)) { + return; + } + logger.error("failed to publish option nbbo", { - error: error instanceof Error ? error.message : String(error), + error: getErrorMessage(error), trace_id: nbbo.trace_id }); } @@ -324,18 +353,35 @@ const run = async () => { }); const shutdown = async (signal: string) => { - if (state.shuttingDown) { - return; + if (state.shutdownPromise) { + return state.shutdownPromise; } state.shuttingDown = true; - await stopAdapter(); + state.shutdownPromise = (async () => { + logger.info("service stopping", { signal }); + await stopAdapter(); - logger.info("service stopping", { signal }); + try { + await nc.drain(); + } catch (error) { + if (!isExpectedShutdownError(error)) { + throw error; + } + } - await nc.drain(); - await clickhouse.close(); - process.exit(0); + try { + await clickhouse.close(); + } catch (error) { + if (!isExpectedShutdownError(error)) { + throw error; + } + } + + process.exit(0); + })(); + + return state.shutdownPromise; }; process.on("SIGINT", () => void shutdown("SIGINT"));