From b88ef2b371f34e6b595ca10b3b0552c370125468 Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Mon, 4 May 2026 04:59:09 -0400 Subject: [PATCH] Stream delayed live feed events --- services/api/src/index.ts | 11 ++--------- services/api/src/live.ts | 2 ++ services/api/tests/live.test.ts | 15 +++++++++++++-- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 911c4bf..e898e45 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -100,7 +100,7 @@ import { } from "@islandflow/types"; import { createClient } from "redis"; import { z } from "zod"; -import { LiveStateManager, isLiveItemFresh } from "./live"; +import { LiveStateManager, shouldFanoutLiveEvent } from "./live"; const service = "api"; const logger = createLogger({ service }); @@ -982,14 +982,7 @@ const run = async () => { ) => { const watermark = await liveState.ingest(ingestChannel, item); - if ( - (ingestChannel === "options" || - ingestChannel === "nbbo" || - ingestChannel === "equities" || - ingestChannel === "equity-quotes" || - ingestChannel === "flow") && - !isLiveItemFresh(ingestChannel, item) - ) { + if (!shouldFanoutLiveEvent(ingestChannel, item)) { return; } diff --git a/services/api/src/live.ts b/services/api/src/live.ts index f10cb33..c15774f 100644 --- a/services/api/src/live.ts +++ b/services/api/src/live.ts @@ -289,6 +289,8 @@ export const isLiveItemFresh = ( return now - ts <= thresholdMs; }; +export const shouldFanoutLiveEvent = (_channel: LiveChannel, _item: unknown): boolean => true; + const nextBeforeForItems = (items: T[], cursorOf: (item: T) => Cursor): Cursor | null => { const last = items.at(-1); return last ? cursorOf(last) : null; diff --git a/services/api/tests/live.test.ts b/services/api/tests/live.test.ts index 41ad732..784fafd 100644 --- a/services/api/tests/live.test.ts +++ b/services/api/tests/live.test.ts @@ -1,6 +1,11 @@ import { describe, expect, it } from "bun:test"; import type { ClickHouseClient } from "@islandflow/storage"; -import { LiveStateManager, isLiveItemFresh, resolveGenericLiveLimits } from "../src/live"; +import { + LiveStateManager, + isLiveItemFresh, + resolveGenericLiveLimits, + shouldFanoutLiveEvent +} from "../src/live"; const makeClickHouse = (): ClickHouseClient => ({ @@ -567,9 +572,15 @@ describe("LiveStateManager", () => { expect(persisted).toHaveLength(1); }); - it("exposes freshness helper for event fanout gating", () => { + it("exposes freshness helper for feed status", () => { expect(isLiveItemFresh("options", { ts: 1000 }, 1010)).toBe(true); expect(isLiveItemFresh("options", { ts: 1000 }, 20_001)).toBe(false); expect(isLiveItemFresh("equity-joins", { source_ts: 1 }, 1_000_000)).toBe(true); }); + + it("fans out stale live events so delayed data remains visible without refresh", () => { + expect(shouldFanoutLiveEvent("options", { ts: 1000 })).toBe(true); + expect(shouldFanoutLiveEvent("equities", { ts: 1000 })).toBe(true); + expect(shouldFanoutLiveEvent("flow", { source_ts: 1000 })).toBe(true); + }); });