total ui refactor

total ui refactor, deslopification, tuning, mildly bbg-inspired if u squint
This commit is contained in:
dirtydishes 2026-03-29 00:05:48 -04:00 committed by GitHub
commit b3c63a19d0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 5722 additions and 4655 deletions

View file

@ -0,0 +1,5 @@
import { ChartsRoute } from "../terminal";
export default function Page() {
return <ChartsRoute />;
}

File diff suppressed because it is too large Load diff

View file

@ -1,9 +1,29 @@
import "./globals.css"; import "./globals.css";
import type { ReactNode } from "react"; import type { ReactNode } from "react";
import { IBM_Plex_Mono, IBM_Plex_Sans, Quantico } from "next/font/google";
import { TerminalAppShell } from "./terminal";
const display = Quantico({
subsets: ["latin"],
weight: ["400", "700"],
variable: "--font-display"
});
const sans = IBM_Plex_Sans({
subsets: ["latin"],
weight: ["400", "500", "600"],
variable: "--font-sans"
});
const mono = IBM_Plex_Mono({
subsets: ["latin"],
weight: ["400", "500"],
variable: "--font-mono"
});
export const metadata = { export const metadata = {
title: "Islandflow", title: "Islandflow Terminal",
description: "Realtime options flow & off-exchange analysis" description: "Realtime options flow and off-exchange analysis terminal"
}; };
type RootLayoutProps = { type RootLayoutProps = {
@ -13,7 +33,9 @@ type RootLayoutProps = {
export default function RootLayout({ children }: RootLayoutProps) { export default function RootLayout({ children }: RootLayoutProps) {
return ( return (
<html lang="en"> <html lang="en">
<body>{children}</body> <body className={`${display.variable} ${sans.variable} ${mono.variable}`}>
<TerminalAppShell>{children}</TerminalAppShell>
</body>
</html> </html>
); );
} }

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,5 @@
import { ReplayRoute } from "../terminal";
export default function Page() {
return <ReplayRoute />;
}

View file

@ -0,0 +1,5 @@
import { SignalsRoute } from "../terminal";
export default function Page() {
return <SignalsRoute />;
}

View file

@ -0,0 +1,5 @@
import { TapeRoute } from "../terminal";
export default function Page() {
return <TapeRoute />;
}

4190
apps/web/app/terminal.tsx Normal file

File diff suppressed because it is too large Load diff

View file

@ -1,3 +1,6 @@
import { mkdir, readFile, rm, writeFile } from "node:fs/promises";
import path from "node:path";
type ChildSpec = { type ChildSpec = {
name: string; name: string;
cmd: string[]; cmd: string[];
@ -11,6 +14,10 @@ type Child = {
const children: Child[] = []; const children: Child[] = [];
let shuttingDown = false; let shuttingDown = false;
let shutdownPromise: Promise<void> | null = null;
let forceShutdownPromise: Promise<void> | null = null;
const stateDir = path.join(process.cwd(), ".tmp");
const pidFile = path.join(stateDir, "dev-services-runner-pids.json");
const sleep = (delayMs: number): Promise<void> => { const sleep = (delayMs: number): Promise<void> => {
return new Promise((resolve) => setTimeout(resolve, delayMs)); return new Promise((resolve) => setTimeout(resolve, delayMs));
@ -24,6 +31,26 @@ const waitForExit = async (proc: Bun.Subprocess, timeoutMs: number): Promise<boo
return result; return result;
}; };
const isPidRunning = (pid: number): boolean => {
try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
};
const waitForPidExit = async (pid: number, timeoutMs: number): Promise<boolean> => {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
if (!isPidRunning(pid)) {
return true;
}
await sleep(100);
}
return !isPidRunning(pid);
};
const signalProcess = (pid: number, signal: NodeJS.Signals): boolean => { const signalProcess = (pid: number, signal: NodeJS.Signals): boolean => {
try { try {
process.kill(-pid, signal); process.kill(-pid, signal);
@ -43,12 +70,15 @@ const stopChild = async (child: Child, timeoutMs = 5000): Promise<void> => {
if (!pid) { if (!pid) {
return; return;
} }
await stopPid(pid, timeoutMs);
};
const stopPid = async (pid: number, timeoutMs = 5000): Promise<void> => {
if (!signalProcess(pid, "SIGINT")) { if (!signalProcess(pid, "SIGINT")) {
return; return;
} }
const exited = await waitForExit(child.process, timeoutMs); const exited = await waitForPidExit(pid, timeoutMs);
if (exited) { if (exited) {
return; return;
} }
@ -57,19 +87,62 @@ const stopChild = async (child: Child, timeoutMs = 5000): Promise<void> => {
return; return;
} }
await waitForExit(child.process, 2000); await waitForPidExit(pid, 2000);
};
const persistChildren = async (): Promise<void> => {
await mkdir(stateDir, { recursive: true });
const payload = children
.map((child) => {
const pid = child.process.pid;
return pid ? { name: child.name, pid } : null;
})
.filter((value): value is { name: string; pid: number } => value !== null);
await writeFile(pidFile, JSON.stringify(payload, null, 2));
};
const clearPersistedChildren = async (): Promise<void> => {
await rm(pidFile, { force: true });
};
const cleanupStaleChildren = async (): Promise<void> => {
try {
const raw = await readFile(pidFile, "utf8");
const recorded = JSON.parse(raw) as Array<{ name?: string; pid?: number }>;
const stale = recorded.filter(
(entry): entry is { name: string; pid: number } =>
typeof entry?.name === "string" && typeof entry?.pid === "number" && isPidRunning(entry.pid)
);
if (stale.length > 0) {
console.log(
`[dev-services] Cleaning up stale processes from previous run: ${stale
.map((entry) => `${entry.name}(${entry.pid})`)
.join(", ")}`
);
}
for (const entry of stale) {
await stopPid(entry.pid, 3000);
}
} catch {
// No persisted children from a prior run.
} finally {
await clearPersistedChildren();
}
}; };
const spawnChild = ({ name, cmd, cwd }: ChildSpec): void => { const spawnChild = ({ name, cmd, cwd }: ChildSpec): void => {
const proc = Bun.spawn(cmd, { const proc = Bun.spawn(cmd, {
cwd, cwd,
detached: true,
stdin: "inherit", stdin: "inherit",
stdout: "inherit", stdout: "inherit",
stderr: "inherit", stderr: "inherit"
detached: true
}); });
children.push({ name, process: proc }); children.push({ name, process: proc });
void persistChildren();
proc.exited.then((code) => { proc.exited.then((code) => {
if (shuttingDown) { if (shuttingDown) {
@ -83,22 +156,68 @@ const spawnChild = ({ name, cmd, cwd }: ChildSpec): void => {
}); });
}; };
const shutdown = async (code: number): Promise<void> => { const forceShutdown = async (code: number): Promise<void> => {
if (shuttingDown) { if (forceShutdownPromise) {
return; return forceShutdownPromise;
} }
shuttingDown = true; shuttingDown = true;
forceShutdownPromise = (async () => {
await Promise.all(
children.map(async (child) => {
const pid = child.process.pid;
if (!pid) {
return;
}
if (!signalProcess(pid, "SIGKILL")) {
return;
}
await waitForPidExit(pid, 2000);
})
);
await clearPersistedChildren();
process.exit(code);
})();
return forceShutdownPromise;
};
const shutdown = async (code: number): Promise<void> => {
if (shutdownPromise) {
return shutdownPromise;
}
shuttingDown = true;
shutdownPromise = (async () => {
if (children.length > 0) { if (children.length > 0) {
await Promise.all(children.map((child) => stopChild(child))); await Promise.all(children.map((child) => stopChild(child)));
} }
await clearPersistedChildren();
process.exit(code); process.exit(code);
})();
return shutdownPromise;
}; };
process.on("SIGINT", () => void shutdown(0)); const handleSignal = (signal: NodeJS.Signals) => {
process.on("SIGTERM", () => void shutdown(0)); if (shuttingDown) {
if (signal === "SIGINT") {
console.error("[dev-services] Force shutdown requested. Terminating remaining processes.");
void forceShutdown(130);
}
return;
}
void shutdown(0);
};
process.on("SIGINT", () => handleSignal("SIGINT"));
process.on("SIGTERM", () => handleSignal("SIGTERM"));
process.on("SIGHUP", () => handleSignal("SIGHUP"));
const tasks: ChildSpec[] = [ const tasks: ChildSpec[] = [
{ name: "ingest-options", cmd: ["bun", "run", "dev"], cwd: "services/ingest-options" }, { name: "ingest-options", cmd: ["bun", "run", "dev"], cwd: "services/ingest-options" },
@ -110,6 +229,7 @@ const tasks: ChildSpec[] = [
{ name: "api", cmd: ["bun", "run", "dev"], cwd: "services/api" } { name: "api", cmd: ["bun", "run", "dev"], cwd: "services/api" }
]; ];
await cleanupStaleChildren();
for (const task of tasks) { for (const task of tasks) {
spawnChild(task); spawnChild(task);
} }

View file

@ -1,4 +1,6 @@
import net from "node:net"; import net from "node:net";
import { mkdir, readFile, rm, writeFile } from "node:fs/promises";
import path from "node:path";
type ChildSpec = { type ChildSpec = {
name: string; name: string;
@ -13,6 +15,10 @@ type Child = {
const children: Child[] = []; const children: Child[] = [];
let shuttingDown = false; let shuttingDown = false;
let shutdownPromise: Promise<void> | null = null;
let forceShutdownPromise: Promise<void> | null = null;
const stateDir = path.join(process.cwd(), ".tmp");
const pidFile = path.join(stateDir, "dev-runner-pids.json");
const sleep = (delayMs: number): Promise<void> => { const sleep = (delayMs: number): Promise<void> => {
return new Promise((resolve) => setTimeout(resolve, delayMs)); return new Promise((resolve) => setTimeout(resolve, delayMs));
@ -26,6 +32,26 @@ const waitForExit = async (proc: Bun.Subprocess, timeoutMs: number): Promise<boo
return result; return result;
}; };
const isPidRunning = (pid: number): boolean => {
try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
};
const waitForPidExit = async (pid: number, timeoutMs: number): Promise<boolean> => {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
if (!isPidRunning(pid)) {
return true;
}
await sleep(100);
}
return !isPidRunning(pid);
};
const signalProcess = (pid: number, signal: NodeJS.Signals): boolean => { const signalProcess = (pid: number, signal: NodeJS.Signals): boolean => {
try { try {
process.kill(-pid, signal); process.kill(-pid, signal);
@ -45,12 +71,15 @@ const stopChild = async (child: Child, timeoutMs = 5000): Promise<void> => {
if (!pid) { if (!pid) {
return; return;
} }
await stopPid(pid, timeoutMs);
};
const stopPid = async (pid: number, timeoutMs = 5000): Promise<void> => {
if (!signalProcess(pid, "SIGINT")) { if (!signalProcess(pid, "SIGINT")) {
return; return;
} }
const exited = await waitForExit(child.process, timeoutMs); const exited = await waitForPidExit(pid, timeoutMs);
if (exited) { if (exited) {
return; return;
} }
@ -59,7 +88,49 @@ const stopChild = async (child: Child, timeoutMs = 5000): Promise<void> => {
return; return;
} }
await waitForExit(child.process, 2000); await waitForPidExit(pid, 2000);
};
const persistChildren = async (): Promise<void> => {
await mkdir(stateDir, { recursive: true });
const payload = children
.map((child) => {
const pid = child.process.pid;
return pid ? { name: child.name, pid } : null;
})
.filter((value): value is { name: string; pid: number } => value !== null);
await writeFile(pidFile, JSON.stringify(payload, null, 2));
};
const clearPersistedChildren = async (): Promise<void> => {
await rm(pidFile, { force: true });
};
const cleanupStaleChildren = async (): Promise<void> => {
try {
const raw = await readFile(pidFile, "utf8");
const recorded = JSON.parse(raw) as Array<{ name?: string; pid?: number }>;
const stale = recorded.filter(
(entry): entry is { name: string; pid: number } =>
typeof entry?.name === "string" && typeof entry?.pid === "number" && isPidRunning(entry.pid)
);
if (stale.length > 0) {
console.log(
`[dev] Cleaning up stale processes from previous run: ${stale
.map((entry) => `${entry.name}(${entry.pid})`)
.join(", ")}`
);
}
for (const entry of stale) {
await stopPid(entry.pid, 3000);
}
} catch {
// No persisted children from a prior run.
} finally {
await clearPersistedChildren();
}
}; };
const parseBool = (value: string | undefined): boolean => { const parseBool = (value: string | undefined): boolean => {
@ -117,13 +188,14 @@ const checkHttp = async (url: string): Promise<boolean> => {
const spawnChild = ({ name, cmd, cwd }: ChildSpec): void => { const spawnChild = ({ name, cmd, cwd }: ChildSpec): void => {
const proc = Bun.spawn(cmd, { const proc = Bun.spawn(cmd, {
cwd, cwd,
detached: true,
stdin: "inherit", stdin: "inherit",
stdout: "inherit", stdout: "inherit",
stderr: "inherit", stderr: "inherit"
detached: true
}); });
children.push({ name, process: proc }); children.push({ name, process: proc });
void persistChildren();
proc.exited.then((code) => { proc.exited.then((code) => {
if (shuttingDown) { if (shuttingDown) {
@ -142,13 +214,42 @@ const spawnChild = ({ name, cmd, cwd }: ChildSpec): void => {
}); });
}; };
const shutdown = async (code: number): Promise<void> => { const forceShutdown = async (code: number): Promise<void> => {
if (shuttingDown) { if (forceShutdownPromise) {
return; return forceShutdownPromise;
} }
shuttingDown = true; shuttingDown = true;
forceShutdownPromise = (async () => {
await Promise.all(
children.map(async (child) => {
const pid = child.process.pid;
if (!pid) {
return;
}
if (!signalProcess(pid, "SIGKILL")) {
return;
}
await waitForPidExit(pid, 2000);
})
);
await clearPersistedChildren();
process.exit(code);
})();
return forceShutdownPromise;
};
const shutdown = async (code: number): Promise<void> => {
if (shutdownPromise) {
return shutdownPromise;
}
shuttingDown = true;
shutdownPromise = (async () => {
const infra = children.find((child) => child.name === "infra") ?? null; const infra = children.find((child) => child.name === "infra") ?? null;
const services = children.filter((child) => child.name !== "infra"); const services = children.filter((child) => child.name !== "infra");
@ -160,11 +261,28 @@ const shutdown = async (code: number): Promise<void> => {
await stopChild(infra, 8000); await stopChild(infra, 8000);
} }
await clearPersistedChildren();
process.exit(code); process.exit(code);
})();
return shutdownPromise;
}; };
process.on("SIGINT", () => void shutdown(0)); const handleSignal = (signal: NodeJS.Signals) => {
process.on("SIGTERM", () => void shutdown(0)); if (shuttingDown) {
if (signal === "SIGINT") {
console.error("[dev] Force shutdown requested. Terminating remaining processes.");
void forceShutdown(130);
}
return;
}
void shutdown(0);
};
process.on("SIGINT", () => handleSignal("SIGINT"));
process.on("SIGTERM", () => handleSignal("SIGTERM"));
process.on("SIGHUP", () => handleSignal("SIGHUP"));
const waitForInfra = async (): Promise<void> => { const waitForInfra = async (): Promise<void> => {
const natsTarget = parseUrlHostPort(process.env.NATS_URL ?? "", "127.0.0.1", 4222); const natsTarget = parseUrlHostPort(process.env.NATS_URL ?? "", "127.0.0.1", 4222);
@ -218,6 +336,7 @@ if (parseBool(process.env.REPLAY_ENABLED)) {
serviceTasks.push({ name: "replay", cmd: ["bun", "run", "dev"], cwd: "services/replay" }); serviceTasks.push({ name: "replay", cmd: ["bun", "run", "dev"], cwd: "services/replay" });
} }
await cleanupStaleChildren();
spawnChild(infraTask); spawnChild(infraTask);
await waitForInfra(); await waitForInfra();

View file

@ -89,6 +89,31 @@ const envSchema = z.object({
const env = readEnv(envSchema); const env = readEnv(envSchema);
const state = {
shuttingDown: false,
shutdownPromise: null as Promise<void> | null
};
const getErrorMessage = (error: unknown): string => {
return error instanceof Error ? error.message : String(error);
};
const isExpectedShutdownError = (error: unknown): boolean => {
if (!state.shuttingDown) {
return false;
}
const message = getErrorMessage(error).toUpperCase();
return [
"SOCKET CONNECTION WAS CLOSED UNEXPECTEDLY",
"SOCKET CLOSED UNEXPECTEDLY",
"ECONNREFUSED",
"CONNECTION_CLOSED",
"CONNECTION_DRAINING",
"TIMEOUT"
].some((token) => message.includes(token));
};
const retry = async <T>( const retry = async <T>(
label: string, label: string,
attempts: number, attempts: number,
@ -517,8 +542,12 @@ const run = async () => {
try { try {
redis = createClient({ url: env.REDIS_URL }); redis = createClient({ url: env.REDIS_URL });
redis.on("error", (error) => { redis.on("error", (error) => {
if (isExpectedShutdownError(error)) {
return;
}
logger.warn("redis client error", { logger.warn("redis client error", {
error: error instanceof Error ? error.message : String(error) error: getErrorMessage(error)
}); });
}); });
await retry("redis connect", 5, 500, async () => { await retry("redis connect", 5, 500, async () => {
@ -1150,14 +1179,45 @@ const run = async () => {
logger.info("api listening", { port: server.port }); logger.info("api listening", { port: server.port });
const shutdown = async (signal: string) => { const shutdown = async (signal: string) => {
if (state.shutdownPromise) {
return state.shutdownPromise;
}
state.shuttingDown = true;
state.shutdownPromise = (async () => {
logger.info("service stopping", { signal }); logger.info("service stopping", { signal });
server.stop(); server.stop();
if (redis && redis.isOpen) { if (redis && redis.isOpen) {
try {
await redis.quit(); await redis.quit();
} catch (error) {
if (!isExpectedShutdownError(error)) {
throw error;
} }
}
}
try {
await nc.drain(); await nc.drain();
} catch (error) {
if (!isExpectedShutdownError(error)) {
throw error;
}
}
try {
await clickhouse.close(); await clickhouse.close();
} catch (error) {
if (!isExpectedShutdownError(error)) {
throw error;
}
}
process.exit(0); process.exit(0);
})();
return state.shutdownPromise;
}; };
process.on("SIGINT", () => void shutdown("SIGINT")); process.on("SIGINT", () => void shutdown("SIGINT"));

View file

@ -54,6 +54,31 @@ const envSchema = z.object({
const env = readEnv(envSchema); const env = readEnv(envSchema);
const state = {
shuttingDown: false,
shutdownPromise: null as Promise<void> | null
};
const getErrorMessage = (error: unknown): string => {
return error instanceof Error ? error.message : String(error);
};
const isExpectedShutdownError = (error: unknown): boolean => {
if (!state.shuttingDown) {
return false;
}
const message = getErrorMessage(error).toUpperCase();
return [
"SOCKET CONNECTION WAS CLOSED UNEXPECTEDLY",
"SOCKET CLOSED UNEXPECTEDLY",
"ECONNREFUSED",
"CONNECTION_CLOSED",
"CONNECTION_DRAINING",
"TIMEOUT"
].some((token) => message.includes(token));
};
const retry = async <T>( const retry = async <T>(
label: string, label: string,
attempts: number, attempts: number,
@ -141,9 +166,13 @@ const emitCandle = async (
try { try {
await insertEquityCandle(clickhouse, candle); await insertEquityCandle(clickhouse, candle);
} catch (error) { } catch (error) {
if (isExpectedShutdownError(error)) {
return;
}
metrics.count("candles.persist_failed", 1); metrics.count("candles.persist_failed", 1);
logger.error("failed to persist candle", { logger.error("failed to persist candle", {
error: error instanceof Error ? error.message : String(error), error: getErrorMessage(error),
trace_id: candle.trace_id, trace_id: candle.trace_id,
underlying_id: candle.underlying_id, underlying_id: candle.underlying_id,
interval_ms: candle.interval_ms interval_ms: candle.interval_ms
@ -158,9 +187,13 @@ const emitCandle = async (
try { try {
await publishJson(js, SUBJECT_EQUITY_CANDLES, candle); await publishJson(js, SUBJECT_EQUITY_CANDLES, candle);
} catch (error) { } catch (error) {
if (isExpectedShutdownError(error)) {
return;
}
metrics.count("candles.publish_failed", 1); metrics.count("candles.publish_failed", 1);
logger.error("failed to publish candle", { logger.error("failed to publish candle", {
error: error instanceof Error ? error.message : String(error), error: getErrorMessage(error),
trace_id: candle.trace_id, trace_id: candle.trace_id,
underlying_id: candle.underlying_id, underlying_id: candle.underlying_id,
interval_ms: candle.interval_ms interval_ms: candle.interval_ms
@ -171,9 +204,13 @@ const emitCandle = async (
try { try {
await cacheCandle(redis, candle, cacheLimit); await cacheCandle(redis, candle, cacheLimit);
} catch (error) { } catch (error) {
if (isExpectedShutdownError(error)) {
return;
}
metrics.count("candles.cache_failed", 1); metrics.count("candles.cache_failed", 1);
logger.warn("failed to cache candle", { logger.warn("failed to cache candle", {
error: error instanceof Error ? error.message : String(error), error: getErrorMessage(error),
trace_id: candle.trace_id, trace_id: candle.trace_id,
underlying_id: candle.underlying_id, underlying_id: candle.underlying_id,
interval_ms: candle.interval_ms interval_ms: candle.interval_ms
@ -242,8 +279,12 @@ const run = async () => {
try { try {
redis = createRedisClient(env.REDIS_URL); redis = createRedisClient(env.REDIS_URL);
redis.on("error", (error) => { redis.on("error", (error) => {
if (isExpectedShutdownError(error)) {
return;
}
logger.warn("redis client error", { logger.warn("redis client error", {
error: error instanceof Error ? error.message : String(error) error: getErrorMessage(error)
}); });
}); });
await retry("redis connect", 20, 500, async () => { await retry("redis connect", 20, 500, async () => {
@ -376,6 +417,12 @@ const run = async () => {
}; };
const shutdown = async (signal: string) => { const shutdown = async (signal: string) => {
if (state.shutdownPromise) {
return state.shutdownPromise;
}
state.shuttingDown = true;
state.shutdownPromise = (async () => {
logger.info("service stopping", { signal }); logger.info("service stopping", { signal });
clearInterval(flushTimer); clearInterval(flushTimer);
await flushExpired(); await flushExpired();
@ -384,12 +431,37 @@ const run = async () => {
const validated = EquityCandleSchema.parse(candle); const validated = EquityCandleSchema.parse(candle);
await emitCandle(clickhouse, js, redis, validated, env.CANDLE_CACHE_LIMIT); await emitCandle(clickhouse, js, redis, validated, env.CANDLE_CACHE_LIMIT);
} }
if (redis && redis.isOpen) { if (redis && redis.isOpen) {
try {
await redis.quit(); await redis.quit();
} catch (error) {
if (!isExpectedShutdownError(error)) {
throw error;
} }
}
}
try {
await nc.drain(); await nc.drain();
} catch (error) {
if (!isExpectedShutdownError(error)) {
throw error;
}
}
try {
await clickhouse.close(); await clickhouse.close();
} catch (error) {
if (!isExpectedShutdownError(error)) {
throw error;
}
}
process.exit(0); process.exit(0);
})();
return state.shutdownPromise;
}; };
process.on("SIGINT", () => void shutdown("SIGINT")); process.on("SIGINT", () => void shutdown("SIGINT"));

View file

@ -192,6 +192,31 @@ const roundTo = (value: number, digits = 4): number => {
return Number(value.toFixed(digits)); return Number(value.toFixed(digits));
}; };
const getErrorCode = (error: unknown): string | null => {
if (error && typeof error === "object" && "code" in error) {
const code = (error as { code?: unknown }).code;
if (typeof code === "string" && code.length > 0) {
return code;
}
}
if (error instanceof Error) {
const match = error.message.match(/\bCONNECTION_(?:DRAINING|CLOSED)\b/);
if (match?.[0]) {
return match[0];
}
}
if (typeof error === "string") {
const match = error.match(/\bCONNECTION_(?:DRAINING|CLOSED)\b/);
if (match?.[0]) {
return match[0];
}
}
return null;
};
type NbboPlacement = "AA" | "A" | "B" | "BB" | "MID" | "MISSING" | "STALE"; type NbboPlacement = "AA" | "A" | "B" | "BB" | "MID" | "MISSING" | "STALE";
type NbboPlacementCounts = { type NbboPlacementCounts = {
@ -217,6 +242,7 @@ type ClusterState = {
firstPrice: number; firstPrice: number;
lastPrice: number; lastPrice: number;
placements: NbboPlacementCounts; placements: NbboPlacementCounts;
flushed: boolean;
}; };
const clusters = new Map<string, ClusterState>(); const clusters = new Map<string, ClusterState>();
@ -226,6 +252,10 @@ const darkInferenceState = createDarkInferenceState();
const recentLegsByKey = new Map<string, LegEvidence[]>(); const recentLegsByKey = new Map<string, LegEvidence[]>();
const recentLegsByRoot = new Map<string, LegEvidence[]>(); const recentLegsByRoot = new Map<string, LegEvidence[]>();
const recentStructureEmits = new Map<string, number>(); const recentStructureEmits = new Map<string, number>();
const runtimeState = {
shuttingDown: false,
shutdownPromise: null as Promise<void> | null
};
const MAX_RECENT_LEGS = 20; const MAX_RECENT_LEGS = 20;
@ -233,6 +263,15 @@ const rollingKey = (metric: string, contractId: string): string => {
return `rolling:${metric}:${contractId}`; return `rolling:${metric}:${contractId}`;
}; };
const buildPacketId = (cluster: ClusterState): string => {
return `flowpacket:${cluster.contractId}:${cluster.startTs}:${cluster.endTs}`;
};
const isExpectedShutdownNatsError = (error: unknown): boolean => {
const code = getErrorCode(error);
return runtimeState.shuttingDown && (code === "CONNECTION_DRAINING" || code === "CONNECTION_CLOSED");
};
const createPlacementCounts = (): NbboPlacementCounts => ({ const createPlacementCounts = (): NbboPlacementCounts => ({
aa: 0, aa: 0,
a: 0, a: 0,
@ -501,7 +540,8 @@ const buildCluster = (print: OptionPrint): ClusterState => {
totalPremium: print.price * print.size, totalPremium: print.price * print.size,
firstPrice: print.price, firstPrice: print.price,
lastPrice: print.price, lastPrice: print.price,
placements placements,
flushed: false
}; };
}; };
@ -613,8 +653,14 @@ const flushCluster = async (
rollingConfig: RollingStatsConfig, rollingConfig: RollingStatsConfig,
cluster: ClusterState cluster: ClusterState
): Promise<void> => { ): Promise<void> => {
if (cluster.flushed) {
return;
}
cluster.flushed = true;
const joinQuality: Record<string, number> = {}; const joinQuality: Record<string, number> = {};
const nbboJoin = selectNbbo(cluster.contractId, cluster.endTs); const nbboJoin = selectNbbo(cluster.contractId, cluster.endTs);
const packetId = buildPacketId(cluster);
const totalPremium = roundTo(cluster.totalPremium); const totalPremium = roundTo(cluster.totalPremium);
const totalNotional = roundTo(totalPremium * 100, 2); const totalNotional = roundTo(totalPremium * 100, 2);
@ -777,15 +823,15 @@ const flushCluster = async (
source_ts: cluster.startSourceTs, source_ts: cluster.startSourceTs,
ingest_ts: cluster.endIngestTs, ingest_ts: cluster.endIngestTs,
seq: cluster.endSeq, seq: cluster.endSeq,
trace_id: `flowpacket:${cluster.contractId}:${cluster.startTs}:${cluster.endTs}`, trace_id: packetId,
id: `flowpacket:${cluster.contractId}:${cluster.startTs}:${cluster.endTs}`, id: packetId,
members: cluster.members, members: cluster.members,
features, features,
join_quality: joinQuality join_quality: joinQuality
}; };
const validated = FlowPacketSchema.parse(packet); const validated = FlowPacketSchema.parse(packet);
try {
await insertFlowPacket(clickhouse, validated); await insertFlowPacket(clickhouse, validated);
await publishJson(js, SUBJECT_FLOW_PACKETS, validated); await publishJson(js, SUBJECT_FLOW_PACKETS, validated);
@ -796,6 +842,19 @@ const flushCluster = async (
contract: cluster.contractId, contract: cluster.contractId,
count: cluster.members.length count: cluster.members.length
}); });
} catch (error) {
if (isExpectedShutdownNatsError(error)) {
logger.info("skipped flow packet publish during shutdown", {
id: packetId,
contract: cluster.contractId,
error: getErrorCode(error) ?? (error instanceof Error ? error.message : String(error))
});
return;
}
cluster.flushed = false;
throw error;
}
}; };
const emitClassifiers = async ( const emitClassifiers = async (
@ -823,6 +882,9 @@ const emitClassifiers = async (
await insertClassifierHit(clickhouse, hit); await insertClassifierHit(clickhouse, hit);
await publishJson(js, SUBJECT_CLASSIFIER_HITS, hit); await publishJson(js, SUBJECT_CLASSIFIER_HITS, hit);
} catch (error) { } catch (error) {
if (isExpectedShutdownNatsError(error)) {
continue;
}
logger.error("failed to emit classifier hit", { logger.error("failed to emit classifier hit", {
error: error instanceof Error ? error.message : String(error), error: error instanceof Error ? error.message : String(error),
classifier_id: hit.classifier_id, classifier_id: hit.classifier_id,
@ -852,6 +914,9 @@ const emitClassifiers = async (
await insertAlert(clickhouse, alert); await insertAlert(clickhouse, alert);
await publishJson(js, SUBJECT_ALERTS, alert); await publishJson(js, SUBJECT_ALERTS, alert);
} catch (error) { } catch (error) {
if (isExpectedShutdownNatsError(error)) {
return;
}
logger.error("failed to emit alert", { logger.error("failed to emit alert", {
error: error instanceof Error ? error.message : String(error), error: error instanceof Error ? error.message : String(error),
packet_id: packet.id packet_id: packet.id
@ -880,6 +945,9 @@ const emitEquityJoin = async (
try { try {
await publishJson(js, SUBJECT_EQUITY_JOINS, payload); await publishJson(js, SUBJECT_EQUITY_JOINS, payload);
} catch (error) { } catch (error) {
if (isExpectedShutdownNatsError(error)) {
return;
}
logger.error("failed to publish equity print join", { logger.error("failed to publish equity print join", {
error: error instanceof Error ? error.message : String(error), error: error instanceof Error ? error.message : String(error),
trace_id: payload.trace_id trace_id: payload.trace_id
@ -901,6 +969,9 @@ const emitDarkInferences = async (
await insertInferredDark(clickhouse, validated); await insertInferredDark(clickhouse, validated);
await publishJson(js, SUBJECT_INFERRED_DARK, validated); await publishJson(js, SUBJECT_INFERRED_DARK, validated);
} catch (error) { } catch (error) {
if (isExpectedShutdownNatsError(error)) {
continue;
}
logger.error("failed to emit inferred dark event", { logger.error("failed to emit inferred dark event", {
error: error instanceof Error ? error.message : String(error), error: error instanceof Error ? error.message : String(error),
trace_id: validated.trace_id trace_id: validated.trace_id
@ -1366,6 +1437,10 @@ const run = async () => {
const nbboLoop = async () => { const nbboLoop = async () => {
for await (const msg of nbboSubscription.messages) { for await (const msg of nbboSubscription.messages) {
if (runtimeState.shuttingDown) {
break;
}
try { try {
const nbbo = OptionNBBOSchema.parse(nbboSubscription.decode(msg)); const nbbo = OptionNBBOSchema.parse(nbboSubscription.decode(msg));
updateNbboCache(nbbo); updateNbboCache(nbbo);
@ -1381,6 +1456,10 @@ const run = async () => {
const equityQuoteLoop = async () => { const equityQuoteLoop = async () => {
for await (const msg of equityQuoteSubscription.messages) { for await (const msg of equityQuoteSubscription.messages) {
if (runtimeState.shuttingDown) {
break;
}
try { try {
const quote = EquityQuoteSchema.parse(equityQuoteSubscription.decode(msg)); const quote = EquityQuoteSchema.parse(equityQuoteSubscription.decode(msg));
updateEquityQuoteCache(quote); updateEquityQuoteCache(quote);
@ -1396,6 +1475,10 @@ const run = async () => {
const equityPrintLoop = async () => { const equityPrintLoop = async () => {
for await (const msg of equitySubscription.messages) { for await (const msg of equitySubscription.messages) {
if (runtimeState.shuttingDown) {
break;
}
try { try {
const print = EquityPrintSchema.parse(equitySubscription.decode(msg)); const print = EquityPrintSchema.parse(equitySubscription.decode(msg));
await emitEquityJoin(clickhouse, js, print); await emitEquityJoin(clickhouse, js, print);
@ -1414,23 +1497,64 @@ const run = async () => {
void equityPrintLoop(); void equityPrintLoop();
const shutdown = async (signal: string) => { const shutdown = async (signal: string) => {
if (runtimeState.shutdownPromise) {
await runtimeState.shutdownPromise;
return;
}
runtimeState.shuttingDown = true;
runtimeState.shutdownPromise = (async () => {
logger.info("service stopping", { signal }); logger.info("service stopping", { signal });
for (const cluster of clusters.values()) { for (const cluster of [...clusters.values()]) {
await flushCluster(clickhouse, js, redis, rollingConfig, cluster); await flushCluster(clickhouse, js, redis, rollingConfig, cluster);
} }
clusters.clear(); clusters.clear();
try {
await nc.drain(); await nc.drain();
} catch (error) {
if (!isExpectedShutdownNatsError(error)) {
throw error;
}
}
await clickhouse.close(); await clickhouse.close();
if (redis.isOpen) {
await redis.quit(); await redis.quit();
}
})();
try {
await runtimeState.shutdownPromise;
process.exit(0); process.exit(0);
} catch (error) {
logger.error("service shutdown failed", {
error: error instanceof Error ? error.message : String(error)
});
try {
await clickhouse.close();
} catch {}
try {
if (redis.isOpen) {
await redis.quit();
}
} catch {}
process.exit(1);
}
}; };
process.on("SIGINT", () => void shutdown("SIGINT")); process.on("SIGINT", () => void shutdown("SIGINT"));
process.on("SIGTERM", () => void shutdown("SIGTERM")); process.on("SIGTERM", () => void shutdown("SIGTERM"));
for await (const msg of subscription.messages) { for await (const msg of subscription.messages) {
if (runtimeState.shuttingDown) {
break;
}
try { try {
const print = OptionPrintSchema.parse(subscription.decode(msg)); const print = OptionPrintSchema.parse(subscription.decode(msg));
await flushEligibleClusters( await flushEligibleClusters(
@ -1442,6 +1566,10 @@ const run = async () => {
print.option_contract_id print.option_contract_id
); );
if (runtimeState.shuttingDown) {
break;
}
const existing = clusters.get(print.option_contract_id); const existing = clusters.get(print.option_contract_id);
if (!existing) { if (!existing) {
clusters.set(print.option_contract_id, buildCluster(print)); clusters.set(print.option_contract_id, buildCluster(print));

View file

@ -65,7 +65,28 @@ const envSchema = z.object({
const env = readEnv(envSchema); const env = readEnv(envSchema);
const state = { const state = {
shuttingDown: false shuttingDown: false,
shutdownPromise: null as Promise<void> | null
};
const getErrorMessage = (error: unknown): string => {
return error instanceof Error ? error.message : String(error);
};
const isExpectedShutdownError = (error: unknown): boolean => {
if (!state.shuttingDown) {
return false;
}
const message = getErrorMessage(error).toUpperCase();
return [
"SOCKET CONNECTION WAS CLOSED UNEXPECTEDLY",
"SOCKET CLOSED UNEXPECTEDLY",
"ECONNREFUSED",
"CONNECTION_CLOSED",
"CONNECTION_DRAINING",
"TIMEOUT"
].some((token) => message.includes(token));
}; };
const buildThrottle = (enabled: boolean, throttleMs: number) => { const buildThrottle = (enabled: boolean, throttleMs: number) => {
@ -223,8 +244,12 @@ const run = async () => {
underlying_id: print.underlying_id underlying_id: print.underlying_id
}); });
} catch (error) { } catch (error) {
if (isExpectedShutdownError(error)) {
return;
}
logger.error("failed to publish equity print", { logger.error("failed to publish equity print", {
error: error instanceof Error ? error.message : String(error), error: getErrorMessage(error),
trace_id: print.trace_id trace_id: print.trace_id
}); });
} }
@ -245,8 +270,12 @@ const run = async () => {
await insertEquityQuote(clickhouse, quote); await insertEquityQuote(clickhouse, quote);
await publishJson(js, SUBJECT_EQUITY_QUOTES, quote); await publishJson(js, SUBJECT_EQUITY_QUOTES, quote);
} catch (error) { } catch (error) {
if (isExpectedShutdownError(error)) {
return;
}
logger.error("failed to publish equity quote", { logger.error("failed to publish equity quote", {
error: error instanceof Error ? error.message : String(error), error: getErrorMessage(error),
trace_id: quote.trace_id trace_id: quote.trace_id
}); });
} }
@ -254,18 +283,35 @@ const run = async () => {
}); });
const shutdown = async (signal: string) => { const shutdown = async (signal: string) => {
if (state.shuttingDown) { if (state.shutdownPromise) {
return; return state.shutdownPromise;
} }
state.shuttingDown = true; state.shuttingDown = true;
state.shutdownPromise = (async () => {
logger.info("service stopping", { signal });
await stopAdapter(); await stopAdapter();
logger.info("service stopping", { signal }); try {
await nc.drain(); await nc.drain();
} catch (error) {
if (!isExpectedShutdownError(error)) {
throw error;
}
}
try {
await clickhouse.close(); await clickhouse.close();
} catch (error) {
if (!isExpectedShutdownError(error)) {
throw error;
}
}
process.exit(0); process.exit(0);
})();
return state.shutdownPromise;
}; };
process.on("SIGINT", () => void shutdown("SIGINT")); process.on("SIGINT", () => void shutdown("SIGINT"));

View file

@ -88,7 +88,28 @@ const envSchema = z.object({
const env = readEnv(envSchema); const env = readEnv(envSchema);
const state = { const state = {
shuttingDown: false shuttingDown: false,
shutdownPromise: null as Promise<void> | null
};
const getErrorMessage = (error: unknown): string => {
return error instanceof Error ? error.message : String(error);
};
const isExpectedShutdownError = (error: unknown): boolean => {
if (!state.shuttingDown) {
return false;
}
const message = getErrorMessage(error).toUpperCase();
return [
"SOCKET CONNECTION WAS CLOSED UNEXPECTEDLY",
"SOCKET CLOSED UNEXPECTEDLY",
"ECONNREFUSED",
"CONNECTION_CLOSED",
"CONNECTION_DRAINING",
"TIMEOUT"
].some((token) => message.includes(token));
}; };
const buildThrottle = (enabled: boolean, throttleMs: number) => { const buildThrottle = (enabled: boolean, throttleMs: number) => {
@ -293,8 +314,12 @@ const run = async () => {
option_contract_id: print.option_contract_id option_contract_id: print.option_contract_id
}); });
} catch (error) { } catch (error) {
if (isExpectedShutdownError(error)) {
return;
}
logger.error("failed to publish option print", { logger.error("failed to publish option print", {
error: error instanceof Error ? error.message : String(error), error: getErrorMessage(error),
trace_id: print.trace_id trace_id: print.trace_id
}); });
} }
@ -315,8 +340,12 @@ const run = async () => {
await insertOptionNBBO(clickhouse, nbbo); await insertOptionNBBO(clickhouse, nbbo);
await publishJson(js, SUBJECT_OPTION_NBBO, nbbo); await publishJson(js, SUBJECT_OPTION_NBBO, nbbo);
} catch (error) { } catch (error) {
if (isExpectedShutdownError(error)) {
return;
}
logger.error("failed to publish option nbbo", { logger.error("failed to publish option nbbo", {
error: error instanceof Error ? error.message : String(error), error: getErrorMessage(error),
trace_id: nbbo.trace_id trace_id: nbbo.trace_id
}); });
} }
@ -324,18 +353,35 @@ const run = async () => {
}); });
const shutdown = async (signal: string) => { const shutdown = async (signal: string) => {
if (state.shuttingDown) { if (state.shutdownPromise) {
return; return state.shutdownPromise;
} }
state.shuttingDown = true; state.shuttingDown = true;
state.shutdownPromise = (async () => {
logger.info("service stopping", { signal });
await stopAdapter(); await stopAdapter();
logger.info("service stopping", { signal }); try {
await nc.drain(); await nc.drain();
} catch (error) {
if (!isExpectedShutdownError(error)) {
throw error;
}
}
try {
await clickhouse.close(); await clickhouse.close();
} catch (error) {
if (!isExpectedShutdownError(error)) {
throw error;
}
}
process.exit(0); process.exit(0);
})();
return state.shutdownPromise;
}; };
process.on("SIGINT", () => void shutdown("SIGINT")); process.on("SIGINT", () => void shutdown("SIGINT"));