stabilize live api memory and add an options pipeline explainer #8

Open
dirtydishes wants to merge 6 commits from stabilize-live-api-memory into main
Showing only changes of commit 20397fdef3 - Show all commits

View file

@ -614,6 +614,9 @@ export type LiveStateStatsSnapshot = {
const isLiveStateConfig = (value: GenericLiveLimits | LiveStateConfig): value is LiveStateConfig =>
"limits" in value;
const isRedisClientClosedError = (error: unknown): boolean =>
error instanceof Error && error.message.toLowerCase().includes("client is closed");
const clampConfiguredLimit = (channel: LiveGenericChannel, value: number): number =>
Math.max(MIN_GENERIC_LIMIT, Math.min(LIVE_GENERIC_LIMIT_CAPS[channel], Math.floor(value)));
@ -656,6 +659,7 @@ export class LiveStateManager {
private readonly overlayAccess = new Map<string, number>();
private readonly pendingRedisWrites = new Map<string, BufferedRedisWrite>();
private readonly redisFlushTimer: ReturnType<typeof setInterval> | null;
private redisFlushInFlight: Promise<void> | null = null;
private readonly stats = {
genericHydrateFromRedis: 0,
genericHydrateFromClickHouse: 0,
@ -723,6 +727,22 @@ export class LiveStateManager {
}
async flushRedisWrites(): Promise<void> {
if (this.redisFlushInFlight) {
return this.redisFlushInFlight;
}
this.redisFlushInFlight = this.flushRedisWritesInternal();
try {
await this.redisFlushInFlight;
} finally {
this.redisFlushInFlight = null;
if (this.pendingRedisWrites.size > 0 && this.redis?.isOpen) {
void this.flushRedisWrites();
}
}
}
private async flushRedisWritesInternal(): Promise<void> {
if (!this.redis?.isOpen) {
return;
}
@ -732,20 +752,34 @@ export class LiveStateManager {
for (const write of writes) {
if (write.mode === "rewrite") {
await this.persistList(write.listKey, write.cursorField, write.items, write.limit, write.cursor);
try {
await this.persistList(write.listKey, write.cursorField, write.items, write.limit, write.cursor);
} catch (error) {
if (isRedisClientClosedError(error)) {
return;
}
throw error;
}
this.stats.redisFlushItems += write.items.length;
this.stats.redisFlushPayloadBytes += write.items.reduce(
(total, item) => total + JSON.stringify(item).length,
0
);
} else {
await this.persistListAppend(
write.listKey,
write.cursorField,
write.payloads,
write.limit,
write.cursor
);
try {
await this.persistListAppend(
write.listKey,
write.cursorField,
write.payloads,
write.limit,
write.cursor
);
} catch (error) {
if (isRedisClientClosedError(error)) {
return;
}
throw error;
}
this.stats.redisFlushItems += write.payloads.length;
this.stats.redisFlushPayloadBytes += write.payloads.reduce((total, payload) => total + payload.length, 0);
}