Retain live history and warn on silent equities feeds

- Keep pausable live snapshots visible while stale
- Surface a connected-but-silent equities warning
- Add coverage for history retention and warning timing
This commit is contained in:
dirtydishes 2026-04-28 23:19:14 -04:00
parent 89aaf63d34
commit da942079f3
2 changed files with 161 additions and 15 deletions

View file

@ -5,7 +5,10 @@ import {
flushPausableTapeData,
getLiveFeedStatus,
nextFlowFilterPopoverState,
projectPausableTapeState,
reducePausableTapeData,
shouldRetainLiveSnapshotHistory,
shouldShowEquitiesSilentFeedWarning,
toggleFilterValue
} from "./terminal";
@ -53,6 +56,55 @@ describe("live tape pausable helpers", () => {
expect(getLiveFeedStatus("connected", 1000, 500, 1601)).toBe("stale");
expect(getLiveFeedStatus("disconnected", 1000, 500, 1601)).toBe("disconnected");
});
it("keeps visible history even when live status is stale", () => {
const projected = projectPausableTapeState([makeItem("stale", 7, 1000)], "stale", 2000);
expect(projected.items.map((item) => item.trace_id)).toEqual(["stale"]);
expect(projected.lastUpdate).toBeNull();
});
it("flags connected equities feeds that stay silent past threshold", () => {
expect(
shouldShowEquitiesSilentFeedWarning({
wsStatus: "connected",
equitiesSubscribed: true,
connectedAt: 1_000,
lastEquitiesEventAt: null,
now: 20_000,
thresholdMs: 25_000
})
).toBe(false);
expect(
shouldShowEquitiesSilentFeedWarning({
wsStatus: "connected",
equitiesSubscribed: true,
connectedAt: 1_000,
lastEquitiesEventAt: null,
now: 27_000,
thresholdMs: 25_000
})
).toBe(true);
expect(
shouldShowEquitiesSilentFeedWarning({
wsStatus: "connected",
equitiesSubscribed: true,
connectedAt: 1_000,
lastEquitiesEventAt: 20_000,
now: 40_000,
thresholdMs: 25_000
})
).toBe(false);
});
it("retains live history when freshness-gated snapshots are empty", () => {
expect(shouldRetainLiveSnapshotHistory("options", true, 0, 3)).toBe(true);
expect(shouldRetainLiveSnapshotHistory("equities", true, 0, 2)).toBe(true);
expect(shouldRetainLiveSnapshotHistory("alerts", true, 0, 3)).toBe(false);
expect(shouldRetainLiveSnapshotHistory("options", true, 1, 3)).toBe(false);
expect(shouldRetainLiveSnapshotHistory("options", false, 0, 3)).toBe(false);
});
});
describe("flow filter popup helpers", () => {

View file

@ -60,6 +60,12 @@ const LIVE_HOT_WINDOW = parseBoundedInt(process.env.NEXT_PUBLIC_LIVE_HOT_WINDOW,
const LIVE_OPTIONS_STALE_MS = 15_000;
const LIVE_NBBO_STALE_MS = 15_000;
const LIVE_EQUITIES_STALE_MS = 15_000;
const LIVE_EQUITIES_SILENT_WARNING_MS = parseBoundedInt(
process.env.NEXT_PUBLIC_LIVE_EQUITIES_SILENT_WARNING_MS,
25_000,
5_000,
5 * 60 * 1000
);
const LIVE_FLOW_STALE_MS = 30_000;
const PINNED_EVIDENCE_TTL_MS = parseBoundedInt(
process.env.NEXT_PUBLIC_PINNED_EVIDENCE_TTL_MS,
@ -775,13 +781,6 @@ export const countActiveFlowFilterGroups = (filters: OptionFlowFilters): number
const isFreshLiveItem = (ts: number, thresholdMs: number, now = Date.now()): boolean => now - ts <= thresholdMs;
const filterFreshLiveItems = <T extends SortableItem>(
items: T[],
thresholdMs: number,
getItemTs: (item: T) => number = extractSortTs,
now = Date.now()
): T[] => items.filter((item) => isFreshLiveItem(getItemTs(item), thresholdMs, now));
export const toggleFilterValue = <T extends string>(
values: T[] | undefined,
value: T,
@ -803,6 +802,60 @@ export const nextFlowFilterPopoverState = (
return action === "toggle" ? !current : false;
};
export const projectPausableTapeState = <T extends SortableItem>(
visible: T[],
status: WsStatus,
lastUpdate: number | null
): { items: T[]; lastUpdate: number | null } => ({
items: visible,
lastUpdate: status === "stale" ? null : lastUpdate
});
type EquitiesSilentFeedWarningInput = {
wsStatus: WsStatus;
equitiesSubscribed: boolean;
connectedAt: number | null;
lastEquitiesEventAt: number | null;
now?: number;
thresholdMs?: number;
};
export const shouldShowEquitiesSilentFeedWarning = ({
wsStatus,
equitiesSubscribed,
connectedAt,
lastEquitiesEventAt,
now = Date.now(),
thresholdMs = LIVE_EQUITIES_SILENT_WARNING_MS
}: EquitiesSilentFeedWarningInput): boolean => {
if (wsStatus !== "connected" || !equitiesSubscribed) {
return false;
}
const baselineTs = lastEquitiesEventAt ?? connectedAt;
if (baselineTs === null) {
return false;
}
return now - baselineTs >= thresholdMs;
};
const LIVE_SNAPSHOT_HISTORY_CHANNELS = new Set<LiveSubscription["channel"]>([
"options",
"nbbo",
"equities",
"flow"
]);
export const shouldRetainLiveSnapshotHistory = (
channel: LiveSubscription["channel"],
isSnapshot: boolean,
snapshotItemCount: number,
currentItemCount: number
): boolean =>
isSnapshot &&
snapshotItemCount === 0 &&
currentItemCount > 0 &&
LIVE_SNAPSHOT_HISTORY_CHANNELS.has(channel);
const classifyNbboSide = (price: number, quote: OptionNBBO | null | undefined): NbboSide | null => {
if (!quote || !Number.isFinite(price)) {
return null;
@ -1635,15 +1688,12 @@ const usePausableTapeView = <T extends SortableItem & { seq: number }>(
const status = config.enabled
? getLiveFeedStatus(config.sourceStatus, freshestTs, config.freshnessMs, clock)
: "disconnected";
const items =
status === "stale"
? []
: filterFreshLiveItems(data.visible, config.freshnessMs, getItemTs, clock);
const projected = projectPausableTapeState(data.visible, status, config.lastUpdate);
return {
status,
items,
lastUpdate: status === "stale" ? null : config.lastUpdate,
items: projected.items,
lastUpdate: projected.lastUpdate,
replayTime: null,
replayComplete: false,
paused,
@ -1889,7 +1939,9 @@ const useFlowStream = (
type LiveSessionState = {
status: WsStatus;
connectedAt: number | null;
lastUpdate: number | null;
lastEventByChannel: Partial<Record<LiveSubscription["channel"], number>>;
options: OptionPrint[];
nbbo: OptionNBBO[];
equities: EquityPrint[];
@ -1952,7 +2004,11 @@ const useLiveSession = (
flowFilters: OptionFlowFilters
): LiveSessionState => {
const [status, setStatus] = useState<WsStatus>(enabled ? "connecting" : "disconnected");
const [connectedAt, setConnectedAt] = useState<number | null>(null);
const [lastUpdate, setLastUpdate] = useState<number | null>(null);
const [lastEventByChannel, setLastEventByChannel] = useState<
Partial<Record<LiveSubscription["channel"], number>>
>({});
const [options, setOptions] = useState<OptionPrint[]>([]);
const [nbbo, setNbbo] = useState<OptionNBBO[]>([]);
const [equities, setEquities] = useState<EquityPrint[]>([]);
@ -1975,7 +2031,9 @@ const useLiveSession = (
useEffect(() => {
if (!enabled) {
setStatus("disconnected");
setConnectedAt(null);
setLastUpdate(null);
setLastEventByChannel({});
setOptions([]);
setNbbo([]);
setEquities([]);
@ -2040,7 +2098,14 @@ const useLiveSession = (
) => {
setter((prev) =>
message.op === "snapshot"
? (nextItems as T[])
? shouldRetainLiveSnapshotHistory(
subscription.channel,
true,
nextItems.length,
prev.length
)
? prev
: (nextItems as T[])
: mergeNewest(nextItems as T[], prev, LIVE_HOT_WINDOW, (evicted) =>
incrementRetentionMetric("hotWindowEvictions", evicted)
)
@ -2080,6 +2145,13 @@ const useLiveSession = (
break;
}
if (items.length > 0) {
setLastEventByChannel((current) => ({
...current,
[subscription.channel]: updateAt
}));
}
setLastUpdate(updateAt);
};
@ -2096,6 +2168,7 @@ const useLiveSession = (
return;
}
setStatus("connected");
setConnectedAt(Date.now());
syncSubscriptions(socket);
};
@ -2116,6 +2189,7 @@ const useLiveSession = (
return;
}
setStatus("disconnected");
setConnectedAt(null);
subscribedKeysRef.current = new Set();
subscribedMapRef.current = new Map();
reconnectRef.current = window.setTimeout(connect, 1000);
@ -2126,6 +2200,7 @@ const useLiveSession = (
return;
}
setStatus("disconnected");
setConnectedAt(null);
socket.close();
};
};
@ -2172,7 +2247,9 @@ const useLiveSession = (
return {
status,
connectedAt,
lastUpdate,
lastEventByChannel,
options,
nbbo,
equities,
@ -3401,6 +3478,13 @@ const useTerminalState = () => {
chartIntervalMs,
flowFilters
);
const equitiesLiveSubscriptionActive = useMemo(
() =>
getLiveManifest(pathname, chartTicker.toUpperCase(), chartIntervalMs, flowFilters).some(
(sub) => sub.channel === "equities"
),
[pathname, chartTicker, chartIntervalMs, flowFilters]
);
const handleReplaySource = useCallback((value: string | null) => {
setReplaySource(value);
@ -4038,6 +4122,13 @@ const useTerminalState = () => {
return equitiesFeed.items.filter((print) => matchesTicker(print.underlying_id));
}, [equitiesFeed.items, matchesTicker, tickerSet]);
const equitiesSilentWarning = shouldShowEquitiesSilentFeedWarning({
wsStatus: liveSession.status,
equitiesSubscribed: mode === "live" && equitiesLiveSubscriptionActive,
connectedAt: liveSession.connectedAt,
lastEquitiesEventAt: liveSession.lastEventByChannel.equities ?? null
});
const filteredInferredDark = useMemo(() => {
if (tickerSet.size === 0) {
return inferredDarkFeed.items;
@ -4390,6 +4481,7 @@ const useTerminalState = () => {
selectedClassifierEvidence,
filteredOptions,
filteredEquities,
equitiesSilentWarning,
filteredInferredDark,
filteredFlow,
filteredAlerts,
@ -4906,7 +4998,9 @@ const EquitiesPane = ({ limit }: EquitiesPaneProps) => {
{state.tickerSet.size > 0
? "No equity prints match the current filter."
: state.mode === "live"
? state.equities.status === "stale"
? state.equitiesSilentWarning
? "Connected but no equity prints received. Check ingest-equities."
: state.equities.status === "stale"
? "Live feed behind. Waiting for fresh equity prints."
: "No equity prints yet. Start ingest-equities."
: "Replay queue empty. Ensure ClickHouse has data."}