Merge pull request #30 from dirtydishes/live-clickhouse-scrollgate

Implement live tape scroll-gated history
This commit is contained in:
dirtydishes 2026-05-06 23:09:30 -04:00 committed by GitHub
commit 62244ce048
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 535 additions and 53 deletions

View file

@ -62,13 +62,25 @@ const parseBoundedInt = (
return Math.max(min, Math.min(max, Math.floor(parsed)));
};
const LIVE_HOT_WINDOW = parseBoundedInt(process.env.NEXT_PUBLIC_LIVE_HOT_WINDOW, 2000, 100, 100000);
const LIVE_HOT_WINDOW = parseBoundedInt(process.env.NEXT_PUBLIC_LIVE_HOT_WINDOW, 100, 1, 100000);
const LIVE_HOT_WINDOW_OPTIONS = parseBoundedInt(
process.env.NEXT_PUBLIC_LIVE_HOT_WINDOW_OPTIONS,
25000,
100,
1,
100000
);
const LIVE_HISTORY_SOFT_CAP = parseBoundedInt(
process.env.NEXT_PUBLIC_LIVE_HISTORY_SOFT_CAP,
5000,
100,
50000
);
const LIVE_HISTORY_BATCH = parseBoundedInt(
process.env.NEXT_PUBLIC_LIVE_HISTORY_BATCH,
500,
1,
1000
);
const LIVE_OPTIONS_STALE_MS = 15_000;
const LIVE_NBBO_STALE_MS = 15_000;
const LIVE_EQUITIES_STALE_MS = 15_000;
@ -409,6 +421,16 @@ type PausableTapeData<T> = {
dropped: number;
};
type LiveHistoryBuffer<T> = {
liveHead: T[];
queuedLive: T[];
historyTail: T[];
nextBefore: Cursor | null;
historyLoading: boolean;
historyExhausted: boolean;
autoHydrating: boolean;
};
export const reducePausableTapeData = <T extends SortableItem>(
current: PausableTapeData<T>,
incoming: T[],
@ -488,6 +510,37 @@ const EMPTY_PAUSABLE_TAPE = {
dropped: 0
};
const appendHistoryTail = <T extends SortableItem>(
current: T[],
incoming: T[],
liveHead: T[],
cap = LIVE_HISTORY_SOFT_CAP
): T[] => {
if (incoming.length === 0) {
return current;
}
const seen = new Set<string>();
for (const item of liveHead) {
seen.add(getTapeItemKey(item));
}
for (const item of current) {
seen.add(getTapeItemKey(item));
}
const appended = [...current];
for (const item of incoming) {
const key = getTapeItemKey(item);
if (seen.has(key)) {
continue;
}
seen.add(key);
appended.push(item);
}
return cap > 0 ? appended.slice(0, cap) : appended;
};
export const getLiveFeedStatus = (
sourceStatus: WsStatus,
freshestTs: number | null,
@ -1315,6 +1368,40 @@ const useScrollAnchor = (
return { capture, apply };
};
const useBottomHistoryGate = (
listRef: React.RefObject<HTMLDivElement>,
enabled: boolean,
onLoadOlder: () => void
): void => {
const loadRef = useRef(onLoadOlder);
useEffect(() => {
loadRef.current = onLoadOlder;
}, [onLoadOlder]);
useEffect(() => {
if (!enabled) {
return;
}
const element = listRef.current;
if (!element) {
return;
}
const maybeLoad = () => {
const threshold = Math.max(240, element.clientHeight * 0.5);
if (element.scrollTop + element.clientHeight >= element.scrollHeight - threshold) {
loadRef.current();
}
};
maybeLoad();
element.addEventListener("scroll", maybeLoad);
return () => {
element.removeEventListener("scroll", maybeLoad);
};
}, [enabled, listRef]);
};
type VirtualListResult<T> = {
visibleItems: T[];
topSpacerHeight: number;
@ -1886,6 +1973,7 @@ type PausableTapeViewConfig<T extends SortableItem & { seq: number }> = {
enabled: boolean;
sourceStatus: WsStatus;
sourceItems: T[];
historyTail?: T[];
lastUpdate: number | null;
freshnessMs: number;
onNewItems?: (count: number) => void;
@ -2011,10 +2099,14 @@ const usePausableTapeView = <T extends SortableItem & { seq: number }>(
)
: "disconnected";
const projected = projectPausableTapeState(data.visible, status, config.lastUpdate);
const items = useMemo(
() => [...projected.items, ...(config.historyTail ?? [])],
[projected.items, config.historyTail]
);
return {
status,
items: projected.items,
items,
lastUpdate: projected.lastUpdate,
replayTime: null,
replayComplete: false,
@ -2269,6 +2361,15 @@ type LiveSessionState = {
historyLoading: Partial<Record<string, boolean>>;
historyErrors: Partial<Record<string, string | null>>;
loadOlder: (channel: LiveSubscription["channel"]) => Promise<void>;
optionsHistory: OptionPrint[];
nbboHistory: OptionNBBO[];
equitiesHistory: EquityPrint[];
equityJoinsHistory: EquityPrintJoin[];
flowHistory: FlowPacket[];
smartMoneyHistory: SmartMoneyEvent[];
classifierHitsHistory: ClassifierHitEvent[];
alertsHistory: AlertEvent[];
inferredDarkHistory: InferredDarkEvent[];
options: OptionPrint[];
nbbo: OptionNBBO[];
equities: EquityPrint[];
@ -2360,14 +2461,23 @@ export const getLiveManifest = (
];
if (pathname === "/tape") {
return dedupeLiveSubscriptions([
...baselineSubs,
{ channel: "nbbo" },
{ channel: "equities", ...equityScope },
{ channel: "flow", filters: flowFilters },
{ channel: "smart-money" },
{ channel: "classifier-hits" }
]);
const optionsSub: Extract<LiveSubscription, { channel: "options" }> = {
channel: "options",
filters: flowFilters,
...optionScope,
snapshot_limit: LIVE_HOT_WINDOW_OPTIONS
};
const tapeSubs: LiveSubscription[] = [
optionsSub,
{ channel: "nbbo", snapshot_limit: LIVE_HOT_WINDOW },
{ channel: "equities", ...equityScope, snapshot_limit: LIVE_HOT_WINDOW },
{ channel: "flow", filters: flowFilters, snapshot_limit: LIVE_HOT_WINDOW },
{ channel: "smart-money", snapshot_limit: LIVE_HOT_WINDOW },
{ channel: "classifier-hits", snapshot_limit: LIVE_HOT_WINDOW },
{ channel: "alerts", snapshot_limit: LIVE_HOT_WINDOW },
{ channel: "inferred-dark", snapshot_limit: LIVE_HOT_WINDOW }
];
return dedupeLiveSubscriptions(tapeSubs);
}
return dedupeLiveSubscriptions([
@ -2410,6 +2520,15 @@ const useLiveSession = (
const [classifierHits, setClassifierHits] = useState<ClassifierHitEvent[]>([]);
const [alerts, setAlerts] = useState<AlertEvent[]>([]);
const [inferredDark, setInferredDark] = useState<InferredDarkEvent[]>([]);
const [optionsHistory, setOptionsHistory] = useState<OptionPrint[]>([]);
const [nbboHistory, setNbboHistory] = useState<OptionNBBO[]>([]);
const [equitiesHistory, setEquitiesHistory] = useState<EquityPrint[]>([]);
const [equityJoinsHistory, setEquityJoinsHistory] = useState<EquityPrintJoin[]>([]);
const [flowHistory, setFlowHistory] = useState<FlowPacket[]>([]);
const [smartMoneyHistory, setSmartMoneyHistory] = useState<SmartMoneyEvent[]>([]);
const [classifierHitsHistory, setClassifierHitsHistory] = useState<ClassifierHitEvent[]>([]);
const [alertsHistory, setAlertsHistory] = useState<AlertEvent[]>([]);
const [inferredDarkHistory, setInferredDarkHistory] = useState<InferredDarkEvent[]>([]);
const [chartCandles, setChartCandles] = useState<EquityCandle[]>([]);
const [chartOverlay, setChartOverlay] = useState<EquityPrint[]>([]);
const socketRef = useRef<WebSocket | null>(null);
@ -2443,6 +2562,15 @@ const useLiveSession = (
setClassifierHits([]);
setAlerts([]);
setInferredDark([]);
setOptionsHistory([]);
setNbboHistory([]);
setEquitiesHistory([]);
setEquityJoinsHistory([]);
setFlowHistory([]);
setSmartMoneyHistory([]);
setClassifierHitsHistory([]);
setAlertsHistory([]);
setInferredDarkHistory([]);
setChartCandles([]);
setChartOverlay([]);
subscribedKeysRef.current = new Set();
@ -2699,9 +2827,11 @@ const useLiveSession = (
);
if (resetScopedChannels.has("options")) {
setOptions([]);
setOptionsHistory([]);
}
if (resetScopedChannels.has("equities")) {
setEquities([]);
setEquitiesHistory([]);
}
if (resetScopedChannels.size > 0) {
setHistoryCursors((current) => {
@ -2765,7 +2895,7 @@ const useLiveSession = (
const params = new URLSearchParams({
before_ts: String(cursor.ts),
before_seq: String(cursor.seq),
limit: String(subscription.channel === "options" ? 500 : 200)
limit: String(subscription.channel === "options" ? LIVE_HISTORY_BATCH : 200)
});
if (subscription.channel === "options" || subscription.channel === "flow") {
appendOptionFlowFilters(params, subscription.filters);
@ -2783,45 +2913,49 @@ const useLiveSession = (
const mergeOlder = <T extends SortableItem>(
setter: Dispatch<SetStateAction<T[]>>,
limit: number
liveHead: T[],
cap = LIVE_HISTORY_SOFT_CAP
) => {
setter((prev) =>
mergeNewest(older as T[], prev, limit, (evicted) =>
incrementRetentionMetric("hotWindowEvictions", evicted)
)
);
setter((prev) => appendHistoryTail(prev, older as T[], liveHead, cap));
};
switch (subscription.channel) {
case "options":
mergeOlder(setOptions, LIVE_HOT_WINDOW_OPTIONS);
mergeOlder(
setOptionsHistory,
options,
subscription.underlying_ids?.length || subscription.option_contract_id ? 0 : LIVE_HISTORY_SOFT_CAP
);
break;
case "nbbo":
mergeOlder(setNbbo, LIVE_HOT_WINDOW);
mergeOlder(setNbboHistory, nbbo);
break;
case "equities":
mergeOlder(setEquities, LIVE_HOT_WINDOW);
mergeOlder(
setEquitiesHistory,
equities,
subscription.underlying_ids?.length ? 0 : LIVE_HISTORY_SOFT_CAP
);
break;
case "equity-quotes":
mergeOlder(setEquityQuotes, LIVE_HOT_WINDOW);
break;
case "equity-joins":
mergeOlder(setEquityJoins, LIVE_HOT_WINDOW);
mergeOlder(setEquityJoinsHistory, equityJoins);
break;
case "flow":
mergeOlder(setFlow, LIVE_HOT_WINDOW);
mergeOlder(setFlowHistory, flow);
break;
case "smart-money":
mergeOlder(setSmartMoney, LIVE_HOT_WINDOW);
mergeOlder(setSmartMoneyHistory, smartMoney);
break;
case "classifier-hits":
mergeOlder(setClassifierHits, LIVE_HOT_WINDOW);
mergeOlder(setClassifierHitsHistory, classifierHits);
break;
case "alerts":
mergeOlder(setAlerts, LIVE_HOT_WINDOW);
mergeOlder(setAlertsHistory, alerts);
break;
case "inferred-dark":
mergeOlder(setInferredDark, LIVE_HOT_WINDOW);
mergeOlder(setInferredDarkHistory, inferredDark);
break;
}
@ -2839,9 +2973,44 @@ const useLiveSession = (
setHistoryLoading((current) => ({ ...current, [key]: false }));
}
},
[enabled, manifest, historyCursors, historyLoading]
[
enabled,
manifest,
historyCursors,
historyLoading,
options,
nbbo,
equities,
equityJoins,
flow,
smartMoney,
classifierHits,
alerts,
inferredDark
]
);
useEffect(() => {
if (!enabled || pathname !== "/tape") {
return;
}
const scoped = manifest.filter(
(subscription) =>
(subscription.channel === "options" &&
(subscription.underlying_ids?.length || subscription.option_contract_id)) ||
(subscription.channel === "equities" && subscription.underlying_ids?.length)
);
if (scoped.length === 0) {
return;
}
for (const subscription of scoped) {
const key = getLiveSubscriptionKey(subscription);
if (historyCursors[key] && !historyLoading[key]) {
void loadOlder(subscription.channel);
}
}
}, [enabled, pathname, manifest, historyCursors, historyLoading, loadOlder]);
return {
status,
connectedAt,
@ -2852,6 +3021,15 @@ const useLiveSession = (
historyLoading,
historyErrors,
loadOlder,
optionsHistory,
nbboHistory,
equitiesHistory,
equityJoinsHistory,
flowHistory,
smartMoneyHistory,
classifierHitsHistory,
alertsHistory,
inferredDarkHistory,
options,
nbbo,
equities,
@ -4435,6 +4613,7 @@ const useTerminalState = () => {
enabled: mode === "live",
sourceStatus: liveSession.status,
sourceItems: liveSession.options,
historyTail: liveSession.optionsHistory,
lastUpdate: liveSession.lastUpdate,
freshnessMs: LIVE_OPTIONS_STALE_MS,
retentionLimit: LIVE_HOT_WINDOW_OPTIONS,
@ -4447,6 +4626,7 @@ const useTerminalState = () => {
enabled: mode === "live",
sourceStatus: liveSession.status,
sourceItems: liveSession.equities,
historyTail: liveSession.equitiesHistory,
lastUpdate: liveSession.lastUpdate,
freshnessMs: LIVE_EQUITIES_STALE_MS,
captureScroll: equitiesAnchor.capture,
@ -4458,6 +4638,7 @@ const useTerminalState = () => {
enabled: mode === "live",
sourceStatus: liveSession.status,
sourceItems: liveSession.flow,
historyTail: liveSession.flowHistory,
lastUpdate: liveSession.lastUpdate,
freshnessMs: LIVE_FLOW_STALE_MS,
captureScroll: flowAnchor.capture,
@ -4469,26 +4650,26 @@ const useTerminalState = () => {
const optionsFeed = mode === "live" ? liveOptions : options;
const nbboFeed =
mode === "live" ? toStaticTapeState(liveSession.status, liveSession.nbbo, liveSession.lastUpdate) : nbbo;
mode === "live" ? toStaticTapeState(liveSession.status, [...liveSession.nbbo, ...liveSession.nbboHistory], liveSession.lastUpdate) : nbbo;
const equitiesFeed = mode === "live" ? liveEquities : equities;
const equityJoinsFeed =
mode === "live"
? toStaticTapeState(liveSession.status, liveSession.equityJoins, liveSession.lastUpdate)
? toStaticTapeState(liveSession.status, [...liveSession.equityJoins, ...liveSession.equityJoinsHistory], liveSession.lastUpdate)
: equityJoins;
const flowFeed = mode === "live" ? liveFlow : flow;
const alertsFeed =
mode === "live" ? toStaticTapeState(liveSession.status, liveSession.alerts, liveSession.lastUpdate) : alerts;
mode === "live" ? toStaticTapeState(liveSession.status, [...liveSession.alerts, ...liveSession.alertsHistory], liveSession.lastUpdate) : alerts;
const classifierHitsFeed =
mode === "live"
? toStaticTapeState(liveSession.status, liveSession.classifierHits, liveSession.lastUpdate)
? toStaticTapeState(liveSession.status, [...liveSession.classifierHits, ...liveSession.classifierHitsHistory], liveSession.lastUpdate)
: classifierHits;
const smartMoneyFeed =
mode === "live"
? toStaticTapeState(liveSession.status, liveSession.smartMoney, liveSession.lastUpdate)
? toStaticTapeState(liveSession.status, [...liveSession.smartMoney, ...liveSession.smartMoneyHistory], liveSession.lastUpdate)
: smartMoney;
const inferredDarkFeed =
mode === "live"
? toStaticTapeState(liveSession.status, liveSession.inferredDark, liveSession.lastUpdate)
? toStaticTapeState(liveSession.status, [...liveSession.inferredDark, ...liveSession.inferredDarkHistory], liveSession.lastUpdate)
: inferredDark;
useLayoutEffect(() => {
@ -4575,6 +4756,11 @@ const useTerminalState = () => {
const [pinnedEquityJoinMap, setPinnedEquityJoinMap] = useState<
Map<string, PinnedEntry<EquityPrintJoin>>
>(() => new Map());
const [optionSupportSmartMoney, setOptionSupportSmartMoney] = useState<SmartMoneyEvent[]>([]);
const [optionSupportClassifierHits, setOptionSupportClassifierHits] = useState<ClassifierHitEvent[]>([]);
const [historicalNbboByTraceId, setHistoricalNbboByTraceId] = useState<Map<string, OptionNBBO | null>>(
() => new Map()
);
const resolvedOptionPrintMap = useMemo(() => {
const merged = new Map<string, OptionPrint>();
@ -4809,7 +4995,7 @@ const useTerminalState = () => {
const classifierHitsByPacketId = useMemo(() => {
const map = new Map<string, ClassifierHitEvent[]>();
for (const hit of classifierHitsFeed.items) {
for (const hit of [...classifierHitsFeed.items, ...optionSupportClassifierHits]) {
const packetId = extractPacketIdFromClassifierHitTrace(hit.trace_id);
if (!packetId) {
continue;
@ -4817,11 +5003,11 @@ const useTerminalState = () => {
map.set(packetId, [...(map.get(packetId) ?? []), hit]);
}
return map;
}, [classifierHitsFeed.items, extractPacketIdFromClassifierHitTrace]);
}, [classifierHitsFeed.items, optionSupportClassifierHits, extractPacketIdFromClassifierHitTrace]);
const smartMoneyByPacketId = useMemo(() => {
const map = new Map<string, SmartMoneyEvent>();
for (const event of smartMoneyFeed.items) {
for (const event of [...smartMoneyFeed.items, ...optionSupportSmartMoney]) {
for (const packetId of event.packet_ids) {
const existing = map.get(packetId);
if (!existing || event.source_ts > existing.source_ts || event.seq > existing.seq) {
@ -4830,17 +5016,17 @@ const useTerminalState = () => {
}
}
return map;
}, [smartMoneyFeed.items]);
}, [smartMoneyFeed.items, optionSupportSmartMoney]);
const packetIdByOptionTraceId = useMemo(() => {
const map = new Map<string, string>();
for (const packet of flowFeed.items) {
for (const packet of resolvedFlowPacketMap.values()) {
for (const member of packet.members) {
map.set(member, packet.id);
}
}
return map;
}, [flowFeed.items]);
}, [resolvedFlowPacketMap]);
const classifierDecorByOptionTraceId = useMemo(() => {
const map = new Map<string, ClassifierDecor>();
@ -4858,6 +5044,111 @@ const useTerminalState = () => {
return map;
}, [classifierHitsByPacketId, packetIdByOptionTraceId, smartMoneyByPacketId]);
useEffect(() => {
if (mode !== "live" || optionsFeed.items.length === 0) {
return;
}
const traceIds: string[] = [];
const nbboContext: Array<{ trace_id: string; option_contract_id: string; ts: number }> = [];
for (const print of optionsFeed.items.slice(0, 1000)) {
if (!print.trace_id || classifierDecorByOptionTraceId.has(print.trace_id)) {
continue;
}
if (!packetIdByOptionTraceId.has(print.trace_id)) {
traceIds.push(print.trace_id);
}
const missingPreservedNbbo =
typeof print.execution_nbbo_side !== "string" &&
typeof print.nbbo_side !== "string" &&
!historicalNbboByTraceId.has(print.trace_id);
if (missingPreservedNbbo) {
nbboContext.push({
trace_id: print.trace_id,
option_contract_id: print.option_contract_id,
ts: print.ts
});
}
if (traceIds.length >= 250 && nbboContext.length >= 250) {
break;
}
}
const uniqueTraceIds = Array.from(new Set(traceIds)).slice(0, 250);
const uniqueNbboContext = Array.from(
new Map(nbboContext.map((item) => [item.trace_id, item])).values()
).slice(0, 250);
if (uniqueTraceIds.length === 0 && uniqueNbboContext.length === 0) {
return;
}
let cancelled = false;
void fetch(buildApiUrl("/lookup/options-support"), {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
trace_ids: uniqueTraceIds,
nbbo_context: uniqueNbboContext
})
})
.then(async (response) => {
if (!response.ok) {
throw new Error(await readErrorDetail(response));
}
return response.json() as Promise<{
packets?: FlowPacket[];
smart_money?: SmartMoneyEvent[];
classifier_hits?: ClassifierHitEvent[];
nbbo_by_trace_id?: Record<string, OptionNBBO | null>;
}>;
})
.then((payload) => {
if (cancelled) {
return;
}
const now = Date.now();
const packetMap = new Map<string, FlowPacket>();
for (const packet of payload.packets ?? []) {
packetMap.set(packet.id, packet);
}
if (packetMap.size > 0) {
setPinnedFlowPacketMap((prev) => upsertPinnedEntries(prev, packetMap, now));
}
if (payload.smart_money?.length) {
setOptionSupportSmartMoney((prev) =>
mergeNewest(payload.smart_money ?? [], prev, PINNED_EVIDENCE_MAX_ITEMS)
);
}
if (payload.classifier_hits?.length) {
setOptionSupportClassifierHits((prev) =>
mergeNewest(payload.classifier_hits ?? [], prev, PINNED_EVIDENCE_MAX_ITEMS)
);
}
if (payload.nbbo_by_trace_id) {
setHistoricalNbboByTraceId((prev) => {
const next = new Map(prev);
for (const [traceId, quote] of Object.entries(payload.nbbo_by_trace_id ?? {})) {
next.set(traceId, quote);
}
return next;
});
}
})
.catch((error) => {
console.warn("Failed to hydrate option row support", error);
});
return () => {
cancelled = true;
};
}, [
mode,
optionsFeed.items,
classifierDecorByOptionTraceId,
packetIdByOptionTraceId,
historicalNbboByTraceId
]);
const selectedClassifierPacketId = useMemo(() => {
if (!selectedClassifierHit) {
return null;
@ -5456,6 +5747,7 @@ const useTerminalState = () => {
tickerSet,
chartTicker,
nbboMap,
historicalNbboByTraceId,
optionPrintMap: resolvedOptionPrintMap,
equityPrintMap,
equityJoinMap: resolvedEquityJoinMap,
@ -5808,6 +6100,9 @@ const OptionsPane = ({ limit }: OptionsPaneProps) => {
const state = useTerminal();
const items = limit ? state.filteredOptions.slice(0, limit) : state.filteredOptions;
const virtual = useVirtualList(items, state.optionsScroll.listRef, !limit, 36);
useBottomHistoryGate(state.optionsScroll.listRef, state.mode === "live" && !limit, () =>
void state.liveSession.loadOlder("options")
);
return (
<Pane
@ -5868,7 +6163,7 @@ const OptionsPane = ({ limit }: OptionsPaneProps) => {
const contractId = normalizeContractId(print.option_contract_id);
const parsed = parseOptionContractId(contractId);
const contractDisplay = formatOptionContractLabel(contractId);
const quote = state.nbboMap.get(contractId);
const quote = state.historicalNbboByTraceId.get(print.trace_id) ?? state.nbboMap.get(contractId);
const hasPreservedNbbo = typeof print.execution_nbbo_side === "string";
const nbboSide =
print.execution_nbbo_side ??
@ -5982,6 +6277,9 @@ const EquitiesPane = ({ limit }: EquitiesPaneProps) => {
const state = useTerminal();
const items = limit ? state.filteredEquities.slice(0, limit) : state.filteredEquities;
const virtual = useVirtualList(items, state.equitiesScroll.listRef, !limit, 36);
useBottomHistoryGate(state.equitiesScroll.listRef, state.mode === "live" && !limit, () =>
void state.liveSession.loadOlder("equities")
);
return (
<Pane
@ -6077,6 +6375,9 @@ const FlowPane = ({ limit, title = "Flow" }: FlowPaneProps) => {
const state = useTerminal();
const items = limit ? state.filteredFlow.slice(0, limit) : state.filteredFlow;
const virtual = useVirtualList(items, state.flowScroll.listRef, !limit, 44);
useBottomHistoryGate(state.flowScroll.listRef, state.mode === "live" && !limit, () =>
void state.liveSession.loadOlder("flow")
);
return (
<Pane
@ -6216,6 +6517,9 @@ const AlertsPane = ({ limit, withStrip = false, className }: AlertsPaneProps) =>
const state = useTerminal();
const items = limit ? state.filteredAlerts.slice(0, limit) : state.filteredAlerts;
const virtual = useVirtualList(items, state.alertsScroll.listRef, !limit, 46);
useBottomHistoryGate(state.alertsScroll.listRef, state.mode === "live" && !limit, () =>
void state.liveSession.loadOlder("alerts")
);
return (
<Pane
@ -6312,6 +6616,10 @@ type ClassifierPaneProps = {
const ClassifierPane = ({ limit, className }: ClassifierPaneProps) => {
const state = useTerminal();
useBottomHistoryGate(state.classifierScroll.listRef, state.mode === "live" && !limit, () => {
void state.liveSession.loadOlder("smart-money");
void state.liveSession.loadOlder("classifier-hits");
});
const smartMoneyItems = limit ? state.filteredSmartMoneyEvents.slice(0, limit) : state.filteredSmartMoneyEvents;
const legacyItems =
smartMoneyItems.length === 0
@ -6438,6 +6746,9 @@ const DarkPane = ({ limit, className }: DarkPaneProps) => {
const state = useTerminal();
const items = limit ? state.filteredInferredDark.slice(0, limit) : state.filteredInferredDark;
const virtual = useVirtualList(items, state.darkScroll.listRef, !limit, 44);
useBottomHistoryGate(state.darkScroll.listRef, state.mode === "live" && !limit, () =>
void state.liveSession.loadOlder("inferred-dark")
);
return (
<Pane

View file

@ -1,3 +1,5 @@
## Docker-internal service URLs (do not use 127.0.0.1/localhost here).
## Containers must reach each other via Compose service names.
NATS_URL=nats://nats:4222
CLICKHOUSE_URL=http://clickhouse:8123
CLICKHOUSE_DATABASE=default

View file

@ -1560,6 +1560,104 @@ export const fetchFlowPacketById = async (
return record ? FlowPacketSchema.parse(fromFlowPacketRecord(record)) : null;
};
export const fetchFlowPacketsByMemberTraceIds = async (
client: ClickHouseClient,
traceIds: string[]
): Promise<FlowPacket[]> => {
const ids = Array.from(new Set(traceIds.map((id) => id.trim()).filter(Boolean)));
if (ids.length === 0) {
return [];
}
const memberPredicates = ids.map((id) => `has(members, ${quoteString(id)})`);
const result = await client.query({
query: `SELECT * FROM ${FLOW_PACKETS_TABLE} WHERE ${memberPredicates.join(" OR ")} ORDER BY source_ts DESC, seq DESC LIMIT ${clampLookupLimit(ids.length * 4)}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const records = rows
.map(normalizeFlowPacketRow)
.filter((record): record is FlowPacketRecord => record !== null);
return FlowPacketSchema.array().parse(records.map(fromFlowPacketRecord));
};
export const fetchSmartMoneyEventsByPacketIds = async (
client: ClickHouseClient,
packetIds: string[]
): Promise<SmartMoneyEvent[]> => {
const ids = Array.from(new Set(packetIds.map((id) => id.trim()).filter(Boolean)));
if (ids.length === 0) {
return [];
}
const packetPredicates = ids.map((id) => `has(packet_ids, ${quoteString(id)})`);
const result = await client.query({
query: `SELECT * FROM ${SMART_MONEY_EVENTS_TABLE} WHERE ${packetPredicates.join(" OR ")} ORDER BY source_ts DESC, seq DESC LIMIT ${clampLookupLimit(ids.length * 4)}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const records = rows
.map(normalizeSmartMoneyEventRow)
.filter((record): record is SmartMoneyEventRecord => record !== null);
return SmartMoneyEventSchema.array().parse(records.map(fromSmartMoneyEventRecord));
};
export const fetchClassifierHitsByPacketIds = async (
client: ClickHouseClient,
packetIds: string[]
): Promise<ClassifierHitEvent[]> => {
const ids = Array.from(new Set(packetIds.map((id) => id.trim()).filter(Boolean)));
if (ids.length === 0) {
return [];
}
const tracePredicates = ids.map((id) => `position(trace_id, ${quoteString(id)}) > 0`);
const result = await client.query({
query: `SELECT * FROM ${CLASSIFIER_HITS_TABLE} WHERE ${tracePredicates.join(" OR ")} ORDER BY source_ts DESC, seq DESC LIMIT ${clampLookupLimit(ids.length * 4)}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const records = rows
.map(normalizeClassifierHitRow)
.filter((record): record is ClassifierHitRecord => record !== null);
return ClassifierHitEventSchema.array().parse(records.map(fromClassifierHitRecord));
};
export const fetchNearestOptionNBBOForPrints = async (
client: ClickHouseClient,
inputs: Array<{ trace_id: string; option_contract_id: string; ts: number }>
): Promise<Record<string, OptionNBBO | null>> => {
const normalized = inputs
.map((item) => ({
trace_id: item.trace_id.trim(),
option_contract_id: item.option_contract_id.trim(),
ts: clampCursor(item.ts)
}))
.filter((item) => item.trace_id && item.option_contract_id);
if (normalized.length === 0) {
return {};
}
const byTraceId: Record<string, OptionNBBO | null> = Object.fromEntries(
normalized.map((item) => [item.trace_id, null])
);
await Promise.all(
normalized.map(async (item) => {
const result = await client.query({
query: `SELECT * FROM ${OPTION_NBBO_TABLE} WHERE option_contract_id = ${quoteString(item.option_contract_id)} AND ts <= ${item.ts} ORDER BY ts DESC, seq DESC LIMIT 1`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const quote = OptionNBBOSchema.array().parse(rows.map(normalizeOptionNbboRow))[0] ?? null;
byTraceId[item.trace_id] = quote;
})
);
return byTraceId;
};
export const fetchOptionPrintsByTraceIds = async (
client: ClickHouseClient,
traceIds: string[]

View file

@ -60,21 +60,26 @@ export const LiveSubscriptionSchema = z.discriminatedUnion("channel", [
channel: z.literal("options"),
filters: OptionFlowFiltersSchema.optional(),
underlying_ids: z.array(z.string().min(1)).optional(),
option_contract_id: z.string().min(1).optional()
option_contract_id: z.string().min(1).optional(),
snapshot_limit: z.number().int().positive().optional()
}),
z.object({
channel: z.literal("flow"),
filters: OptionFlowFiltersSchema.optional()
filters: OptionFlowFiltersSchema.optional(),
snapshot_limit: z.number().int().positive().optional()
}),
z.object({
channel: z.literal("smart-money")
channel: z.literal("smart-money"),
snapshot_limit: z.number().int().positive().optional()
}),
z.object({
channel: z.enum(["nbbo", "equity-quotes", "equity-joins", "classifier-hits", "alerts", "inferred-dark"])
channel: z.enum(["nbbo", "equity-quotes", "equity-joins", "classifier-hits", "alerts", "inferred-dark"]),
snapshot_limit: z.number().int().positive().optional()
}),
z.object({
channel: z.literal("equities"),
underlying_ids: z.array(z.string().min(1)).optional()
underlying_ids: z.array(z.string().min(1)).optional(),
snapshot_limit: z.number().int().positive().optional()
}),
z.object({
channel: z.literal("equity-candles"),

View file

@ -49,6 +49,7 @@ import {
fetchSmartMoneyEventsBefore,
fetchFlowPacketsAfter,
fetchFlowPacketById,
fetchFlowPacketsByMemberTraceIds,
fetchFlowPacketsBefore,
fetchRecentAlerts,
fetchRecentClassifierHits,
@ -76,6 +77,9 @@ import {
fetchOptionPrintsBefore,
fetchOptionPrintsAfter,
fetchOptionPrintsByTraceIds,
fetchNearestOptionNBBOForPrints,
fetchSmartMoneyEventsByPacketIds,
fetchClassifierHitsByPacketIds,
fetchRecentOptionPrints
} from "@islandflow/storage";
import type { EquityPrintQueryFilters, OptionPrintQueryFilters } from "@islandflow/storage";
@ -303,6 +307,28 @@ const jsonResponse = (body: unknown, status = 200): Response => {
});
};
const readJsonBody = async (req: Request): Promise<unknown> => {
const text = await req.text();
if (!text.trim()) {
return {};
}
return JSON.parse(text);
};
const optionsSupportLookupSchema = z.object({
trace_ids: z.array(z.string().min(1)).default([]),
nbbo_context: z
.array(
z.object({
trace_id: z.string().min(1),
option_contract_id: z.string().min(1),
ts: z.number().int().nonnegative()
})
)
.optional()
.default([])
});
const parseLimit = (value: string | null): number => {
if (value === null) {
return env.REST_DEFAULT_LIMIT;
@ -1608,6 +1634,33 @@ const run = async () => {
return jsonResponse({ data });
}
if (req.method === "POST" && url.pathname === "/lookup/options-support") {
try {
const body = optionsSupportLookupSchema.parse(await readJsonBody(req));
const packets = await fetchFlowPacketsByMemberTraceIds(clickhouse, body.trace_ids);
const packetIds = packets.map((packet) => packet.id);
const [smartMoney, classifierHits, nbboByTraceId] = await Promise.all([
fetchSmartMoneyEventsByPacketIds(clickhouse, packetIds),
fetchClassifierHitsByPacketIds(clickhouse, packetIds),
fetchNearestOptionNBBOForPrints(clickhouse, body.nbbo_context)
]);
return jsonResponse({
packets,
smart_money: smartMoney,
classifier_hits: classifierHits,
nbbo_by_trace_id: nbboByTraceId
});
} catch (error) {
return jsonResponse(
{
error: "invalid options support lookup",
detail: error instanceof Error ? error.message : String(error)
},
400
);
}
}
if (req.method === "GET" && url.pathname === "/equity-joins/by-id") {
const ids = url.searchParams.getAll("id");
const data = await fetchEquityPrintJoinsByIds(clickhouse, ids);

View file

@ -327,6 +327,14 @@ const nextBeforeForItems = <T>(items: T[], cursorOf: (item: T) => Cursor): Curso
return last ? cursorOf(last) : null;
};
const snapshotLimitFor = (subscription: LiveSubscription, configuredLimit: number): number => {
const requested = "snapshot_limit" in subscription ? subscription.snapshot_limit : undefined;
if (!requested) {
return configuredLimit;
}
return Math.max(1, Math.min(configuredLimit, Math.floor(requested)));
};
const candleRedisKey = (underlyingId: string, intervalMs: number): string =>
`live:equity-candles:${underlyingId}:${intervalMs}`;
@ -448,6 +456,7 @@ export class LiveStateManager {
const scoped =
Boolean(subscription.underlying_ids?.length) || Boolean(subscription.option_contract_id);
if (subscription.filters?.view === "raw" || scoped) {
const limit = snapshotLimitFor(subscription, this.generic.options.limit);
const storageFilters: OptionPrintQueryFilters = {
view: subscription.filters?.view ?? "signal",
security:
@ -463,7 +472,7 @@ export class LiveStateManager {
};
const items = await fetchRecentOptionPrints(
this.clickhouse,
this.generic.options.limit,
limit,
undefined,
storageFilters
);
@ -476,10 +485,11 @@ export class LiveStateManager {
}
const config = this.generic.options;
const limit = snapshotLimitFor(subscription, config.limit);
const items = (this.genericItems.get("options") ?? []).filter((item) =>
isWithinLiveFeedLookback("options", item) &&
matchesOptionPrintFilters(item, subscription.filters)
);
).slice(0, limit);
return {
subscription,
items,
@ -489,10 +499,11 @@ export class LiveStateManager {
}
case "flow": {
const config = this.generic.flow;
const limit = snapshotLimitFor(subscription, config.limit);
const items = (this.genericItems.get("flow") ?? []).filter((item) =>
isWithinLiveFeedLookback("flow", item) &&
matchesFlowPacketFilters(item, subscription.filters)
);
).slice(0, limit);
return {
subscription,
items,
@ -502,12 +513,13 @@ export class LiveStateManager {
}
case "equities": {
const config = this.generic.equities;
const limit = snapshotLimitFor(subscription, config.limit);
if (subscription.underlying_ids?.length) {
const filters: EquityPrintQueryFilters = {
underlyingIds: subscription.underlying_ids,
sinceTs: Date.now() - LIVE_FEED_LOOKBACK_MS
};
const items = await fetchRecentEquityPrints(this.clickhouse, config.limit, filters);
const items = await fetchRecentEquityPrints(this.clickhouse, limit, filters);
return {
subscription,
items,
@ -517,7 +529,7 @@ export class LiveStateManager {
}
const items = (this.genericItems.get("equities") ?? []).filter((item) =>
isWithinLiveFeedLookback("equities", item)
);
).slice(0, limit);
return {
subscription,
items,
@ -555,9 +567,10 @@ export class LiveStateManager {
}
default: {
const config = this.generic[subscription.channel];
const limit = snapshotLimitFor(subscription, config.limit);
const items = (this.genericItems.get(subscription.channel) ?? []).filter((item) =>
isWithinLiveFeedLookback(subscription.channel, item)
);
).slice(0, limit);
return {
subscription,
items,