Drop stale backlog events from /ws/live fanout

This commit is contained in:
dirtydishes 2026-04-28 17:26:48 -04:00
parent 75fc6f9373
commit 89aaf63d34
4 changed files with 61 additions and 10 deletions

View file

@ -99,7 +99,7 @@ import {
} from "@islandflow/types";
import { createClient } from "redis";
import { z } from "zod";
import { LiveStateManager } from "./live";
import { LiveStateManager, isLiveItemFresh } from "./live";
const service = "api";
const logger = createLogger({ service });
@ -860,6 +860,16 @@ const run = async () => {
item: unknown,
ingestChannel: "options" | "nbbo" | "equities" | "equity-candles" | "equity-overlay" | "equity-joins" | "flow" | "classifier-hits" | "alerts" | "inferred-dark"
) => {
if (
(ingestChannel === "options" ||
ingestChannel === "nbbo" ||
ingestChannel === "equities" ||
ingestChannel === "flow") &&
!isLiveItemFresh(ingestChannel, item)
) {
return;
}
const watermark = await liveState.ingest(ingestChannel, item);
const matchingSubscriptions =
subscription.channel === "options" || subscription.channel === "flow"

View file

@ -65,7 +65,7 @@ type GenericFeedConfig = {
fetchRecent: (clickhouse: ClickHouseClient, limit: number) => Promise<any[]>;
};
const LIVE_FRESHNESS_THRESHOLDS: Partial<Record<LiveGenericChannel, number>> = {
export const LIVE_FRESHNESS_THRESHOLDS: Partial<Record<LiveGenericChannel, number>> = {
options: 15_000,
nbbo: 15_000,
equities: 15_000,
@ -259,6 +259,22 @@ const extractFreshnessTs = (channel: LiveGenericChannel, item: any): number | nu
}
};
export const isLiveItemFresh = (
channel: LiveGenericChannel,
item: unknown,
now = Date.now()
): boolean => {
const thresholdMs = LIVE_FRESHNESS_THRESHOLDS[channel];
if (!thresholdMs) {
return true;
}
const ts = extractFreshnessTs(channel, item);
if (ts === null) {
return false;
}
return now - ts <= thresholdMs;
};
const filterFreshGenericItems = <T>(
channel: LiveGenericChannel,
items: T[],
@ -269,13 +285,7 @@ const filterFreshGenericItems = <T>(
return items;
}
return items.filter((item) => {
const ts = extractFreshnessTs(channel, item);
if (ts === null) {
return false;
}
return now - ts <= thresholdMs;
});
return items.filter((item) => isLiveItemFresh(channel, item, now));
};
const nextBeforeForItems = <T>(items: T[], cursorOf: (item: T) => Cursor): Cursor | null => {
@ -503,6 +513,9 @@ export class LiveStateManager {
default: {
const config = this.generic[channel];
const parsed = config.parse(item);
if (!isLiveItemFresh(channel, parsed)) {
return this.genericCursors.get(config.cursorField) ?? null;
}
const items = this.genericItems.get(channel) ?? [];
const next = normalizeGenericItems(channel, [parsed, ...items], config);
this.genericItems.set(channel, next);

View file

@ -1,6 +1,6 @@
import { describe, expect, it } from "bun:test";
import type { ClickHouseClient } from "@islandflow/storage";
import { LiveStateManager, resolveGenericLiveLimits } from "../src/live";
import { LiveStateManager, isLiveItemFresh, resolveGenericLiveLimits } from "../src/live";
const makeClickHouse = (): ClickHouseClient =>
({
@ -475,4 +475,31 @@ describe("LiveStateManager", () => {
["MSFT-2025-01-17-300-C", "nbbo-other"]
]);
});
it("rejects stale ingest for freshness-gated channels", async () => {
const manager = new LiveStateManager(makeClickHouse(), null);
const now = Date.now();
await manager.ingest("equities", {
source_ts: now - 60_000,
ingest_ts: now - 59_999,
seq: 1,
trace_id: "eq-stale",
ts: now - 60_000,
underlying_id: "AAPL",
price: 100,
size: 10,
exchange: "X",
offExchangeFlag: false
});
const snapshot = await manager.getSnapshot({ channel: "equities" });
expect(snapshot.items).toHaveLength(0);
});
it("exposes freshness helper for event fanout gating", () => {
expect(isLiveItemFresh("options", { ts: 1000 }, 1010)).toBe(true);
expect(isLiveItemFresh("options", { ts: 1000 }, 20_001)).toBe(false);
expect(isLiveItemFresh("equity-joins", { source_ts: 1 }, 1_000_000)).toBe(true);
});
});