From 20397fdef37e03fb170aa48373b9df8f29897536 Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Fri, 22 May 2026 21:42:55 -0400 Subject: [PATCH] serialize redis flushes during api shutdown --- services/api/src/live.ts | 50 +++++++++++++++++++++++++++++++++------- 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/services/api/src/live.ts b/services/api/src/live.ts index 9687eec..2df3969 100644 --- a/services/api/src/live.ts +++ b/services/api/src/live.ts @@ -614,6 +614,9 @@ export type LiveStateStatsSnapshot = { const isLiveStateConfig = (value: GenericLiveLimits | LiveStateConfig): value is LiveStateConfig => "limits" in value; +const isRedisClientClosedError = (error: unknown): boolean => + error instanceof Error && error.message.toLowerCase().includes("client is closed"); + const clampConfiguredLimit = (channel: LiveGenericChannel, value: number): number => Math.max(MIN_GENERIC_LIMIT, Math.min(LIVE_GENERIC_LIMIT_CAPS[channel], Math.floor(value))); @@ -656,6 +659,7 @@ export class LiveStateManager { private readonly overlayAccess = new Map(); private readonly pendingRedisWrites = new Map(); private readonly redisFlushTimer: ReturnType | null; + private redisFlushInFlight: Promise | null = null; private readonly stats = { genericHydrateFromRedis: 0, genericHydrateFromClickHouse: 0, @@ -723,6 +727,22 @@ export class LiveStateManager { } async flushRedisWrites(): Promise { + if (this.redisFlushInFlight) { + return this.redisFlushInFlight; + } + + this.redisFlushInFlight = this.flushRedisWritesInternal(); + try { + await this.redisFlushInFlight; + } finally { + this.redisFlushInFlight = null; + if (this.pendingRedisWrites.size > 0 && this.redis?.isOpen) { + void this.flushRedisWrites(); + } + } + } + + private async flushRedisWritesInternal(): Promise { if (!this.redis?.isOpen) { return; } @@ -732,20 +752,34 @@ export class LiveStateManager { for (const write of writes) { if (write.mode === "rewrite") { - await this.persistList(write.listKey, write.cursorField, write.items, write.limit, write.cursor); + try { + await this.persistList(write.listKey, write.cursorField, write.items, write.limit, write.cursor); + } catch (error) { + if (isRedisClientClosedError(error)) { + return; + } + throw error; + } this.stats.redisFlushItems += write.items.length; this.stats.redisFlushPayloadBytes += write.items.reduce( (total, item) => total + JSON.stringify(item).length, 0 ); } else { - await this.persistListAppend( - write.listKey, - write.cursorField, - write.payloads, - write.limit, - write.cursor - ); + try { + await this.persistListAppend( + write.listKey, + write.cursorField, + write.payloads, + write.limit, + write.cursor + ); + } catch (error) { + if (isRedisClientClosedError(error)) { + return; + } + throw error; + } this.stats.redisFlushItems += write.payloads.length; this.stats.redisFlushPayloadBytes += write.payloads.reduce((total, payload) => total + payload.length, 0); }