Fix live websocket lag by optimizing cache persistence and adding lag telemetry

This commit is contained in:
dirtydishes 2026-05-04 13:10:05 -04:00
parent 48b0d980a6
commit eea2d74a3d
3 changed files with 119 additions and 7 deletions

View file

@ -1,3 +1,4 @@
{"_type":"issue","id":"islandflow-sh1","title":"Fix live websocket stale lag and reconnect loop","description":"Investigate and fix API live consumer lag causing stale timestamps, feed-behind status, and reconnect loops. Optimize live cache persistence path, add lag telemetry/alerts, and validate in runtime.","status":"closed","priority":1,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-04T17:04:34Z","created_by":"dirtydishes","updated_at":"2026-05-04T17:09:44Z","started_at":"2026-05-04T17:04:38Z","closed_at":"2026-05-04T17:09:44Z","close_reason":"Completed: optimized live cache persistence path, added lag telemetry, deployed api via docker compose on di, verified ws freshness and low hotFeedLagMs","dependency_count":0,"dependent_count":0,"comment_count":0}
{"_type":"issue","id":"islandflow-b3o","title":"Implement options tape table with execution spot","description":"Redesign OptionsPane into a dense classifier-colored table and preserve execution-time underlying spot on option prints from equity quote mid.","status":"closed","priority":1,"issue_type":"feature","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-04T04:41:59Z","created_by":"dirtydishes","updated_at":"2026-05-04T05:14:26Z","started_at":"2026-05-04T04:42:08Z","closed_at":"2026-05-04T05:14:26Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0}
{"_type":"issue","id":"islandflow-ug1","title":"Fix false NBBO-missing badges in live Options tape","description":"Investigate and fix client-side cases where Options rows show NBBO missing/stale even when a fresh NBBO quote exists in the live nbbo map. Update rendering logic to prefer fresh quote-derived status and add regression tests.","status":"closed","priority":1,"issue_type":"task","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-29T15:58:31Z","created_by":"dirtydishes","updated_at":"2026-04-29T16:01:28Z","started_at":"2026-04-29T15:58:35Z","closed_at":"2026-04-29T16:01:28Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0}
{"_type":"issue","id":"islandflow-ayo","title":"Drop stale backlog events from live fanout","description":"Follow-up to live freshness rollout: /ws/live was still fanning out stale backlog events for freshness-gated channels, which kept tape panes in Live feed behind despite active synthetic ingest. Gate fanout and cache ingest by freshness for options/nbbo/equities/flow.","status":"closed","priority":1,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T21:26:39Z","created_by":"dirtydishes","updated_at":"2026-04-28T21:26:44Z","started_at":"2026-04-28T21:26:44Z","closed_at":"2026-04-28T21:26:44Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0}

View file

@ -116,7 +116,8 @@ const envSchema = z.object({
REDIS_URL: z.string().default("redis://127.0.0.1:6379"),
REST_DEFAULT_LIMIT: z.coerce.number().int().positive().default(200),
API_DELIVER_POLICY: DeliverPolicySchema.default("new"),
API_CONSUMER_RESET: z.coerce.boolean().default(false)
API_CONSUMER_RESET: z.coerce.boolean().default(false),
LIVE_LAG_WARN_MS: z.coerce.number().int().positive().default(120_000)
});
const env = readEnv(envSchema);
@ -126,6 +127,13 @@ const state = {
shutdownPromise: null as Promise<void> | null
};
const HOT_LIVE_REDIS_KEYS = {
options: "live:options",
equities: "live:equities",
flow: "live:flow",
nbbo: "live:nbbo"
} as const;
const getErrorMessage = (error: unknown): string => {
return error instanceof Error ? error.message : String(error);
};
@ -835,9 +843,38 @@ const run = async () => {
const liveState = new LiveStateManager(clickhouse, redis);
await liveState.hydrate();
const warnLiveLag = (
channel: keyof typeof HOT_LIVE_REDIS_KEYS,
ageMs: number | null | undefined
) => {
if (typeof ageMs !== "number" || !Number.isFinite(ageMs)) {
return;
}
if (ageMs < env.LIVE_LAG_WARN_MS) {
return;
}
logger.warn("live feed lag exceeded threshold", {
channel,
age_ms: ageMs,
threshold_ms: env.LIVE_LAG_WARN_MS
});
};
const liveStateMetricsTimer = setInterval(() => {
const snapshot = liveState.getStatsSnapshot();
logger.info("live cache metrics", snapshot);
const hotFeedLagMs = {
options: snapshot.freshnessAgeMsByKey[HOT_LIVE_REDIS_KEYS.options] ?? null,
equities: snapshot.freshnessAgeMsByKey[HOT_LIVE_REDIS_KEYS.equities] ?? null,
flow: snapshot.freshnessAgeMsByKey[HOT_LIVE_REDIS_KEYS.flow] ?? null,
nbbo: snapshot.freshnessAgeMsByKey[HOT_LIVE_REDIS_KEYS.nbbo] ?? null
};
logger.info("live cache metrics", {
...snapshot,
hotFeedLagMs
});
warnLiveLag("options", hotFeedLagMs.options);
warnLiveLag("equities", hotFeedLagMs.equities);
warnLiveLag("flow", hotFeedLagMs.flow);
warnLiveLag("nbbo", hotFeedLagMs.nbbo);
}, 60000);
const consumerBindings = [

View file

@ -338,7 +338,8 @@ export class LiveStateManager {
genericHydrateFromRedis: 0,
genericHydrateFromClickHouse: 0,
trimOperations: 0,
cacheDepthByKey: new Map<string, number>()
cacheDepthByKey: new Map<string, number>(),
freshnessAgeMsByKey: new Map<string, number>()
};
constructor(
@ -354,15 +355,30 @@ export class LiveStateManager {
genericHydrateFromClickHouse: number;
trimOperations: number;
cacheDepthByKey: Record<string, number>;
freshnessAgeMsByKey: Record<string, number>;
} {
return {
genericHydrateFromRedis: this.stats.genericHydrateFromRedis,
genericHydrateFromClickHouse: this.stats.genericHydrateFromClickHouse,
trimOperations: this.stats.trimOperations,
cacheDepthByKey: Object.fromEntries(this.stats.cacheDepthByKey)
cacheDepthByKey: Object.fromEntries(this.stats.cacheDepthByKey),
freshnessAgeMsByKey: Object.fromEntries(this.stats.freshnessAgeMsByKey)
};
}
private updateFreshnessMetric(listKey: string, channel: LiveChannel, item: unknown, now = Date.now()): void {
const ts =
channel === "equity-candles" || channel === "equity-overlay"
? typeof (item as { ts?: unknown })?.ts === "number"
? ((item as { ts: number }).ts as number)
: null
: extractFreshnessTs(channel, item);
if (typeof ts === "number" && Number.isFinite(ts)) {
this.stats.freshnessAgeMsByKey.set(listKey, Math.max(0, now - ts));
}
}
async hydrate(): Promise<void> {
const channels = Object.keys(this.generic) as LiveGenericChannel[];
await Promise.all(channels.map((channel) => this.hydrateGeneric(channel)));
@ -383,6 +399,7 @@ export class LiveStateManager {
this.genericItems.set(channel, cached);
this.stats.genericHydrateFromRedis += 1;
this.stats.cacheDepthByKey.set(config.redisKey, cached.length);
this.updateFreshnessMetric(config.redisKey, channel, cached[0]);
this.genericCursors.set(config.cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, config.cursorField)));
await this.persistList(
config.redisKey,
@ -405,6 +422,9 @@ export class LiveStateManager {
this.stats.genericHydrateFromClickHouse += 1;
this.stats.cacheDepthByKey.set(config.redisKey, fresh.length);
this.genericItems.set(channel, fresh);
if (fresh.length > 0) {
this.updateFreshnessMetric(config.redisKey, channel, fresh[0]);
}
const watermark = fresh[0] ? config.cursor(fresh[0]) : null;
this.genericCursors.set(config.cursorField, watermark);
await this.persistList(config.redisKey, config.cursorField, fresh, config.limit, watermark);
@ -542,6 +562,7 @@ export class LiveStateManager {
const candle = EquityCandleSchema.parse(item);
const key = candleRedisKey(candle.underlying_id, candle.interval_ms);
const cursorField = candleCursorField(candle.underlying_id, candle.interval_ms);
const previousCursor = this.candleCursors.get(cursorField) ?? null;
const items = this.candleItems.get(key) ?? [];
const next = [candle, ...items]
.sort((a, b) => (b.ts - a.ts) || (b.seq - a.seq))
@ -550,13 +571,22 @@ export class LiveStateManager {
this.stats.cacheDepthByKey.set(key, next.length);
const cursor = { ts: candle.ts, seq: candle.seq };
this.candleCursors.set(cursorField, cursor);
if (next.length > 0) {
this.updateFreshnessMetric(key, "equity-candles", next[0]);
}
const outOfOrder = previousCursor ? compareCursors(cursor, previousCursor) > 0 : false;
if (outOfOrder) {
await this.persistList(key, cursorField, next, CHART_LIMITS.candles, cursor);
} else {
await this.persistItem(key, cursorField, candle, CHART_LIMITS.candles, cursor, next.length);
}
return cursor;
}
case "equity-overlay": {
const print = EquityPrintSchema.parse(item);
const key = overlayRedisKey(print.underlying_id);
const cursorField = overlayCursorField(print.underlying_id);
const previousCursor = this.overlayCursors.get(cursorField) ?? null;
const items = this.overlayItems.get(key) ?? [];
const next = [print, ...items]
.sort((a, b) => (b.ts - a.ts) || (b.seq - a.seq))
@ -565,7 +595,15 @@ export class LiveStateManager {
this.stats.cacheDepthByKey.set(key, next.length);
const cursor = { ts: print.ts, seq: print.seq };
this.overlayCursors.set(cursorField, cursor);
if (next.length > 0) {
this.updateFreshnessMetric(key, "equity-overlay", next[0]);
}
const outOfOrder = previousCursor ? compareCursors(cursor, previousCursor) > 0 : false;
if (outOfOrder) {
await this.persistList(key, cursorField, next, CHART_LIMITS.overlay, cursor);
} else {
await this.persistItem(key, cursorField, print, CHART_LIMITS.overlay, cursor, next.length);
}
return cursor;
}
default: {
@ -574,13 +612,22 @@ export class LiveStateManager {
if (!isWithinLiveFeedLookback(channel, parsed)) {
return null;
}
const previousCursor = this.genericCursors.get(config.cursorField) ?? null;
const items = this.genericItems.get(channel) ?? [];
const next = normalizeGenericItems(channel, [parsed, ...items], config);
this.genericItems.set(channel, next);
this.stats.cacheDepthByKey.set(config.redisKey, next.length);
const cursor = config.cursor(parsed);
this.genericCursors.set(config.cursorField, cursor);
if (next.length > 0) {
this.updateFreshnessMetric(config.redisKey, channel, next[0]);
}
const outOfOrder = previousCursor ? compareCursors(cursor, previousCursor) > 0 : false;
if (channel === "nbbo" || outOfOrder) {
await this.persistList(config.redisKey, config.cursorField, next, config.limit, cursor);
} else {
await this.persistItem(config.redisKey, config.cursorField, parsed, config.limit, cursor, next.length);
}
return cursor;
}
}
@ -595,6 +642,7 @@ export class LiveStateManager {
if (cached.length > 0) {
this.candleItems.set(key, cached);
this.stats.cacheDepthByKey.set(key, cached.length);
this.updateFreshnessMetric(key, "equity-candles", cached[0]);
this.candleCursors.set(cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, cursorField)));
return;
}
@ -603,6 +651,9 @@ export class LiveStateManager {
const fresh = await fetchRecentEquityCandles(this.clickhouse, underlyingId, intervalMs, CHART_LIMITS.candles);
this.candleItems.set(key, fresh);
this.stats.cacheDepthByKey.set(key, fresh.length);
if (fresh.length > 0) {
this.updateFreshnessMetric(key, "equity-candles", fresh[0]);
}
const watermark = fresh[0] ? { ts: fresh[0].ts, seq: fresh[0].seq } : null;
this.candleCursors.set(cursorField, watermark);
await this.persistList(key, cursorField, fresh, CHART_LIMITS.candles, watermark);
@ -617,6 +668,7 @@ export class LiveStateManager {
if (cached.length > 0) {
this.overlayItems.set(key, cached);
this.stats.cacheDepthByKey.set(key, cached.length);
this.updateFreshnessMetric(key, "equity-overlay", cached[0]);
this.overlayCursors.set(cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, cursorField)));
return;
}
@ -627,11 +679,33 @@ export class LiveStateManager {
);
this.overlayItems.set(key, fresh);
this.stats.cacheDepthByKey.set(key, fresh.length);
if (fresh.length > 0) {
this.updateFreshnessMetric(key, "equity-overlay", fresh[0]);
}
const watermark = fresh[0] ? { ts: fresh[0].ts, seq: fresh[0].seq } : null;
this.overlayCursors.set(cursorField, watermark);
await this.persistList(key, cursorField, fresh, CHART_LIMITS.overlay, watermark);
}
private async persistItem<T>(
listKey: string,
cursorField: string,
item: T,
limit: number,
cursor: Cursor | null,
depth: number
): Promise<void> {
if (!this.redis?.isOpen) {
return;
}
await this.redis.lPush(listKey, JSON.stringify(item));
await this.redis.lTrim(listKey, 0, limit - 1);
this.stats.trimOperations += 1;
this.stats.cacheDepthByKey.set(listKey, Math.min(depth, limit));
await this.redis.hSet(CURSOR_HASH_KEY, cursorField, JSON.stringify(cursor));
}
private async persistList<T>(
listKey: string,
cursorField: string,