Add testing-mode throttles and UI batching

Throttle ingest pipelines in TESTING_MODE, document settings in README, and batch live UI updates per frame to reduce scroll lag.
This commit is contained in:
dirtydishes 2025-12-29 22:38:51 -05:00
parent 82861408e4
commit bd1a67a7fc
5 changed files with 341 additions and 58 deletions

View file

@ -45,6 +45,10 @@ IBKR_PYTHON_BIN=python3
EQUITIES_INGEST_ADAPTER=synthetic
EMIT_INTERVAL_MS=1000
# Testing mode
TESTING_MODE=false
TESTING_THROTTLE_MS=200
# Compute consumer behavior
COMPUTE_DELIVER_POLICY=new
COMPUTE_CONSUMER_RESET=false

View file

@ -6,24 +6,26 @@ The system ingests real-time options trades/quotes and equity prints, clusters r
## CURRENT STATE (Plan Progress)
Plan progress (rough): [####------]
Plan progress (rough): [#####-----]
Done now (in repo):
- Bun monorepo + infra docker compose (ClickHouse, Redis, NATS JetStream)
- Shared event schemas + logging + config helpers
- Synthetic options/equity prints published to NATS and persisted to ClickHouse
- Synthetic options/equity prints (full S&P 500) published to NATS and persisted to ClickHouse
- Deterministic option FlowPacket clustering (time window) + persistence
- API: REST for prints/flow packets, WS for live options/equities/flow, replay endpoints
- Rule-first classifiers + alert scoring with ClickHouse persistence + WS/REST endpoints
- API: REST for prints/flow packets/classifier hits/alerts, WS for live options/equities/flow/alerts/hits, replay endpoints
- UI: live tapes for options/equities/flow + replay toggle + pause controls + replay time/completion
- UI: alerts + classifier hits panels, ticker filter, evidence drawer, severity strip
- Databento historical replay adapter (options) with symbol mapping
- Alpaca options adapter (dev-only, bounded contract list)
- Testing-mode throttling for ingest to reduce CPU during local dev
In progress / blocked:
- Live data adapters beyond dev-only feeds (requires licensed data source)
- Rolling stats and advanced clustering
Not started:
- Classifiers + alert scoring
- Dark pool inference
- Candle service and chart overlays
- Auth / secure deployment
@ -37,19 +39,19 @@ Not started:
## Current Capabilities
- Synthetic options/equity prints with deterministic sequencing
- Synthetic options/equity prints with deterministic sequencing across the S&P 500
- Ingest adapter seam (env-selected; options default `alpaca`, equities default `synthetic`)
- Raw event persistence in ClickHouse + streaming via NATS JetStream
- Deterministic option FlowPacket clustering (time-window)
- Classifiers + alert scoring (rule-first) with WS/REST endpoints
- API gateway with REST, WS, and replay endpoints
- UI tapes for options/equities/flow packets with live/replay toggle and pause controls
- UI tapes for options/equities/flow packets + alerts/hits with live/replay toggle and pause controls
- Alpaca options adapter (dev-only) with bounded contract selection
- Databento historical replay adapter (options, Python sidecar)
## Planned Capabilities (from PLAN.md)
- Real-time licensed market data ingestors (options + equities)
- Rule-first classifiers and alert scoring
- Dark pool inference and evidence linking
- Candle aggregation + chart overlays
- Replay/backtesting metrics and calibration
@ -92,8 +94,8 @@ Create env file:
Start everything (infra + services + web):
- `bun run dev`
Run just the web app (auto-picks a free port in 3001-3005):
- `bun --cwd apps/web run dev`
Run just the web app (fixed to port 3000):
- `bun run dev:web`
Run just the API:
- `bun --cwd services/api run dev`
@ -103,6 +105,10 @@ Adapter selection (env):
- Equities: `EQUITIES_INGEST_ADAPTER` (defaults to `synthetic`)
- Compute: `COMPUTE_DELIVER_POLICY` (`new` default), `COMPUTE_CONSUMER_RESET` (force skip backlog)
Testing mode (throttles ingest to reduce CPU):
- `TESTING_MODE=true` enables throttling
- `TESTING_THROTTLE_MS=200` minimum spacing between emitted prints (per ingest service)
IBKR adapter (options, via Python `ib_insync`):
- Install Python deps: `python3 -m pip install -r services/ingest-options/py/requirements.txt`
- Set `OPTIONS_INGEST_ADAPTER=ibkr` and configure:

View file

@ -1,6 +1,6 @@
"use client";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { useCallback, useEffect, useLayoutEffect, useMemo, useRef, useState } from "react";
import type { AlertEvent, ClassifierHitEvent, EquityPrint, FlowPacket, OptionPrint } from "@islandflow/types";
const MAX_ITEMS = 500;
@ -213,6 +213,7 @@ const parseNumber = (value: unknown, fallback: number): number => {
type ListScrollState = {
listRef: React.RefObject<HTMLDivElement>;
isAtTop: boolean;
isAtTopRef: React.MutableRefObject<boolean>;
missed: number;
onNewItems: (count: number) => void;
jumpToTop: () => void;
@ -236,6 +237,7 @@ const useListScroll = (): ListScrollState => {
const onScroll = () => {
const atTop = el.scrollTop <= 2;
isAtTopRef.current = atTop;
setIsAtTop(atTop);
if (atTop) {
setMissed(0);
@ -273,7 +275,55 @@ const useListScroll = (): ListScrollState => {
setMissed(0);
}, []);
return { listRef, isAtTop, missed, onNewItems, jumpToTop };
return {
listRef,
isAtTop,
isAtTopRef,
missed,
onNewItems,
jumpToTop
};
};
const useScrollAnchor = (
listRef: React.RefObject<HTMLDivElement>,
isAtTopRef: React.MutableRefObject<boolean>
) => {
const pendingRef = useRef<{ top: number; height: number } | null>(null);
const capture = useCallback(() => {
if (isAtTopRef.current) {
return;
}
const el = listRef.current;
if (!el) {
return;
}
pendingRef.current = {
top: el.scrollTop,
height: el.scrollHeight
};
}, [isAtTopRef, listRef]);
const apply = useCallback(() => {
const pending = pendingRef.current;
if (!pending) {
return;
}
const el = listRef.current;
if (!el) {
return;
}
const delta = el.scrollHeight - pending.height;
el.scrollTop = pending.top + delta;
pendingRef.current = null;
}, [listRef]);
return { capture, apply };
};
const statusLabel = (status: WsStatus, paused: boolean, mode: TapeMode): string => {
@ -304,13 +354,14 @@ type TapeConfig<T> = {
expectedType: MessageType;
batchSize?: number;
pollMs?: number;
captureScroll?: () => void;
onNewItems?: (count: number) => void;
};
const useTape = <T extends { ts: number; seq: number }>(
config: TapeConfig<T>
): TapeState<T> => {
const { mode, wsPath, replayPath, expectedType, latestPath, onNewItems } = config;
const { mode, wsPath, replayPath, expectedType, latestPath, onNewItems, captureScroll } = config;
const batchSize = config.batchSize ?? 40;
const pollMs = config.pollMs ?? 1000;
const [status, setStatus] = useState<WsStatus>("connecting");
@ -328,11 +379,50 @@ const useTape = <T extends { ts: number; seq: number }>(
const replaySourceRef = useRef<string | null>(null);
const emptyPollsRef = useRef<number>(0);
const pausedRef = useRef(paused);
const pendingRef = useRef<T[]>([]);
const pendingCountRef = useRef(0);
const flushHandleRef = useRef<number | null>(null);
useEffect(() => {
pausedRef.current = paused;
}, [paused]);
const cancelFlush = useCallback(() => {
if (flushHandleRef.current !== null) {
cancelAnimationFrame(flushHandleRef.current);
flushHandleRef.current = null;
}
}, []);
const scheduleFlush = useCallback(() => {
if (flushHandleRef.current !== null) {
return;
}
flushHandleRef.current = requestAnimationFrame(() => {
flushHandleRef.current = null;
const buffered = pendingRef.current;
if (buffered.length === 0) {
return;
}
pendingRef.current = [];
const pendingCount = pendingCountRef.current;
pendingCountRef.current = 0;
if (onNewItems && pendingCount > 0) {
onNewItems(pendingCount);
}
if (captureScroll) {
captureScroll();
}
setItems((prev) => mergeNewest(buffered, prev));
setLastUpdate(Date.now());
});
}, [captureScroll, onNewItems]);
const togglePause = useCallback(() => {
setPaused((prev) => {
const next = !prev;
@ -354,7 +444,10 @@ const useTape = <T extends { ts: number; seq: number }>(
setDropped(0);
setStatus("connecting");
cursorRef.current = { ts: 0, seq: 0 };
}, [mode]);
pendingRef.current = [];
pendingCountRef.current = 0;
cancelFlush();
}, [mode, cancelFlush]);
useEffect(() => {
if (mode !== "replay" || !latestPath) {
@ -434,12 +527,9 @@ const useTape = <T extends { ts: number; seq: number }>(
return;
}
if (onNewItems) {
onNewItems(1);
}
setItems((prev) => mergeNewest([message.payload], prev));
setLastUpdate(Date.now());
pendingRef.current.push(message.payload);
pendingCountRef.current += 1;
scheduleFlush();
} catch (error) {
console.warn("Failed to parse websocket payload", error);
}
@ -470,6 +560,7 @@ const useTape = <T extends { ts: number; seq: number }>(
return () => {
active = false;
cancelFlush();
if (reconnectRef.current !== null) {
window.clearTimeout(reconnectRef.current);
}
@ -477,7 +568,7 @@ const useTape = <T extends { ts: number; seq: number }>(
socketRef.current.close();
}
};
}, [mode, wsPath, expectedType]);
}, [mode, wsPath, expectedType, scheduleFlush, cancelFlush]);
useEffect(() => {
if (mode !== "replay") {
@ -543,11 +634,9 @@ const useTape = <T extends { ts: number; seq: number }>(
if (filtered.length > 0) {
const nextItems = [...filtered].reverse();
if (onNewItems) {
onNewItems(nextItems.length);
}
setItems((prev) => mergeNewest(nextItems, prev));
setLastUpdate(Date.now());
pendingRef.current.push(...nextItems);
pendingCountRef.current += nextItems.length;
scheduleFlush();
const last = filtered.at(-1);
if (last) {
setReplayTime(last.ts);
@ -601,8 +690,9 @@ const useTape = <T extends { ts: number; seq: number }>(
return () => {
active = false;
window.clearInterval(interval);
cancelFlush();
};
}, [mode, replayPath, batchSize, pollMs]);
}, [mode, replayPath, batchSize, pollMs, scheduleFlush, cancelFlush]);
return {
status,
@ -617,12 +707,17 @@ const useTape = <T extends { ts: number; seq: number }>(
};
const useLiveStream = <T extends SortableItem>(
enabled: boolean,
wsPath: string,
expectedType: MessageType,
onNewItems?: (count: number) => void
config: {
enabled: boolean;
wsPath: string;
expectedType: MessageType;
onNewItems?: (count: number) => void;
captureScroll?: () => void;
}
): TapeState<T> => {
const [status, setStatus] = useState<WsStatus>(enabled ? "connecting" : "disconnected");
const [status, setStatus] = useState<WsStatus>(
config.enabled ? "connecting" : "disconnected"
);
const [items, setItems] = useState<T[]>([]);
const [lastUpdate, setLastUpdate] = useState<number | null>(null);
const [replayTime] = useState<number | null>(null);
@ -632,11 +727,50 @@ const useLiveStream = <T extends SortableItem>(
const reconnectRef = useRef<number | null>(null);
const socketRef = useRef<WebSocket | null>(null);
const pausedRef = useRef(paused);
const pendingRef = useRef<T[]>([]);
const pendingCountRef = useRef(0);
const flushHandleRef = useRef<number | null>(null);
useEffect(() => {
pausedRef.current = paused;
}, [paused]);
const cancelFlush = useCallback(() => {
if (flushHandleRef.current !== null) {
cancelAnimationFrame(flushHandleRef.current);
flushHandleRef.current = null;
}
}, []);
const scheduleFlush = useCallback(() => {
if (flushHandleRef.current !== null) {
return;
}
flushHandleRef.current = requestAnimationFrame(() => {
flushHandleRef.current = null;
const buffered = pendingRef.current;
if (buffered.length === 0) {
return;
}
pendingRef.current = [];
const pendingCount = pendingCountRef.current;
pendingCountRef.current = 0;
if (config.onNewItems && pendingCount > 0) {
config.onNewItems(pendingCount);
}
if (config.captureScroll) {
config.captureScroll();
}
setItems((prev) => mergeNewest(buffered, prev));
setLastUpdate(Date.now());
});
}, [config.captureScroll, config.onNewItems]);
const togglePause = useCallback(() => {
setPaused((prev) => {
const next = !prev;
@ -648,10 +782,13 @@ const useLiveStream = <T extends SortableItem>(
}, []);
useEffect(() => {
if (!enabled) {
if (!config.enabled) {
setStatus("disconnected");
setItems([]);
setLastUpdate(null);
pendingRef.current = [];
pendingCountRef.current = 0;
cancelFlush();
return;
}
@ -664,7 +801,7 @@ const useLiveStream = <T extends SortableItem>(
setStatus("connecting");
const socket = new WebSocket(buildWsUrl(wsPath));
const socket = new WebSocket(buildWsUrl(config.wsPath));
socketRef.current = socket;
socket.onopen = () => {
@ -681,7 +818,7 @@ const useLiveStream = <T extends SortableItem>(
try {
const message = JSON.parse(event.data) as StreamMessage<T>;
if (!message || message.type !== expectedType) {
if (!message || message.type !== config.expectedType) {
return;
}
@ -691,12 +828,9 @@ const useLiveStream = <T extends SortableItem>(
return;
}
if (onNewItems) {
onNewItems(1);
}
setItems((prev) => mergeNewest([message.payload], prev));
setLastUpdate(Date.now());
pendingRef.current.push(message.payload);
pendingCountRef.current += 1;
scheduleFlush();
} catch (error) {
console.warn("Failed to parse live stream payload", error);
}
@ -727,6 +861,7 @@ const useLiveStream = <T extends SortableItem>(
return () => {
active = false;
cancelFlush();
if (reconnectRef.current !== null) {
window.clearTimeout(reconnectRef.current);
}
@ -734,7 +869,7 @@ const useLiveStream = <T extends SortableItem>(
socketRef.current.close();
}
};
}, [enabled, expectedType, wsPath, onNewItems]);
}, [config.enabled, config.expectedType, config.wsPath, scheduleFlush, cancelFlush]);
return {
status,
@ -750,9 +885,16 @@ const useLiveStream = <T extends SortableItem>(
const useFlowStream = (
enabled: boolean,
onNewItems?: (count: number) => void
onNewItems?: (count: number) => void,
captureScroll?: () => void
): TapeState<FlowPacket> => {
return useLiveStream<FlowPacket>(enabled, "/ws/flow", "flow-packet", onNewItems);
return useLiveStream<FlowPacket>({
enabled,
wsPath: "/ws/flow",
expectedType: "flow-packet",
onNewItems,
captureScroll
});
};
type TapeStatusProps = {
@ -1001,6 +1143,15 @@ export default function HomePage() {
const alertsScroll = useListScroll();
const classifierScroll = useListScroll();
const optionsAnchor = useScrollAnchor(optionsScroll.listRef, optionsScroll.isAtTopRef);
const equitiesAnchor = useScrollAnchor(equitiesScroll.listRef, equitiesScroll.isAtTopRef);
const flowAnchor = useScrollAnchor(flowScroll.listRef, flowScroll.isAtTopRef);
const alertsAnchor = useScrollAnchor(alertsScroll.listRef, alertsScroll.isAtTopRef);
const classifierAnchor = useScrollAnchor(
classifierScroll.listRef,
classifierScroll.isAtTopRef
);
const options = useTape<OptionPrint>({
mode,
wsPath: "/ws/options",
@ -1009,6 +1160,7 @@ export default function HomePage() {
expectedType: "option-print",
batchSize: mode === "replay" ? 120 : undefined,
pollMs: mode === "replay" ? 200 : undefined,
captureScroll: optionsAnchor.capture,
onNewItems: optionsScroll.onNewItems
});
@ -1020,22 +1172,45 @@ export default function HomePage() {
expectedType: "equity-print",
batchSize: mode === "replay" ? 120 : undefined,
pollMs: mode === "replay" ? 200 : undefined,
captureScroll: equitiesAnchor.capture,
onNewItems: equitiesScroll.onNewItems
});
const flow = useFlowStream(mode === "live", flowScroll.onNewItems);
const alerts = useLiveStream<AlertEvent>(
mode === "live",
"/ws/alerts",
"alert",
alertsScroll.onNewItems
);
const classifierHits = useLiveStream<ClassifierHitEvent>(
mode === "live",
"/ws/classifier-hits",
"classifier-hit",
classifierScroll.onNewItems
);
const flow = useFlowStream(mode === "live", flowScroll.onNewItems, flowAnchor.capture);
const alerts = useLiveStream<AlertEvent>({
enabled: mode === "live",
wsPath: "/ws/alerts",
expectedType: "alert",
onNewItems: alertsScroll.onNewItems,
captureScroll: alertsAnchor.capture
});
const classifierHits = useLiveStream<ClassifierHitEvent>({
enabled: mode === "live",
wsPath: "/ws/classifier-hits",
expectedType: "classifier-hit",
onNewItems: classifierScroll.onNewItems,
captureScroll: classifierAnchor.capture
});
useLayoutEffect(() => {
optionsAnchor.apply();
}, [options.items, optionsAnchor.apply]);
useLayoutEffect(() => {
equitiesAnchor.apply();
}, [equities.items, equitiesAnchor.apply]);
useLayoutEffect(() => {
flowAnchor.apply();
}, [flow.items, flowAnchor.apply]);
useLayoutEffect(() => {
alertsAnchor.apply();
}, [alerts.items, alertsAnchor.apply]);
useLayoutEffect(() => {
classifierAnchor.apply();
}, [classifierHits.items, classifierAnchor.apply]);
const activeTickers = useMemo(() => {
const parts = filterInput

View file

@ -25,7 +25,22 @@ const envSchema = z.object({
CLICKHOUSE_URL: z.string().default("http://localhost:8123"),
CLICKHOUSE_DATABASE: z.string().default("default"),
EQUITIES_INGEST_ADAPTER: z.string().min(1).default("synthetic"),
EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000)
EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000),
TESTING_MODE: z
.preprocess((value) => {
if (typeof value === "string") {
const normalized = value.trim().toLowerCase();
if (["1", "true", "yes", "on"].includes(normalized)) {
return true;
}
if (["0", "false", "no", "off"].includes(normalized)) {
return false;
}
}
return value;
}, z.boolean())
.default(false),
TESTING_THROTTLE_MS: z.coerce.number().int().nonnegative().default(200)
});
const env = readEnv(envSchema);
@ -34,6 +49,34 @@ const state = {
shuttingDown: false
};
const buildThrottle = (enabled: boolean, throttleMs: number) => {
if (!enabled || throttleMs <= 0) {
return () => true;
}
let lastEmit = 0;
let dropped = 0;
let lastLog = Date.now();
return (now: number) => {
if (now - lastEmit < throttleMs) {
dropped += 1;
if (now - lastLog > 5000) {
logger.warn("testing mode throttling equity prints", {
dropped,
throttle_ms: throttleMs
});
dropped = 0;
lastLog = now;
}
return false;
}
lastEmit = now;
return true;
};
};
const retry = async <T>(
label: string,
attempts: number,
@ -104,6 +147,7 @@ const run = async () => {
const adapter = selectAdapter(env.EQUITIES_INGEST_ADAPTER);
logger.info("ingest adapter selected", { adapter: adapter.name });
const allowPublish = buildThrottle(env.TESTING_MODE, env.TESTING_THROTTLE_MS);
const stopAdapter: StopHandler = await adapter.start({
onTrade: async (candidate: EquityPrint) => {
@ -111,6 +155,11 @@ const run = async () => {
return;
}
const now = Date.now();
if (!allowPublish(now)) {
return;
}
const print = EquityPrintSchema.parse(candidate);
try {

View file

@ -62,7 +62,22 @@ const envSchema = z.object({
IBKR_EXCHANGE: z.string().min(1).default("SMART"),
IBKR_CURRENCY: z.string().min(1).default("USD"),
IBKR_PYTHON_BIN: z.string().min(1).default("python3"),
EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000)
EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000),
TESTING_MODE: z
.preprocess((value) => {
if (typeof value === "string") {
const normalized = value.trim().toLowerCase();
if (["1", "true", "yes", "on"].includes(normalized)) {
return true;
}
if (["0", "false", "no", "off"].includes(normalized)) {
return false;
}
}
return value;
}, z.boolean())
.default(false),
TESTING_THROTTLE_MS: z.coerce.number().int().nonnegative().default(200)
});
const env = readEnv(envSchema);
@ -71,6 +86,34 @@ const state = {
shuttingDown: false
};
const buildThrottle = (enabled: boolean, throttleMs: number) => {
if (!enabled || throttleMs <= 0) {
return () => true;
}
let lastEmit = 0;
let dropped = 0;
let lastLog = Date.now();
return (now: number) => {
if (now - lastEmit < throttleMs) {
dropped += 1;
if (now - lastLog > 5000) {
logger.warn("testing mode throttling option prints", {
dropped,
throttle_ms: throttleMs
});
dropped = 0;
lastLog = now;
}
return false;
}
lastEmit = now;
return true;
};
};
const retry = async <T>(
label: string,
attempts: number,
@ -205,6 +248,7 @@ const run = async () => {
const adapter = selectAdapter(env.OPTIONS_INGEST_ADAPTER);
logger.info("ingest adapter selected", { adapter: adapter.name });
const allowPublish = buildThrottle(env.TESTING_MODE, env.TESTING_THROTTLE_MS);
const stopAdapter: StopHandler = await adapter.start({
onTrade: async (candidate: OptionPrint) => {
@ -212,6 +256,11 @@ const run = async () => {
return;
}
const now = Date.now();
if (!allowPublish(now)) {
return;
}
const print = OptionPrintSchema.parse(candidate);
try {