diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index 287cf8e..eb80781 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -1,3 +1,4 @@ +{"_type":"issue","id":"islandflow-sh1","title":"Fix live websocket stale lag and reconnect loop","description":"Investigate and fix API live consumer lag causing stale timestamps, feed-behind status, and reconnect loops. Optimize live cache persistence path, add lag telemetry/alerts, and validate in runtime.","status":"closed","priority":1,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-04T17:04:34Z","created_by":"dirtydishes","updated_at":"2026-05-04T17:09:44Z","started_at":"2026-05-04T17:04:38Z","closed_at":"2026-05-04T17:09:44Z","close_reason":"Completed: optimized live cache persistence path, added lag telemetry, deployed api via docker compose on di, verified ws freshness and low hotFeedLagMs","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-b3o","title":"Implement options tape table with execution spot","description":"Redesign OptionsPane into a dense classifier-colored table and preserve execution-time underlying spot on option prints from equity quote mid.","status":"closed","priority":1,"issue_type":"feature","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-04T04:41:59Z","created_by":"dirtydishes","updated_at":"2026-05-04T05:14:26Z","started_at":"2026-05-04T04:42:08Z","closed_at":"2026-05-04T05:14:26Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-ug1","title":"Fix false NBBO-missing badges in live Options tape","description":"Investigate and fix client-side cases where Options rows show NBBO missing/stale even when a fresh NBBO quote exists in the live nbbo map. Update rendering logic to prefer fresh quote-derived status and add regression tests.","status":"closed","priority":1,"issue_type":"task","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-29T15:58:31Z","created_by":"dirtydishes","updated_at":"2026-04-29T16:01:28Z","started_at":"2026-04-29T15:58:35Z","closed_at":"2026-04-29T16:01:28Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-ayo","title":"Drop stale backlog events from live fanout","description":"Follow-up to live freshness rollout: /ws/live was still fanning out stale backlog events for freshness-gated channels, which kept tape panes in Live feed behind despite active synthetic ingest. Gate fanout and cache ingest by freshness for options/nbbo/equities/flow.","status":"closed","priority":1,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T21:26:39Z","created_by":"dirtydishes","updated_at":"2026-04-28T21:26:44Z","started_at":"2026-04-28T21:26:44Z","closed_at":"2026-04-28T21:26:44Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0} diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 251f8ac..37830c3 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -116,7 +116,8 @@ const envSchema = z.object({ REDIS_URL: z.string().default("redis://127.0.0.1:6379"), REST_DEFAULT_LIMIT: z.coerce.number().int().positive().default(200), API_DELIVER_POLICY: DeliverPolicySchema.default("new"), - API_CONSUMER_RESET: z.coerce.boolean().default(false) + API_CONSUMER_RESET: z.coerce.boolean().default(false), + LIVE_LAG_WARN_MS: z.coerce.number().int().positive().default(120_000) }); const env = readEnv(envSchema); @@ -126,6 +127,13 @@ const state = { shutdownPromise: null as Promise | null }; +const HOT_LIVE_REDIS_KEYS = { + options: "live:options", + equities: "live:equities", + flow: "live:flow", + nbbo: "live:nbbo" +} as const; + const getErrorMessage = (error: unknown): string => { return error instanceof Error ? error.message : String(error); }; @@ -835,9 +843,38 @@ const run = async () => { const liveState = new LiveStateManager(clickhouse, redis); await liveState.hydrate(); + const warnLiveLag = ( + channel: keyof typeof HOT_LIVE_REDIS_KEYS, + ageMs: number | null | undefined + ) => { + if (typeof ageMs !== "number" || !Number.isFinite(ageMs)) { + return; + } + if (ageMs < env.LIVE_LAG_WARN_MS) { + return; + } + logger.warn("live feed lag exceeded threshold", { + channel, + age_ms: ageMs, + threshold_ms: env.LIVE_LAG_WARN_MS + }); + }; const liveStateMetricsTimer = setInterval(() => { const snapshot = liveState.getStatsSnapshot(); - logger.info("live cache metrics", snapshot); + const hotFeedLagMs = { + options: snapshot.freshnessAgeMsByKey[HOT_LIVE_REDIS_KEYS.options] ?? null, + equities: snapshot.freshnessAgeMsByKey[HOT_LIVE_REDIS_KEYS.equities] ?? null, + flow: snapshot.freshnessAgeMsByKey[HOT_LIVE_REDIS_KEYS.flow] ?? null, + nbbo: snapshot.freshnessAgeMsByKey[HOT_LIVE_REDIS_KEYS.nbbo] ?? null + }; + logger.info("live cache metrics", { + ...snapshot, + hotFeedLagMs + }); + warnLiveLag("options", hotFeedLagMs.options); + warnLiveLag("equities", hotFeedLagMs.equities); + warnLiveLag("flow", hotFeedLagMs.flow); + warnLiveLag("nbbo", hotFeedLagMs.nbbo); }, 60000); const consumerBindings = [ diff --git a/services/api/src/live.ts b/services/api/src/live.ts index 89d7d26..36b7aee 100644 --- a/services/api/src/live.ts +++ b/services/api/src/live.ts @@ -338,7 +338,8 @@ export class LiveStateManager { genericHydrateFromRedis: 0, genericHydrateFromClickHouse: 0, trimOperations: 0, - cacheDepthByKey: new Map() + cacheDepthByKey: new Map(), + freshnessAgeMsByKey: new Map() }; constructor( @@ -354,15 +355,30 @@ export class LiveStateManager { genericHydrateFromClickHouse: number; trimOperations: number; cacheDepthByKey: Record; + freshnessAgeMsByKey: Record; } { return { genericHydrateFromRedis: this.stats.genericHydrateFromRedis, genericHydrateFromClickHouse: this.stats.genericHydrateFromClickHouse, trimOperations: this.stats.trimOperations, - cacheDepthByKey: Object.fromEntries(this.stats.cacheDepthByKey) + cacheDepthByKey: Object.fromEntries(this.stats.cacheDepthByKey), + freshnessAgeMsByKey: Object.fromEntries(this.stats.freshnessAgeMsByKey) }; } + private updateFreshnessMetric(listKey: string, channel: LiveChannel, item: unknown, now = Date.now()): void { + const ts = + channel === "equity-candles" || channel === "equity-overlay" + ? typeof (item as { ts?: unknown })?.ts === "number" + ? ((item as { ts: number }).ts as number) + : null + : extractFreshnessTs(channel, item); + + if (typeof ts === "number" && Number.isFinite(ts)) { + this.stats.freshnessAgeMsByKey.set(listKey, Math.max(0, now - ts)); + } + } + async hydrate(): Promise { const channels = Object.keys(this.generic) as LiveGenericChannel[]; await Promise.all(channels.map((channel) => this.hydrateGeneric(channel))); @@ -383,6 +399,7 @@ export class LiveStateManager { this.genericItems.set(channel, cached); this.stats.genericHydrateFromRedis += 1; this.stats.cacheDepthByKey.set(config.redisKey, cached.length); + this.updateFreshnessMetric(config.redisKey, channel, cached[0]); this.genericCursors.set(config.cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, config.cursorField))); await this.persistList( config.redisKey, @@ -405,6 +422,9 @@ export class LiveStateManager { this.stats.genericHydrateFromClickHouse += 1; this.stats.cacheDepthByKey.set(config.redisKey, fresh.length); this.genericItems.set(channel, fresh); + if (fresh.length > 0) { + this.updateFreshnessMetric(config.redisKey, channel, fresh[0]); + } const watermark = fresh[0] ? config.cursor(fresh[0]) : null; this.genericCursors.set(config.cursorField, watermark); await this.persistList(config.redisKey, config.cursorField, fresh, config.limit, watermark); @@ -542,6 +562,7 @@ export class LiveStateManager { const candle = EquityCandleSchema.parse(item); const key = candleRedisKey(candle.underlying_id, candle.interval_ms); const cursorField = candleCursorField(candle.underlying_id, candle.interval_ms); + const previousCursor = this.candleCursors.get(cursorField) ?? null; const items = this.candleItems.get(key) ?? []; const next = [candle, ...items] .sort((a, b) => (b.ts - a.ts) || (b.seq - a.seq)) @@ -550,13 +571,22 @@ export class LiveStateManager { this.stats.cacheDepthByKey.set(key, next.length); const cursor = { ts: candle.ts, seq: candle.seq }; this.candleCursors.set(cursorField, cursor); - await this.persistList(key, cursorField, next, CHART_LIMITS.candles, cursor); + if (next.length > 0) { + this.updateFreshnessMetric(key, "equity-candles", next[0]); + } + const outOfOrder = previousCursor ? compareCursors(cursor, previousCursor) > 0 : false; + if (outOfOrder) { + await this.persistList(key, cursorField, next, CHART_LIMITS.candles, cursor); + } else { + await this.persistItem(key, cursorField, candle, CHART_LIMITS.candles, cursor, next.length); + } return cursor; } case "equity-overlay": { const print = EquityPrintSchema.parse(item); const key = overlayRedisKey(print.underlying_id); const cursorField = overlayCursorField(print.underlying_id); + const previousCursor = this.overlayCursors.get(cursorField) ?? null; const items = this.overlayItems.get(key) ?? []; const next = [print, ...items] .sort((a, b) => (b.ts - a.ts) || (b.seq - a.seq)) @@ -565,7 +595,15 @@ export class LiveStateManager { this.stats.cacheDepthByKey.set(key, next.length); const cursor = { ts: print.ts, seq: print.seq }; this.overlayCursors.set(cursorField, cursor); - await this.persistList(key, cursorField, next, CHART_LIMITS.overlay, cursor); + if (next.length > 0) { + this.updateFreshnessMetric(key, "equity-overlay", next[0]); + } + const outOfOrder = previousCursor ? compareCursors(cursor, previousCursor) > 0 : false; + if (outOfOrder) { + await this.persistList(key, cursorField, next, CHART_LIMITS.overlay, cursor); + } else { + await this.persistItem(key, cursorField, print, CHART_LIMITS.overlay, cursor, next.length); + } return cursor; } default: { @@ -574,13 +612,22 @@ export class LiveStateManager { if (!isWithinLiveFeedLookback(channel, parsed)) { return null; } + const previousCursor = this.genericCursors.get(config.cursorField) ?? null; const items = this.genericItems.get(channel) ?? []; const next = normalizeGenericItems(channel, [parsed, ...items], config); this.genericItems.set(channel, next); this.stats.cacheDepthByKey.set(config.redisKey, next.length); const cursor = config.cursor(parsed); this.genericCursors.set(config.cursorField, cursor); - await this.persistList(config.redisKey, config.cursorField, next, config.limit, cursor); + if (next.length > 0) { + this.updateFreshnessMetric(config.redisKey, channel, next[0]); + } + const outOfOrder = previousCursor ? compareCursors(cursor, previousCursor) > 0 : false; + if (channel === "nbbo" || outOfOrder) { + await this.persistList(config.redisKey, config.cursorField, next, config.limit, cursor); + } else { + await this.persistItem(config.redisKey, config.cursorField, parsed, config.limit, cursor, next.length); + } return cursor; } } @@ -595,6 +642,7 @@ export class LiveStateManager { if (cached.length > 0) { this.candleItems.set(key, cached); this.stats.cacheDepthByKey.set(key, cached.length); + this.updateFreshnessMetric(key, "equity-candles", cached[0]); this.candleCursors.set(cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, cursorField))); return; } @@ -603,6 +651,9 @@ export class LiveStateManager { const fresh = await fetchRecentEquityCandles(this.clickhouse, underlyingId, intervalMs, CHART_LIMITS.candles); this.candleItems.set(key, fresh); this.stats.cacheDepthByKey.set(key, fresh.length); + if (fresh.length > 0) { + this.updateFreshnessMetric(key, "equity-candles", fresh[0]); + } const watermark = fresh[0] ? { ts: fresh[0].ts, seq: fresh[0].seq } : null; this.candleCursors.set(cursorField, watermark); await this.persistList(key, cursorField, fresh, CHART_LIMITS.candles, watermark); @@ -617,6 +668,7 @@ export class LiveStateManager { if (cached.length > 0) { this.overlayItems.set(key, cached); this.stats.cacheDepthByKey.set(key, cached.length); + this.updateFreshnessMetric(key, "equity-overlay", cached[0]); this.overlayCursors.set(cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, cursorField))); return; } @@ -627,11 +679,33 @@ export class LiveStateManager { ); this.overlayItems.set(key, fresh); this.stats.cacheDepthByKey.set(key, fresh.length); + if (fresh.length > 0) { + this.updateFreshnessMetric(key, "equity-overlay", fresh[0]); + } const watermark = fresh[0] ? { ts: fresh[0].ts, seq: fresh[0].seq } : null; this.overlayCursors.set(cursorField, watermark); await this.persistList(key, cursorField, fresh, CHART_LIMITS.overlay, watermark); } + private async persistItem( + listKey: string, + cursorField: string, + item: T, + limit: number, + cursor: Cursor | null, + depth: number + ): Promise { + if (!this.redis?.isOpen) { + return; + } + + await this.redis.lPush(listKey, JSON.stringify(item)); + await this.redis.lTrim(listKey, 0, limit - 1); + this.stats.trimOperations += 1; + this.stats.cacheDepthByKey.set(listKey, Math.min(depth, limit)); + await this.redis.hSet(CURSOR_HASH_KEY, cursorField, JSON.stringify(cursor)); + } + private async persistList( listKey: string, cursorField: string,