From a45d5c85f6350e99c8c5388ecba2d85525509eac Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Mon, 27 Apr 2026 14:37:52 -0400 Subject: [PATCH] Add bounded live retention for UI and API caches - Introduce configurable hot-window and pinned evidence retention - Keep live evidence available after eviction with fetch-on-miss hydration - Add tests and docs for the new retention settings --- .env.example | 13 + AGENT_INSTRUCTIONS(1).md | 428 --------------------- README.md | 11 + apps/web/app/terminal.tsx | 648 +++++++++++++++++++++++++++----- services/api/src/index.ts | 5 + services/api/src/live.ts | 123 ++++-- services/api/tests/live.test.ts | 78 +++- 7 files changed, 769 insertions(+), 537 deletions(-) delete mode 100644 AGENT_INSTRUCTIONS(1).md diff --git a/.env.example b/.env.example index 0c7385a..3b24669 100644 --- a/.env.example +++ b/.env.example @@ -57,6 +57,9 @@ COMPUTE_DELIVER_POLICY=new COMPUTE_CONSUMER_RESET=false NBBO_MAX_AGE_MS=1000 NEXT_PUBLIC_NBBO_MAX_AGE_MS=1000 +NEXT_PUBLIC_LIVE_HOT_WINDOW=2000 +NEXT_PUBLIC_PINNED_EVIDENCE_TTL_MS=1200000 +NEXT_PUBLIC_PINNED_EVIDENCE_MAX_ITEMS=4000 ROLLING_WINDOW_SIZE=50 ROLLING_TTL_SEC=86400 CLASSIFIER_SWEEP_MIN_PREMIUM=40000 @@ -81,3 +84,13 @@ REPLAY_END_TS=0 REPLAY_SPEED=1 REPLAY_BATCH_SIZE=200 REPLAY_LOG_EVERY=1000 + +# API live retention (generic channels) +LIVE_LIMIT_OPTIONS=10000 +LIVE_LIMIT_NBBO=10000 +LIVE_LIMIT_EQUITIES=10000 +LIVE_LIMIT_EQUITY_JOINS=10000 +LIVE_LIMIT_FLOW=10000 +LIVE_LIMIT_CLASSIFIER_HITS=10000 +LIVE_LIMIT_ALERTS=10000 +LIVE_LIMIT_INFERRED_DARK=10000 diff --git a/AGENT_INSTRUCTIONS(1).md b/AGENT_INSTRUCTIONS(1).md deleted file mode 100644 index ff46d22..0000000 --- a/AGENT_INSTRUCTIONS(1).md +++ /dev/null @@ -1,428 +0,0 @@ -# Detailed Agent Instructions for Beads Development - -**For project overview and quick start, see [AGENTS.md](AGENTS.md)** - -This document contains detailed operational instructions for AI agents working on beads development, testing, and releases. - -## Development Guidelines - -### Code Standards - -- **Go version**: 1.24+ -- **Linting**: `golangci-lint run ./...` (baseline warnings documented in [docs/LINTING.md](docs/LINTING.md)) -- **Testing**: All new features need tests (`make test` for local baseline, `make test-full-cgo` when validating full CGO paths) -- **Documentation**: Update relevant .md files - -### File Organization - -``` -beads/ -├── cmd/bd/ # CLI commands -├── internal/ -│ ├── types/ # Core data types -│ └── storage/ # Storage layer -│ └── dolt/ # Dolt implementation -├── examples/ # Integration examples -└── *.md # Documentation -``` - -### Testing Workflow - -**IMPORTANT:** Never pollute the production database with test issues! - -**For manual testing**, use the `BEADS_DB` environment variable to point to a temporary database: - -```bash -# Create test issues in isolated database -BEADS_DB=/tmp/test.db bd init --quiet --prefix test -BEADS_DB=/tmp/test.db bd create "Test issue" -p 1 - -# Or for quick testing -BEADS_DB=/tmp/test.db bd create "Test feature" -p 1 -``` - -**For automated tests**, use `t.TempDir()` in Go tests: - -```go -func TestMyFeature(t *testing.T) { - tmpDir := t.TempDir() - testDB := filepath.Join(tmpDir, ".beads", "beads.db") - s := newTestStore(t, testDB) - // ... test code -} -``` - -**Git test isolation:** For tests that create temporary git repos, force repo-local hooks: - -```bash -git config core.hooksPath .git/hooks -``` - -Do not rely on the developer's global git config. Global `core.hooksPath` can leak -into temp repos and produce flaky test behavior. - -**Warning:** bd will warn you when creating issues with "Test" prefix in the production database. Always use `BEADS_DB` for manual testing. - -### Before Committing - -1. **Run tests**: `make test` (or `./scripts/test.sh`) - - For full CGO validation: `make test-full-cgo` -2. **Run linter**: `golangci-lint run ./...` (ignore baseline warnings) -3. **Update docs**: If you changed behavior, update README.md or other docs -4. **Commit**: With git hooks installed (`bd hooks install`), Dolt changes are auto-committed - -### Commit Message Convention - -When committing work for an issue, include the issue ID in parentheses at the end: - -```bash -git commit -m "Fix auth validation bug (bd-abc)" -git commit -m "Add retry logic for database locks (bd-xyz)" -``` - -This enables `bd doctor` to detect **orphaned issues** - work that was committed but the issue wasn't closed. The doctor check cross-references open issues against git history to find these orphans. - -### Git Workflow - -bd uses **Dolt** as its primary database. Changes are committed to Dolt history automatically (one Dolt commit per write command). - -**Install git hooks** for automatic sync: -```bash -bd hooks install -``` - -### Git Integration - -**Dolt sync**: Dolt handles sync natively via `bd dolt push` / `bd dolt pull`. No JSONL export/import needed. - -**Protected branches**: Dolt stores data under `refs/dolt/data`, separate from standard Git refs. See [docs/PROTECTED_BRANCHES.md](docs/PROTECTED_BRANCHES.md). - -**Git worktrees**: Work directly with Dolt — no special flags needed. See [docs/ADVANCED.md](docs/ADVANCED.md). - -**Merge conflicts**: Rare with hash IDs. Dolt uses cell-level 3-way merge for conflict resolution. - -## Git Workflow: Push to Main, Never PR - -Crew workers push directly to main. **Never create pull requests.** - -- `git push` to main is the only way to land work -- `gh pr create` is forbidden — PRs are for external contributors, not crew -- Do not create feature branches for your own work — commit and push to main -- When handling external PRs, use fix-merge: checkout the PR branch locally, - fix/rebase onto main, merge locally, `git push`, then close the PR - -This is enforced by pre-use hooks. If you try `gh pr create`, it will be blocked. - -## Landing the Plane - -**When the user says "let's land the plane"**, you MUST complete ALL steps below. The plane is NOT landed until `git push` succeeds. NEVER stop before pushing. NEVER say "ready to push when you are!" - that is a FAILURE. - -**MANDATORY WORKFLOW - COMPLETE ALL STEPS:** - -1. **File beads issues for any remaining work** that needs follow-up -2. **Ensure all quality gates pass** (only if code changes were made): - - Run `make lint` or `golangci-lint run ./...` (if pre-commit installed: `pre-commit run --all-files`) - - Run `make test` (and `make test-full-cgo` when CGO-relevant code changed) - - File P0 issues if quality gates are broken -3. **Update beads issues** - close finished work, update status -4. **PUSH TO REMOTE - NON-NEGOTIABLE** - This step is MANDATORY. Execute ALL commands below: - ```bash - # Pull first to catch any remote changes - git pull --rebase - - # MANDATORY: Push everything to remote - # DO NOT STOP BEFORE THIS COMMAND COMPLETES - git push - - # MANDATORY: Verify push succeeded - git status # MUST show "up to date with origin/main" - ``` - - **CRITICAL RULES:** - - The plane has NOT landed until `git push` completes successfully - - NEVER stop before `git push` - that leaves work stranded locally - - NEVER say "ready to push when you are!" - YOU must push, not the user - - If `git push` fails, resolve the issue and retry until it succeeds - - The user is managing multiple agents - unpushed work breaks their coordination workflow - -5. **Clean up git state** - Clear old stashes and prune dead remote branches: - ```bash - git stash clear # Remove old stashes - git remote prune origin # Clean up deleted remote branches - ``` -6. **Verify clean state** - Ensure all changes are committed AND PUSHED, no untracked files remain -7. **Choose a follow-up issue for next session** - - Provide a prompt for the user to give to you in the next session - - Format: "Continue work on bd-X: [issue title]. [Brief context about what's been done and what's next]" - -**REMEMBER: Landing the plane means EVERYTHING is pushed to remote. No exceptions. No "ready when you are". PUSH IT.** - -**Example "land the plane" session:** - -```bash -# 1. File remaining work -bd create "Add integration tests for sync" -t task -p 2 --json - -# 2. Run quality gates (only if code changes were made) -go test -short ./... -golangci-lint run ./... - -# 3. Close finished issues -bd close bd-42 bd-43 --reason "Completed" --json - -# 4. PUSH TO REMOTE - MANDATORY, NO STOPPING BEFORE THIS IS DONE -git pull --rebase -git push # MANDATORY - THE PLANE IS STILL IN THE AIR UNTIL THIS SUCCEEDS -git status # MUST verify "up to date with origin/main" - -# 5. Clean up git state -git stash clear -git remote prune origin - -# 6. Verify everything is clean and pushed -git status - -# 7. Choose next work -bd ready --json -bd show bd-44 --json -``` - -**Then provide the user with:** - -- Summary of what was completed this session -- What issues were filed for follow-up -- Status of quality gates (all passing / issues filed) -- Confirmation that ALL changes have been pushed to remote -- Recommended prompt for next session - -**CRITICAL: Never end a "land the plane" session without successfully pushing. The user is coordinating multiple agents and unpushed work causes severe rebase conflicts.** - -## Agent Session Workflow - -**WARNING: DO NOT use `bd edit`** - it opens an interactive editor ($EDITOR) which AI agents cannot use. Use `bd update` with flags instead: -```bash -bd update --description "new description" -bd update --title "new title" -bd update --design "design notes" -bd update --notes "additional notes" -bd update --acceptance "acceptance criteria" -``` - -**Use stdin for descriptions with special characters** (backticks, `!`, nested quotes): -```bash -# Pipe via stdin to avoid shell escaping issues -echo 'Description with `backticks` and "quotes"' | bd create "Title" --stdin -echo 'Updated description with $variables' | bd update --description=- - -# Or use --body-file for longer content -bd create "Title" --body-file=description.md -``` - -**Example agent session:** - -```bash -# Make changes (each write auto-commits to Dolt) -bd create "Fix bug" -p 1 -bd create "Add tests" -p 1 -bd update bd-42 --claim -bd close bd-40 --reason "Completed" - -# Push Dolt data to remote if configured -bd dolt push - -# Now safe to end session -``` - -This installs: - -- **pre-commit** — Commits pending Dolt changes -- **post-merge** — Pulls remote Dolt changes after git merge - -**Note:** Hooks are embedded in the bd binary and work for all bd users (not just source repo users). - -## Common Development Tasks - -### CLI Design Principles - -**Minimize cognitive overload.** Every new command, flag, or option adds cognitive burden for users. Before adding anything: - -1. **Recovery/fix operations → `bd doctor --fix`**: Don't create separate commands like `bd recover` or `bd repair`. Doctor already detects problems - let `--fix` handle remediation. This keeps all health-related operations in one discoverable place. - For git hook marker migration specifically: use `bd migrate hooks --dry-run` to preview operations, and `bd doctor --fix` for the standard apply path. - -2. **Prefer flags on existing commands**: Before creating a new command, ask: "Can this be a flag on an existing command?" Example: `bd list --stale` instead of `bd stale`. - -3. **Consolidate related operations**: Related operations should live together. Version control uses `bd vc {log,diff,commit}`, not separate top-level commands. - -4. **Count the commands**: Run `bd --help` and count. If we're approaching 30+ commands, we have a discoverability problem. Consider subcommand grouping. - -5. **New commands need strong justification**: A new command should represent a fundamentally different operation, not just a convenience wrapper. - -### Adding a New Command - -1. Create file in `cmd/bd/` -2. Add to root command in `cmd/bd/main.go` -3. Implement with Cobra framework -4. Add `--json` flag for agent use -5. Add tests in `cmd/bd/*_test.go` -6. Document in README.md - -### Adding Storage Features - -1. Add Dolt SQL schema changes in `internal/storage/dolt/` -2. Add migration if needed -3. Update `internal/types/types.go` if new types -4. Implement in `internal/storage/dolt/` (queries, issues, etc.) -5. Add tests -6. Update export/import in `cmd/bd/export.go` and `cmd/bd/import.go` - -### Adding Examples - -1. Create directory in `examples/` -2. Add README.md explaining the example -3. Include working code -4. Link from `examples/README.md` -5. Mention in main README.md - -## Building and Testing - -```bash -# Build and install bd to ~/.local/bin (the canonical location) -make install - -# Test (local baseline) -make test - -# Test with full CGO-enabled suite (local/CI parity) -make test-full-cgo - -# Coverage run -go test -coverprofile=coverage.out ./... -go tool cover -html=coverage.out - -# Verify installed binary -bd init --prefix test -bd create "Test issue" -p 1 -bd ready -``` - -> **WARNING**: Do NOT use `go build -o bd ./cmd/bd` or `go install ./cmd/bd`. -> These create stale binaries in the working directory or `~/go/bin/` that -> shadow the canonical install at `~/.local/bin/bd`. Always use `make install`. - -## Version Management - -**IMPORTANT**: When the user asks to "bump the version" or mentions a new version number (e.g., "bump to 0.9.3"), use the version bump script: - -```bash -# Preview changes (shows diff, doesn't commit) -./scripts/bump-version.sh 0.9.3 - -# Auto-commit the version bump -./scripts/bump-version.sh 0.9.3 --commit -git push origin main -``` - -**What it does:** - -- Updates ALL version files (CLI, plugin, MCP server, docs) in one command -- Validates semantic versioning format -- Shows diff preview -- Verifies all versions match after update -- Creates standardized commit message - -**User will typically say:** - -- "Bump to 0.9.3" -- "Update version to 1.0.0" -- "Rev the project to 0.9.4" -- "Increment the version" - -**You should:** - -1. Run `./scripts/bump-version.sh --commit` -2. Push to GitHub -3. Confirm all versions updated correctly - -**Files updated automatically:** - -- `cmd/bd/version.go` - CLI version -- `claude-plugin/.claude-plugin/plugin.json` - Plugin version -- `.claude-plugin/marketplace.json` - Marketplace version -- `integrations/beads-mcp/pyproject.toml` - MCP server version -- `README.md` - Documentation version -- `PLUGIN.md` - Version requirements - -**Why this matters:** We had version mismatches (bd-66) when only `version.go` was updated. This script prevents that by updating all components atomically. - -See `scripts/README.md` for more details. - -## Release Process (Maintainers) - -**Automated (Recommended):** - -```bash -# One command to do everything (version bump, tests, tag, Homebrew update, local install) -./scripts/release.sh 0.9.3 -``` - -This handles the entire release workflow automatically, including waiting ~5 minutes for GitHub Actions to build release artifacts. See [scripts/README.md](scripts/README.md) for details. - -**Manual (Step-by-Step):** - -1. Bump version: `./scripts/bump-version.sh --commit` -2. Update CHANGELOG.md with release notes -3. Run tests: `make test` (and `make test-full-cgo` for CGO-related changes) -4. Push version bump: `git push origin main` -5. Tag release: `git tag v && git push origin v` -6. Update Homebrew: `./scripts/update-homebrew.sh ` (waits for GitHub Actions) -7. Verify: `brew update && brew upgrade beads && bd version` - -See [docs/RELEASING.md](docs/RELEASING.md) for complete manual instructions. - -## Checking GitHub Issues and PRs - -**IMPORTANT**: When asked to check GitHub issues or PRs, use command-line tools like `gh` instead of browser/playwright tools. - -**Preferred approach:** - -```bash -# List open issues with details -gh issue list --limit 30 - -# List open PRs -gh pr list --limit 30 - -# View specific issue -gh issue view 201 -``` - -**Then provide an in-conversation summary** highlighting: - -- Urgent/critical issues (regressions, bugs, broken builds) -- Common themes or patterns -- Feature requests with high engagement -- Items that need immediate attention - -**Why this matters:** - -- Browser tools consume more tokens and are slower -- CLI summaries are easier to scan and discuss -- Keeps the conversation focused and efficient -- Better for quick triage and prioritization - -**Do NOT use:** `browser_navigate`, `browser_snapshot`, or other playwright tools for GitHub PR/issue reviews unless specifically requested by the user. - -## Questions? - -- Check existing issues: `bd list` -- Look at recent commits: `git log --oneline -20` -- Read the docs: README.md, ADVANCED.md, EXTENDING.md -- Create an issue if unsure: `bd create "Question: ..." -t task -p 2` - -## Important Files - -- **README.md** - Main documentation (keep this updated!) -- **EXTENDING.md** - Database extension guide -- **ADVANCED.md** - Advanced features (rename, merge, compaction) -- **CONTRIBUTING.md** - Contribution guidelines -- **SECURITY.md** - Security policy diff --git a/README.md b/README.md index 4551594..314b051 100644 --- a/README.md +++ b/README.md @@ -149,6 +149,13 @@ All runtime configuration comes from `.env`. ### API - `API_PORT`, `REST_DEFAULT_LIMIT` +- `LIVE_LIMIT_OPTIONS`, `LIVE_LIMIT_NBBO`, `LIVE_LIMIT_EQUITIES`, `LIVE_LIMIT_EQUITY_JOINS`, `LIVE_LIMIT_FLOW`, `LIVE_LIMIT_CLASSIFIER_HITS`, `LIVE_LIMIT_ALERTS`, `LIVE_LIMIT_INFERRED_DARK` (bounded live generic cache depths; defaults `10000`, max `100000`) + +### Web live retention + +- `NEXT_PUBLIC_LIVE_HOT_WINDOW` (frontend hot live window cap; default `2000`) +- `NEXT_PUBLIC_PINNED_EVIDENCE_TTL_MS` (pinned evidence TTL; default `1200000`) +- `NEXT_PUBLIC_PINNED_EVIDENCE_MAX_ITEMS` (pinned evidence cache guardrail; default `4000`) ### Replay service @@ -163,4 +170,8 @@ All runtime configuration comes from `.env`. - Python dependencies are required only for IBKR/Databento sidecars (`services/ingest-options/py/requirements.txt`). - Candle construction is server-side; the client consumes prebuilt OHLC events. +- Live retention uses a two-tier model: + - API/Redis maintain a bounded hot cache per live generic channel. + - UI keeps a bounded hot window for rendering performance. + - Alert/drawer evidence is pinned and hydrated by id/trace so details remain inspectable after hot-window eviction. - This repository is for personal, non-redistributed usage. diff --git a/apps/web/app/terminal.tsx b/apps/web/app/terminal.tsx index 7f3b242..24d951b 100644 --- a/apps/web/app/terminal.tsx +++ b/apps/web/app/terminal.tsx @@ -29,7 +29,35 @@ import type { } from "@islandflow/types"; import { createChart, type IChartApi, type SeriesMarker, type UTCTimestamp } from "lightweight-charts"; -const MAX_ITEMS = 500; +const parseBoundedInt = ( + value: string | undefined, + fallback: number, + min: number, + max: number +): number => { + if (!value || value.trim().length === 0) { + return fallback; + } + const parsed = Number(value); + if (!Number.isFinite(parsed)) { + return fallback; + } + return Math.max(min, Math.min(max, Math.floor(parsed))); +}; + +const LIVE_HOT_WINDOW = parseBoundedInt(process.env.NEXT_PUBLIC_LIVE_HOT_WINDOW, 2000, 100, 100000); +const PINNED_EVIDENCE_TTL_MS = parseBoundedInt( + process.env.NEXT_PUBLIC_PINNED_EVIDENCE_TTL_MS, + 20 * 60 * 1000, + 60 * 1000, + 2 * 60 * 60 * 1000 +); +const PINNED_EVIDENCE_MAX_ITEMS = parseBoundedInt( + process.env.NEXT_PUBLIC_PINNED_EVIDENCE_MAX_ITEMS, + 4000, + 100, + 50000 +); const NBBO_MAX_AGE_MS = Number(process.env.NEXT_PUBLIC_NBBO_MAX_AGE_MS); const NBBO_MAX_AGE_MS_SAFE = Number.isFinite(NBBO_MAX_AGE_MS) && NBBO_MAX_AGE_MS > 0 ? NBBO_MAX_AGE_MS : 1000; @@ -229,6 +257,32 @@ type SortableItem = { id?: string; }; +type PinnedEntry = { + value: T; + updatedAt: number; +}; + +type RetentionMetricKey = + | "hotWindowEvictions" + | "pinnedFetchMisses" + | "pinnedFetchFailures" + | "pinnedStoreSize"; + +const frontendRetentionMetrics: Record = { + hotWindowEvictions: 0, + pinnedFetchMisses: 0, + pinnedFetchFailures: 0, + pinnedStoreSize: 0 +}; + +const incrementRetentionMetric = (key: RetentionMetricKey, count = 1): void => { + frontendRetentionMetrics[key] += count; +}; + +const setRetentionMetric = (key: RetentionMetricKey, value: number): void => { + frontendRetentionMetrics[key] = value; +}; + const extractSortTs = (item: SortableItem): number => item.ts ?? item.source_ts ?? item.ingest_ts ?? 0; @@ -246,7 +300,12 @@ const buildItemKey = (item: SortableItem): string | null => { return null; }; -const mergeNewest = (incoming: T[], existing: T[]): T[] => { +const mergeNewest = ( + incoming: T[], + existing: T[], + limit = LIVE_HOT_WINDOW, + onTrim?: (evicted: number) => void +): T[] => { const combined = [...incoming, ...existing]; if (combined.length === 0) { return combined; @@ -274,7 +333,13 @@ const mergeNewest = (incoming: T[], existing: T[]): T[] return extractSortSeq(b) - extractSortSeq(a); }); - return deduped.slice(0, MAX_ITEMS); + const safeLimit = Math.max(1, Math.floor(limit)); + const evicted = Math.max(0, deduped.length - safeLimit); + if (evicted > 0) { + onTrim?.(evicted); + } + + return deduped.slice(0, safeLimit); }; type TapeState = { @@ -670,6 +735,117 @@ const useScrollAnchor = ( return { capture, apply }; }; +type VirtualListResult = { + visibleItems: T[]; + topSpacerHeight: number; + bottomSpacerHeight: number; +}; + +const useVirtualList = ( + items: T[], + listRef: React.RefObject, + enabled: boolean, + rowHeight: number, + overscan = 8 +): VirtualListResult => { + const [range, setRange] = useState<{ start: number; end: number }>({ + start: 0, + end: items.length + }); + + const recompute = useCallback(() => { + if (!enabled) { + setRange({ start: 0, end: items.length }); + return; + } + + const element = listRef.current; + if (!element) { + setRange({ start: 0, end: Math.min(items.length, 80) }); + return; + } + + const viewportHeight = Math.max(rowHeight, element.clientHeight); + const visibleCount = Math.ceil(viewportHeight / rowHeight); + const start = Math.max(0, Math.floor(element.scrollTop / rowHeight) - overscan); + const end = Math.min(items.length, start + visibleCount + overscan * 2); + setRange({ start, end }); + }, [enabled, items.length, listRef, overscan, rowHeight]); + + useEffect(() => { + recompute(); + }, [items.length, recompute]); + + useEffect(() => { + if (!enabled) { + return; + } + + const element = listRef.current; + if (!element) { + return; + } + + const onScroll = () => recompute(); + const onResize = () => recompute(); + + element.addEventListener("scroll", onScroll); + window.addEventListener("resize", onResize); + + return () => { + element.removeEventListener("scroll", onScroll); + window.removeEventListener("resize", onResize); + }; + }, [enabled, listRef, recompute]); + + if (!enabled) { + return { + visibleItems: items, + topSpacerHeight: 0, + bottomSpacerHeight: 0 + }; + } + + const start = Math.min(range.start, items.length); + const end = Math.min(Math.max(range.end, start), items.length); + + return { + visibleItems: items.slice(start, end), + topSpacerHeight: start * rowHeight, + bottomSpacerHeight: Math.max(0, (items.length - end) * rowHeight) + }; +}; + +const upsertPinnedEntries = ( + current: Map>, + incoming: Map, + now: number +): Map> => { + const next = new Map(current); + for (const [key, value] of incoming) { + next.set(key, { value, updatedAt: now }); + } + return next; +}; + +const prunePinnedEntries = ( + current: Map>, + activeKeys: Set, + now: number +): Map> => { + const surviving: Array<[string, PinnedEntry]> = []; + + for (const [key, entry] of current) { + if (activeKeys.has(key) || now - entry.updatedAt <= PINNED_EVIDENCE_TTL_MS) { + surviving.push([key, entry]); + } + } + + surviving.sort((a, b) => b[1].updatedAt - a[1].updatedAt); + const trimmed = surviving.slice(0, PINNED_EVIDENCE_MAX_ITEMS); + return new Map(trimmed); +}; + const statusLabel = (status: WsStatus, paused: boolean, mode: TapeMode): string => { if (paused) { return "Paused"; @@ -772,7 +948,11 @@ const useTape = ( captureScroll(); } - setItems((prev) => mergeNewest(buffered, prev)); + setItems((prev) => + mergeNewest(buffered, prev, LIVE_HOT_WINDOW, (evicted) => + incrementRetentionMetric("hotWindowEvictions", evicted) + ) + ); setLastUpdate(Date.now()); }); }, [captureScroll, onNewItems]); @@ -1170,7 +1350,9 @@ const useLiveStream = ( } if (shouldHold) { - holdRef.current = mergeNewest(buffered, holdRef.current); + holdRef.current = mergeNewest(buffered, holdRef.current, LIVE_HOT_WINDOW, (evicted) => + incrementRetentionMetric("hotWindowEvictions", evicted) + ); setLastUpdate(Date.now()); return; } @@ -1179,7 +1361,11 @@ const useLiveStream = ( holdRef.current.length > 0 ? [...holdRef.current, ...buffered] : buffered; holdRef.current = []; - setItems((prev) => mergeNewest(nextBatch, prev)); + setItems((prev) => + mergeNewest(nextBatch, prev, LIVE_HOT_WINDOW, (evicted) => + incrementRetentionMetric("hotWindowEvictions", evicted) + ) + ); setLastUpdate(Date.now()); }); }, [config.captureScroll, config.onNewItems, config.shouldHold]); @@ -1295,7 +1481,11 @@ const useLiveStream = ( if (holdRef.current.length === 0) { return; } - setItems((prev) => mergeNewest(holdRef.current, prev)); + setItems((prev) => + mergeNewest(holdRef.current, prev, LIVE_HOT_WINDOW, (evicted) => + incrementRetentionMetric("hotWindowEvictions", evicted) + ) + ); holdRef.current = []; setLastUpdate(Date.now()); }, [config.resumeSignal, config.shouldHold]); @@ -1491,7 +1681,11 @@ const useLiveSession = ( nextItems: T[] ) => { setter((prev) => - message.op === "snapshot" ? (nextItems as T[]) : mergeNewest(nextItems as T[], prev) + message.op === "snapshot" + ? (nextItems as T[]) + : mergeNewest(nextItems as T[], prev, LIVE_HOT_WINDOW, (evicted) => + incrementRetentionMetric("hotWindowEvictions", evicted) + ) ); }; @@ -3077,36 +3271,53 @@ const useTerminalState = () => { } return map; }, [flowFeed.items]); - const [fetchedOptionPrintMap, setFetchedOptionPrintMap] = useState>( - () => new Map() - ); - const [fetchedFlowPacketMap, setFetchedFlowPacketMap] = useState>( - () => new Map() - ); - const [fetchedEquityJoinMap, setFetchedEquityJoinMap] = useState>( - () => new Map() - ); - const mergedOptionPrintMap = useMemo(() => { - const merged = new Map(optionPrintMap); - for (const [key, value] of fetchedOptionPrintMap) { + const [pinnedOptionPrintMap, setPinnedOptionPrintMap] = useState< + Map> + >(() => new Map()); + const [pinnedFlowPacketMap, setPinnedFlowPacketMap] = useState< + Map> + >(() => new Map()); + const [pinnedEquityJoinMap, setPinnedEquityJoinMap] = useState< + Map> + >(() => new Map()); + + const resolvedOptionPrintMap = useMemo(() => { + const merged = new Map(); + for (const [key, entry] of pinnedOptionPrintMap) { + merged.set(key, entry.value); + } + for (const [key, value] of optionPrintMap) { merged.set(key, value); } return merged; - }, [optionPrintMap, fetchedOptionPrintMap]); - const mergedFlowPacketMap = useMemo(() => { - const merged = new Map(flowPacketMap); - for (const [key, value] of fetchedFlowPacketMap) { + }, [optionPrintMap, pinnedOptionPrintMap]); + const resolvedFlowPacketMap = useMemo(() => { + const merged = new Map(); + for (const [key, entry] of pinnedFlowPacketMap) { + merged.set(key, entry.value); + } + for (const [key, value] of flowPacketMap) { merged.set(key, value); } return merged; - }, [flowPacketMap, fetchedFlowPacketMap]); - const mergedEquityJoinMap = useMemo(() => { - const merged = new Map(equityJoinMap); - for (const [key, value] of fetchedEquityJoinMap) { + }, [flowPacketMap, pinnedFlowPacketMap]); + const resolvedEquityJoinMap = useMemo(() => { + const merged = new Map(); + for (const [key, entry] of pinnedEquityJoinMap) { + merged.set(key, entry.value); + } + for (const [key, value] of equityJoinMap) { merged.set(key, value); } return merged; - }, [equityJoinMap, fetchedEquityJoinMap]); + }, [equityJoinMap, pinnedEquityJoinMap]); + + useEffect(() => { + setRetentionMetric( + "pinnedStoreSize", + pinnedOptionPrintMap.size + pinnedFlowPacketMap.size + pinnedEquityJoinMap.size + ); + }, [pinnedOptionPrintMap.size, pinnedFlowPacketMap.size, pinnedEquityJoinMap.size]); useEffect(() => { if (!selectedAlert || mode !== "live") { @@ -3114,68 +3325,99 @@ const useTerminalState = () => { } const packetId = selectedAlert.evidence_refs[0]; - if (packetId && !mergedFlowPacketMap.has(packetId)) { + if (packetId && !resolvedFlowPacketMap.has(packetId)) { + incrementRetentionMetric("pinnedFetchMisses", 1); void fetch(buildApiUrl(`/flow/packets/${encodeURIComponent(packetId)}`)) - .then((response) => response.json()) + .then(async (response) => { + if (!response.ok) { + throw new Error(await readErrorDetail(response)); + } + return response.json(); + }) .then((payload: { data?: FlowPacket | null }) => { if (!payload.data) { return; } - setFetchedFlowPacketMap((prev) => new Map(prev).set(payload.data!.id, payload.data!)); + const now = Date.now(); + const next = new Map([[payload.data.id, payload.data]]); + setPinnedFlowPacketMap((prev) => upsertPinnedEntries(prev, next, now)); }) - .catch((error) => console.warn("Failed to fetch flow packet evidence", error)); + .catch((error) => { + incrementRetentionMetric("pinnedFetchFailures", 1); + console.warn("Failed to fetch flow packet evidence", error); + }); } const missingPrintIds = selectedAlert.evidence_refs.filter( - (id) => !mergedFlowPacketMap.has(id) && !mergedOptionPrintMap.has(id) + (id) => !resolvedFlowPacketMap.has(id) && !resolvedOptionPrintMap.has(id) ); if (missingPrintIds.length > 0) { + incrementRetentionMetric("pinnedFetchMisses", missingPrintIds.length); const url = new URL(buildApiUrl("/option-prints/by-trace")); for (const traceId of missingPrintIds) { url.searchParams.append("trace_id", traceId); } void fetch(url.toString()) - .then((response) => response.json()) + .then(async (response) => { + if (!response.ok) { + throw new Error(await readErrorDetail(response)); + } + return response.json(); + }) .then((payload: { data?: OptionPrint[] }) => { const next = new Map(); for (const item of payload.data ?? []) { next.set(item.trace_id, item); } if (next.size > 0) { - setFetchedOptionPrintMap((prev) => new Map([...prev, ...next])); + const now = Date.now(); + setPinnedOptionPrintMap((prev) => upsertPinnedEntries(prev, next, now)); } }) - .catch((error) => console.warn("Failed to fetch option print evidence", error)); + .catch((error) => { + incrementRetentionMetric("pinnedFetchFailures", 1); + console.warn("Failed to fetch option print evidence", error); + }); } - }, [selectedAlert, mode, mergedFlowPacketMap, mergedOptionPrintMap]); + }, [selectedAlert, mode, resolvedFlowPacketMap, resolvedOptionPrintMap]); useEffect(() => { if (!selectedDarkEvent || mode !== "live") { return; } - const missingIds = selectedDarkEvent.evidence_refs.filter((id) => !mergedEquityJoinMap.has(id)); + const missingIds = selectedDarkEvent.evidence_refs.filter((id) => !resolvedEquityJoinMap.has(id)); if (missingIds.length === 0) { return; } + incrementRetentionMetric("pinnedFetchMisses", missingIds.length); const url = new URL(buildApiUrl("/equity-joins/by-id")); for (const id of missingIds) { url.searchParams.append("id", id); } void fetch(url.toString()) - .then((response) => response.json()) + .then(async (response) => { + if (!response.ok) { + throw new Error(await readErrorDetail(response)); + } + return response.json(); + }) .then((payload: { data?: EquityPrintJoin[] }) => { const next = new Map(); for (const item of payload.data ?? []) { next.set(item.id, item); } if (next.size > 0) { - setFetchedEquityJoinMap((prev) => new Map([...prev, ...next])); + const now = Date.now(); + setPinnedEquityJoinMap((prev) => upsertPinnedEntries(prev, next, now)); } }) - .catch((error) => console.warn("Failed to fetch dark evidence joins", error)); - }, [selectedDarkEvent, mode, mergedEquityJoinMap]); + .catch((error) => { + incrementRetentionMetric("pinnedFetchFailures", 1); + console.warn("Failed to fetch dark evidence joins", error); + }); + }, [selectedDarkEvent, mode, resolvedEquityJoinMap]); const selectedEvidence = useMemo((): EvidenceItem[] => { if (!selectedAlert) { @@ -3183,25 +3425,25 @@ const useTerminalState = () => { } return selectedAlert.evidence_refs.map((id) => { - const packet = mergedFlowPacketMap.get(id); + const packet = resolvedFlowPacketMap.get(id); if (packet) { return { kind: "flow", id, packet }; } - const print = mergedOptionPrintMap.get(id); + const print = resolvedOptionPrintMap.get(id); if (print) { return { kind: "print", id, print }; } return { kind: "unknown", id }; }); - }, [selectedAlert, mergedFlowPacketMap, mergedOptionPrintMap]); + }, [selectedAlert, resolvedFlowPacketMap, resolvedOptionPrintMap]); const selectedFlowPacket = useMemo(() => { if (!selectedAlert) { return null; } const packetId = selectedAlert.evidence_refs[0]; - return packetId ? mergedFlowPacketMap.get(packetId) ?? null : null; - }, [selectedAlert, mergedFlowPacketMap]); + return packetId ? resolvedFlowPacketMap.get(packetId) ?? null : null; + }, [selectedAlert, resolvedFlowPacketMap]); const selectedDarkEvidence = useMemo((): DarkEvidenceItem[] => { if (!selectedDarkEvent) { @@ -3209,20 +3451,20 @@ const useTerminalState = () => { } return selectedDarkEvent.evidence_refs.map((id) => { - const join = mergedEquityJoinMap.get(id); + const join = resolvedEquityJoinMap.get(id); if (join) { return { kind: "join", id, join }; } return { kind: "unknown", id }; }); - }, [selectedDarkEvent, mergedEquityJoinMap]); + }, [selectedDarkEvent, resolvedEquityJoinMap]); const selectedDarkUnderlying = useMemo(() => { if (!selectedDarkEvent) { return null; } - return inferDarkUnderlying(selectedDarkEvent, equityPrintMap, mergedEquityJoinMap); - }, [selectedDarkEvent, mergedEquityJoinMap, equityPrintMap]); + return inferDarkUnderlying(selectedDarkEvent, equityPrintMap, resolvedEquityJoinMap); + }, [selectedDarkEvent, resolvedEquityJoinMap, equityPrintMap]); useEffect(() => { if (mode !== "live") { @@ -3269,25 +3511,36 @@ const useTerminalState = () => { return; } - if (!mergedFlowPacketMap.has(selectedClassifierPacketId)) { + if (!resolvedFlowPacketMap.has(selectedClassifierPacketId)) { + incrementRetentionMetric("pinnedFetchMisses", 1); void fetch(buildApiUrl(`/flow/packets/${encodeURIComponent(selectedClassifierPacketId)}`)) - .then((response) => response.json()) + .then(async (response) => { + if (!response.ok) { + throw new Error(await readErrorDetail(response)); + } + return response.json(); + }) .then((payload: { data?: FlowPacket | null }) => { if (!payload.data) { return; } - setFetchedFlowPacketMap((prev) => new Map(prev).set(payload.data!.id, payload.data!)); + const now = Date.now(); + const next = new Map([[payload.data.id, payload.data]]); + setPinnedFlowPacketMap((prev) => upsertPinnedEntries(prev, next, now)); }) - .catch((error) => console.warn("Failed to fetch classifier flow packet", error)); + .catch((error) => { + incrementRetentionMetric("pinnedFetchFailures", 1); + console.warn("Failed to fetch classifier flow packet", error); + }); } - }, [selectedClassifierPacketId, mode, mergedFlowPacketMap]); + }, [selectedClassifierPacketId, mode, resolvedFlowPacketMap]); const selectedClassifierFlowPacket = useMemo(() => { if (!selectedClassifierPacketId) { return null; } - return mergedFlowPacketMap.get(selectedClassifierPacketId) ?? null; - }, [mergedFlowPacketMap, selectedClassifierPacketId]); + return resolvedFlowPacketMap.get(selectedClassifierPacketId) ?? null; + }, [resolvedFlowPacketMap, selectedClassifierPacketId]); const selectedClassifierEvidence = useMemo((): EvidenceItem[] => { if (!selectedClassifierHit) { @@ -3298,19 +3551,19 @@ const useTerminalState = () => { return []; } - const packet = mergedFlowPacketMap.get(selectedClassifierPacketId); + const packet = resolvedFlowPacketMap.get(selectedClassifierPacketId); if (!packet) { return []; } return packet.members.map((id) => { - const print = mergedOptionPrintMap.get(id); + const print = resolvedOptionPrintMap.get(id); if (print) { return { kind: "print", id, print }; } return { kind: "unknown", id }; }); - }, [mergedFlowPacketMap, mergedOptionPrintMap, selectedClassifierHit, selectedClassifierPacketId]); + }, [resolvedFlowPacketMap, resolvedOptionPrintMap, selectedClassifierHit, selectedClassifierPacketId]); const inferAlertUnderlying = useCallback( (alert: AlertEvent): string | null => { @@ -3321,14 +3574,14 @@ const useTerminalState = () => { const packetId = alert.evidence_refs[0]; if (packetId) { - const packet = mergedFlowPacketMap.get(packetId); + const packet = resolvedFlowPacketMap.get(packetId); if (packet) { return extractUnderlying(extractPacketContract(packet)); } } for (const ref of alert.evidence_refs) { - const print = mergedOptionPrintMap.get(ref); + const print = resolvedOptionPrintMap.get(ref); if (print) { return extractUnderlying(print.option_contract_id); } @@ -3336,7 +3589,7 @@ const useTerminalState = () => { return null; }, - [extractPacketContract, extractUnderlyingFromTrace, mergedFlowPacketMap, mergedOptionPrintMap] + [extractPacketContract, extractUnderlyingFromTrace, resolvedFlowPacketMap, resolvedOptionPrintMap] ); const matchesTicker = useCallback( @@ -3373,10 +3626,10 @@ const useTerminalState = () => { return inferredDarkFeed.items; } return inferredDarkFeed.items.filter((event) => { - const underlying = inferDarkUnderlying(event, equityPrintMap, mergedEquityJoinMap); + const underlying = inferDarkUnderlying(event, equityPrintMap, resolvedEquityJoinMap); return matchesTicker(underlying); }); - }, [mergedEquityJoinMap, equityPrintMap, inferredDarkFeed.items, matchesTicker, tickerSet]); + }, [resolvedEquityJoinMap, equityPrintMap, inferredDarkFeed.items, matchesTicker, tickerSet]); const filteredFlow = useMemo(() => { if (tickerSet.size === 0) { @@ -3394,6 +3647,175 @@ const useTerminalState = () => { return alertsFeed.items.filter((alert) => matchesTicker(inferAlertUnderlying(alert))); }, [alertsFeed.items, inferAlertUnderlying, matchesTicker, tickerSet]); + const visibleAlerts = useMemo(() => filteredAlerts.slice(0, 12), [filteredAlerts]); + + const visibleAlertEvidenceRefs = useMemo(() => { + const refs = new Set(); + for (const alert of visibleAlerts) { + for (const id of alert.evidence_refs.slice(0, 8)) { + refs.add(id); + } + } + return refs; + }, [visibleAlerts]); + + useEffect(() => { + if (mode !== "live" || visibleAlerts.length === 0) { + return; + } + + const visiblePacketIds = visibleAlerts + .map((alert) => alert.evidence_refs[0] ?? null) + .filter((id): id is string => Boolean(id) && id.startsWith("flowpacket:")); + const missingPacketIds = Array.from(new Set(visiblePacketIds)).filter( + (id) => !resolvedFlowPacketMap.has(id) + ); + + if (missingPacketIds.length > 0) { + incrementRetentionMetric("pinnedFetchMisses", missingPacketIds.length); + void Promise.all( + missingPacketIds.map(async (packetId) => { + const response = await fetch(buildApiUrl(`/flow/packets/${encodeURIComponent(packetId)}`)); + if (!response.ok) { + throw new Error(await readErrorDetail(response)); + } + const payload = (await response.json()) as { data?: FlowPacket | null }; + return payload.data ?? null; + }) + ) + .then((packets) => { + const next = new Map(); + for (const packet of packets) { + if (packet) { + next.set(packet.id, packet); + } + } + if (next.size > 0) { + const now = Date.now(); + setPinnedFlowPacketMap((prev) => upsertPinnedEntries(prev, next, now)); + } + }) + .catch((error) => { + incrementRetentionMetric("pinnedFetchFailures", 1); + console.warn("Failed to prefetch visible alert packets", error); + }); + } + + const missingPrintIds = Array.from(visibleAlertEvidenceRefs).filter( + (id) => !resolvedFlowPacketMap.has(id) && !resolvedOptionPrintMap.has(id) + ); + if (missingPrintIds.length === 0) { + return; + } + + incrementRetentionMetric("pinnedFetchMisses", missingPrintIds.length); + const url = new URL(buildApiUrl("/option-prints/by-trace")); + for (const traceId of missingPrintIds) { + url.searchParams.append("trace_id", traceId); + } + void fetch(url.toString()) + .then(async (response) => { + if (!response.ok) { + throw new Error(await readErrorDetail(response)); + } + return response.json(); + }) + .then((payload: { data?: OptionPrint[] }) => { + const next = new Map(); + for (const item of payload.data ?? []) { + next.set(item.trace_id, item); + } + if (next.size > 0) { + const now = Date.now(); + setPinnedOptionPrintMap((prev) => upsertPinnedEntries(prev, next, now)); + } + }) + .catch((error) => { + incrementRetentionMetric("pinnedFetchFailures", 1); + console.warn("Failed to prefetch visible alert evidence", error); + }); + }, [ + mode, + visibleAlerts, + visibleAlertEvidenceRefs, + resolvedFlowPacketMap, + resolvedOptionPrintMap + ]); + + const activePinnedFlowKeys = useMemo(() => { + const keys = new Set(); + const selectedAlertPacketId = selectedAlert?.evidence_refs[0]; + if (selectedAlertPacketId) { + keys.add(selectedAlertPacketId); + } + if (selectedClassifierPacketId) { + keys.add(selectedClassifierPacketId); + } + for (const alert of visibleAlerts) { + const packetId = alert.evidence_refs[0]; + if (packetId) { + keys.add(packetId); + } + } + return keys; + }, [selectedAlert, selectedClassifierPacketId, visibleAlerts]); + + const activePinnedOptionKeys = useMemo(() => { + const keys = new Set(); + if (selectedAlert) { + for (const id of selectedAlert.evidence_refs) { + keys.add(id); + } + } + if (selectedClassifierFlowPacket) { + for (const id of selectedClassifierFlowPacket.members) { + keys.add(id); + } + } + for (const id of visibleAlertEvidenceRefs) { + keys.add(id); + } + return keys; + }, [selectedAlert, selectedClassifierFlowPacket, visibleAlertEvidenceRefs]); + + const activePinnedJoinKeys = useMemo(() => { + const keys = new Set(); + if (selectedDarkEvent) { + for (const id of selectedDarkEvent.evidence_refs) { + keys.add(id); + } + } + return keys; + }, [selectedDarkEvent]); + + useEffect(() => { + if (mode !== "live") { + return; + } + + const prune = () => { + const now = Date.now(); + setPinnedOptionPrintMap((prev) => prunePinnedEntries(prev, activePinnedOptionKeys, now)); + setPinnedFlowPacketMap((prev) => prunePinnedEntries(prev, activePinnedFlowKeys, now)); + setPinnedEquityJoinMap((prev) => prunePinnedEntries(prev, activePinnedJoinKeys, now)); + }; + + prune(); + const interval = window.setInterval(prune, 60000); + return () => { + window.clearInterval(interval); + }; + }, [mode, activePinnedOptionKeys, activePinnedFlowKeys, activePinnedJoinKeys]); + + useEffect(() => { + const interval = window.setInterval(() => { + console.info("frontend live retention metrics", frontendRetentionMetrics); + }, 60000); + return () => { + window.clearInterval(interval); + }; + }, []); + const filteredClassifierHits = useMemo(() => { if (tickerSet.size === 0) { return classifierHitsFeed.items; @@ -3420,7 +3842,7 @@ const useTerminalState = () => { const chartInferredDark = useMemo(() => { const desired = chartTicker.toUpperCase(); return inferredDarkFeed.items - .filter((event) => inferDarkUnderlying(event, equityPrintMap, mergedEquityJoinMap) === desired) + .filter((event) => inferDarkUnderlying(event, equityPrintMap, resolvedEquityJoinMap) === desired) .sort((a, b) => { const delta = a.source_ts - b.source_ts; if (delta !== 0) { @@ -3428,7 +3850,7 @@ const useTerminalState = () => { } return a.seq - b.seq; }); - }, [chartTicker, inferredDarkFeed.items, mergedEquityJoinMap, equityPrintMap]); + }, [chartTicker, inferredDarkFeed.items, resolvedEquityJoinMap, equityPrintMap]); const findAlertForClassifierHit = useCallback( (hit: ClassifierHitEvent): AlertEvent | null => { @@ -3531,10 +3953,10 @@ const useTerminalState = () => { tickerSet, chartTicker, nbboMap, - optionPrintMap: mergedOptionPrintMap, + optionPrintMap: resolvedOptionPrintMap, equityPrintMap, - equityJoinMap: mergedEquityJoinMap, - flowPacketMap: mergedFlowPacketMap, + equityJoinMap: resolvedEquityJoinMap, + flowPacketMap: resolvedFlowPacketMap, selectedEvidence, selectedFlowPacket, selectedDarkEvidence, @@ -3714,6 +4136,7 @@ type OptionsPaneProps = { const OptionsPane = ({ limit }: OptionsPaneProps) => { const state = useTerminal(); const items = limit ? state.filteredOptions.slice(0, limit) : state.filteredOptions; + const virtual = useVirtualList(items, state.optionsScroll.listRef, !limit, 96); return ( { : "Replay queue empty. Ensure ClickHouse has data."} ) : ( - items.map((print) => { + <> + {virtual.topSpacerHeight > 0 ? ( +
+ ) : null} + {virtual.visibleItems.map((print) => { const contractId = normalizeContractId(print.option_contract_id); const quote = state.nbboMap.get(contractId); const nbboAge = quote ? Math.abs(print.ts - quote.ts) : null; @@ -3811,7 +4238,11 @@ const OptionsPane = ({ limit }: OptionsPaneProps) => {
{formatTime(print.ts)}
); - }) + })} + {virtual.bottomSpacerHeight > 0 ? ( +
+ ) : null} + )}
@@ -3825,6 +4256,7 @@ type EquitiesPaneProps = { const EquitiesPane = ({ limit }: EquitiesPaneProps) => { const state = useTerminal(); const items = limit ? state.filteredEquities.slice(0, limit) : state.filteredEquities; + const virtual = useVirtualList(items, state.equitiesScroll.listRef, !limit, 78); return ( { : "Replay queue empty. Ensure ClickHouse has data."} ) : ( - items.map((print) => ( -
+ <> + {virtual.topSpacerHeight > 0 ? ( +
+ ) : null} + {virtual.visibleItems.map((print) => ( +
{print.underlying_id}
@@ -3876,8 +4312,12 @@ const EquitiesPane = ({ limit }: EquitiesPaneProps) => {
{formatTime(print.ts)}
-
- )) +
+ ))} + {virtual.bottomSpacerHeight > 0 ? ( +
+ ) : null} + )}
@@ -3892,6 +4332,7 @@ type FlowPaneProps = { const FlowPane = ({ limit, title = "Flow" }: FlowPaneProps) => { const state = useTerminal(); const items = limit ? state.filteredFlow.slice(0, limit) : state.filteredFlow; + const virtual = useVirtualList(items, state.flowScroll.listRef, !limit, 104); return ( { : "Replay queue empty. Ensure ClickHouse has data."}
) : ( - items.map((packet) => { + <> + {virtual.topSpacerHeight > 0 ? ( +
+ ) : null} + {virtual.visibleItems.map((packet) => { const features = packet.features ?? {}; const contract = String(features.option_contract_id ?? packet.id ?? "unknown"); const count = parseNumber(features.count, packet.members.length); @@ -4005,7 +4450,11 @@ const FlowPane = ({ limit, title = "Flow" }: FlowPaneProps) => {
); - }) + })} + {virtual.bottomSpacerHeight > 0 ? ( +
+ ) : null} + )}
@@ -4020,6 +4469,7 @@ type AlertsPaneProps = { const AlertsPane = ({ limit, withStrip = false }: AlertsPaneProps) => { const state = useTerminal(); const items = limit ? state.filteredAlerts.slice(0, limit) : state.filteredAlerts; + const virtual = useVirtualList(items, state.alertsScroll.listRef, !limit, 92); return ( { : "Replay queue empty. Ensure ClickHouse has data."} ) : ( - items.map((alert) => { + <> + {virtual.topSpacerHeight > 0 ? ( +
+ ) : null} + {virtual.visibleItems.map((alert) => { const primary = alert.hits[0]; const direction = primary ? normalizeDirection(primary.direction) : "neutral"; @@ -4090,7 +4544,11 @@ const AlertsPane = ({ limit, withStrip = false }: AlertsPaneProps) => {
{formatTime(alert.source_ts)}
); - }) + })} + {virtual.bottomSpacerHeight > 0 ? ( +
+ ) : null} + )}
@@ -4104,6 +4562,7 @@ type ClassifierPaneProps = { const ClassifierPane = ({ limit }: ClassifierPaneProps) => { const state = useTerminal(); const items = limit ? state.filteredClassifierHits.slice(0, limit) : state.filteredClassifierHits; + const virtual = useVirtualList(items, state.classifierScroll.listRef, !limit, 88); return ( { : "Replay queue empty. Ensure ClickHouse has data."}
) : ( - items.map((hit) => { + <> + {virtual.topSpacerHeight > 0 ? ( +
+ ) : null} + {virtual.visibleItems.map((hit) => { const direction = normalizeDirection(hit.direction); return ( ); - }) + })} + {virtual.bottomSpacerHeight > 0 ? ( +
+ ) : null} + )}
@@ -4173,6 +4640,7 @@ type DarkPaneProps = { const DarkPane = ({ limit }: DarkPaneProps) => { const state = useTerminal(); const items = limit ? state.filteredInferredDark.slice(0, limit) : state.filteredInferredDark; + const virtual = useVirtualList(items, state.darkScroll.listRef, !limit, 88); return ( { : "Replay queue empty. Ensure ClickHouse has data."}
) : ( - items.map((event) => { + <> + {virtual.topSpacerHeight > 0 ? ( +
+ ) : null} + {virtual.visibleItems.map((event) => { const underlying = inferDarkUnderlying(event, state.equityPrintMap, state.equityJoinMap); const evidenceCount = event.evidence_refs.length; @@ -4235,7 +4707,11 @@ const DarkPane = ({ limit }: DarkPaneProps) => {
{formatTime(event.source_ts)}
); - }) + })} + {virtual.bottomSpacerHeight > 0 ? ( +
+ ) : null} + )}
diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 3d10874..090c641 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -678,6 +678,10 @@ const run = async () => { const liveState = new LiveStateManager(clickhouse, redis); await liveState.hydrate(); + const liveStateMetricsTimer = setInterval(() => { + const snapshot = liveState.getStatsSnapshot(); + logger.info("live cache metrics", snapshot); + }, 60000); const subscribeWithReset = async ( subject: string, @@ -1475,6 +1479,7 @@ const run = async () => { state.shutdownPromise = (async () => { logger.info("service stopping", { signal }); server.stop(); + clearInterval(liveStateMetricsTimer); if (redis && redis.isOpen) { try { diff --git a/services/api/src/live.ts b/services/api/src/live.ts index 7aeebb0..d170b69 100644 --- a/services/api/src/live.ts +++ b/services/api/src/live.ts @@ -33,16 +33,19 @@ import type { RedisClientType } from "redis"; const CURSOR_HASH_KEY = "live:cursors"; -const GENERIC_LIMITS = { - options: 500, - nbbo: 500, - equities: 500, - "equity-joins": 500, - flow: 500, - "classifier-hits": 500, - alerts: 500, - "inferred-dark": 500 -} as const; +const DEFAULT_GENERIC_LIMIT = 10000; +const MAX_GENERIC_LIMIT = 100000; +const MIN_GENERIC_LIMIT = 1; +const GENERIC_LIMIT_ENV_KEYS: Record = { + options: "LIVE_LIMIT_OPTIONS", + nbbo: "LIVE_LIMIT_NBBO", + equities: "LIVE_LIMIT_EQUITIES", + "equity-joins": "LIVE_LIMIT_EQUITY_JOINS", + flow: "LIVE_LIMIT_FLOW", + "classifier-hits": "LIVE_LIMIT_CLASSIFIER_HITS", + alerts: "LIVE_LIMIT_ALERTS", + "inferred-dark": "LIVE_LIMIT_INFERRED_DARK" +}; const CHART_LIMITS = { candles: 500, @@ -58,6 +61,43 @@ type GenericFeedConfig = { fetchRecent: (clickhouse: ClickHouseClient, limit: number) => Promise; }; +export type GenericLiveLimits = Record; + +const parseGenericLimit = ( + env: NodeJS.ProcessEnv, + channel: LiveGenericChannel, + fallback: number +): number => { + const key = GENERIC_LIMIT_ENV_KEYS[channel]; + const raw = env[key]; + if (!raw || raw.trim().length === 0) { + return fallback; + } + + const parsed = Number(raw); + if (!Number.isFinite(parsed)) { + console.warn(`Invalid ${key}="${raw}", using ${fallback}`); + return fallback; + } + + const bounded = Math.max(MIN_GENERIC_LIMIT, Math.min(MAX_GENERIC_LIMIT, Math.floor(parsed))); + if (bounded !== parsed) { + console.warn(`Clamped ${key} from ${parsed} to ${bounded}`); + } + return bounded; +}; + +export const resolveGenericLiveLimits = (env: NodeJS.ProcessEnv = process.env): GenericLiveLimits => ({ + options: parseGenericLimit(env, "options", DEFAULT_GENERIC_LIMIT), + nbbo: parseGenericLimit(env, "nbbo", DEFAULT_GENERIC_LIMIT), + equities: parseGenericLimit(env, "equities", DEFAULT_GENERIC_LIMIT), + "equity-joins": parseGenericLimit(env, "equity-joins", DEFAULT_GENERIC_LIMIT), + flow: parseGenericLimit(env, "flow", DEFAULT_GENERIC_LIMIT), + "classifier-hits": parseGenericLimit(env, "classifier-hits", DEFAULT_GENERIC_LIMIT), + alerts: parseGenericLimit(env, "alerts", DEFAULT_GENERIC_LIMIT), + "inferred-dark": parseGenericLimit(env, "inferred-dark", DEFAULT_GENERIC_LIMIT) +}); + type RedisLike = Pick< RedisClientType, "isOpen" | "lRange" | "lPush" | "lTrim" | "hGet" | "hSet" @@ -75,13 +115,13 @@ const parseCursor = (value: string | null): Cursor | null => { } }; -const getGenericConfig = (): { +const getGenericConfig = (limits: GenericLiveLimits): { [K in LiveGenericChannel]: GenericFeedConfig; } => ({ options: { redisKey: "live:options", cursorField: "options", - limit: GENERIC_LIMITS.options, + limit: limits.options, parse: (value) => OptionPrintSchema.parse(value), cursor: (item) => ({ ts: item.ts, seq: item.seq }), fetchRecent: fetchRecentOptionPrints @@ -89,7 +129,7 @@ const getGenericConfig = (): { nbbo: { redisKey: "live:nbbo", cursorField: "nbbo", - limit: GENERIC_LIMITS.nbbo, + limit: limits.nbbo, parse: (value) => OptionNBBOSchema.parse(value), cursor: (item) => ({ ts: item.ts, seq: item.seq }), fetchRecent: fetchRecentOptionNBBO @@ -97,7 +137,7 @@ const getGenericConfig = (): { equities: { redisKey: "live:equities", cursorField: "equities", - limit: GENERIC_LIMITS.equities, + limit: limits.equities, parse: (value) => EquityPrintSchema.parse(value), cursor: (item) => ({ ts: item.ts, seq: item.seq }), fetchRecent: fetchRecentEquityPrints @@ -105,7 +145,7 @@ const getGenericConfig = (): { "equity-joins": { redisKey: "live:equity-joins", cursorField: "equity-joins", - limit: GENERIC_LIMITS["equity-joins"], + limit: limits["equity-joins"], parse: (value) => EquityPrintJoinSchema.parse(value), cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), fetchRecent: fetchRecentEquityPrintJoins @@ -113,7 +153,7 @@ const getGenericConfig = (): { flow: { redisKey: "live:flow", cursorField: "flow", - limit: GENERIC_LIMITS.flow, + limit: limits.flow, parse: (value) => FlowPacketSchema.parse(value), cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), fetchRecent: fetchRecentFlowPackets @@ -121,7 +161,7 @@ const getGenericConfig = (): { "classifier-hits": { redisKey: "live:classifier-hits", cursorField: "classifier-hits", - limit: GENERIC_LIMITS["classifier-hits"], + limit: limits["classifier-hits"], parse: (value) => ClassifierHitEventSchema.parse(value), cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), fetchRecent: fetchRecentClassifierHits @@ -129,7 +169,7 @@ const getGenericConfig = (): { alerts: { redisKey: "live:alerts", cursorField: "alerts", - limit: GENERIC_LIMITS.alerts, + limit: limits.alerts, parse: (value) => AlertEventSchema.parse(value), cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), fetchRecent: fetchRecentAlerts @@ -137,7 +177,7 @@ const getGenericConfig = (): { "inferred-dark": { redisKey: "live:inferred-dark", cursorField: "inferred-dark", - limit: GENERIC_LIMITS["inferred-dark"], + limit: limits["inferred-dark"], parse: (value) => InferredDarkEventSchema.parse(value), cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), fetchRecent: fetchRecentInferredDark @@ -171,18 +211,43 @@ const overlayRedisKey = (underlyingId: string): string => `live:equity-overlay:$ const overlayCursorField = (underlyingId: string): string => `equities:${underlyingId}`; export class LiveStateManager { - private readonly generic = getGenericConfig(); + private readonly generic: { + [K in LiveGenericChannel]: GenericFeedConfig; + }; private readonly genericItems = new Map(); private readonly genericCursors = new Map(); private readonly candleItems = new Map(); private readonly candleCursors = new Map(); private readonly overlayItems = new Map(); private readonly overlayCursors = new Map(); + private readonly stats = { + genericHydrateFromRedis: 0, + genericHydrateFromClickHouse: 0, + trimOperations: 0, + cacheDepthByKey: new Map() + }; constructor( private readonly clickhouse: ClickHouseClient, - private readonly redis: RedisLike | null - ) {} + private readonly redis: RedisLike | null, + limits: GenericLiveLimits = resolveGenericLiveLimits() + ) { + this.generic = getGenericConfig(limits); + } + + getStatsSnapshot(): { + genericHydrateFromRedis: number; + genericHydrateFromClickHouse: number; + trimOperations: number; + cacheDepthByKey: Record; + } { + return { + genericHydrateFromRedis: this.stats.genericHydrateFromRedis, + genericHydrateFromClickHouse: this.stats.genericHydrateFromClickHouse, + trimOperations: this.stats.trimOperations, + cacheDepthByKey: Object.fromEntries(this.stats.cacheDepthByKey) + }; + } async hydrate(): Promise { const channels = Object.keys(this.generic) as LiveGenericChannel[]; @@ -196,12 +261,16 @@ export class LiveStateManager { const cached = parseJsonList(payloads, config.parse); if (cached.length > 0) { this.genericItems.set(channel, cached); + this.stats.genericHydrateFromRedis += 1; + this.stats.cacheDepthByKey.set(config.redisKey, cached.length); this.genericCursors.set(config.cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, config.cursorField))); return; } } const fresh = await config.fetchRecent(this.clickhouse, config.limit); + this.stats.genericHydrateFromClickHouse += 1; + this.stats.cacheDepthByKey.set(config.redisKey, fresh.length); this.genericItems.set(channel, fresh); const watermark = fresh[0] ? config.cursor(fresh[0]) : null; this.genericCursors.set(config.cursorField, watermark); @@ -262,6 +331,7 @@ export class LiveStateManager { .sort((a, b) => (b.ts - a.ts) || (b.seq - a.seq)) .slice(0, CHART_LIMITS.candles); this.candleItems.set(key, next); + this.stats.cacheDepthByKey.set(key, next.length); const cursor = { ts: candle.ts, seq: candle.seq }; this.candleCursors.set(cursorField, cursor); await this.persistList(key, cursorField, next, CHART_LIMITS.candles, cursor); @@ -276,6 +346,7 @@ export class LiveStateManager { .sort((a, b) => (b.ts - a.ts) || (b.seq - a.seq)) .slice(0, CHART_LIMITS.overlay); this.overlayItems.set(key, next); + this.stats.cacheDepthByKey.set(key, next.length); const cursor = { ts: print.ts, seq: print.seq }; this.overlayCursors.set(cursorField, cursor); await this.persistList(key, cursorField, next, CHART_LIMITS.overlay, cursor); @@ -293,6 +364,7 @@ export class LiveStateManager { }) .slice(0, config.limit); this.genericItems.set(channel, next); + this.stats.cacheDepthByKey.set(config.redisKey, next.length); const cursor = config.cursor(parsed); this.genericCursors.set(config.cursorField, cursor); await this.persistList(config.redisKey, config.cursorField, next, config.limit, cursor); @@ -309,6 +381,7 @@ export class LiveStateManager { const cached = parseJsonList(payloads, (value) => EquityCandleSchema.parse(value)); if (cached.length > 0) { this.candleItems.set(key, cached); + this.stats.cacheDepthByKey.set(key, cached.length); this.candleCursors.set(cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, cursorField))); return; } @@ -316,6 +389,7 @@ export class LiveStateManager { const fresh = await fetchRecentEquityCandles(this.clickhouse, underlyingId, intervalMs, CHART_LIMITS.candles); this.candleItems.set(key, fresh); + this.stats.cacheDepthByKey.set(key, fresh.length); const watermark = fresh[0] ? { ts: fresh[0].ts, seq: fresh[0].seq } : null; this.candleCursors.set(cursorField, watermark); await this.persistList(key, cursorField, fresh, CHART_LIMITS.candles, watermark); @@ -329,6 +403,7 @@ export class LiveStateManager { const cached = parseJsonList(payloads, (value) => EquityPrintSchema.parse(value)); if (cached.length > 0) { this.overlayItems.set(key, cached); + this.stats.cacheDepthByKey.set(key, cached.length); this.overlayCursors.set(cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, cursorField))); return; } @@ -338,6 +413,7 @@ export class LiveStateManager { (item) => item.underlying_id === underlyingId ); this.overlayItems.set(key, fresh); + this.stats.cacheDepthByKey.set(key, fresh.length); const watermark = fresh[0] ? { ts: fresh[0].ts, seq: fresh[0].seq } : null; this.overlayCursors.set(cursorField, watermark); await this.persistList(key, cursorField, fresh, CHART_LIMITS.overlay, watermark); @@ -356,6 +432,7 @@ export class LiveStateManager { const payloads = items.map((item) => JSON.stringify(item)); await this.redis.lTrim(listKey, 1, 0); + this.stats.trimOperations += 1; if (payloads.length > 0) { for (let idx = payloads.length - 1; idx >= 0; idx -= 1) { const payload = payloads[idx]; @@ -364,7 +441,9 @@ export class LiveStateManager { } } await this.redis.lTrim(listKey, 0, limit - 1); + this.stats.trimOperations += 1; } + this.stats.cacheDepthByKey.set(listKey, Math.min(items.length, limit)); await this.redis.hSet(CURSOR_HASH_KEY, cursorField, JSON.stringify(cursor)); } } diff --git a/services/api/tests/live.test.ts b/services/api/tests/live.test.ts index bfda54d..037da47 100644 --- a/services/api/tests/live.test.ts +++ b/services/api/tests/live.test.ts @@ -1,6 +1,6 @@ import { describe, expect, it } from "bun:test"; import type { ClickHouseClient } from "@islandflow/storage"; -import { LiveStateManager } from "../src/live"; +import { LiveStateManager, resolveGenericLiveLimits } from "../src/live"; const makeClickHouse = (): ClickHouseClient => ({ @@ -48,6 +48,19 @@ const makeRedis = () => { }; describe("LiveStateManager", () => { + it("resolves live limits from env with clamping", () => { + const limits = resolveGenericLiveLimits({ + LIVE_LIMIT_OPTIONS: "777", + LIVE_LIMIT_NBBO: "200000", + LIVE_LIMIT_FLOW: "bad" + } as NodeJS.ProcessEnv); + + expect(limits.options).toBe(777); + expect(limits.nbbo).toBe(100000); + expect(limits.flow).toBe(10000); + expect(limits.alerts).toBe(10000); + }); + it("hydrates snapshots from redis generic windows", async () => { const redis = makeRedis(); await redis.lPush( @@ -120,4 +133,67 @@ describe("LiveStateManager", () => { expect(candleSnapshot.watermark).toEqual({ ts: 100, seq: 1 }); expect(overlaySnapshot.watermark).toEqual({ ts: 110, seq: 2 }); }); + + it("trims generic windows to configured per-channel limits", async () => { + const redis = makeRedis(); + const manager = new LiveStateManager( + makeClickHouse(), + redis as never, + { + options: 10000, + nbbo: 10000, + equities: 10000, + "equity-joins": 10000, + flow: 2, + "classifier-hits": 10000, + alerts: 10000, + "inferred-dark": 10000 + } + ); + + await manager.ingest("flow", { + source_ts: 100, + ingest_ts: 101, + seq: 1, + trace_id: "flow-1", + id: "flow-1", + members: ["a"], + features: {}, + join_quality: {} + }); + await manager.ingest("flow", { + source_ts: 110, + ingest_ts: 111, + seq: 2, + trace_id: "flow-2", + id: "flow-2", + members: ["b"], + features: {}, + join_quality: {} + }); + await manager.ingest("flow", { + source_ts: 120, + ingest_ts: 121, + seq: 3, + trace_id: "flow-3", + id: "flow-3", + members: ["c"], + features: {}, + join_quality: {} + }); + + const snapshot = await manager.getSnapshot({ channel: "flow" }); + expect(snapshot.items).toHaveLength(2); + expect((snapshot.items as Array<{ id: string }>).map((item) => item.id)).toEqual([ + "flow-3", + "flow-2" + ]); + + const persisted = await redis.lRange("live:flow", 0, 99); + expect(persisted).toHaveLength(2); + + const stats = manager.getStatsSnapshot(); + expect(stats.trimOperations).toBeGreaterThan(0); + expect(stats.cacheDepthByKey["live:flow"]).toBe(2); + }); });