Stabilize tape virtualization and scoped live health

This commit is contained in:
dirtydishes 2026-05-07 01:52:20 -04:00
parent 034d24f8ac
commit e69bf295c8
11 changed files with 866 additions and 273 deletions

View file

@ -112,7 +112,7 @@ import {
} from "@islandflow/types";
import { createClient } from "redis";
import { z } from "zod";
import { LiveStateManager, shouldFanoutLiveEvent } from "./live";
import { HOT_LIVE_REDIS_KEYS, LiveStateManager, shouldFanoutLiveEvent } from "./live";
const service = "api";
const logger = createLogger({ service });
@ -138,13 +138,6 @@ const state = {
shutdownPromise: null as Promise<void> | null
};
const HOT_LIVE_REDIS_KEYS = {
options: "live:options",
equities: "live:equities",
flow: "live:flow",
nbbo: "live:nbbo"
} as const;
const getErrorMessage = (error: unknown): string => {
return error instanceof Error ? error.message : String(error);
};
@ -908,6 +901,7 @@ const run = async () => {
};
const liveStateMetricsTimer = setInterval(() => {
const snapshot = liveState.getStatsSnapshot();
const hotFeedHealth = liveState.getHotChannelHealth();
const hotFeedLagMs = {
options: snapshot.freshnessAgeMsByKey[HOT_LIVE_REDIS_KEYS.options] ?? null,
equities: snapshot.freshnessAgeMsByKey[HOT_LIVE_REDIS_KEYS.equities] ?? null,
@ -916,7 +910,12 @@ const run = async () => {
};
logger.info("live cache metrics", {
...snapshot,
hotFeedLagMs
hotFeedLagMs,
hotFeedHealth,
snapshotSourceCounts: {
generic_cache_snapshot: snapshot.genericCacheSnapshots,
scoped_clickhouse_snapshot: snapshot.scopedClickHouseSnapshots
}
});
warnLiveLag("options", hotFeedLagMs.options);
warnLiveLag("equities", hotFeedLagMs.equities);
@ -1892,9 +1891,13 @@ const run = async () => {
websocket: {
open: (socket: any) => {
if (socket.data.channel === "live") {
sendLiveMessage(socket, { op: "ready" });
sendLiveMessage(socket, { op: "ready", channel_health: liveState.getHotChannelHealth() });
const heartbeat = setInterval(() => {
sendLiveMessage(socket, { op: "heartbeat", ts: Date.now() });
sendLiveMessage(socket, {
op: "heartbeat",
ts: Date.now(),
channel_health: liveState.getHotChannelHealth()
});
}, 15000);
liveHeartbeats.set(socket, heartbeat);
} else if (socket.data.channel === "options") {
@ -1935,7 +1938,11 @@ const run = async () => {
: new TextDecoder().decode(message instanceof Uint8Array ? message : new Uint8Array(message));
const parsed = LiveClientMessageSchema.parse(JSON.parse(payload));
if (parsed.op === "ping") {
sendLiveMessage(socket, { op: "heartbeat", ts: Date.now() });
sendLiveMessage(socket, {
op: "heartbeat",
ts: Date.now(),
channel_health: liveState.getHotChannelHealth()
});
return;
}

View file

@ -25,7 +25,10 @@ import {
FeedSnapshot,
FlowPacketSchema,
InferredDarkEventSchema,
LiveChannelHealth,
LiveGenericChannel,
LiveHotChannel,
LiveHotChannelHealthMap,
LiveSubscription,
matchesFlowPacketFilters,
matchesOptionPrintFilters,
@ -81,6 +84,13 @@ export const LIVE_FRESHNESS_THRESHOLDS: Partial<Record<LiveGenericChannel, numbe
flow: 30_000
};
export const HOT_LIVE_REDIS_KEYS = {
options: "live:options",
equities: "live:equities",
flow: "live:flow",
nbbo: "live:nbbo"
} as const satisfies Record<LiveHotChannel, string>;
export type GenericLiveLimits = Record<LiveGenericChannel, number>;
const parseGenericLimit = (
@ -357,6 +367,8 @@ export class LiveStateManager {
private readonly stats = {
genericHydrateFromRedis: 0,
genericHydrateFromClickHouse: 0,
genericCacheSnapshots: 0,
scopedClickHouseSnapshots: 0,
trimOperations: 0,
cacheDepthByKey: new Map<string, number>(),
freshnessAgeMsByKey: new Map<string, number>()
@ -373,6 +385,8 @@ export class LiveStateManager {
getStatsSnapshot(): {
genericHydrateFromRedis: number;
genericHydrateFromClickHouse: number;
genericCacheSnapshots: number;
scopedClickHouseSnapshots: number;
trimOperations: number;
cacheDepthByKey: Record<string, number>;
freshnessAgeMsByKey: Record<string, number>;
@ -380,12 +394,37 @@ export class LiveStateManager {
return {
genericHydrateFromRedis: this.stats.genericHydrateFromRedis,
genericHydrateFromClickHouse: this.stats.genericHydrateFromClickHouse,
genericCacheSnapshots: this.stats.genericCacheSnapshots,
scopedClickHouseSnapshots: this.stats.scopedClickHouseSnapshots,
trimOperations: this.stats.trimOperations,
cacheDepthByKey: Object.fromEntries(this.stats.cacheDepthByKey),
freshnessAgeMsByKey: Object.fromEntries(this.stats.freshnessAgeMsByKey)
};
}
getHotChannelHealth(): LiveHotChannelHealthMap {
return {
options: this.getChannelHealth("options"),
nbbo: this.getChannelHealth("nbbo"),
equities: this.getChannelHealth("equities"),
flow: this.getChannelHealth("flow")
};
}
private getChannelHealth(channel: LiveHotChannel): LiveChannelHealth {
const listKey = HOT_LIVE_REDIS_KEYS[channel];
const thresholdMs = LIVE_FRESHNESS_THRESHOLDS[channel];
const freshnessAgeMs = this.stats.freshnessAgeMsByKey.get(listKey) ?? null;
return {
freshness_age_ms: freshnessAgeMs,
healthy:
freshnessAgeMs !== null &&
typeof thresholdMs === "number" &&
Number.isFinite(freshnessAgeMs) &&
freshnessAgeMs <= thresholdMs
};
}
private updateFreshnessMetric(listKey: string, channel: LiveChannel, item: unknown, now = Date.now()): void {
const ts =
channel === "equity-candles" || channel === "equity-overlay"
@ -448,6 +487,7 @@ export class LiveStateManager {
const scoped =
Boolean(subscription.underlying_ids?.length) || Boolean(subscription.option_contract_id);
if (subscription.filters?.view === "raw" || scoped) {
this.stats.scopedClickHouseSnapshots += 1;
const limit = snapshotLimitFor(subscription, this.generic.options.limit);
const storageFilters: OptionPrintQueryFilters = {
view: subscription.filters?.view ?? "signal",
@ -476,6 +516,7 @@ export class LiveStateManager {
}
const config = this.generic.options;
this.stats.genericCacheSnapshots += 1;
const limit = snapshotLimitFor(subscription, config.limit);
const items = (this.genericItems.get("options") ?? []).filter((item) =>
matchesOptionPrintFilters(item, subscription.filters)
@ -489,6 +530,7 @@ export class LiveStateManager {
}
case "flow": {
const config = this.generic.flow;
this.stats.genericCacheSnapshots += 1;
const limit = snapshotLimitFor(subscription, config.limit);
const items = (this.genericItems.get("flow") ?? []).filter((item) =>
matchesFlowPacketFilters(item, subscription.filters)
@ -504,6 +546,7 @@ export class LiveStateManager {
const config = this.generic.equities;
const limit = snapshotLimitFor(subscription, config.limit);
if (subscription.underlying_ids?.length) {
this.stats.scopedClickHouseSnapshots += 1;
const filters: EquityPrintQueryFilters = {
underlyingIds: subscription.underlying_ids
};
@ -515,6 +558,7 @@ export class LiveStateManager {
next_before: nextBeforeForItems(items, config.cursor)
};
}
this.stats.genericCacheSnapshots += 1;
const items = (this.genericItems.get("equities") ?? []).slice(0, limit);
return {
subscription,
@ -553,6 +597,7 @@ export class LiveStateManager {
}
default: {
const config = this.generic[subscription.channel];
this.stats.genericCacheSnapshots += 1;
const limit = snapshotLimitFor(subscription, config.limit);
const items = (this.genericItems.get(subscription.channel) ?? []).slice(0, limit);
return {

View file

@ -1,6 +1,7 @@
import { describe, expect, it } from "bun:test";
import type { ClickHouseClient } from "@islandflow/storage";
import {
HOT_LIVE_REDIS_KEYS,
LiveStateManager,
isLiveItemFresh,
resolveGenericLiveLimits,
@ -729,6 +730,122 @@ describe("LiveStateManager", () => {
expect(persisted).toHaveLength(1);
});
it("includes hot-channel health for options, nbbo, equities, and flow", async () => {
const manager = new LiveStateManager(makeClickHouse(), null);
const now = Date.now();
await manager.ingest("options", {
source_ts: now,
ingest_ts: now + 1,
seq: 1,
trace_id: "opt-health",
ts: now,
option_contract_id: "AAPL-2025-01-17-200-C",
price: 1,
size: 10,
exchange: "X"
});
await manager.ingest("nbbo", {
source_ts: now,
ingest_ts: now + 1,
seq: 1,
trace_id: "nbbo-health",
ts: now,
option_contract_id: "AAPL-2025-01-17-200-C",
bid: 1,
ask: 1.1,
bidSize: 10,
askSize: 10
});
await manager.ingest("equities", {
source_ts: now,
ingest_ts: now + 1,
seq: 1,
trace_id: "eq-health",
ts: now,
underlying_id: "AAPL",
price: 100,
size: 10,
exchange: "X",
offExchangeFlag: false
});
await manager.ingest("flow", {
source_ts: now,
ingest_ts: now + 1,
seq: 1,
trace_id: "flow-health",
id: "flow-health",
members: [],
features: {},
join_quality: {}
});
const health = manager.getHotChannelHealth();
expect(health.options.healthy).toBe(true);
expect(health.nbbo.healthy).toBe(true);
expect(health.equities.healthy).toBe(true);
expect(health.flow.healthy).toBe(true);
expect(health.options.freshness_age_ms).not.toBeNull();
expect(health.nbbo.freshness_age_ms).not.toBeNull();
expect(health.equities.freshness_age_ms).not.toBeNull();
expect(health.flow.freshness_age_ms).not.toBeNull();
});
it("tracks generic cache and scoped clickhouse snapshot sources separately", async () => {
const manager = new LiveStateManager(makeClickHouse(() => []), null);
const now = Date.now();
await manager.ingest("options", {
source_ts: now,
ingest_ts: now + 1,
seq: 1,
trace_id: "opt-snapshot",
ts: now,
option_contract_id: "SPY-2025-01-17-500-C",
price: 1,
size: 10,
exchange: "X"
});
await manager.getSnapshot({ channel: "options" });
await manager.getSnapshot({
channel: "options",
underlying_ids: ["QQQ"],
option_contract_id: "QQQ-2025-01-17-400-C"
});
const stats = manager.getStatsSnapshot();
expect(stats.genericCacheSnapshots).toBe(1);
expect(stats.scopedClickHouseSnapshots).toBe(1);
});
it("keeps backend channel health healthy when a scoped query is quiet", async () => {
const manager = new LiveStateManager(makeClickHouse(() => []), null);
const now = Date.now();
await manager.ingest("options", {
source_ts: now,
ingest_ts: now + 1,
seq: 1,
trace_id: "opt-global",
ts: now,
option_contract_id: "SPY-2025-01-17-500-C",
price: 1,
size: 10,
exchange: "X"
});
const quietSnapshot = await manager.getSnapshot({
channel: "options",
underlying_ids: ["QQQ"],
option_contract_id: "QQQ-2025-01-17-400-C"
});
expect(quietSnapshot.items).toEqual([]);
expect(manager.getHotChannelHealth().options.healthy).toBe(true);
expect(manager.getStatsSnapshot().freshnessAgeMsByKey[HOT_LIVE_REDIS_KEYS.options]).toBeLessThanOrEqual(50);
});
it("exposes freshness helper for feed status", () => {
expect(isLiveItemFresh("options", { ts: 1000 }, 1010)).toBe(true);
expect(isLiveItemFresh("options", { ts: 1000 }, 20_001)).toBe(false);