From bd1a67a7fc96402d27873970d8641b7ab04f2b5c Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Mon, 29 Dec 2025 22:38:51 -0500 Subject: [PATCH] Add testing-mode throttles and UI batching Throttle ingest pipelines in TESTING_MODE, document settings in README, and batch live UI updates per frame to reduce scroll lag. --- .env.example | 4 + README.md | 24 ++- apps/web/app/page.tsx | 269 +++++++++++++++++++++----- services/ingest-equities/src/index.ts | 51 ++++- services/ingest-options/src/index.ts | 51 ++++- 5 files changed, 341 insertions(+), 58 deletions(-) diff --git a/.env.example b/.env.example index 5ebe0a7..900fd08 100644 --- a/.env.example +++ b/.env.example @@ -45,6 +45,10 @@ IBKR_PYTHON_BIN=python3 EQUITIES_INGEST_ADAPTER=synthetic EMIT_INTERVAL_MS=1000 +# Testing mode +TESTING_MODE=false +TESTING_THROTTLE_MS=200 + # Compute consumer behavior COMPUTE_DELIVER_POLICY=new COMPUTE_CONSUMER_RESET=false diff --git a/README.md b/README.md index 94e34ce..1deaa03 100644 --- a/README.md +++ b/README.md @@ -6,24 +6,26 @@ The system ingests real-time options trades/quotes and equity prints, clusters r ## CURRENT STATE (Plan Progress) -Plan progress (rough): [####------] +Plan progress (rough): [#####-----] Done now (in repo): - Bun monorepo + infra docker compose (ClickHouse, Redis, NATS JetStream) - Shared event schemas + logging + config helpers -- Synthetic options/equity prints published to NATS and persisted to ClickHouse +- Synthetic options/equity prints (full S&P 500) published to NATS and persisted to ClickHouse - Deterministic option FlowPacket clustering (time window) + persistence -- API: REST for prints/flow packets, WS for live options/equities/flow, replay endpoints +- Rule-first classifiers + alert scoring with ClickHouse persistence + WS/REST endpoints +- API: REST for prints/flow packets/classifier hits/alerts, WS for live options/equities/flow/alerts/hits, replay endpoints - UI: live tapes for options/equities/flow + replay toggle + pause controls + replay time/completion +- UI: alerts + classifier hits panels, ticker filter, evidence drawer, severity strip - Databento historical replay adapter (options) with symbol mapping - Alpaca options adapter (dev-only, bounded contract list) +- Testing-mode throttling for ingest to reduce CPU during local dev In progress / blocked: - Live data adapters beyond dev-only feeds (requires licensed data source) - Rolling stats and advanced clustering Not started: -- Classifiers + alert scoring - Dark pool inference - Candle service and chart overlays - Auth / secure deployment @@ -37,19 +39,19 @@ Not started: ## Current Capabilities -- Synthetic options/equity prints with deterministic sequencing +- Synthetic options/equity prints with deterministic sequencing across the S&P 500 - Ingest adapter seam (env-selected; options default `alpaca`, equities default `synthetic`) - Raw event persistence in ClickHouse + streaming via NATS JetStream - Deterministic option FlowPacket clustering (time-window) +- Classifiers + alert scoring (rule-first) with WS/REST endpoints - API gateway with REST, WS, and replay endpoints -- UI tapes for options/equities/flow packets with live/replay toggle and pause controls +- UI tapes for options/equities/flow packets + alerts/hits with live/replay toggle and pause controls - Alpaca options adapter (dev-only) with bounded contract selection - Databento historical replay adapter (options, Python sidecar) ## Planned Capabilities (from PLAN.md) - Real-time licensed market data ingestors (options + equities) -- Rule-first classifiers and alert scoring - Dark pool inference and evidence linking - Candle aggregation + chart overlays - Replay/backtesting metrics and calibration @@ -92,8 +94,8 @@ Create env file: Start everything (infra + services + web): - `bun run dev` -Run just the web app (auto-picks a free port in 3001-3005): -- `bun --cwd apps/web run dev` +Run just the web app (fixed to port 3000): +- `bun run dev:web` Run just the API: - `bun --cwd services/api run dev` @@ -103,6 +105,10 @@ Adapter selection (env): - Equities: `EQUITIES_INGEST_ADAPTER` (defaults to `synthetic`) - Compute: `COMPUTE_DELIVER_POLICY` (`new` default), `COMPUTE_CONSUMER_RESET` (force skip backlog) +Testing mode (throttles ingest to reduce CPU): +- `TESTING_MODE=true` enables throttling +- `TESTING_THROTTLE_MS=200` minimum spacing between emitted prints (per ingest service) + IBKR adapter (options, via Python `ib_insync`): - Install Python deps: `python3 -m pip install -r services/ingest-options/py/requirements.txt` - Set `OPTIONS_INGEST_ADAPTER=ibkr` and configure: diff --git a/apps/web/app/page.tsx b/apps/web/app/page.tsx index 41acf38..433d954 100644 --- a/apps/web/app/page.tsx +++ b/apps/web/app/page.tsx @@ -1,6 +1,6 @@ "use client"; -import { useCallback, useEffect, useMemo, useRef, useState } from "react"; +import { useCallback, useEffect, useLayoutEffect, useMemo, useRef, useState } from "react"; import type { AlertEvent, ClassifierHitEvent, EquityPrint, FlowPacket, OptionPrint } from "@islandflow/types"; const MAX_ITEMS = 500; @@ -213,6 +213,7 @@ const parseNumber = (value: unknown, fallback: number): number => { type ListScrollState = { listRef: React.RefObject; isAtTop: boolean; + isAtTopRef: React.MutableRefObject; missed: number; onNewItems: (count: number) => void; jumpToTop: () => void; @@ -236,6 +237,7 @@ const useListScroll = (): ListScrollState => { const onScroll = () => { const atTop = el.scrollTop <= 2; + isAtTopRef.current = atTop; setIsAtTop(atTop); if (atTop) { setMissed(0); @@ -273,7 +275,55 @@ const useListScroll = (): ListScrollState => { setMissed(0); }, []); - return { listRef, isAtTop, missed, onNewItems, jumpToTop }; + return { + listRef, + isAtTop, + isAtTopRef, + missed, + onNewItems, + jumpToTop + }; +}; + +const useScrollAnchor = ( + listRef: React.RefObject, + isAtTopRef: React.MutableRefObject +) => { + const pendingRef = useRef<{ top: number; height: number } | null>(null); + + const capture = useCallback(() => { + if (isAtTopRef.current) { + return; + } + + const el = listRef.current; + if (!el) { + return; + } + + pendingRef.current = { + top: el.scrollTop, + height: el.scrollHeight + }; + }, [isAtTopRef, listRef]); + + const apply = useCallback(() => { + const pending = pendingRef.current; + if (!pending) { + return; + } + + const el = listRef.current; + if (!el) { + return; + } + + const delta = el.scrollHeight - pending.height; + el.scrollTop = pending.top + delta; + pendingRef.current = null; + }, [listRef]); + + return { capture, apply }; }; const statusLabel = (status: WsStatus, paused: boolean, mode: TapeMode): string => { @@ -304,13 +354,14 @@ type TapeConfig = { expectedType: MessageType; batchSize?: number; pollMs?: number; + captureScroll?: () => void; onNewItems?: (count: number) => void; }; const useTape = ( config: TapeConfig ): TapeState => { - const { mode, wsPath, replayPath, expectedType, latestPath, onNewItems } = config; + const { mode, wsPath, replayPath, expectedType, latestPath, onNewItems, captureScroll } = config; const batchSize = config.batchSize ?? 40; const pollMs = config.pollMs ?? 1000; const [status, setStatus] = useState("connecting"); @@ -328,11 +379,50 @@ const useTape = ( const replaySourceRef = useRef(null); const emptyPollsRef = useRef(0); const pausedRef = useRef(paused); + const pendingRef = useRef([]); + const pendingCountRef = useRef(0); + const flushHandleRef = useRef(null); useEffect(() => { pausedRef.current = paused; }, [paused]); + const cancelFlush = useCallback(() => { + if (flushHandleRef.current !== null) { + cancelAnimationFrame(flushHandleRef.current); + flushHandleRef.current = null; + } + }, []); + + const scheduleFlush = useCallback(() => { + if (flushHandleRef.current !== null) { + return; + } + + flushHandleRef.current = requestAnimationFrame(() => { + flushHandleRef.current = null; + const buffered = pendingRef.current; + if (buffered.length === 0) { + return; + } + pendingRef.current = []; + + const pendingCount = pendingCountRef.current; + pendingCountRef.current = 0; + + if (onNewItems && pendingCount > 0) { + onNewItems(pendingCount); + } + + if (captureScroll) { + captureScroll(); + } + + setItems((prev) => mergeNewest(buffered, prev)); + setLastUpdate(Date.now()); + }); + }, [captureScroll, onNewItems]); + const togglePause = useCallback(() => { setPaused((prev) => { const next = !prev; @@ -354,7 +444,10 @@ const useTape = ( setDropped(0); setStatus("connecting"); cursorRef.current = { ts: 0, seq: 0 }; - }, [mode]); + pendingRef.current = []; + pendingCountRef.current = 0; + cancelFlush(); + }, [mode, cancelFlush]); useEffect(() => { if (mode !== "replay" || !latestPath) { @@ -434,12 +527,9 @@ const useTape = ( return; } - if (onNewItems) { - onNewItems(1); - } - - setItems((prev) => mergeNewest([message.payload], prev)); - setLastUpdate(Date.now()); + pendingRef.current.push(message.payload); + pendingCountRef.current += 1; + scheduleFlush(); } catch (error) { console.warn("Failed to parse websocket payload", error); } @@ -470,6 +560,7 @@ const useTape = ( return () => { active = false; + cancelFlush(); if (reconnectRef.current !== null) { window.clearTimeout(reconnectRef.current); } @@ -477,7 +568,7 @@ const useTape = ( socketRef.current.close(); } }; - }, [mode, wsPath, expectedType]); + }, [mode, wsPath, expectedType, scheduleFlush, cancelFlush]); useEffect(() => { if (mode !== "replay") { @@ -543,11 +634,9 @@ const useTape = ( if (filtered.length > 0) { const nextItems = [...filtered].reverse(); - if (onNewItems) { - onNewItems(nextItems.length); - } - setItems((prev) => mergeNewest(nextItems, prev)); - setLastUpdate(Date.now()); + pendingRef.current.push(...nextItems); + pendingCountRef.current += nextItems.length; + scheduleFlush(); const last = filtered.at(-1); if (last) { setReplayTime(last.ts); @@ -601,8 +690,9 @@ const useTape = ( return () => { active = false; window.clearInterval(interval); + cancelFlush(); }; - }, [mode, replayPath, batchSize, pollMs]); + }, [mode, replayPath, batchSize, pollMs, scheduleFlush, cancelFlush]); return { status, @@ -617,12 +707,17 @@ const useTape = ( }; const useLiveStream = ( - enabled: boolean, - wsPath: string, - expectedType: MessageType, - onNewItems?: (count: number) => void + config: { + enabled: boolean; + wsPath: string; + expectedType: MessageType; + onNewItems?: (count: number) => void; + captureScroll?: () => void; + } ): TapeState => { - const [status, setStatus] = useState(enabled ? "connecting" : "disconnected"); + const [status, setStatus] = useState( + config.enabled ? "connecting" : "disconnected" + ); const [items, setItems] = useState([]); const [lastUpdate, setLastUpdate] = useState(null); const [replayTime] = useState(null); @@ -632,11 +727,50 @@ const useLiveStream = ( const reconnectRef = useRef(null); const socketRef = useRef(null); const pausedRef = useRef(paused); + const pendingRef = useRef([]); + const pendingCountRef = useRef(0); + const flushHandleRef = useRef(null); useEffect(() => { pausedRef.current = paused; }, [paused]); + const cancelFlush = useCallback(() => { + if (flushHandleRef.current !== null) { + cancelAnimationFrame(flushHandleRef.current); + flushHandleRef.current = null; + } + }, []); + + const scheduleFlush = useCallback(() => { + if (flushHandleRef.current !== null) { + return; + } + + flushHandleRef.current = requestAnimationFrame(() => { + flushHandleRef.current = null; + const buffered = pendingRef.current; + if (buffered.length === 0) { + return; + } + pendingRef.current = []; + + const pendingCount = pendingCountRef.current; + pendingCountRef.current = 0; + + if (config.onNewItems && pendingCount > 0) { + config.onNewItems(pendingCount); + } + + if (config.captureScroll) { + config.captureScroll(); + } + + setItems((prev) => mergeNewest(buffered, prev)); + setLastUpdate(Date.now()); + }); + }, [config.captureScroll, config.onNewItems]); + const togglePause = useCallback(() => { setPaused((prev) => { const next = !prev; @@ -648,10 +782,13 @@ const useLiveStream = ( }, []); useEffect(() => { - if (!enabled) { + if (!config.enabled) { setStatus("disconnected"); setItems([]); setLastUpdate(null); + pendingRef.current = []; + pendingCountRef.current = 0; + cancelFlush(); return; } @@ -664,7 +801,7 @@ const useLiveStream = ( setStatus("connecting"); - const socket = new WebSocket(buildWsUrl(wsPath)); + const socket = new WebSocket(buildWsUrl(config.wsPath)); socketRef.current = socket; socket.onopen = () => { @@ -681,7 +818,7 @@ const useLiveStream = ( try { const message = JSON.parse(event.data) as StreamMessage; - if (!message || message.type !== expectedType) { + if (!message || message.type !== config.expectedType) { return; } @@ -691,12 +828,9 @@ const useLiveStream = ( return; } - if (onNewItems) { - onNewItems(1); - } - - setItems((prev) => mergeNewest([message.payload], prev)); - setLastUpdate(Date.now()); + pendingRef.current.push(message.payload); + pendingCountRef.current += 1; + scheduleFlush(); } catch (error) { console.warn("Failed to parse live stream payload", error); } @@ -727,6 +861,7 @@ const useLiveStream = ( return () => { active = false; + cancelFlush(); if (reconnectRef.current !== null) { window.clearTimeout(reconnectRef.current); } @@ -734,7 +869,7 @@ const useLiveStream = ( socketRef.current.close(); } }; - }, [enabled, expectedType, wsPath, onNewItems]); + }, [config.enabled, config.expectedType, config.wsPath, scheduleFlush, cancelFlush]); return { status, @@ -750,9 +885,16 @@ const useLiveStream = ( const useFlowStream = ( enabled: boolean, - onNewItems?: (count: number) => void + onNewItems?: (count: number) => void, + captureScroll?: () => void ): TapeState => { - return useLiveStream(enabled, "/ws/flow", "flow-packet", onNewItems); + return useLiveStream({ + enabled, + wsPath: "/ws/flow", + expectedType: "flow-packet", + onNewItems, + captureScroll + }); }; type TapeStatusProps = { @@ -1001,6 +1143,15 @@ export default function HomePage() { const alertsScroll = useListScroll(); const classifierScroll = useListScroll(); + const optionsAnchor = useScrollAnchor(optionsScroll.listRef, optionsScroll.isAtTopRef); + const equitiesAnchor = useScrollAnchor(equitiesScroll.listRef, equitiesScroll.isAtTopRef); + const flowAnchor = useScrollAnchor(flowScroll.listRef, flowScroll.isAtTopRef); + const alertsAnchor = useScrollAnchor(alertsScroll.listRef, alertsScroll.isAtTopRef); + const classifierAnchor = useScrollAnchor( + classifierScroll.listRef, + classifierScroll.isAtTopRef + ); + const options = useTape({ mode, wsPath: "/ws/options", @@ -1009,6 +1160,7 @@ export default function HomePage() { expectedType: "option-print", batchSize: mode === "replay" ? 120 : undefined, pollMs: mode === "replay" ? 200 : undefined, + captureScroll: optionsAnchor.capture, onNewItems: optionsScroll.onNewItems }); @@ -1020,22 +1172,45 @@ export default function HomePage() { expectedType: "equity-print", batchSize: mode === "replay" ? 120 : undefined, pollMs: mode === "replay" ? 200 : undefined, + captureScroll: equitiesAnchor.capture, onNewItems: equitiesScroll.onNewItems }); - const flow = useFlowStream(mode === "live", flowScroll.onNewItems); - const alerts = useLiveStream( - mode === "live", - "/ws/alerts", - "alert", - alertsScroll.onNewItems - ); - const classifierHits = useLiveStream( - mode === "live", - "/ws/classifier-hits", - "classifier-hit", - classifierScroll.onNewItems - ); + const flow = useFlowStream(mode === "live", flowScroll.onNewItems, flowAnchor.capture); + const alerts = useLiveStream({ + enabled: mode === "live", + wsPath: "/ws/alerts", + expectedType: "alert", + onNewItems: alertsScroll.onNewItems, + captureScroll: alertsAnchor.capture + }); + const classifierHits = useLiveStream({ + enabled: mode === "live", + wsPath: "/ws/classifier-hits", + expectedType: "classifier-hit", + onNewItems: classifierScroll.onNewItems, + captureScroll: classifierAnchor.capture + }); + + useLayoutEffect(() => { + optionsAnchor.apply(); + }, [options.items, optionsAnchor.apply]); + + useLayoutEffect(() => { + equitiesAnchor.apply(); + }, [equities.items, equitiesAnchor.apply]); + + useLayoutEffect(() => { + flowAnchor.apply(); + }, [flow.items, flowAnchor.apply]); + + useLayoutEffect(() => { + alertsAnchor.apply(); + }, [alerts.items, alertsAnchor.apply]); + + useLayoutEffect(() => { + classifierAnchor.apply(); + }, [classifierHits.items, classifierAnchor.apply]); const activeTickers = useMemo(() => { const parts = filterInput diff --git a/services/ingest-equities/src/index.ts b/services/ingest-equities/src/index.ts index c02bebc..5001cec 100644 --- a/services/ingest-equities/src/index.ts +++ b/services/ingest-equities/src/index.ts @@ -25,7 +25,22 @@ const envSchema = z.object({ CLICKHOUSE_URL: z.string().default("http://localhost:8123"), CLICKHOUSE_DATABASE: z.string().default("default"), EQUITIES_INGEST_ADAPTER: z.string().min(1).default("synthetic"), - EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000) + EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000), + TESTING_MODE: z + .preprocess((value) => { + if (typeof value === "string") { + const normalized = value.trim().toLowerCase(); + if (["1", "true", "yes", "on"].includes(normalized)) { + return true; + } + if (["0", "false", "no", "off"].includes(normalized)) { + return false; + } + } + return value; + }, z.boolean()) + .default(false), + TESTING_THROTTLE_MS: z.coerce.number().int().nonnegative().default(200) }); const env = readEnv(envSchema); @@ -34,6 +49,34 @@ const state = { shuttingDown: false }; +const buildThrottle = (enabled: boolean, throttleMs: number) => { + if (!enabled || throttleMs <= 0) { + return () => true; + } + + let lastEmit = 0; + let dropped = 0; + let lastLog = Date.now(); + + return (now: number) => { + if (now - lastEmit < throttleMs) { + dropped += 1; + if (now - lastLog > 5000) { + logger.warn("testing mode throttling equity prints", { + dropped, + throttle_ms: throttleMs + }); + dropped = 0; + lastLog = now; + } + return false; + } + + lastEmit = now; + return true; + }; +}; + const retry = async ( label: string, attempts: number, @@ -104,6 +147,7 @@ const run = async () => { const adapter = selectAdapter(env.EQUITIES_INGEST_ADAPTER); logger.info("ingest adapter selected", { adapter: adapter.name }); + const allowPublish = buildThrottle(env.TESTING_MODE, env.TESTING_THROTTLE_MS); const stopAdapter: StopHandler = await adapter.start({ onTrade: async (candidate: EquityPrint) => { @@ -111,6 +155,11 @@ const run = async () => { return; } + const now = Date.now(); + if (!allowPublish(now)) { + return; + } + const print = EquityPrintSchema.parse(candidate); try { diff --git a/services/ingest-options/src/index.ts b/services/ingest-options/src/index.ts index 483f4b3..a16b051 100644 --- a/services/ingest-options/src/index.ts +++ b/services/ingest-options/src/index.ts @@ -62,7 +62,22 @@ const envSchema = z.object({ IBKR_EXCHANGE: z.string().min(1).default("SMART"), IBKR_CURRENCY: z.string().min(1).default("USD"), IBKR_PYTHON_BIN: z.string().min(1).default("python3"), - EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000) + EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000), + TESTING_MODE: z + .preprocess((value) => { + if (typeof value === "string") { + const normalized = value.trim().toLowerCase(); + if (["1", "true", "yes", "on"].includes(normalized)) { + return true; + } + if (["0", "false", "no", "off"].includes(normalized)) { + return false; + } + } + return value; + }, z.boolean()) + .default(false), + TESTING_THROTTLE_MS: z.coerce.number().int().nonnegative().default(200) }); const env = readEnv(envSchema); @@ -71,6 +86,34 @@ const state = { shuttingDown: false }; +const buildThrottle = (enabled: boolean, throttleMs: number) => { + if (!enabled || throttleMs <= 0) { + return () => true; + } + + let lastEmit = 0; + let dropped = 0; + let lastLog = Date.now(); + + return (now: number) => { + if (now - lastEmit < throttleMs) { + dropped += 1; + if (now - lastLog > 5000) { + logger.warn("testing mode throttling option prints", { + dropped, + throttle_ms: throttleMs + }); + dropped = 0; + lastLog = now; + } + return false; + } + + lastEmit = now; + return true; + }; +}; + const retry = async ( label: string, attempts: number, @@ -205,6 +248,7 @@ const run = async () => { const adapter = selectAdapter(env.OPTIONS_INGEST_ADAPTER); logger.info("ingest adapter selected", { adapter: adapter.name }); + const allowPublish = buildThrottle(env.TESTING_MODE, env.TESTING_THROTTLE_MS); const stopAdapter: StopHandler = await adapter.start({ onTrade: async (candidate: OptionPrint) => { @@ -212,6 +256,11 @@ const run = async () => { return; } + const now = Date.now(); + if (!allowPublish(now)) { + return; + } + const print = OptionPrintSchema.parse(candidate); try {