cap live api caches and cut redis churn

This commit is contained in:
dirtydishes 2026-05-22 21:39:55 -04:00
parent 828c81bcc6
commit e54fa4b2de
7 changed files with 304 additions and 41 deletions

View file

@ -307,6 +307,35 @@ const subscriptionSockets = new Map<string, Set<LiveSocket>>();
const subscriptionDefinitions = new Map<string, LiveSubscription>();
const liveHeartbeats = new Map<LiveSocket, ReturnType<typeof setInterval>>();
const buildLiveSubscriptionMetrics = (): {
liveSocketCount: number;
uniqueSubscriptionsByChannel: Partial<Record<LiveSubscription["channel"], number>>;
socketFanoutByChannel: Partial<Record<LiveSubscription["channel"], number>>;
} => {
const uniqueSubscriptionsByChannel: Partial<Record<LiveSubscription["channel"], number>> = {};
const socketFanoutByChannel: Partial<Record<LiveSubscription["channel"], number>> = {};
for (const subscription of subscriptionDefinitions.values()) {
uniqueSubscriptionsByChannel[subscription.channel] =
(uniqueSubscriptionsByChannel[subscription.channel] ?? 0) + 1;
}
for (const [key, sockets] of subscriptionSockets.entries()) {
const subscription = subscriptionDefinitions.get(key);
if (!subscription || sockets.size === 0) {
continue;
}
socketFanoutByChannel[subscription.channel] =
(socketFanoutByChannel[subscription.channel] ?? 0) + sockets.size;
}
return {
liveSocketCount: liveSocketSubscriptions.size,
uniqueSubscriptionsByChannel,
socketFanoutByChannel
};
};
const jsonResponse = (body: unknown, status = 200): Response => {
return new Response(JSON.stringify(body), {
status,
@ -759,6 +788,8 @@ const run = async () => {
const liveState = new LiveStateManager(clickhouse, redis, resolveLiveStateConfig());
await liveState.hydrate();
let previousLiveStats = liveState.getStatsSnapshot();
let previousMemoryUsage = process.memoryUsage();
const warnLiveLag = (
channel: keyof typeof HOT_LIVE_REDIS_KEYS,
ageMs: number | null | undefined
@ -778,25 +809,52 @@ const run = async () => {
const liveStateMetricsTimer = setInterval(() => {
const snapshot = liveState.getStatsSnapshot();
const hotFeedHealth = liveState.getHotChannelHealth();
const subscriptionMetrics = buildLiveSubscriptionMetrics();
const memoryUsage = process.memoryUsage();
const hotFeedLagMs = {
options: snapshot.freshnessAgeMsByKey[HOT_LIVE_REDIS_KEYS.options] ?? null,
equities: snapshot.freshnessAgeMsByKey[HOT_LIVE_REDIS_KEYS.equities] ?? null,
flow: snapshot.freshnessAgeMsByKey[HOT_LIVE_REDIS_KEYS.flow] ?? null,
nbbo: snapshot.freshnessAgeMsByKey[HOT_LIVE_REDIS_KEYS.nbbo] ?? null
};
const flushDelta = {
redisFlushCount: snapshot.redisFlushCount - previousLiveStats.redisFlushCount,
redisFlushItems: snapshot.redisFlushItems - previousLiveStats.redisFlushItems,
redisFlushPayloadBytes: snapshot.redisFlushPayloadBytes - previousLiveStats.redisFlushPayloadBytes
};
const memorySnapshot = {
rss_bytes: memoryUsage.rss,
heap_used_bytes: memoryUsage.heapUsed,
heap_total_bytes: memoryUsage.heapTotal,
external_bytes: memoryUsage.external,
array_buffers_bytes: memoryUsage.arrayBuffers,
rss_delta_bytes: memoryUsage.rss - previousMemoryUsage.rss,
heap_used_delta_bytes: memoryUsage.heapUsed - previousMemoryUsage.heapUsed
};
logger.info("live cache metrics", {
...snapshot,
hotFeedLagMs,
hotFeedHealth,
flushDelta,
memorySnapshot,
liveSubscriptions: subscriptionMetrics,
snapshotSourceCounts: {
generic_cache_snapshot: snapshot.genericCacheSnapshots,
scoped_clickhouse_snapshot: snapshot.scopedClickHouseSnapshots
}
});
metrics.gauge("api.memory.rss_bytes", memoryUsage.rss);
metrics.gauge("api.memory.heap_used_bytes", memoryUsage.heapUsed);
metrics.gauge("api.live.active_sockets", subscriptionMetrics.liveSocketCount);
for (const [channel, count] of Object.entries(subscriptionMetrics.uniqueSubscriptionsByChannel)) {
metrics.gauge("api.live.subscription_count", count, { channel });
}
warnLiveLag("options", hotFeedLagMs.options);
warnLiveLag("equities", hotFeedLagMs.equities);
warnLiveLag("flow", hotFeedLagMs.flow);
warnLiveLag("nbbo", hotFeedLagMs.nbbo);
previousLiveStats = snapshot;
previousMemoryUsage = memoryUsage;
}, 60000);
const consumerBindings = [

View file

@ -89,6 +89,20 @@ const DEFAULT_LIVE_LIMITS: GenericLiveLimits = {
news: 100
};
export const LIVE_GENERIC_LIMIT_CAPS: GenericLiveLimits = {
options: 100,
nbbo: 1000,
equities: 1000,
"equity-quotes": 500,
"equity-joins": 500,
flow: 500,
"smart-money": 300,
"classifier-hits": 300,
alerts: 300,
"inferred-dark": 300,
news: 100
};
const DEFAULT_SCOPED_CACHE_MAX_KEYS = 32;
const DEFAULT_REDIS_FLUSH_INTERVAL_MS = 250;
const DEFAULT_REDIS_FLUSH_MAX_ITEMS = 100;
@ -134,7 +148,7 @@ const parseGenericLimit = (
const key = GENERIC_LIMIT_ENV_KEYS[channel];
const raw = env[key];
if (!raw || raw.trim().length === 0) {
return fallback;
return clampConfiguredLimit(channel, fallback);
}
const parsed = Number(raw);
@ -143,7 +157,7 @@ const parseGenericLimit = (
return fallback;
}
const bounded = Math.max(MIN_GENERIC_LIMIT, Math.min(MAX_GENERIC_LIMIT, Math.floor(parsed)));
const bounded = clampConfiguredLimit(channel, Math.min(MAX_GENERIC_LIMIT, parsed));
if (bounded !== parsed) {
console.warn(`Clamped ${key} from ${parsed} to ${bounded}`);
}
@ -226,7 +240,7 @@ const extractFreshnessTs = (channel: LiveGenericChannel, item: any): number | nu
};
export const resolveLiveStateConfig = (env: NodeJS.ProcessEnv = process.env): LiveStateConfig => ({
limits: resolveGenericLiveLimits(env),
limits: clampGenericLimitMap(resolveGenericLiveLimits(env)),
scopedCacheMaxKeys: parsePositiveInt(env.LIVE_SCOPED_CACHE_MAX_KEYS, DEFAULT_SCOPED_CACHE_MAX_KEYS),
redisFlushIntervalMs: parsePositiveInt(
env.LIVE_REDIS_FLUSH_INTERVAL_MS,
@ -559,7 +573,8 @@ const insertNewestFirst = <T>(
};
};
type BufferedRedisWrite = {
type BufferedRedisRewrite = {
mode: "rewrite";
listKey: string;
cursorField: string;
items: unknown[];
@ -568,9 +583,64 @@ type BufferedRedisWrite = {
updates: number;
};
type BufferedRedisAppend = {
mode: "append";
listKey: string;
cursorField: string;
payloads: string[];
limit: number;
cursor: Cursor | null;
updates: number;
};
type BufferedRedisWrite = BufferedRedisRewrite | BufferedRedisAppend;
export type LiveStateStatsSnapshot = {
genericHydrateFromRedis: number;
genericHydrateFromClickHouse: number;
genericCacheSnapshots: number;
scopedClickHouseSnapshots: number;
trimOperations: number;
redisFlushCount: number;
redisFlushItems: number;
redisFlushPayloadBytes: number;
cacheEvictions: number;
outOfOrderEvents: number;
cacheDepthByKey: Record<string, number>;
freshnessAgeMsByKey: Record<string, number>;
snapshotItemsByChannel: Record<string, number>;
};
const isLiveStateConfig = (value: GenericLiveLimits | LiveStateConfig): value is LiveStateConfig =>
"limits" in value;
const clampConfiguredLimit = (channel: LiveGenericChannel, value: number): number =>
Math.max(MIN_GENERIC_LIMIT, Math.min(LIVE_GENERIC_LIMIT_CAPS[channel], Math.floor(value)));
const clampGenericLimitMap = (limits: GenericLiveLimits): GenericLiveLimits =>
Object.fromEntries(
(Object.keys(LIVE_GENERIC_LIMIT_CAPS) as LiveGenericChannel[]).map((channel) => [
channel,
clampConfiguredLimit(channel, limits[channel] ?? DEFAULT_LIVE_LIMITS[channel])
])
) as GenericLiveLimits;
const normalizeLiveStateConfig = (config: GenericLiveLimits | LiveStateConfig): LiveStateConfig => {
if (isLiveStateConfig(config)) {
return {
...config,
limits: clampGenericLimitMap(config.limits)
};
}
return {
limits: clampGenericLimitMap(config),
scopedCacheMaxKeys: DEFAULT_SCOPED_CACHE_MAX_KEYS,
redisFlushIntervalMs: DEFAULT_REDIS_FLUSH_INTERVAL_MS,
redisFlushMaxItems: DEFAULT_REDIS_FLUSH_MAX_ITEMS
};
};
export class LiveStateManager {
private readonly config: LiveStateConfig;
private readonly generic: {
@ -594,10 +664,12 @@ export class LiveStateManager {
trimOperations: 0,
redisFlushCount: 0,
redisFlushItems: 0,
redisFlushPayloadBytes: 0,
cacheEvictions: 0,
outOfOrderEvents: 0,
cacheDepthByKey: new Map<string, number>(),
freshnessAgeMsByKey: new Map<string, number>()
freshnessAgeMsByKey: new Map<string, number>(),
snapshotItemsByChannel: new Map<string, number>()
};
constructor(
@ -605,14 +677,7 @@ export class LiveStateManager {
private readonly redis: RedisLike | null,
config: GenericLiveLimits | LiveStateConfig = resolveLiveStateConfig()
) {
this.config = isLiveStateConfig(config)
? config
: {
limits: config,
scopedCacheMaxKeys: DEFAULT_SCOPED_CACHE_MAX_KEYS,
redisFlushIntervalMs: DEFAULT_REDIS_FLUSH_INTERVAL_MS,
redisFlushMaxItems: DEFAULT_REDIS_FLUSH_MAX_ITEMS
};
this.config = normalizeLiveStateConfig(config);
this.generic = getGenericConfig(this.config.limits);
this.redisFlushTimer =
this.redis && this.redis.isOpen
@ -630,19 +695,7 @@ export class LiveStateManager {
await this.flushRedisWrites();
}
getStatsSnapshot(): {
genericHydrateFromRedis: number;
genericHydrateFromClickHouse: number;
genericCacheSnapshots: number;
scopedClickHouseSnapshots: number;
trimOperations: number;
redisFlushCount: number;
redisFlushItems: number;
cacheEvictions: number;
outOfOrderEvents: number;
cacheDepthByKey: Record<string, number>;
freshnessAgeMsByKey: Record<string, number>;
} {
getStatsSnapshot(): LiveStateStatsSnapshot {
return {
genericHydrateFromRedis: this.stats.genericHydrateFromRedis,
genericHydrateFromClickHouse: this.stats.genericHydrateFromClickHouse,
@ -651,10 +704,12 @@ export class LiveStateManager {
trimOperations: this.stats.trimOperations,
redisFlushCount: this.stats.redisFlushCount,
redisFlushItems: this.stats.redisFlushItems,
redisFlushPayloadBytes: this.stats.redisFlushPayloadBytes,
cacheEvictions: this.stats.cacheEvictions,
outOfOrderEvents: this.stats.outOfOrderEvents,
cacheDepthByKey: Object.fromEntries(this.stats.cacheDepthByKey),
freshnessAgeMsByKey: Object.fromEntries(this.stats.freshnessAgeMsByKey)
freshnessAgeMsByKey: Object.fromEntries(this.stats.freshnessAgeMsByKey),
snapshotItemsByChannel: Object.fromEntries(this.stats.snapshotItemsByChannel)
};
}
@ -676,11 +731,36 @@ export class LiveStateManager {
this.pendingRedisWrites.clear();
for (const write of writes) {
await this.persistList(write.listKey, write.cursorField, write.items, write.limit, write.cursor);
if (write.mode === "rewrite") {
await this.persistList(write.listKey, write.cursorField, write.items, write.limit, write.cursor);
this.stats.redisFlushItems += write.items.length;
this.stats.redisFlushPayloadBytes += write.items.reduce(
(total, item) => total + JSON.stringify(item).length,
0
);
} else {
await this.persistListAppend(
write.listKey,
write.cursorField,
write.payloads,
write.limit,
write.cursor
);
this.stats.redisFlushItems += write.payloads.length;
this.stats.redisFlushPayloadBytes += write.payloads.reduce((total, payload) => total + payload.length, 0);
}
this.stats.redisFlushCount += 1;
this.stats.redisFlushItems += write.items.length;
metrics.count("api.live.redis_flush_count", 1);
metrics.count("api.live.redis_flush_items", write.items.length);
metrics.count(
"api.live.redis_flush_items",
write.mode === "rewrite" ? write.items.length : write.payloads.length
);
metrics.count(
"api.live.redis_flush_payload_bytes",
write.mode === "rewrite"
? write.items.reduce((total, item) => total + JSON.stringify(item).length, 0)
: write.payloads.reduce((total, payload) => total + payload.length, 0)
);
}
}
@ -739,7 +819,12 @@ export class LiveStateManager {
}
}
private queueRedisWrite(
private recordSnapshotItems(channel: LiveSubscription["channel"], count: number): void {
this.stats.snapshotItemsByChannel.set(channel, count);
metrics.gauge("api.live.snapshot_items", count, { channel });
}
private queueRedisRewrite(
listKey: string,
cursorField: string,
items: unknown[],
@ -751,7 +836,8 @@ export class LiveStateManager {
}
const existing = this.pendingRedisWrites.get(listKey);
const write: BufferedRedisWrite = {
const write: BufferedRedisRewrite = {
mode: "rewrite",
listKey,
cursorField,
items: [...items],
@ -765,6 +851,51 @@ export class LiveStateManager {
}
}
private queueGenericRedisWrite(
listKey: string,
cursorField: string,
item: unknown,
items: unknown[],
limit: number,
cursor: Cursor | null,
forceRewrite = false
): void {
if (!this.redis?.isOpen) {
return;
}
const existing = this.pendingRedisWrites.get(listKey);
const nextUpdateCount = (existing?.updates ?? 0) + 1;
if (forceRewrite || existing?.mode === "rewrite") {
const write: BufferedRedisRewrite = {
mode: "rewrite",
listKey,
cursorField,
items: [...items],
limit,
cursor,
updates: nextUpdateCount
};
this.pendingRedisWrites.set(listKey, write);
} else {
const payload = JSON.stringify(item);
const write: BufferedRedisAppend = {
mode: "append",
listKey,
cursorField,
payloads: [...(existing?.mode === "append" ? existing.payloads : []), payload],
limit,
cursor,
updates: nextUpdateCount
};
this.pendingRedisWrites.set(listKey, write);
}
if (nextUpdateCount >= this.config.redisFlushMaxItems) {
void this.flushRedisWrites();
}
}
async hydrate(): Promise<void> {
const channels = Object.keys(this.generic) as LiveGenericChannel[];
await Promise.all(channels.map((channel) => this.hydrateGeneric(channel)));
@ -818,6 +949,7 @@ export class LiveStateManager {
const backfill = await fetchRecentOptionPrints(this.clickhouse, limit, undefined, storageFilters);
items = mergeSnapshotBackfill(cached, backfill, limit, (entry) => ({ ts: entry.ts, seq: entry.seq }));
}
this.recordSnapshotItems(subscription.channel, items.length);
return {
subscription,
items,
@ -830,6 +962,7 @@ export class LiveStateManager {
const items = (this.genericItems.get("options") ?? [])
.filter((entry) => matchesOptionPrintFilters(entry, subscription.filters))
.slice(0, limit);
this.recordSnapshotItems(subscription.channel, items.length);
return {
subscription,
items,
@ -844,6 +977,7 @@ export class LiveStateManager {
const items = (this.genericItems.get("flow") ?? [])
.filter((entry) => matchesFlowPacketFilters(entry, subscription.filters))
.slice(0, limit);
this.recordSnapshotItems(subscription.channel, items.length);
return {
subscription,
items,
@ -865,6 +999,7 @@ export class LiveStateManager {
const backfill = await fetchRecentEquityPrints(this.clickhouse, limit, filters);
items = mergeSnapshotBackfill(cached, backfill, limit, config.cursor);
}
this.recordSnapshotItems(subscription.channel, items.length);
return {
subscription,
items,
@ -874,6 +1009,7 @@ export class LiveStateManager {
}
this.stats.genericCacheSnapshots += 1;
const items = (this.genericItems.get("equities") ?? []).slice(0, limit);
this.recordSnapshotItems(subscription.channel, items.length);
return {
subscription,
items,
@ -889,6 +1025,7 @@ export class LiveStateManager {
}
this.touchAccess(this.candleAccess, key);
const items = this.candleItems.get(key) ?? [];
this.recordSnapshotItems(subscription.channel, items.length);
return {
subscription,
items,
@ -904,6 +1041,7 @@ export class LiveStateManager {
}
this.touchAccess(this.overlayAccess, key);
const items = this.overlayItems.get(key) ?? [];
this.recordSnapshotItems(subscription.channel, items.length);
return {
subscription,
items,
@ -916,6 +1054,7 @@ export class LiveStateManager {
this.stats.genericCacheSnapshots += 1;
const limit = snapshotLimitFor(subscription, config.limit);
const items = (this.genericItems.get(subscription.channel) ?? []).slice(0, limit);
this.recordSnapshotItems(subscription.channel, items.length);
return {
subscription,
items,
@ -951,7 +1090,7 @@ export class LiveStateManager {
if (nextState.items.length > 0) {
this.updateFreshnessMetric(key, "equity-candles", nextState.items[0]);
}
this.queueRedisWrite(key, cursorField, nextState.items, CHART_LIMITS.candles, cursor);
this.queueRedisRewrite(key, cursorField, nextState.items, CHART_LIMITS.candles, cursor);
return cursor;
}
case "equity-overlay": {
@ -977,7 +1116,7 @@ export class LiveStateManager {
if (nextState.items.length > 0) {
this.updateFreshnessMetric(key, "equity-overlay", nextState.items[0]);
}
this.queueRedisWrite(key, cursorField, nextState.items, CHART_LIMITS.overlay, cursor);
this.queueRedisRewrite(key, cursorField, nextState.items, CHART_LIMITS.overlay, cursor);
return cursor;
}
default: {
@ -1007,7 +1146,15 @@ export class LiveStateManager {
if (nextState.items.length > 0) {
this.updateFreshnessMetric(config.redisKey, channel, nextState.items[0]);
}
this.queueRedisWrite(config.redisKey, config.cursorField, nextState.items, config.limit, cursor);
this.queueGenericRedisWrite(
config.redisKey,
config.cursorField,
parsed,
nextState.items,
config.limit,
cursor,
nextState.outOfOrder
);
return cursor;
}
}
@ -1102,4 +1249,23 @@ export class LiveStateManager {
this.stats.cacheDepthByKey.set(listKey, Math.min(items.length, limit));
await this.redis.hSet(CURSOR_HASH_KEY, cursorField, JSON.stringify(cursor));
}
private async persistListAppend(
listKey: string,
cursorField: string,
payloads: string[],
limit: number,
cursor: Cursor | null
): Promise<void> {
if (!this.redis?.isOpen) {
return;
}
for (const payload of payloads) {
await this.redis.lPush(listKey, payload);
}
await this.redis.lTrim(listKey, 0, limit - 1);
this.stats.trimOperations += 1;
await this.redis.hSet(CURSOR_HASH_KEY, cursorField, JSON.stringify(cursor));
}
}

View file

@ -27,6 +27,7 @@ const makeClickHouse = (
const makeRedis = () => {
const lists = new Map<string, string[]>();
const hashes = new Map<string, Map<string, string>>();
let clearTrimCount = 0;
return {
isOpen: true,
@ -41,6 +42,9 @@ const makeRedis = () => {
},
async lTrim(key: string, start: number, stop: number) {
const next = lists.get(key) ?? [];
if (start > stop) {
clearTrimCount += 1;
}
lists.set(key, start > stop ? [] : next.slice(start, stop + 1));
return "OK";
},
@ -52,6 +56,9 @@ const makeRedis = () => {
hash.set(field, value);
hashes.set(key, hash);
return 1;
},
getClearTrimCount() {
return clearTrimCount;
}
};
};
@ -64,8 +71,8 @@ describe("LiveStateManager", () => {
LIVE_LIMIT_FLOW: "bad"
} as NodeJS.ProcessEnv);
expect(limits.options).toBe(777);
expect(limits.nbbo).toBe(100000);
expect(limits.options).toBe(100);
expect(limits.nbbo).toBe(1000);
expect(limits.flow).toBe(500);
expect(limits["equity-quotes"]).toBe(500);
expect(limits.alerts).toBe(300);
@ -209,11 +216,13 @@ describe("LiveStateManager", () => {
const flushed = await redis.lRange("live:flow", 0, 99);
expect(persisted).toHaveLength(0);
expect(flushed).toHaveLength(2);
expect(redis.getClearTrimCount()).toBe(0);
const stats = manager.getStatsSnapshot();
expect(stats.trimOperations).toBeGreaterThan(0);
expect(stats.redisFlushCount).toBeGreaterThan(0);
expect(stats.cacheDepthByKey["live:flow"]).toBe(2);
expect(stats.redisFlushPayloadBytes).toBeGreaterThan(0);
});
it("reorders out-of-order live events without dropping newest-first semantics", async () => {
@ -1074,6 +1083,33 @@ describe("LiveStateManager", () => {
expect(stats.scopedClickHouseSnapshots).toBe(1);
});
it("clamps oversized snapshot requests to the server-side channel cap", async () => {
const manager = new LiveStateManager(makeClickHouse(), null);
const now = Date.now();
for (let idx = 0; idx < 120; idx += 1) {
await manager.ingest("options", {
source_ts: now + idx,
ingest_ts: now + idx + 1,
seq: idx + 1,
trace_id: `opt-${idx + 1}`,
ts: now + idx,
option_contract_id: `SPY-2025-01-17-${500 + idx}-C`,
price: 1,
size: 10,
exchange: "X"
});
}
const snapshot = await manager.getSnapshot({
channel: "options",
snapshot_limit: 10_000
});
expect(snapshot.items).toHaveLength(100);
expect(manager.getStatsSnapshot().snapshotItemsByChannel.options).toBe(100);
});
it("keeps backend channel health healthy when a scoped query is quiet", async () => {
const manager = new LiveStateManager(makeClickHouse(() => []), null);
const now = Date.now();