Stream delayed live feed events

This commit is contained in:
dirtydishes 2026-05-04 04:59:09 -04:00
parent 85dfebb8f0
commit b88ef2b371
3 changed files with 17 additions and 11 deletions

View file

@ -100,7 +100,7 @@ import {
} from "@islandflow/types"; } from "@islandflow/types";
import { createClient } from "redis"; import { createClient } from "redis";
import { z } from "zod"; import { z } from "zod";
import { LiveStateManager, isLiveItemFresh } from "./live"; import { LiveStateManager, shouldFanoutLiveEvent } from "./live";
const service = "api"; const service = "api";
const logger = createLogger({ service }); const logger = createLogger({ service });
@ -982,14 +982,7 @@ const run = async () => {
) => { ) => {
const watermark = await liveState.ingest(ingestChannel, item); const watermark = await liveState.ingest(ingestChannel, item);
if ( if (!shouldFanoutLiveEvent(ingestChannel, item)) {
(ingestChannel === "options" ||
ingestChannel === "nbbo" ||
ingestChannel === "equities" ||
ingestChannel === "equity-quotes" ||
ingestChannel === "flow") &&
!isLiveItemFresh(ingestChannel, item)
) {
return; return;
} }

View file

@ -289,6 +289,8 @@ export const isLiveItemFresh = (
return now - ts <= thresholdMs; return now - ts <= thresholdMs;
}; };
export const shouldFanoutLiveEvent = (_channel: LiveChannel, _item: unknown): boolean => true;
const nextBeforeForItems = <T>(items: T[], cursorOf: (item: T) => Cursor): Cursor | null => { const nextBeforeForItems = <T>(items: T[], cursorOf: (item: T) => Cursor): Cursor | null => {
const last = items.at(-1); const last = items.at(-1);
return last ? cursorOf(last) : null; return last ? cursorOf(last) : null;

View file

@ -1,6 +1,11 @@
import { describe, expect, it } from "bun:test"; import { describe, expect, it } from "bun:test";
import type { ClickHouseClient } from "@islandflow/storage"; import type { ClickHouseClient } from "@islandflow/storage";
import { LiveStateManager, isLiveItemFresh, resolveGenericLiveLimits } from "../src/live"; import {
LiveStateManager,
isLiveItemFresh,
resolveGenericLiveLimits,
shouldFanoutLiveEvent
} from "../src/live";
const makeClickHouse = (): ClickHouseClient => const makeClickHouse = (): ClickHouseClient =>
({ ({
@ -567,9 +572,15 @@ describe("LiveStateManager", () => {
expect(persisted).toHaveLength(1); expect(persisted).toHaveLength(1);
}); });
it("exposes freshness helper for event fanout gating", () => { it("exposes freshness helper for feed status", () => {
expect(isLiveItemFresh("options", { ts: 1000 }, 1010)).toBe(true); expect(isLiveItemFresh("options", { ts: 1000 }, 1010)).toBe(true);
expect(isLiveItemFresh("options", { ts: 1000 }, 20_001)).toBe(false); expect(isLiveItemFresh("options", { ts: 1000 }, 20_001)).toBe(false);
expect(isLiveItemFresh("equity-joins", { source_ts: 1 }, 1_000_000)).toBe(true); expect(isLiveItemFresh("equity-joins", { source_ts: 1 }, 1_000_000)).toBe(true);
}); });
it("fans out stale live events so delayed data remains visible without refresh", () => {
expect(shouldFanoutLiveEvent("options", { ts: 1000 })).toBe(true);
expect(shouldFanoutLiveEvent("equities", { ts: 1000 })).toBe(true);
expect(shouldFanoutLiveEvent("flow", { source_ts: 1000 })).toBe(true);
});
}); });