From 9edf8fcbc5dfafe182c1bb413c52980590a19cd0 Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Sat, 10 Jan 2026 16:08:23 -0500 Subject: [PATCH] Emit equity candles on schedule to avoid stalled charts --- services/candles/src/aggregator.ts | 12 ++++++++++++ services/candles/src/index.ts | 14 ++++++++++++++ services/candles/tests/aggregator.test.ts | 15 +++++++++++++++ 3 files changed, 41 insertions(+) diff --git a/services/candles/src/aggregator.ts b/services/candles/src/aggregator.ts index e00d9b9..37a8921 100644 --- a/services/candles/src/aggregator.ts +++ b/services/candles/src/aggregator.ts @@ -233,6 +233,18 @@ export class CandleAggregator { return { emitted, droppedLate }; } + flushExpired(now: number): EquityCandle[] { + const watermark = Math.max(0, Math.floor(now) - this.maxLateMs); + const emitted: EquityCandle[] = []; + + for (const state of this.stateByKey.values()) { + state.lastTsSeen = Math.max(state.lastTsSeen, watermark); + emitted.push(...flushState(state, watermark)); + } + + return emitted; + } + drain(): EquityCandle[] { const emitted: EquityCandle[] = []; diff --git a/services/candles/src/index.ts b/services/candles/src/index.ts index a625509..a02ab70 100644 --- a/services/candles/src/index.ts +++ b/services/candles/src/index.ts @@ -329,6 +329,18 @@ const run = async () => { let droppedLate = 0; let lastLateLog = Date.now(); + const flushExpired = async () => { + const expired = aggregator.flushExpired(Date.now()); + for (const candle of expired) { + const validated = EquityCandleSchema.parse(candle); + await emitCandle(clickhouse, js, redis, validated, env.CANDLE_CACHE_LIMIT); + } + }; + + const flushTimer = setInterval(() => { + void flushExpired(); + }, 1000); + const loop = async () => { for await (const msg of subscription.messages) { try { @@ -365,6 +377,8 @@ const run = async () => { const shutdown = async (signal: string) => { logger.info("service stopping", { signal }); + clearInterval(flushTimer); + await flushExpired(); const remaining = aggregator.drain(); for (const candle of remaining) { const validated = EquityCandleSchema.parse(candle); diff --git a/services/candles/tests/aggregator.test.ts b/services/candles/tests/aggregator.test.ts index 79a39b2..bbfe57b 100644 --- a/services/candles/tests/aggregator.test.ts +++ b/services/candles/tests/aggregator.test.ts @@ -78,4 +78,19 @@ describe("CandleAggregator", () => { expect(lateResult.emitted).toHaveLength(0); expect(lateResult.droppedLate).toBe(1); }); + + test("flushes expired windows without new prints", () => { + const aggregator = new CandleAggregator({ intervalsMs: [1000], maxLateMs: 0 }); + + const first = buildPrint({ ts: 1000, price: 10, size: 100, seq: 1 }); + const second = buildPrint({ ts: 1500, price: 12, size: 50, seq: 2 }); + + aggregator.ingest(first); + aggregator.ingest(second); + + const emitted = aggregator.flushExpired(2500); + expect(emitted).toHaveLength(1); + expect(emitted[0].ts).toBe(1000); + expect(emitted[0].close).toBe(12); + }); });