From 89aaf63d34629732a1f3b6f49a62e2cd34498610 Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Tue, 28 Apr 2026 17:26:48 -0400 Subject: [PATCH] Drop stale backlog events from /ws/live fanout --- .beads/issues.jsonl | 1 + services/api/src/index.ts | 12 +++++++++++- services/api/src/live.ts | 29 +++++++++++++++++++++-------- services/api/tests/live.test.ts | 29 ++++++++++++++++++++++++++++- 4 files changed, 61 insertions(+), 10 deletions(-) diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index d5a4458..051dbad 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -1,3 +1,4 @@ +{"_type":"issue","id":"islandflow-ayo","title":"Drop stale backlog events from live fanout","description":"Follow-up to live freshness rollout: /ws/live was still fanning out stale backlog events for freshness-gated channels, which kept tape panes in Live feed behind despite active synthetic ingest. Gate fanout and cache ingest by freshness for options/nbbo/equities/flow.","status":"closed","priority":1,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T21:26:39Z","created_by":"dirtydishes","updated_at":"2026-04-28T21:26:44Z","started_at":"2026-04-28T21:26:44Z","closed_at":"2026-04-28T21:26:44Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-0v6","title":"Fix tape freshness, NBBO coverage, pause controls, and filter popup","description":"Implement the tape fixes requested for synthetic options notional sizing, strict live freshness, live-mode pause/resume behavior, stronger NBBO snapshot coverage, and moving flow filters behind a popup. Includes server-side live cache changes, web terminal state/UI changes, and tests for synthetic pricing, live snapshot freshness/NBBO retention, and live pause/filter interactions.","status":"closed","priority":1,"issue_type":"task","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T21:02:52Z","created_by":"dirtydishes","updated_at":"2026-04-28T21:13:38Z","started_at":"2026-04-28T21:02:57Z","closed_at":"2026-04-28T21:13:38Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-e4r","title":"Implement smart-money flow filtering and synthetic firehose modes","description":"Implement the approved multi-surface plan for named synthetic market profiles, options raw-vs-signal filtering, live/API filter contracts, Tape page client-side flow filters, firehose-readiness improvements, tests, and README updates.","status":"closed","priority":1,"issue_type":"feature","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T20:10:49Z","created_by":"dirtydishes","updated_at":"2026-04-28T20:29:29Z","started_at":"2026-04-28T20:10:53Z","closed_at":"2026-04-28T20:29:29Z","close_reason":"Implemented synthetic market profiles, options signal-path filtering, signal-aware API/replay contracts, Tape page filters, tests, and README updates. Follow-up tracked in islandflow-biq.","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-biq","title":"Finish raw live options delivery and filter/backpressure observability","description":"The smart-money signal path and Tape filters are in place, but the next firehose pass should finish server-side selective raw live delivery for options subscriptions and add explicit filtered-out/backpressure observability for API/web counters. This was discovered while landing islandflow-e4r.\n","status":"open","priority":2,"issue_type":"task","owner":"dishes@dpdrm.com","created_at":"2026-04-28T20:28:58Z","created_by":"dirtydishes","updated_at":"2026-04-28T20:28:58Z","dependencies":[{"issue_id":"islandflow-biq","depends_on_id":"islandflow-e4r","type":"discovered-from","created_at":"2026-04-28T16:28:58Z","created_by":"dirtydishes","metadata":"{}"}],"dependency_count":0,"dependent_count":0,"comment_count":0} diff --git a/services/api/src/index.ts b/services/api/src/index.ts index c0bb2b5..9aedabc 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -99,7 +99,7 @@ import { } from "@islandflow/types"; import { createClient } from "redis"; import { z } from "zod"; -import { LiveStateManager } from "./live"; +import { LiveStateManager, isLiveItemFresh } from "./live"; const service = "api"; const logger = createLogger({ service }); @@ -860,6 +860,16 @@ const run = async () => { item: unknown, ingestChannel: "options" | "nbbo" | "equities" | "equity-candles" | "equity-overlay" | "equity-joins" | "flow" | "classifier-hits" | "alerts" | "inferred-dark" ) => { + if ( + (ingestChannel === "options" || + ingestChannel === "nbbo" || + ingestChannel === "equities" || + ingestChannel === "flow") && + !isLiveItemFresh(ingestChannel, item) + ) { + return; + } + const watermark = await liveState.ingest(ingestChannel, item); const matchingSubscriptions = subscription.channel === "options" || subscription.channel === "flow" diff --git a/services/api/src/live.ts b/services/api/src/live.ts index 81234c0..df916fb 100644 --- a/services/api/src/live.ts +++ b/services/api/src/live.ts @@ -65,7 +65,7 @@ type GenericFeedConfig = { fetchRecent: (clickhouse: ClickHouseClient, limit: number) => Promise; }; -const LIVE_FRESHNESS_THRESHOLDS: Partial> = { +export const LIVE_FRESHNESS_THRESHOLDS: Partial> = { options: 15_000, nbbo: 15_000, equities: 15_000, @@ -259,6 +259,22 @@ const extractFreshnessTs = (channel: LiveGenericChannel, item: any): number | nu } }; +export const isLiveItemFresh = ( + channel: LiveGenericChannel, + item: unknown, + now = Date.now() +): boolean => { + const thresholdMs = LIVE_FRESHNESS_THRESHOLDS[channel]; + if (!thresholdMs) { + return true; + } + const ts = extractFreshnessTs(channel, item); + if (ts === null) { + return false; + } + return now - ts <= thresholdMs; +}; + const filterFreshGenericItems = ( channel: LiveGenericChannel, items: T[], @@ -269,13 +285,7 @@ const filterFreshGenericItems = ( return items; } - return items.filter((item) => { - const ts = extractFreshnessTs(channel, item); - if (ts === null) { - return false; - } - return now - ts <= thresholdMs; - }); + return items.filter((item) => isLiveItemFresh(channel, item, now)); }; const nextBeforeForItems = (items: T[], cursorOf: (item: T) => Cursor): Cursor | null => { @@ -503,6 +513,9 @@ export class LiveStateManager { default: { const config = this.generic[channel]; const parsed = config.parse(item); + if (!isLiveItemFresh(channel, parsed)) { + return this.genericCursors.get(config.cursorField) ?? null; + } const items = this.genericItems.get(channel) ?? []; const next = normalizeGenericItems(channel, [parsed, ...items], config); this.genericItems.set(channel, next); diff --git a/services/api/tests/live.test.ts b/services/api/tests/live.test.ts index f40eb1f..21bcd28 100644 --- a/services/api/tests/live.test.ts +++ b/services/api/tests/live.test.ts @@ -1,6 +1,6 @@ import { describe, expect, it } from "bun:test"; import type { ClickHouseClient } from "@islandflow/storage"; -import { LiveStateManager, resolveGenericLiveLimits } from "../src/live"; +import { LiveStateManager, isLiveItemFresh, resolveGenericLiveLimits } from "../src/live"; const makeClickHouse = (): ClickHouseClient => ({ @@ -475,4 +475,31 @@ describe("LiveStateManager", () => { ["MSFT-2025-01-17-300-C", "nbbo-other"] ]); }); + + it("rejects stale ingest for freshness-gated channels", async () => { + const manager = new LiveStateManager(makeClickHouse(), null); + const now = Date.now(); + + await manager.ingest("equities", { + source_ts: now - 60_000, + ingest_ts: now - 59_999, + seq: 1, + trace_id: "eq-stale", + ts: now - 60_000, + underlying_id: "AAPL", + price: 100, + size: 10, + exchange: "X", + offExchangeFlag: false + }); + + const snapshot = await manager.getSnapshot({ channel: "equities" }); + expect(snapshot.items).toHaveLength(0); + }); + + it("exposes freshness helper for event fanout gating", () => { + expect(isLiveItemFresh("options", { ts: 1000 }, 1010)).toBe(true); + expect(isLiveItemFresh("options", { ts: 1000 }, 20_001)).toBe(false); + expect(isLiveItemFresh("equity-joins", { source_ts: 1 }, 1_000_000)).toBe(true); + }); });