tune shutdown and runner cleanup

This commit is contained in:
Kellan Drucquer 2026-03-28 23:54:16 -04:00
parent b6d83663d3
commit 776ac7842f
7 changed files with 687 additions and 96 deletions

View file

@ -191,6 +191,31 @@ const roundTo = (value: number, digits = 4): number => {
return Number(value.toFixed(digits));
};
const getErrorCode = (error: unknown): string | null => {
if (error && typeof error === "object" && "code" in error) {
const code = (error as { code?: unknown }).code;
if (typeof code === "string" && code.length > 0) {
return code;
}
}
if (error instanceof Error) {
const match = error.message.match(/\bCONNECTION_(?:DRAINING|CLOSED)\b/);
if (match?.[0]) {
return match[0];
}
}
if (typeof error === "string") {
const match = error.match(/\bCONNECTION_(?:DRAINING|CLOSED)\b/);
if (match?.[0]) {
return match[0];
}
}
return null;
};
type NbboPlacement = "AA" | "A" | "B" | "BB" | "MID" | "MISSING" | "STALE";
type NbboPlacementCounts = {
@ -216,6 +241,7 @@ type ClusterState = {
firstPrice: number;
lastPrice: number;
placements: NbboPlacementCounts;
flushed: boolean;
};
const clusters = new Map<string, ClusterState>();
@ -225,6 +251,10 @@ const darkInferenceState = createDarkInferenceState();
const recentLegsByKey = new Map<string, LegEvidence[]>();
const recentLegsByRoot = new Map<string, LegEvidence[]>();
const recentStructureEmits = new Map<string, number>();
const runtimeState = {
shuttingDown: false,
shutdownPromise: null as Promise<void> | null
};
const MAX_RECENT_LEGS = 20;
@ -232,6 +262,15 @@ const rollingKey = (metric: string, contractId: string): string => {
return `rolling:${metric}:${contractId}`;
};
const buildPacketId = (cluster: ClusterState): string => {
return `flowpacket:${cluster.contractId}:${cluster.startTs}:${cluster.endTs}`;
};
const isExpectedShutdownNatsError = (error: unknown): boolean => {
const code = getErrorCode(error);
return runtimeState.shuttingDown && (code === "CONNECTION_DRAINING" || code === "CONNECTION_CLOSED");
};
const createPlacementCounts = (): NbboPlacementCounts => ({
aa: 0,
a: 0,
@ -500,7 +539,8 @@ const buildCluster = (print: OptionPrint): ClusterState => {
totalPremium: print.price * print.size,
firstPrice: print.price,
lastPrice: print.price,
placements
placements,
flushed: false
};
};
@ -612,8 +652,14 @@ const flushCluster = async (
rollingConfig: RollingStatsConfig,
cluster: ClusterState
): Promise<void> => {
if (cluster.flushed) {
return;
}
cluster.flushed = true;
const joinQuality: Record<string, number> = {};
const nbboJoin = selectNbbo(cluster.contractId, cluster.endTs);
const packetId = buildPacketId(cluster);
const totalPremium = roundTo(cluster.totalPremium);
const totalNotional = roundTo(totalPremium * 100, 2);
@ -776,25 +822,38 @@ const flushCluster = async (
source_ts: cluster.startSourceTs,
ingest_ts: cluster.endIngestTs,
seq: cluster.endSeq,
trace_id: `flowpacket:${cluster.contractId}:${cluster.startTs}:${cluster.endTs}`,
id: `flowpacket:${cluster.contractId}:${cluster.startTs}:${cluster.endTs}`,
trace_id: packetId,
id: packetId,
members: cluster.members,
features,
join_quality: joinQuality
};
const validated = FlowPacketSchema.parse(packet);
try {
await insertFlowPacket(clickhouse, validated);
await publishJson(js, SUBJECT_FLOW_PACKETS, validated);
await insertFlowPacket(clickhouse, validated);
await publishJson(js, SUBJECT_FLOW_PACKETS, validated);
await emitClassifiers(clickhouse, js, validated);
await emitClassifiers(clickhouse, js, validated);
logger.info("emitted flow packet", {
id: validated.id,
contract: cluster.contractId,
count: cluster.members.length
});
} catch (error) {
if (isExpectedShutdownNatsError(error)) {
logger.info("skipped flow packet publish during shutdown", {
id: packetId,
contract: cluster.contractId,
error: getErrorCode(error) ?? (error instanceof Error ? error.message : String(error))
});
return;
}
logger.info("emitted flow packet", {
id: validated.id,
contract: cluster.contractId,
count: cluster.members.length
});
cluster.flushed = false;
throw error;
}
};
const scoreAlert = (packet: FlowPacket, hits: ClassifierHitEvent[]): { score: number; severity: string } => {
@ -834,6 +893,9 @@ const emitClassifiers = async (
await insertClassifierHit(clickhouse, hit);
await publishJson(js, SUBJECT_CLASSIFIER_HITS, hit);
} catch (error) {
if (isExpectedShutdownNatsError(error)) {
continue;
}
logger.error("failed to emit classifier hit", {
error: error instanceof Error ? error.message : String(error),
classifier_id: hit.classifier_id,
@ -863,6 +925,9 @@ const emitClassifiers = async (
await insertAlert(clickhouse, alert);
await publishJson(js, SUBJECT_ALERTS, alert);
} catch (error) {
if (isExpectedShutdownNatsError(error)) {
return;
}
logger.error("failed to emit alert", {
error: error instanceof Error ? error.message : String(error),
packet_id: packet.id
@ -891,6 +956,9 @@ const emitEquityJoin = async (
try {
await publishJson(js, SUBJECT_EQUITY_JOINS, payload);
} catch (error) {
if (isExpectedShutdownNatsError(error)) {
return;
}
logger.error("failed to publish equity print join", {
error: error instanceof Error ? error.message : String(error),
trace_id: payload.trace_id
@ -912,6 +980,9 @@ const emitDarkInferences = async (
await insertInferredDark(clickhouse, validated);
await publishJson(js, SUBJECT_INFERRED_DARK, validated);
} catch (error) {
if (isExpectedShutdownNatsError(error)) {
continue;
}
logger.error("failed to emit inferred dark event", {
error: error instanceof Error ? error.message : String(error),
trace_id: validated.trace_id
@ -1377,6 +1448,10 @@ const run = async () => {
const nbboLoop = async () => {
for await (const msg of nbboSubscription.messages) {
if (runtimeState.shuttingDown) {
break;
}
try {
const nbbo = OptionNBBOSchema.parse(nbboSubscription.decode(msg));
updateNbboCache(nbbo);
@ -1392,6 +1467,10 @@ const run = async () => {
const equityQuoteLoop = async () => {
for await (const msg of equityQuoteSubscription.messages) {
if (runtimeState.shuttingDown) {
break;
}
try {
const quote = EquityQuoteSchema.parse(equityQuoteSubscription.decode(msg));
updateEquityQuoteCache(quote);
@ -1407,6 +1486,10 @@ const run = async () => {
const equityPrintLoop = async () => {
for await (const msg of equitySubscription.messages) {
if (runtimeState.shuttingDown) {
break;
}
try {
const print = EquityPrintSchema.parse(equitySubscription.decode(msg));
await emitEquityJoin(clickhouse, js, print);
@ -1425,23 +1508,64 @@ const run = async () => {
void equityPrintLoop();
const shutdown = async (signal: string) => {
logger.info("service stopping", { signal });
for (const cluster of clusters.values()) {
await flushCluster(clickhouse, js, redis, rollingConfig, cluster);
if (runtimeState.shutdownPromise) {
await runtimeState.shutdownPromise;
return;
}
clusters.clear();
await nc.drain();
await clickhouse.close();
await redis.quit();
process.exit(0);
runtimeState.shuttingDown = true;
runtimeState.shutdownPromise = (async () => {
logger.info("service stopping", { signal });
for (const cluster of [...clusters.values()]) {
await flushCluster(clickhouse, js, redis, rollingConfig, cluster);
}
clusters.clear();
try {
await nc.drain();
} catch (error) {
if (!isExpectedShutdownNatsError(error)) {
throw error;
}
}
await clickhouse.close();
if (redis.isOpen) {
await redis.quit();
}
})();
try {
await runtimeState.shutdownPromise;
process.exit(0);
} catch (error) {
logger.error("service shutdown failed", {
error: error instanceof Error ? error.message : String(error)
});
try {
await clickhouse.close();
} catch {}
try {
if (redis.isOpen) {
await redis.quit();
}
} catch {}
process.exit(1);
}
};
process.on("SIGINT", () => void shutdown("SIGINT"));
process.on("SIGTERM", () => void shutdown("SIGTERM"));
for await (const msg of subscription.messages) {
if (runtimeState.shuttingDown) {
break;
}
try {
const print = OptionPrintSchema.parse(subscription.decode(msg));
await flushEligibleClusters(
@ -1453,6 +1577,10 @@ const run = async () => {
print.option_contract_id
);
if (runtimeState.shuttingDown) {
break;
}
const existing = clusters.get(print.option_contract_id);
if (!existing) {
clusters.set(print.option_contract_id, buildCluster(print));