Emit equity candles on schedule to avoid stalled charts

This commit is contained in:
dirtydishes 2026-01-10 16:08:23 -05:00
parent 980bb4f1b1
commit 9edf8fcbc5
3 changed files with 41 additions and 0 deletions

View file

@ -233,6 +233,18 @@ export class CandleAggregator {
return { emitted, droppedLate };
}
flushExpired(now: number): EquityCandle[] {
const watermark = Math.max(0, Math.floor(now) - this.maxLateMs);
const emitted: EquityCandle[] = [];
for (const state of this.stateByKey.values()) {
state.lastTsSeen = Math.max(state.lastTsSeen, watermark);
emitted.push(...flushState(state, watermark));
}
return emitted;
}
drain(): EquityCandle[] {
const emitted: EquityCandle[] = [];

View file

@ -329,6 +329,18 @@ const run = async () => {
let droppedLate = 0;
let lastLateLog = Date.now();
const flushExpired = async () => {
const expired = aggregator.flushExpired(Date.now());
for (const candle of expired) {
const validated = EquityCandleSchema.parse(candle);
await emitCandle(clickhouse, js, redis, validated, env.CANDLE_CACHE_LIMIT);
}
};
const flushTimer = setInterval(() => {
void flushExpired();
}, 1000);
const loop = async () => {
for await (const msg of subscription.messages) {
try {
@ -365,6 +377,8 @@ const run = async () => {
const shutdown = async (signal: string) => {
logger.info("service stopping", { signal });
clearInterval(flushTimer);
await flushExpired();
const remaining = aggregator.drain();
for (const candle of remaining) {
const validated = EquityCandleSchema.parse(candle);

View file

@ -78,4 +78,19 @@ describe("CandleAggregator", () => {
expect(lateResult.emitted).toHaveLength(0);
expect(lateResult.droppedLate).toBe(1);
});
test("flushes expired windows without new prints", () => {
const aggregator = new CandleAggregator({ intervalsMs: [1000], maxLateMs: 0 });
const first = buildPrint({ ts: 1000, price: 10, size: 100, seq: 1 });
const second = buildPrint({ ts: 1500, price: 12, size: 50, seq: 2 });
aggregator.ingest(first);
aggregator.ingest(second);
const emitted = aggregator.flushExpired(2500);
expect(emitted).toHaveLength(1);
expect(emitted[0].ts).toBe(1000);
expect(emitted[0].close).toBe(12);
});
});