From ad58c62c371aceb149ae1a8f04e8047262bd3098 Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Mon, 29 Dec 2025 15:45:33 -0500 Subject: [PATCH] Improve live tape UX and compute consumer behavior - raise UI tape cap to 500 and keep newest-first ordering - add jump-to-top with missed counter + revised card layout styling - add compute deliver policy/reset envs to prevent JetStream backlog replay - update .env.example and README for new defaults/config --- .env.example | 4 + README.md | 1 + apps/web/app/globals.css | 80 ++++++++++++- apps/web/app/page.tsx | 215 ++++++++++++++++++++++++++++++---- services/compute/src/index.ts | 78 +++++++++++- 5 files changed, 349 insertions(+), 29 deletions(-) diff --git a/.env.example b/.env.example index 02c301e..081c73d 100644 --- a/.env.example +++ b/.env.example @@ -44,3 +44,7 @@ IBKR_PYTHON_BIN=python3 # Equities ingest EQUITIES_INGEST_ADAPTER=synthetic EMIT_INTERVAL_MS=1000 + +# Compute consumer behavior +COMPUTE_DELIVER_POLICY=new +COMPUTE_CONSUMER_RESET=false diff --git a/README.md b/README.md index 5bac783..94e34ce 100644 --- a/README.md +++ b/README.md @@ -101,6 +101,7 @@ Run just the API: Adapter selection (env): - Options: `OPTIONS_INGEST_ADAPTER` (defaults to `alpaca`) - Equities: `EQUITIES_INGEST_ADAPTER` (defaults to `synthetic`) +- Compute: `COMPUTE_DELIVER_POLICY` (`new` default), `COMPUTE_CONSUMER_RESET` (force skip backlog) IBKR adapter (options, via Python `ib_insync`): - Install Python deps: `python3 -m pip install -r services/ingest-options/py/requirements.txt` diff --git a/apps/web/app/globals.css b/apps/web/app/globals.css index aeeb5c0..0f0ccc7 100644 --- a/apps/web/app/globals.css +++ b/apps/web/app/globals.css @@ -185,6 +185,79 @@ h1 { outline-offset: 2px; } +.card-controls { + display: flex; + align-items: flex-end; + justify-content: space-between; + gap: 12px; + width: 100%; + margin-bottom: 20px; +} + +.tape-controls { + display: flex; + flex-direction: column; + align-items: flex-end; + gap: 6px; + min-width: 120px; +} + +.jump-button { + border: 1px solid rgba(111, 91, 57, 0.35); + border-radius: 999px; + padding: 6px 12px; + background: rgba(111, 91, 57, 0.08); + color: #6f5b39; + font-size: 0.75rem; + letter-spacing: 0.12em; + text-transform: uppercase; + cursor: pointer; +} + +.jump-button:disabled { + opacity: 0.5; + cursor: default; +} + +.jump-button:not(:disabled) { + border-color: rgba(47, 109, 79, 0.6); + background: rgba(47, 109, 79, 0.1); + color: #2f6d4f; + box-shadow: 0 0 0 2px rgba(47, 109, 79, 0.15); +} + +.jump-button:focus-visible { + outline: 2px solid rgba(111, 91, 57, 0.3); + outline-offset: 2px; +} + +.missed-count { + padding: 4px 10px; + border-radius: 999px; + border: 1px solid rgba(31, 74, 123, 0.25); + background: rgba(31, 74, 123, 0.12); + color: #1f4a7b; + font-size: 0.7rem; + letter-spacing: 0.12em; + text-transform: uppercase; + max-height: 0; + opacity: 0; + transform: translateY(-6px); + overflow: hidden; + transition: max-height 0.2s ease, opacity 0.2s ease, transform 0.2s ease; +} + +.tape-controls-active .jump-button { + transform: translateY(-6px); + transition: transform 0.2s ease; +} + +.tape-controls-active .missed-count { + max-height: 24px; + opacity: 1; + transform: translateY(0); +} + .card { border: 1px solid var(--panel-border); border-radius: 24px; @@ -195,10 +268,9 @@ h1 { .card-header { display: flex; - justify-content: space-between; - gap: 16px; - flex-wrap: wrap; - align-items: center; + flex-direction: column; + gap: 6px; + align-items: flex-start; margin-bottom: 24px; } diff --git a/apps/web/app/page.tsx b/apps/web/app/page.tsx index c96a75c..0ed0b09 100644 --- a/apps/web/app/page.tsx +++ b/apps/web/app/page.tsx @@ -3,7 +3,7 @@ import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import type { EquityPrint, FlowPacket, OptionPrint } from "@islandflow/types"; -const MAX_ITEMS = 60; +const MAX_ITEMS = 500; const LOCAL_HOSTS = new Set(["localhost", "127.0.0.1"]); type WsStatus = "connecting" | "connected" | "disconnected"; @@ -40,6 +40,63 @@ const extractTracePrefix = (item: T): string | null => { return inferTracePrefix(traceId); }; +type SortableItem = { + ts?: number; + source_ts?: number; + ingest_ts?: number; + seq?: number; + trace_id?: string; + id?: string; +}; + +const extractSortTs = (item: SortableItem): number => + item.ts ?? item.source_ts ?? item.ingest_ts ?? 0; + +const extractSortSeq = (item: SortableItem): number => item.seq ?? 0; + +const buildItemKey = (item: SortableItem): string | null => { + if (item.trace_id) { + return `${item.trace_id}:${item.seq ?? ""}`; + } + + if (item.id) { + return `id:${item.id}`; + } + + return null; +}; + +const mergeNewest = (incoming: T[], existing: T[]): T[] => { + const combined = [...incoming, ...existing]; + if (combined.length === 0) { + return combined; + } + + const seen = new Set(); + const deduped: T[] = []; + + for (const item of combined) { + const key = buildItemKey(item); + if (key) { + if (seen.has(key)) { + continue; + } + seen.add(key); + } + deduped.push(item); + } + + deduped.sort((a, b) => { + const delta = extractSortTs(b) - extractSortTs(a); + if (delta !== 0) { + return delta; + } + return extractSortSeq(b) - extractSortSeq(a); + }); + + return deduped.slice(0, MAX_ITEMS); +}; + type TapeState = { status: WsStatus; items: T[]; @@ -119,6 +176,72 @@ const parseNumber = (value: unknown, fallback: number): number => { return fallback; }; +type ListScrollState = { + listRef: React.RefObject; + isAtTop: boolean; + missed: number; + onNewItems: (count: number) => void; + jumpToTop: () => void; +}; + +const useListScroll = (): ListScrollState => { + const listRef = useRef(null); + const [isAtTop, setIsAtTop] = useState(true); + const [missed, setMissed] = useState(0); + const isAtTopRef = useRef(true); + + useEffect(() => { + isAtTopRef.current = isAtTop; + }, [isAtTop]); + + useEffect(() => { + const el = listRef.current; + if (!el) { + return; + } + + const onScroll = () => { + const atTop = el.scrollTop <= 2; + setIsAtTop(atTop); + if (atTop) { + setMissed(0); + } + }; + + onScroll(); + el.addEventListener("scroll", onScroll); + + return () => { + el.removeEventListener("scroll", onScroll); + }; + }, []); + + const onNewItems = useCallback((count: number) => { + if (count <= 0) { + return; + } + + if (isAtTopRef.current) { + setMissed(0); + return; + } + + setMissed((prev) => prev + count); + }, []); + + const jumpToTop = useCallback(() => { + const el = listRef.current; + if (!el) { + return; + } + + el.scrollTo({ top: 0, behavior: "smooth" }); + setMissed(0); + }, []); + + return { listRef, isAtTop, missed, onNewItems, jumpToTop }; +}; + const statusLabel = (status: WsStatus, paused: boolean, mode: TapeMode): string => { if (paused) { return "Paused"; @@ -147,12 +270,13 @@ type TapeConfig = { expectedType: MessageType; batchSize?: number; pollMs?: number; + onNewItems?: (count: number) => void; }; const useTape = ( config: TapeConfig ): TapeState => { - const { mode, wsPath, replayPath, expectedType, latestPath } = config; + const { mode, wsPath, replayPath, expectedType, latestPath, onNewItems } = config; const batchSize = config.batchSize ?? 40; const pollMs = config.pollMs ?? 1000; const [status, setStatus] = useState("connecting"); @@ -276,10 +400,11 @@ const useTape = ( return; } - setItems((prev) => { - const next = [message.payload, ...prev]; - return next.slice(0, MAX_ITEMS); - }); + if (onNewItems) { + onNewItems(1); + } + + setItems((prev) => mergeNewest([message.payload], prev)); setLastUpdate(Date.now()); } catch (error) { console.warn("Failed to parse websocket payload", error); @@ -384,10 +509,10 @@ const useTape = ( if (filtered.length > 0) { const nextItems = [...filtered].reverse(); - setItems((prev) => { - const next = [...nextItems, ...prev]; - return next.slice(0, MAX_ITEMS); - }); + if (onNewItems) { + onNewItems(nextItems.length); + } + setItems((prev) => mergeNewest(nextItems, prev)); setLastUpdate(Date.now()); const last = filtered.at(-1); if (last) { @@ -457,7 +582,10 @@ const useTape = ( }; }; -const useFlowStream = (enabled: boolean): TapeState => { +const useFlowStream = ( + enabled: boolean, + onNewItems?: (count: number) => void +): TapeState => { const [status, setStatus] = useState(enabled ? "connecting" : "disconnected"); const [items, setItems] = useState([]); const [lastUpdate, setLastUpdate] = useState(null); @@ -525,10 +653,11 @@ const useFlowStream = (enabled: boolean): TapeState => { return; } - setItems((prev) => { - const next = [message.payload, ...prev]; - return next.slice(0, MAX_ITEMS); - }); + if (onNewItems) { + onNewItems(1); + } + + setItems((prev) => mergeNewest([message.payload], prev)); setLastUpdate(Date.now()); } catch (error) { console.warn("Failed to parse flow packet", error); @@ -630,6 +759,24 @@ const TapeStatus = ({ ); }; +type TapeControlsProps = { + isAtTop: boolean; + missed: number; + onJump: () => void; +}; + +const TapeControls = ({ isAtTop, missed, onJump }: TapeControlsProps) => { + const active = !isAtTop && missed > 0; + return ( +
+ + {active ? `+${missed} new` : ""} +
+ ); +}; + const formatFlowMetric = (value: number, suffix?: string): string => { if (suffix) { return `${value}${suffix}`; @@ -640,6 +787,9 @@ const formatFlowMetric = (value: number, suffix?: string): string => { export default function HomePage() { const [mode, setMode] = useState("live"); + const optionsScroll = useListScroll(); + const equitiesScroll = useListScroll(); + const flowScroll = useListScroll(); const options = useTape({ mode, @@ -648,7 +798,8 @@ export default function HomePage() { latestPath: "/prints/options", expectedType: "option-print", batchSize: mode === "replay" ? 120 : undefined, - pollMs: mode === "replay" ? 200 : undefined + pollMs: mode === "replay" ? 200 : undefined, + onNewItems: optionsScroll.onNewItems }); const equities = useTape({ @@ -658,10 +809,11 @@ export default function HomePage() { latestPath: "/prints/equities", expectedType: "equity-print", batchSize: mode === "replay" ? 120 : undefined, - pollMs: mode === "replay" ? 200 : undefined + pollMs: mode === "replay" ? 200 : undefined, + onNewItems: equitiesScroll.onNewItems }); - const flow = useFlowStream(mode === "live"); + const flow = useFlowStream(mode === "live", flowScroll.onNewItems); const lastSeen = useMemo(() => { return [options.lastUpdate, equities.lastUpdate, flow.lastUpdate] @@ -701,6 +853,8 @@ export default function HomePage() {

Options Tape

Newest prints first (max {MAX_ITEMS}).

+ +
+
-
+
{options.items.length === 0 ? (
{mode === "live" @@ -747,6 +906,8 @@ export default function HomePage() {

Equities Tape

Off-exchange flag highlighted.

+
+
+
-
+
{equities.items.length === 0 ? (
{mode === "live" @@ -795,6 +961,8 @@ export default function HomePage() {

Flow Packets

Deterministic clusters (live only).

+
+
+
-
+
{mode !== "live" ? (
Flow packets are live-only in this build.
) : flow.items.length === 0 ? ( diff --git a/services/compute/src/index.ts b/services/compute/src/index.ts index 6717c5d..cbbd91f 100644 --- a/services/compute/src/index.ts +++ b/services/compute/src/index.ts @@ -26,7 +26,22 @@ const envSchema = z.object({ NATS_URL: z.string().default("nats://localhost:4222"), CLICKHOUSE_URL: z.string().default("http://localhost:8123"), CLICKHOUSE_DATABASE: z.string().default("default"), - CLUSTER_WINDOW_MS: z.coerce.number().int().positive().default(500) + CLUSTER_WINDOW_MS: z.coerce.number().int().positive().default(500), + COMPUTE_DELIVER_POLICY: z.enum(["new", "all", "last", "last_per_subject"]).default("new"), + COMPUTE_CONSUMER_RESET: z + .preprocess((value) => { + if (typeof value === "string") { + const normalized = value.trim().toLowerCase(); + if (["1", "true", "yes", "on"].includes(normalized)) { + return true; + } + if (["0", "false", "no", "off"].includes(normalized)) { + return false; + } + } + return value; + }, z.boolean()) + .default(false) }); const env = readEnv(envSchema); @@ -74,6 +89,27 @@ type ClusterState = { const clusters = new Map(); +const applyDeliverPolicy = ( + opts: ReturnType, + policy: typeof env.COMPUTE_DELIVER_POLICY +) => { + switch (policy) { + case "all": + opts.deliverAll(); + break; + case "last": + opts.deliverLast(); + break; + case "last_per_subject": + opts.deliverLastPerSubject(); + break; + case "new": + default: + opts.deliverNew(); + break; + } +}; + const buildCluster = (print: OptionPrint): ClusterState => { return { contractId: print.option_contract_id, @@ -206,9 +242,41 @@ const run = async () => { }); const durableName = "compute-option-prints"; - const subscription = await (async () => { + + if (env.COMPUTE_CONSUMER_RESET) { try { - return await subscribeJson(js, SUBJECT_OPTION_PRINTS, buildDurableConsumer(durableName)); + await jsm.consumers.delete(STREAM_OPTION_PRINTS, durableName); + logger.warn("reset jetstream consumer", { durable: durableName }); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (!message.includes("not found")) { + logger.warn("failed to reset jetstream consumer", { durable: durableName, error: message }); + } + } + } else { + try { + const info = await jsm.consumers.info(STREAM_OPTION_PRINTS, durableName); + if (info?.config?.deliver_policy && info.config.deliver_policy !== env.COMPUTE_DELIVER_POLICY) { + logger.warn("resetting consumer due to deliver policy change", { + durable: durableName, + current: info.config.deliver_policy, + desired: env.COMPUTE_DELIVER_POLICY + }); + await jsm.consumers.delete(STREAM_OPTION_PRINTS, durableName); + } + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (!message.includes("not found")) { + logger.warn("failed to inspect jetstream consumer", { durable: durableName, error: message }); + } + } + } + + const subscription = await (async () => { + const opts = buildDurableConsumer(durableName); + applyDeliverPolicy(opts, env.COMPUTE_DELIVER_POLICY); + try { + return await subscribeJson(js, SUBJECT_OPTION_PRINTS, opts); } catch (error) { const message = error instanceof Error ? error.message : String(error); const shouldReset = @@ -234,7 +302,9 @@ const run = async () => { } } - return await subscribeJson(js, SUBJECT_OPTION_PRINTS, buildDurableConsumer(durableName)); + const resetOpts = buildDurableConsumer(durableName); + applyDeliverPolicy(resetOpts, env.COMPUTE_DELIVER_POLICY); + return await subscribeJson(js, SUBJECT_OPTION_PRINTS, resetOpts); } })();