Merge pull request #22 from dirtydishes/feature/live-retention

Live retention redesign: balanced hot windows + pinned evidence hydration
This commit is contained in:
dirtydishes 2026-04-27 14:45:25 -04:00 committed by GitHub
commit dec99fb6e9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 1237 additions and 613 deletions

73
.beads/.gitignore vendored Normal file
View file

@ -0,0 +1,73 @@
# Dolt database (managed by Dolt, not git)
dolt/
embeddeddolt/
# Runtime files
bd.sock
bd.sock.startlock
sync-state.json
last-touched
.exclusive-lock
# Daemon runtime (lock, log, pid)
daemon.*
# Interactions log (runtime, not versioned)
interactions.jsonl
# Push state (runtime, per-machine)
push-state.json
# Lock files (various runtime locks)
*.lock
# Credential key (encryption key for federation peer auth — never commit)
.beads-credential-key
# Local version tracking (prevents upgrade notification spam after git ops)
.local_version
# Worktree redirect file (contains relative path to main repo's .beads/)
# Must not be committed as paths would be wrong in other clones
redirect
# Sync state (local-only, per-machine)
# These files are machine-specific and should not be shared across clones
.sync.lock
export-state/
export-state.json
# Ephemeral store (SQLite - wisps/molecules, intentionally not versioned)
ephemeral.sqlite3
ephemeral.sqlite3-journal
ephemeral.sqlite3-wal
ephemeral.sqlite3-shm
# Dolt server management (auto-started by bd)
dolt-server.pid
dolt-server.log
dolt-server.lock
dolt-server.port
dolt-server.activity
# Corrupt backup directories (created by bd doctor --fix recovery)
*.corrupt.backup/
# Backup data (auto-exported JSONL, local-only)
backup/
# Per-project environment file (Dolt connection config, GH#2520)
.env
# Legacy files (from pre-Dolt versions)
*.db
*.db?*
*.db-journal
*.db-wal
*.db-shm
db.sqlite
bd.db
# NOTE: Do NOT add negation patterns here.
# They would override fork protection in .git/info/exclude.
# Config files (metadata.json, config.yaml) are tracked by git by default
# since no pattern above ignores them.

81
.beads/README.md Normal file
View file

@ -0,0 +1,81 @@
# Beads - AI-Native Issue Tracking
Welcome to Beads! This repository uses **Beads** for issue tracking - a modern, AI-native tool designed to live directly in your codebase alongside your code.
## What is Beads?
Beads is issue tracking that lives in your repo, making it perfect for AI coding agents and developers who want their issues close to their code. No web UI required - everything works through the CLI and integrates seamlessly with git.
**Learn more:** [github.com/steveyegge/beads](https://github.com/steveyegge/beads)
## Quick Start
### Essential Commands
```bash
# Create new issues
bd create "Add user authentication"
# View all issues
bd list
# View issue details
bd show <issue-id>
# Update issue status
bd update <issue-id> --claim
bd update <issue-id> --status done
# Sync with Dolt remote
bd dolt push
```
### Working with Issues
Issues in Beads are:
- **Git-native**: Stored in Dolt database with version control and branching
- **AI-friendly**: CLI-first design works perfectly with AI coding agents
- **Branch-aware**: Issues can follow your branch workflow
- **Always in sync**: Auto-syncs with your commits
## Why Beads?
✨ **AI-Native Design**
- Built specifically for AI-assisted development workflows
- CLI-first interface works seamlessly with AI coding agents
- No context switching to web UIs
🚀 **Developer Focused**
- Issues live in your repo, right next to your code
- Works offline, syncs when you push
- Fast, lightweight, and stays out of your way
🔧 **Git Integration**
- Automatic sync with git commits
- Branch-aware issue tracking
- Dolt-native three-way merge resolution
## Get Started with Beads
Try Beads in your own projects:
```bash
# Install Beads
curl -sSL https://raw.githubusercontent.com/steveyegge/beads/main/scripts/install.sh | bash
# Initialize in your repo
bd init
# Create your first issue
bd create "Try out Beads"
```
## Learn More
- **Documentation**: [github.com/steveyegge/beads/docs](https://github.com/steveyegge/beads/tree/main/docs)
- **Quick Start Guide**: Run `bd quickstart`
- **Examples**: [github.com/steveyegge/beads/examples](https://github.com/steveyegge/beads/tree/main/examples)
---
*Beads: Issue tracking that moves at the speed of thought* ⚡

54
.beads/config.yaml Normal file
View file

@ -0,0 +1,54 @@
# Beads Configuration File
# This file configures default behavior for all bd commands in this repository
# All settings can also be set via environment variables (BD_* prefix)
# or overridden with command-line flags
# Issue prefix for this repository (used by bd init)
# If not set, bd init will auto-detect from directory name
# Example: issue-prefix: "myproject" creates issues like "myproject-1", "myproject-2", etc.
# issue-prefix: ""
# Use no-db mode: JSONL-only, no Dolt database
# When true, bd will use .beads/issues.jsonl as the source of truth
# no-db: false
# Enable JSON output by default
# json: false
# Feedback title formatting for mutating commands (create/update/close/dep/edit)
# 0 = hide titles, N > 0 = truncate to N characters
# output:
# title-length: 255
# Default actor for audit trails (overridden by BEADS_ACTOR or --actor)
# actor: ""
# Export events (audit trail) to .beads/events.jsonl on each flush/sync
# When enabled, new events are appended incrementally using a high-water mark.
# Use 'bd export --events' to trigger manually regardless of this setting.
# events-export: false
# Multi-repo configuration (experimental - bd-307)
# Allows hydrating from multiple repositories and routing writes to the correct database
# repos:
# primary: "." # Primary repo (where this database lives)
# additional: # Additional repos to hydrate from (read-only)
# - ~/beads-planning # Personal planning repo
# - ~/work-planning # Work planning repo
# JSONL backup (periodic export for off-machine recovery)
# Auto-enabled when a git remote exists. Override explicitly:
# backup:
# enabled: false # Disable auto-backup entirely
# interval: 15m # Minimum time between auto-exports
# git-push: false # Disable git push (export locally only)
# git-repo: "" # Separate git repo for backups (default: project repo)
# Integration settings (access with 'bd config get/set')
# These are stored in the database, not in this file:
# - jira.url
# - jira.project
# - linear.url
# - linear.api-key
# - github.org
# - github.repo

24
.beads/hooks/post-checkout Executable file
View file

@ -0,0 +1,24 @@
#!/usr/bin/env sh
# --- BEGIN BEADS INTEGRATION v1.0.3 ---
# This section is managed by beads. Do not remove these markers.
if command -v bd >/dev/null 2>&1; then
export BD_GIT_HOOK=1
_bd_timeout=${BEADS_HOOK_TIMEOUT:-300}
if command -v timeout >/dev/null 2>&1; then
timeout "$_bd_timeout" bd hooks run post-checkout "$@"
_bd_exit=$?
if [ $_bd_exit -eq 124 ]; then
echo >&2 "beads: hook 'post-checkout' timed out after ${_bd_timeout}s — continuing without beads"
_bd_exit=0
fi
else
bd hooks run post-checkout "$@"
_bd_exit=$?
fi
if [ $_bd_exit -eq 3 ]; then
echo >&2 "beads: database not initialized — skipping hook 'post-checkout'"
_bd_exit=0
fi
if [ $_bd_exit -ne 0 ]; then exit $_bd_exit; fi
fi
# --- END BEADS INTEGRATION v1.0.3 ---

24
.beads/hooks/post-merge Executable file
View file

@ -0,0 +1,24 @@
#!/usr/bin/env sh
# --- BEGIN BEADS INTEGRATION v1.0.3 ---
# This section is managed by beads. Do not remove these markers.
if command -v bd >/dev/null 2>&1; then
export BD_GIT_HOOK=1
_bd_timeout=${BEADS_HOOK_TIMEOUT:-300}
if command -v timeout >/dev/null 2>&1; then
timeout "$_bd_timeout" bd hooks run post-merge "$@"
_bd_exit=$?
if [ $_bd_exit -eq 124 ]; then
echo >&2 "beads: hook 'post-merge' timed out after ${_bd_timeout}s — continuing without beads"
_bd_exit=0
fi
else
bd hooks run post-merge "$@"
_bd_exit=$?
fi
if [ $_bd_exit -eq 3 ]; then
echo >&2 "beads: database not initialized — skipping hook 'post-merge'"
_bd_exit=0
fi
if [ $_bd_exit -ne 0 ]; then exit $_bd_exit; fi
fi
# --- END BEADS INTEGRATION v1.0.3 ---

24
.beads/hooks/pre-commit Executable file
View file

@ -0,0 +1,24 @@
#!/usr/bin/env sh
# --- BEGIN BEADS INTEGRATION v1.0.3 ---
# This section is managed by beads. Do not remove these markers.
if command -v bd >/dev/null 2>&1; then
export BD_GIT_HOOK=1
_bd_timeout=${BEADS_HOOK_TIMEOUT:-300}
if command -v timeout >/dev/null 2>&1; then
timeout "$_bd_timeout" bd hooks run pre-commit "$@"
_bd_exit=$?
if [ $_bd_exit -eq 124 ]; then
echo >&2 "beads: hook 'pre-commit' timed out after ${_bd_timeout}s — continuing without beads"
_bd_exit=0
fi
else
bd hooks run pre-commit "$@"
_bd_exit=$?
fi
if [ $_bd_exit -eq 3 ]; then
echo >&2 "beads: database not initialized — skipping hook 'pre-commit'"
_bd_exit=0
fi
if [ $_bd_exit -ne 0 ]; then exit $_bd_exit; fi
fi
# --- END BEADS INTEGRATION v1.0.3 ---

24
.beads/hooks/pre-push Executable file
View file

@ -0,0 +1,24 @@
#!/usr/bin/env sh
# --- BEGIN BEADS INTEGRATION v1.0.3 ---
# This section is managed by beads. Do not remove these markers.
if command -v bd >/dev/null 2>&1; then
export BD_GIT_HOOK=1
_bd_timeout=${BEADS_HOOK_TIMEOUT:-300}
if command -v timeout >/dev/null 2>&1; then
timeout "$_bd_timeout" bd hooks run pre-push "$@"
_bd_exit=$?
if [ $_bd_exit -eq 124 ]; then
echo >&2 "beads: hook 'pre-push' timed out after ${_bd_timeout}s — continuing without beads"
_bd_exit=0
fi
else
bd hooks run pre-push "$@"
_bd_exit=$?
fi
if [ $_bd_exit -eq 3 ]; then
echo >&2 "beads: database not initialized — skipping hook 'pre-push'"
_bd_exit=0
fi
if [ $_bd_exit -ne 0 ]; then exit $_bd_exit; fi
fi
# --- END BEADS INTEGRATION v1.0.3 ---

24
.beads/hooks/prepare-commit-msg Executable file
View file

@ -0,0 +1,24 @@
#!/usr/bin/env sh
# --- BEGIN BEADS INTEGRATION v1.0.3 ---
# This section is managed by beads. Do not remove these markers.
if command -v bd >/dev/null 2>&1; then
export BD_GIT_HOOK=1
_bd_timeout=${BEADS_HOOK_TIMEOUT:-300}
if command -v timeout >/dev/null 2>&1; then
timeout "$_bd_timeout" bd hooks run prepare-commit-msg "$@"
_bd_exit=$?
if [ $_bd_exit -eq 124 ]; then
echo >&2 "beads: hook 'prepare-commit-msg' timed out after ${_bd_timeout}s — continuing without beads"
_bd_exit=0
fi
else
bd hooks run prepare-commit-msg "$@"
_bd_exit=$?
fi
if [ $_bd_exit -eq 3 ]; then
echo >&2 "beads: database not initialized — skipping hook 'prepare-commit-msg'"
_bd_exit=0
fi
if [ $_bd_exit -ne 0 ]; then exit $_bd_exit; fi
fi
# --- END BEADS INTEGRATION v1.0.3 ---

7
.beads/metadata.json Normal file
View file

@ -0,0 +1,7 @@
{
"database": "dolt",
"backend": "dolt",
"dolt_mode": "embedded",
"dolt_database": "islandflow",
"project_id": "05939772-aa50-4910-914d-31feaf6c757b"
}

26
.claude/settings.json Normal file
View file

@ -0,0 +1,26 @@
{
"hooks": {
"PreCompact": [
{
"hooks": [
{
"command": "bd prime",
"type": "command"
}
],
"matcher": ""
}
],
"SessionStart": [
{
"hooks": [
{
"command": "bd prime",
"type": "command"
}
],
"matcher": ""
}
]
}
}

View file

@ -57,6 +57,9 @@ COMPUTE_DELIVER_POLICY=new
COMPUTE_CONSUMER_RESET=false COMPUTE_CONSUMER_RESET=false
NBBO_MAX_AGE_MS=1000 NBBO_MAX_AGE_MS=1000
NEXT_PUBLIC_NBBO_MAX_AGE_MS=1000 NEXT_PUBLIC_NBBO_MAX_AGE_MS=1000
NEXT_PUBLIC_LIVE_HOT_WINDOW=2000
NEXT_PUBLIC_PINNED_EVIDENCE_TTL_MS=1200000
NEXT_PUBLIC_PINNED_EVIDENCE_MAX_ITEMS=4000
ROLLING_WINDOW_SIZE=50 ROLLING_WINDOW_SIZE=50
ROLLING_TTL_SEC=86400 ROLLING_TTL_SEC=86400
CLASSIFIER_SWEEP_MIN_PREMIUM=40000 CLASSIFIER_SWEEP_MIN_PREMIUM=40000
@ -81,3 +84,13 @@ REPLAY_END_TS=0
REPLAY_SPEED=1 REPLAY_SPEED=1
REPLAY_BATCH_SIZE=200 REPLAY_BATCH_SIZE=200
REPLAY_LOG_EVERY=1000 REPLAY_LOG_EVERY=1000
# API live retention (generic channels)
LIVE_LIMIT_OPTIONS=10000
LIVE_LIMIT_NBBO=10000
LIVE_LIMIT_EQUITIES=10000
LIVE_LIMIT_EQUITY_JOINS=10000
LIVE_LIMIT_FLOW=10000
LIVE_LIMIT_CLASSIFIER_HITS=10000
LIVE_LIMIT_ALERTS=10000
LIVE_LIMIT_INFERRED_DARK=10000

5
.gitignore vendored
View file

@ -15,3 +15,8 @@ apps/web/.next/
# Local assistant artifacts # Local assistant artifacts
session-ses_*.md session-ses_*.md
token-usage-output.txt token-usage-output.txt
# Beads / Dolt files (added by bd init)
.dolt/
*.db
.beads-credential-key

113
AGENTS.md
View file

@ -97,92 +97,49 @@ cp -rf source dest # NOT: cp -r source dest
- NEVER say "ready to push when you are" - YOU must push - NEVER say "ready to push when you are" - YOU must push
- If push fails, resolve and retry until it succeeds - If push fails, resolve and retry until it succeeds
<!-- BEGIN BEADS INTEGRATION --> <!-- BEGIN BEADS INTEGRATION v:1 profile:minimal hash:ca08a54f -->
## Issue Tracking with bd (beads) ## Beads Issue Tracker
**IMPORTANT**: This project uses **bd (beads)** for ALL issue tracking. Do NOT use markdown TODOs, task lists, or other tracking methods. This project uses **bd (beads)** for issue tracking. Run `bd prime` to see full workflow context and commands.
### Why bd? ### Quick Reference
- Dependency-aware: Track blockers and relationships between issues
- Git-friendly: Dolt-powered version control with native sync
- Agent-optimized: JSON output, ready work detection, discovered-from links
- Prevents duplicate tracking systems and confusion
### Quick Start
**Check for ready work:**
```bash ```bash
bd ready --json bd ready # Find available work
bd show <id> # View issue details
bd update <id> --claim # Claim work
bd close <id> # Complete work
``` ```
**Create new issues:** ### Rules
- Use `bd` for ALL task tracking — do NOT use TodoWrite, TaskCreate, or markdown TODO lists
- Run `bd prime` for detailed command reference and session close protocol
- Use `bd remember` for persistent knowledge — do NOT use MEMORY.md files
## Session Completion
**When ending a work session**, you MUST complete ALL steps below. Work is NOT complete until `git push` succeeds.
**MANDATORY WORKFLOW:**
1. **File issues for remaining work** - Create issues for anything that needs follow-up
2. **Run quality gates** (if code changed) - Tests, linters, builds
3. **Update issue status** - Close finished work, update in-progress items
4. **PUSH TO REMOTE** - This is MANDATORY:
```bash ```bash
bd create "Issue title" --description="Detailed context" -t bug|feature|task -p 0-4 --json git pull --rebase
bd create "Issue title" --description="What this issue is about" -p 1 --deps discovered-from:bd-123 --json bd dolt push
git push
# Use stdin for descriptions with special characters (backticks, !, nested quotes) git status # MUST show "up to date with origin"
echo 'Description with `backticks` and "quotes"' | bd create "Title" --description=- --json
``` ```
5. **Clean up** - Clear stashes, prune remote branches
6. **Verify** - All changes committed AND pushed
7. **Hand off** - Provide context for next session
**Claim and update:** **CRITICAL RULES:**
- Work is NOT complete until `git push` succeeds
```bash - NEVER stop before pushing - that leaves work stranded locally
bd update <id> --claim --json - NEVER say "ready to push when you are" - YOU must push
bd update bd-42 --priority 1 --json - If push fails, resolve and retry until it succeeds
```
**Complete work:**
```bash
bd close bd-42 --reason "Completed" --json
```
### Issue Types
- `bug` - Something broken
- `feature` - New functionality
- `task` - Work item (tests, docs, refactoring)
- `epic` - Large feature with subtasks
- `chore` - Maintenance (dependencies, tooling)
### Priorities
- `0` - Critical (security, data loss, broken builds)
- `1` - High (major features, important bugs)
- `2` - Medium (default, nice-to-have)
- `3` - Low (polish, optimization)
- `4` - Backlog (future ideas)
### Workflow for AI Agents
1. **Check ready work**: `bd ready` shows unblocked issues
2. **Claim your task atomically**: `bd update <id> --claim`
3. **Work on it**: Implement, test, document
4. **Discover new work?** Create linked issue:
- `bd create "Found bug" --description="Details about what was found" -p 1 --deps discovered-from:<parent-id>`
5. **Complete**: `bd close <id> --reason "Done"`
### Auto-Sync
bd automatically syncs via Dolt:
- Each write auto-commits to Dolt history
- Use `bd dolt push`/`bd dolt pull` for remote sync
- No manual export/import needed!
### Important Rules
- ✅ Use bd for ALL task tracking
- ✅ Always use `--json` flag for programmatic use
- ✅ Link discovered work with `discovered-from` dependencies
- ✅ Check `bd ready` before asking "what should I work on?"
- ❌ Do NOT create markdown TODO lists
- ❌ Do NOT use external issue trackers
- ❌ Do NOT duplicate tracking systems
For more details, see README.md and docs/QUICKSTART.md.
<!-- END BEADS INTEGRATION --> <!-- END BEADS INTEGRATION -->

View file

@ -1,428 +0,0 @@
# Detailed Agent Instructions for Beads Development
**For project overview and quick start, see [AGENTS.md](AGENTS.md)**
This document contains detailed operational instructions for AI agents working on beads development, testing, and releases.
## Development Guidelines
### Code Standards
- **Go version**: 1.24+
- **Linting**: `golangci-lint run ./...` (baseline warnings documented in [docs/LINTING.md](docs/LINTING.md))
- **Testing**: All new features need tests (`make test` for local baseline, `make test-full-cgo` when validating full CGO paths)
- **Documentation**: Update relevant .md files
### File Organization
```
beads/
├── cmd/bd/ # CLI commands
├── internal/
│ ├── types/ # Core data types
│ └── storage/ # Storage layer
│ └── dolt/ # Dolt implementation
├── examples/ # Integration examples
└── *.md # Documentation
```
### Testing Workflow
**IMPORTANT:** Never pollute the production database with test issues!
**For manual testing**, use the `BEADS_DB` environment variable to point to a temporary database:
```bash
# Create test issues in isolated database
BEADS_DB=/tmp/test.db bd init --quiet --prefix test
BEADS_DB=/tmp/test.db bd create "Test issue" -p 1
# Or for quick testing
BEADS_DB=/tmp/test.db bd create "Test feature" -p 1
```
**For automated tests**, use `t.TempDir()` in Go tests:
```go
func TestMyFeature(t *testing.T) {
tmpDir := t.TempDir()
testDB := filepath.Join(tmpDir, ".beads", "beads.db")
s := newTestStore(t, testDB)
// ... test code
}
```
**Git test isolation:** For tests that create temporary git repos, force repo-local hooks:
```bash
git config core.hooksPath .git/hooks
```
Do not rely on the developer's global git config. Global `core.hooksPath` can leak
into temp repos and produce flaky test behavior.
**Warning:** bd will warn you when creating issues with "Test" prefix in the production database. Always use `BEADS_DB` for manual testing.
### Before Committing
1. **Run tests**: `make test` (or `./scripts/test.sh`)
- For full CGO validation: `make test-full-cgo`
2. **Run linter**: `golangci-lint run ./...` (ignore baseline warnings)
3. **Update docs**: If you changed behavior, update README.md or other docs
4. **Commit**: With git hooks installed (`bd hooks install`), Dolt changes are auto-committed
### Commit Message Convention
When committing work for an issue, include the issue ID in parentheses at the end:
```bash
git commit -m "Fix auth validation bug (bd-abc)"
git commit -m "Add retry logic for database locks (bd-xyz)"
```
This enables `bd doctor` to detect **orphaned issues** - work that was committed but the issue wasn't closed. The doctor check cross-references open issues against git history to find these orphans.
### Git Workflow
bd uses **Dolt** as its primary database. Changes are committed to Dolt history automatically (one Dolt commit per write command).
**Install git hooks** for automatic sync:
```bash
bd hooks install
```
### Git Integration
**Dolt sync**: Dolt handles sync natively via `bd dolt push` / `bd dolt pull`. No JSONL export/import needed.
**Protected branches**: Dolt stores data under `refs/dolt/data`, separate from standard Git refs. See [docs/PROTECTED_BRANCHES.md](docs/PROTECTED_BRANCHES.md).
**Git worktrees**: Work directly with Dolt — no special flags needed. See [docs/ADVANCED.md](docs/ADVANCED.md).
**Merge conflicts**: Rare with hash IDs. Dolt uses cell-level 3-way merge for conflict resolution.
## Git Workflow: Push to Main, Never PR
Crew workers push directly to main. **Never create pull requests.**
- `git push` to main is the only way to land work
- `gh pr create` is forbidden — PRs are for external contributors, not crew
- Do not create feature branches for your own work — commit and push to main
- When handling external PRs, use fix-merge: checkout the PR branch locally,
fix/rebase onto main, merge locally, `git push`, then close the PR
This is enforced by pre-use hooks. If you try `gh pr create`, it will be blocked.
## Landing the Plane
**When the user says "let's land the plane"**, you MUST complete ALL steps below. The plane is NOT landed until `git push` succeeds. NEVER stop before pushing. NEVER say "ready to push when you are!" - that is a FAILURE.
**MANDATORY WORKFLOW - COMPLETE ALL STEPS:**
1. **File beads issues for any remaining work** that needs follow-up
2. **Ensure all quality gates pass** (only if code changes were made):
- Run `make lint` or `golangci-lint run ./...` (if pre-commit installed: `pre-commit run --all-files`)
- Run `make test` (and `make test-full-cgo` when CGO-relevant code changed)
- File P0 issues if quality gates are broken
3. **Update beads issues** - close finished work, update status
4. **PUSH TO REMOTE - NON-NEGOTIABLE** - This step is MANDATORY. Execute ALL commands below:
```bash
# Pull first to catch any remote changes
git pull --rebase
# MANDATORY: Push everything to remote
# DO NOT STOP BEFORE THIS COMMAND COMPLETES
git push
# MANDATORY: Verify push succeeded
git status # MUST show "up to date with origin/main"
```
**CRITICAL RULES:**
- The plane has NOT landed until `git push` completes successfully
- NEVER stop before `git push` - that leaves work stranded locally
- NEVER say "ready to push when you are!" - YOU must push, not the user
- If `git push` fails, resolve the issue and retry until it succeeds
- The user is managing multiple agents - unpushed work breaks their coordination workflow
5. **Clean up git state** - Clear old stashes and prune dead remote branches:
```bash
git stash clear # Remove old stashes
git remote prune origin # Clean up deleted remote branches
```
6. **Verify clean state** - Ensure all changes are committed AND PUSHED, no untracked files remain
7. **Choose a follow-up issue for next session**
- Provide a prompt for the user to give to you in the next session
- Format: "Continue work on bd-X: [issue title]. [Brief context about what's been done and what's next]"
**REMEMBER: Landing the plane means EVERYTHING is pushed to remote. No exceptions. No "ready when you are". PUSH IT.**
**Example "land the plane" session:**
```bash
# 1. File remaining work
bd create "Add integration tests for sync" -t task -p 2 --json
# 2. Run quality gates (only if code changes were made)
go test -short ./...
golangci-lint run ./...
# 3. Close finished issues
bd close bd-42 bd-43 --reason "Completed" --json
# 4. PUSH TO REMOTE - MANDATORY, NO STOPPING BEFORE THIS IS DONE
git pull --rebase
git push # MANDATORY - THE PLANE IS STILL IN THE AIR UNTIL THIS SUCCEEDS
git status # MUST verify "up to date with origin/main"
# 5. Clean up git state
git stash clear
git remote prune origin
# 6. Verify everything is clean and pushed
git status
# 7. Choose next work
bd ready --json
bd show bd-44 --json
```
**Then provide the user with:**
- Summary of what was completed this session
- What issues were filed for follow-up
- Status of quality gates (all passing / issues filed)
- Confirmation that ALL changes have been pushed to remote
- Recommended prompt for next session
**CRITICAL: Never end a "land the plane" session without successfully pushing. The user is coordinating multiple agents and unpushed work causes severe rebase conflicts.**
## Agent Session Workflow
**WARNING: DO NOT use `bd edit`** - it opens an interactive editor ($EDITOR) which AI agents cannot use. Use `bd update` with flags instead:
```bash
bd update <id> --description "new description"
bd update <id> --title "new title"
bd update <id> --design "design notes"
bd update <id> --notes "additional notes"
bd update <id> --acceptance "acceptance criteria"
```
**Use stdin for descriptions with special characters** (backticks, `!`, nested quotes):
```bash
# Pipe via stdin to avoid shell escaping issues
echo 'Description with `backticks` and "quotes"' | bd create "Title" --stdin
echo 'Updated description with $variables' | bd update <id> --description=-
# Or use --body-file for longer content
bd create "Title" --body-file=description.md
```
**Example agent session:**
```bash
# Make changes (each write auto-commits to Dolt)
bd create "Fix bug" -p 1
bd create "Add tests" -p 1
bd update bd-42 --claim
bd close bd-40 --reason "Completed"
# Push Dolt data to remote if configured
bd dolt push
# Now safe to end session
```
This installs:
- **pre-commit** — Commits pending Dolt changes
- **post-merge** — Pulls remote Dolt changes after git merge
**Note:** Hooks are embedded in the bd binary and work for all bd users (not just source repo users).
## Common Development Tasks
### CLI Design Principles
**Minimize cognitive overload.** Every new command, flag, or option adds cognitive burden for users. Before adding anything:
1. **Recovery/fix operations → `bd doctor --fix`**: Don't create separate commands like `bd recover` or `bd repair`. Doctor already detects problems - let `--fix` handle remediation. This keeps all health-related operations in one discoverable place.
For git hook marker migration specifically: use `bd migrate hooks --dry-run` to preview operations, and `bd doctor --fix` for the standard apply path.
2. **Prefer flags on existing commands**: Before creating a new command, ask: "Can this be a flag on an existing command?" Example: `bd list --stale` instead of `bd stale`.
3. **Consolidate related operations**: Related operations should live together. Version control uses `bd vc {log,diff,commit}`, not separate top-level commands.
4. **Count the commands**: Run `bd --help` and count. If we're approaching 30+ commands, we have a discoverability problem. Consider subcommand grouping.
5. **New commands need strong justification**: A new command should represent a fundamentally different operation, not just a convenience wrapper.
### Adding a New Command
1. Create file in `cmd/bd/`
2. Add to root command in `cmd/bd/main.go`
3. Implement with Cobra framework
4. Add `--json` flag for agent use
5. Add tests in `cmd/bd/*_test.go`
6. Document in README.md
### Adding Storage Features
1. Add Dolt SQL schema changes in `internal/storage/dolt/`
2. Add migration if needed
3. Update `internal/types/types.go` if new types
4. Implement in `internal/storage/dolt/` (queries, issues, etc.)
5. Add tests
6. Update export/import in `cmd/bd/export.go` and `cmd/bd/import.go`
### Adding Examples
1. Create directory in `examples/`
2. Add README.md explaining the example
3. Include working code
4. Link from `examples/README.md`
5. Mention in main README.md
## Building and Testing
```bash
# Build and install bd to ~/.local/bin (the canonical location)
make install
# Test (local baseline)
make test
# Test with full CGO-enabled suite (local/CI parity)
make test-full-cgo
# Coverage run
go test -coverprofile=coverage.out ./...
go tool cover -html=coverage.out
# Verify installed binary
bd init --prefix test
bd create "Test issue" -p 1
bd ready
```
> **WARNING**: Do NOT use `go build -o bd ./cmd/bd` or `go install ./cmd/bd`.
> These create stale binaries in the working directory or `~/go/bin/` that
> shadow the canonical install at `~/.local/bin/bd`. Always use `make install`.
## Version Management
**IMPORTANT**: When the user asks to "bump the version" or mentions a new version number (e.g., "bump to 0.9.3"), use the version bump script:
```bash
# Preview changes (shows diff, doesn't commit)
./scripts/bump-version.sh 0.9.3
# Auto-commit the version bump
./scripts/bump-version.sh 0.9.3 --commit
git push origin main
```
**What it does:**
- Updates ALL version files (CLI, plugin, MCP server, docs) in one command
- Validates semantic versioning format
- Shows diff preview
- Verifies all versions match after update
- Creates standardized commit message
**User will typically say:**
- "Bump to 0.9.3"
- "Update version to 1.0.0"
- "Rev the project to 0.9.4"
- "Increment the version"
**You should:**
1. Run `./scripts/bump-version.sh <version> --commit`
2. Push to GitHub
3. Confirm all versions updated correctly
**Files updated automatically:**
- `cmd/bd/version.go` - CLI version
- `claude-plugin/.claude-plugin/plugin.json` - Plugin version
- `.claude-plugin/marketplace.json` - Marketplace version
- `integrations/beads-mcp/pyproject.toml` - MCP server version
- `README.md` - Documentation version
- `PLUGIN.md` - Version requirements
**Why this matters:** We had version mismatches (bd-66) when only `version.go` was updated. This script prevents that by updating all components atomically.
See `scripts/README.md` for more details.
## Release Process (Maintainers)
**Automated (Recommended):**
```bash
# One command to do everything (version bump, tests, tag, Homebrew update, local install)
./scripts/release.sh 0.9.3
```
This handles the entire release workflow automatically, including waiting ~5 minutes for GitHub Actions to build release artifacts. See [scripts/README.md](scripts/README.md) for details.
**Manual (Step-by-Step):**
1. Bump version: `./scripts/bump-version.sh <version> --commit`
2. Update CHANGELOG.md with release notes
3. Run tests: `make test` (and `make test-full-cgo` for CGO-related changes)
4. Push version bump: `git push origin main`
5. Tag release: `git tag v<version> && git push origin v<version>`
6. Update Homebrew: `./scripts/update-homebrew.sh <version>` (waits for GitHub Actions)
7. Verify: `brew update && brew upgrade beads && bd version`
See [docs/RELEASING.md](docs/RELEASING.md) for complete manual instructions.
## Checking GitHub Issues and PRs
**IMPORTANT**: When asked to check GitHub issues or PRs, use command-line tools like `gh` instead of browser/playwright tools.
**Preferred approach:**
```bash
# List open issues with details
gh issue list --limit 30
# List open PRs
gh pr list --limit 30
# View specific issue
gh issue view 201
```
**Then provide an in-conversation summary** highlighting:
- Urgent/critical issues (regressions, bugs, broken builds)
- Common themes or patterns
- Feature requests with high engagement
- Items that need immediate attention
**Why this matters:**
- Browser tools consume more tokens and are slower
- CLI summaries are easier to scan and discuss
- Keeps the conversation focused and efficient
- Better for quick triage and prioritization
**Do NOT use:** `browser_navigate`, `browser_snapshot`, or other playwright tools for GitHub PR/issue reviews unless specifically requested by the user.
## Questions?
- Check existing issues: `bd list`
- Look at recent commits: `git log --oneline -20`
- Read the docs: README.md, ADVANCED.md, EXTENDING.md
- Create an issue if unsure: `bd create "Question: ..." -t task -p 2`
## Important Files
- **README.md** - Main documentation (keep this updated!)
- **EXTENDING.md** - Database extension guide
- **ADVANCED.md** - Advanced features (rename, merge, compaction)
- **CONTRIBUTING.md** - Contribution guidelines
- **SECURITY.md** - Security policy

69
CLAUDE.md Normal file
View file

@ -0,0 +1,69 @@
# Project Instructions for AI Agents
This file provides instructions and context for AI coding agents working on this project.
<!-- BEGIN BEADS INTEGRATION v:1 profile:minimal hash:ca08a54f -->
## Beads Issue Tracker
This project uses **bd (beads)** for issue tracking. Run `bd prime` to see full workflow context and commands.
### Quick Reference
```bash
bd ready # Find available work
bd show <id> # View issue details
bd update <id> --claim # Claim work
bd close <id> # Complete work
```
### Rules
- Use `bd` for ALL task tracking — do NOT use TodoWrite, TaskCreate, or markdown TODO lists
- Run `bd prime` for detailed command reference and session close protocol
- Use `bd remember` for persistent knowledge — do NOT use MEMORY.md files
## Session Completion
**When ending a work session**, you MUST complete ALL steps below. Work is NOT complete until `git push` succeeds.
**MANDATORY WORKFLOW:**
1. **File issues for remaining work** - Create issues for anything that needs follow-up
2. **Run quality gates** (if code changed) - Tests, linters, builds
3. **Update issue status** - Close finished work, update in-progress items
4. **PUSH TO REMOTE** - This is MANDATORY:
```bash
git pull --rebase
bd dolt push
git push
git status # MUST show "up to date with origin"
```
5. **Clean up** - Clear stashes, prune remote branches
6. **Verify** - All changes committed AND pushed
7. **Hand off** - Provide context for next session
**CRITICAL RULES:**
- Work is NOT complete until `git push` succeeds
- NEVER stop before pushing - that leaves work stranded locally
- NEVER say "ready to push when you are" - YOU must push
- If push fails, resolve and retry until it succeeds
<!-- END BEADS INTEGRATION -->
## Build & Test
_Add your build and test commands here_
```bash
# Example:
# npm install
# npm test
```
## Architecture Overview
_Add a brief overview of your project architecture_
## Conventions & Patterns
_Add your project-specific conventions here_

View file

@ -149,6 +149,13 @@ All runtime configuration comes from `.env`.
### API ### API
- `API_PORT`, `REST_DEFAULT_LIMIT` - `API_PORT`, `REST_DEFAULT_LIMIT`
- `LIVE_LIMIT_OPTIONS`, `LIVE_LIMIT_NBBO`, `LIVE_LIMIT_EQUITIES`, `LIVE_LIMIT_EQUITY_JOINS`, `LIVE_LIMIT_FLOW`, `LIVE_LIMIT_CLASSIFIER_HITS`, `LIVE_LIMIT_ALERTS`, `LIVE_LIMIT_INFERRED_DARK` (bounded live generic cache depths; defaults `10000`, max `100000`)
### Web live retention
- `NEXT_PUBLIC_LIVE_HOT_WINDOW` (frontend hot live window cap; default `2000`)
- `NEXT_PUBLIC_PINNED_EVIDENCE_TTL_MS` (pinned evidence TTL; default `1200000`)
- `NEXT_PUBLIC_PINNED_EVIDENCE_MAX_ITEMS` (pinned evidence cache guardrail; default `4000`)
### Replay service ### Replay service
@ -163,4 +170,8 @@ All runtime configuration comes from `.env`.
- Python dependencies are required only for IBKR/Databento sidecars (`services/ingest-options/py/requirements.txt`). - Python dependencies are required only for IBKR/Databento sidecars (`services/ingest-options/py/requirements.txt`).
- Candle construction is server-side; the client consumes prebuilt OHLC events. - Candle construction is server-side; the client consumes prebuilt OHLC events.
- Live retention uses a two-tier model:
- API/Redis maintain a bounded hot cache per live generic channel.
- UI keeps a bounded hot window for rendering performance.
- Alert/drawer evidence is pinned and hydrated by id/trace so details remain inspectable after hot-window eviction.
- This repository is for personal, non-redistributed usage. - This repository is for personal, non-redistributed usage.

File diff suppressed because it is too large Load diff

View file

@ -678,6 +678,10 @@ const run = async () => {
const liveState = new LiveStateManager(clickhouse, redis); const liveState = new LiveStateManager(clickhouse, redis);
await liveState.hydrate(); await liveState.hydrate();
const liveStateMetricsTimer = setInterval(() => {
const snapshot = liveState.getStatsSnapshot();
logger.info("live cache metrics", snapshot);
}, 60000);
const subscribeWithReset = async <T>( const subscribeWithReset = async <T>(
subject: string, subject: string,
@ -1475,6 +1479,7 @@ const run = async () => {
state.shutdownPromise = (async () => { state.shutdownPromise = (async () => {
logger.info("service stopping", { signal }); logger.info("service stopping", { signal });
server.stop(); server.stop();
clearInterval(liveStateMetricsTimer);
if (redis && redis.isOpen) { if (redis && redis.isOpen) {
try { try {

View file

@ -33,16 +33,19 @@ import type { RedisClientType } from "redis";
const CURSOR_HASH_KEY = "live:cursors"; const CURSOR_HASH_KEY = "live:cursors";
const GENERIC_LIMITS = { const DEFAULT_GENERIC_LIMIT = 10000;
options: 500, const MAX_GENERIC_LIMIT = 100000;
nbbo: 500, const MIN_GENERIC_LIMIT = 1;
equities: 500, const GENERIC_LIMIT_ENV_KEYS: Record<LiveGenericChannel, string> = {
"equity-joins": 500, options: "LIVE_LIMIT_OPTIONS",
flow: 500, nbbo: "LIVE_LIMIT_NBBO",
"classifier-hits": 500, equities: "LIVE_LIMIT_EQUITIES",
alerts: 500, "equity-joins": "LIVE_LIMIT_EQUITY_JOINS",
"inferred-dark": 500 flow: "LIVE_LIMIT_FLOW",
} as const; "classifier-hits": "LIVE_LIMIT_CLASSIFIER_HITS",
alerts: "LIVE_LIMIT_ALERTS",
"inferred-dark": "LIVE_LIMIT_INFERRED_DARK"
};
const CHART_LIMITS = { const CHART_LIMITS = {
candles: 500, candles: 500,
@ -58,6 +61,43 @@ type GenericFeedConfig = {
fetchRecent: (clickhouse: ClickHouseClient, limit: number) => Promise<any[]>; fetchRecent: (clickhouse: ClickHouseClient, limit: number) => Promise<any[]>;
}; };
export type GenericLiveLimits = Record<LiveGenericChannel, number>;
const parseGenericLimit = (
env: NodeJS.ProcessEnv,
channel: LiveGenericChannel,
fallback: number
): number => {
const key = GENERIC_LIMIT_ENV_KEYS[channel];
const raw = env[key];
if (!raw || raw.trim().length === 0) {
return fallback;
}
const parsed = Number(raw);
if (!Number.isFinite(parsed)) {
console.warn(`Invalid ${key}="${raw}", using ${fallback}`);
return fallback;
}
const bounded = Math.max(MIN_GENERIC_LIMIT, Math.min(MAX_GENERIC_LIMIT, Math.floor(parsed)));
if (bounded !== parsed) {
console.warn(`Clamped ${key} from ${parsed} to ${bounded}`);
}
return bounded;
};
export const resolveGenericLiveLimits = (env: NodeJS.ProcessEnv = process.env): GenericLiveLimits => ({
options: parseGenericLimit(env, "options", DEFAULT_GENERIC_LIMIT),
nbbo: parseGenericLimit(env, "nbbo", DEFAULT_GENERIC_LIMIT),
equities: parseGenericLimit(env, "equities", DEFAULT_GENERIC_LIMIT),
"equity-joins": parseGenericLimit(env, "equity-joins", DEFAULT_GENERIC_LIMIT),
flow: parseGenericLimit(env, "flow", DEFAULT_GENERIC_LIMIT),
"classifier-hits": parseGenericLimit(env, "classifier-hits", DEFAULT_GENERIC_LIMIT),
alerts: parseGenericLimit(env, "alerts", DEFAULT_GENERIC_LIMIT),
"inferred-dark": parseGenericLimit(env, "inferred-dark", DEFAULT_GENERIC_LIMIT)
});
type RedisLike = Pick< type RedisLike = Pick<
RedisClientType, RedisClientType,
"isOpen" | "lRange" | "lPush" | "lTrim" | "hGet" | "hSet" "isOpen" | "lRange" | "lPush" | "lTrim" | "hGet" | "hSet"
@ -75,13 +115,13 @@ const parseCursor = (value: string | null): Cursor | null => {
} }
}; };
const getGenericConfig = (): { const getGenericConfig = (limits: GenericLiveLimits): {
[K in LiveGenericChannel]: GenericFeedConfig; [K in LiveGenericChannel]: GenericFeedConfig;
} => ({ } => ({
options: { options: {
redisKey: "live:options", redisKey: "live:options",
cursorField: "options", cursorField: "options",
limit: GENERIC_LIMITS.options, limit: limits.options,
parse: (value) => OptionPrintSchema.parse(value), parse: (value) => OptionPrintSchema.parse(value),
cursor: (item) => ({ ts: item.ts, seq: item.seq }), cursor: (item) => ({ ts: item.ts, seq: item.seq }),
fetchRecent: fetchRecentOptionPrints fetchRecent: fetchRecentOptionPrints
@ -89,7 +129,7 @@ const getGenericConfig = (): {
nbbo: { nbbo: {
redisKey: "live:nbbo", redisKey: "live:nbbo",
cursorField: "nbbo", cursorField: "nbbo",
limit: GENERIC_LIMITS.nbbo, limit: limits.nbbo,
parse: (value) => OptionNBBOSchema.parse(value), parse: (value) => OptionNBBOSchema.parse(value),
cursor: (item) => ({ ts: item.ts, seq: item.seq }), cursor: (item) => ({ ts: item.ts, seq: item.seq }),
fetchRecent: fetchRecentOptionNBBO fetchRecent: fetchRecentOptionNBBO
@ -97,7 +137,7 @@ const getGenericConfig = (): {
equities: { equities: {
redisKey: "live:equities", redisKey: "live:equities",
cursorField: "equities", cursorField: "equities",
limit: GENERIC_LIMITS.equities, limit: limits.equities,
parse: (value) => EquityPrintSchema.parse(value), parse: (value) => EquityPrintSchema.parse(value),
cursor: (item) => ({ ts: item.ts, seq: item.seq }), cursor: (item) => ({ ts: item.ts, seq: item.seq }),
fetchRecent: fetchRecentEquityPrints fetchRecent: fetchRecentEquityPrints
@ -105,7 +145,7 @@ const getGenericConfig = (): {
"equity-joins": { "equity-joins": {
redisKey: "live:equity-joins", redisKey: "live:equity-joins",
cursorField: "equity-joins", cursorField: "equity-joins",
limit: GENERIC_LIMITS["equity-joins"], limit: limits["equity-joins"],
parse: (value) => EquityPrintJoinSchema.parse(value), parse: (value) => EquityPrintJoinSchema.parse(value),
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
fetchRecent: fetchRecentEquityPrintJoins fetchRecent: fetchRecentEquityPrintJoins
@ -113,7 +153,7 @@ const getGenericConfig = (): {
flow: { flow: {
redisKey: "live:flow", redisKey: "live:flow",
cursorField: "flow", cursorField: "flow",
limit: GENERIC_LIMITS.flow, limit: limits.flow,
parse: (value) => FlowPacketSchema.parse(value), parse: (value) => FlowPacketSchema.parse(value),
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
fetchRecent: fetchRecentFlowPackets fetchRecent: fetchRecentFlowPackets
@ -121,7 +161,7 @@ const getGenericConfig = (): {
"classifier-hits": { "classifier-hits": {
redisKey: "live:classifier-hits", redisKey: "live:classifier-hits",
cursorField: "classifier-hits", cursorField: "classifier-hits",
limit: GENERIC_LIMITS["classifier-hits"], limit: limits["classifier-hits"],
parse: (value) => ClassifierHitEventSchema.parse(value), parse: (value) => ClassifierHitEventSchema.parse(value),
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
fetchRecent: fetchRecentClassifierHits fetchRecent: fetchRecentClassifierHits
@ -129,7 +169,7 @@ const getGenericConfig = (): {
alerts: { alerts: {
redisKey: "live:alerts", redisKey: "live:alerts",
cursorField: "alerts", cursorField: "alerts",
limit: GENERIC_LIMITS.alerts, limit: limits.alerts,
parse: (value) => AlertEventSchema.parse(value), parse: (value) => AlertEventSchema.parse(value),
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
fetchRecent: fetchRecentAlerts fetchRecent: fetchRecentAlerts
@ -137,7 +177,7 @@ const getGenericConfig = (): {
"inferred-dark": { "inferred-dark": {
redisKey: "live:inferred-dark", redisKey: "live:inferred-dark",
cursorField: "inferred-dark", cursorField: "inferred-dark",
limit: GENERIC_LIMITS["inferred-dark"], limit: limits["inferred-dark"],
parse: (value) => InferredDarkEventSchema.parse(value), parse: (value) => InferredDarkEventSchema.parse(value),
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
fetchRecent: fetchRecentInferredDark fetchRecent: fetchRecentInferredDark
@ -171,18 +211,43 @@ const overlayRedisKey = (underlyingId: string): string => `live:equity-overlay:$
const overlayCursorField = (underlyingId: string): string => `equities:${underlyingId}`; const overlayCursorField = (underlyingId: string): string => `equities:${underlyingId}`;
export class LiveStateManager { export class LiveStateManager {
private readonly generic = getGenericConfig(); private readonly generic: {
[K in LiveGenericChannel]: GenericFeedConfig;
};
private readonly genericItems = new Map<LiveGenericChannel, any[]>(); private readonly genericItems = new Map<LiveGenericChannel, any[]>();
private readonly genericCursors = new Map<string, Cursor | null>(); private readonly genericCursors = new Map<string, Cursor | null>();
private readonly candleItems = new Map<string, EquityCandle[]>(); private readonly candleItems = new Map<string, EquityCandle[]>();
private readonly candleCursors = new Map<string, Cursor | null>(); private readonly candleCursors = new Map<string, Cursor | null>();
private readonly overlayItems = new Map<string, EquityPrint[]>(); private readonly overlayItems = new Map<string, EquityPrint[]>();
private readonly overlayCursors = new Map<string, Cursor | null>(); private readonly overlayCursors = new Map<string, Cursor | null>();
private readonly stats = {
genericHydrateFromRedis: 0,
genericHydrateFromClickHouse: 0,
trimOperations: 0,
cacheDepthByKey: new Map<string, number>()
};
constructor( constructor(
private readonly clickhouse: ClickHouseClient, private readonly clickhouse: ClickHouseClient,
private readonly redis: RedisLike | null private readonly redis: RedisLike | null,
) {} limits: GenericLiveLimits = resolveGenericLiveLimits()
) {
this.generic = getGenericConfig(limits);
}
getStatsSnapshot(): {
genericHydrateFromRedis: number;
genericHydrateFromClickHouse: number;
trimOperations: number;
cacheDepthByKey: Record<string, number>;
} {
return {
genericHydrateFromRedis: this.stats.genericHydrateFromRedis,
genericHydrateFromClickHouse: this.stats.genericHydrateFromClickHouse,
trimOperations: this.stats.trimOperations,
cacheDepthByKey: Object.fromEntries(this.stats.cacheDepthByKey)
};
}
async hydrate(): Promise<void> { async hydrate(): Promise<void> {
const channels = Object.keys(this.generic) as LiveGenericChannel[]; const channels = Object.keys(this.generic) as LiveGenericChannel[];
@ -196,12 +261,16 @@ export class LiveStateManager {
const cached = parseJsonList(payloads, config.parse); const cached = parseJsonList(payloads, config.parse);
if (cached.length > 0) { if (cached.length > 0) {
this.genericItems.set(channel, cached); this.genericItems.set(channel, cached);
this.stats.genericHydrateFromRedis += 1;
this.stats.cacheDepthByKey.set(config.redisKey, cached.length);
this.genericCursors.set(config.cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, config.cursorField))); this.genericCursors.set(config.cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, config.cursorField)));
return; return;
} }
} }
const fresh = await config.fetchRecent(this.clickhouse, config.limit); const fresh = await config.fetchRecent(this.clickhouse, config.limit);
this.stats.genericHydrateFromClickHouse += 1;
this.stats.cacheDepthByKey.set(config.redisKey, fresh.length);
this.genericItems.set(channel, fresh); this.genericItems.set(channel, fresh);
const watermark = fresh[0] ? config.cursor(fresh[0]) : null; const watermark = fresh[0] ? config.cursor(fresh[0]) : null;
this.genericCursors.set(config.cursorField, watermark); this.genericCursors.set(config.cursorField, watermark);
@ -262,6 +331,7 @@ export class LiveStateManager {
.sort((a, b) => (b.ts - a.ts) || (b.seq - a.seq)) .sort((a, b) => (b.ts - a.ts) || (b.seq - a.seq))
.slice(0, CHART_LIMITS.candles); .slice(0, CHART_LIMITS.candles);
this.candleItems.set(key, next); this.candleItems.set(key, next);
this.stats.cacheDepthByKey.set(key, next.length);
const cursor = { ts: candle.ts, seq: candle.seq }; const cursor = { ts: candle.ts, seq: candle.seq };
this.candleCursors.set(cursorField, cursor); this.candleCursors.set(cursorField, cursor);
await this.persistList(key, cursorField, next, CHART_LIMITS.candles, cursor); await this.persistList(key, cursorField, next, CHART_LIMITS.candles, cursor);
@ -276,6 +346,7 @@ export class LiveStateManager {
.sort((a, b) => (b.ts - a.ts) || (b.seq - a.seq)) .sort((a, b) => (b.ts - a.ts) || (b.seq - a.seq))
.slice(0, CHART_LIMITS.overlay); .slice(0, CHART_LIMITS.overlay);
this.overlayItems.set(key, next); this.overlayItems.set(key, next);
this.stats.cacheDepthByKey.set(key, next.length);
const cursor = { ts: print.ts, seq: print.seq }; const cursor = { ts: print.ts, seq: print.seq };
this.overlayCursors.set(cursorField, cursor); this.overlayCursors.set(cursorField, cursor);
await this.persistList(key, cursorField, next, CHART_LIMITS.overlay, cursor); await this.persistList(key, cursorField, next, CHART_LIMITS.overlay, cursor);
@ -293,6 +364,7 @@ export class LiveStateManager {
}) })
.slice(0, config.limit); .slice(0, config.limit);
this.genericItems.set(channel, next); this.genericItems.set(channel, next);
this.stats.cacheDepthByKey.set(config.redisKey, next.length);
const cursor = config.cursor(parsed); const cursor = config.cursor(parsed);
this.genericCursors.set(config.cursorField, cursor); this.genericCursors.set(config.cursorField, cursor);
await this.persistList(config.redisKey, config.cursorField, next, config.limit, cursor); await this.persistList(config.redisKey, config.cursorField, next, config.limit, cursor);
@ -309,6 +381,7 @@ export class LiveStateManager {
const cached = parseJsonList(payloads, (value) => EquityCandleSchema.parse(value)); const cached = parseJsonList(payloads, (value) => EquityCandleSchema.parse(value));
if (cached.length > 0) { if (cached.length > 0) {
this.candleItems.set(key, cached); this.candleItems.set(key, cached);
this.stats.cacheDepthByKey.set(key, cached.length);
this.candleCursors.set(cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, cursorField))); this.candleCursors.set(cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, cursorField)));
return; return;
} }
@ -316,6 +389,7 @@ export class LiveStateManager {
const fresh = await fetchRecentEquityCandles(this.clickhouse, underlyingId, intervalMs, CHART_LIMITS.candles); const fresh = await fetchRecentEquityCandles(this.clickhouse, underlyingId, intervalMs, CHART_LIMITS.candles);
this.candleItems.set(key, fresh); this.candleItems.set(key, fresh);
this.stats.cacheDepthByKey.set(key, fresh.length);
const watermark = fresh[0] ? { ts: fresh[0].ts, seq: fresh[0].seq } : null; const watermark = fresh[0] ? { ts: fresh[0].ts, seq: fresh[0].seq } : null;
this.candleCursors.set(cursorField, watermark); this.candleCursors.set(cursorField, watermark);
await this.persistList(key, cursorField, fresh, CHART_LIMITS.candles, watermark); await this.persistList(key, cursorField, fresh, CHART_LIMITS.candles, watermark);
@ -329,6 +403,7 @@ export class LiveStateManager {
const cached = parseJsonList(payloads, (value) => EquityPrintSchema.parse(value)); const cached = parseJsonList(payloads, (value) => EquityPrintSchema.parse(value));
if (cached.length > 0) { if (cached.length > 0) {
this.overlayItems.set(key, cached); this.overlayItems.set(key, cached);
this.stats.cacheDepthByKey.set(key, cached.length);
this.overlayCursors.set(cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, cursorField))); this.overlayCursors.set(cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, cursorField)));
return; return;
} }
@ -338,6 +413,7 @@ export class LiveStateManager {
(item) => item.underlying_id === underlyingId (item) => item.underlying_id === underlyingId
); );
this.overlayItems.set(key, fresh); this.overlayItems.set(key, fresh);
this.stats.cacheDepthByKey.set(key, fresh.length);
const watermark = fresh[0] ? { ts: fresh[0].ts, seq: fresh[0].seq } : null; const watermark = fresh[0] ? { ts: fresh[0].ts, seq: fresh[0].seq } : null;
this.overlayCursors.set(cursorField, watermark); this.overlayCursors.set(cursorField, watermark);
await this.persistList(key, cursorField, fresh, CHART_LIMITS.overlay, watermark); await this.persistList(key, cursorField, fresh, CHART_LIMITS.overlay, watermark);
@ -356,6 +432,7 @@ export class LiveStateManager {
const payloads = items.map((item) => JSON.stringify(item)); const payloads = items.map((item) => JSON.stringify(item));
await this.redis.lTrim(listKey, 1, 0); await this.redis.lTrim(listKey, 1, 0);
this.stats.trimOperations += 1;
if (payloads.length > 0) { if (payloads.length > 0) {
for (let idx = payloads.length - 1; idx >= 0; idx -= 1) { for (let idx = payloads.length - 1; idx >= 0; idx -= 1) {
const payload = payloads[idx]; const payload = payloads[idx];
@ -364,7 +441,9 @@ export class LiveStateManager {
} }
} }
await this.redis.lTrim(listKey, 0, limit - 1); await this.redis.lTrim(listKey, 0, limit - 1);
this.stats.trimOperations += 1;
} }
this.stats.cacheDepthByKey.set(listKey, Math.min(items.length, limit));
await this.redis.hSet(CURSOR_HASH_KEY, cursorField, JSON.stringify(cursor)); await this.redis.hSet(CURSOR_HASH_KEY, cursorField, JSON.stringify(cursor));
} }
} }

View file

@ -1,6 +1,6 @@
import { describe, expect, it } from "bun:test"; import { describe, expect, it } from "bun:test";
import type { ClickHouseClient } from "@islandflow/storage"; import type { ClickHouseClient } from "@islandflow/storage";
import { LiveStateManager } from "../src/live"; import { LiveStateManager, resolveGenericLiveLimits } from "../src/live";
const makeClickHouse = (): ClickHouseClient => const makeClickHouse = (): ClickHouseClient =>
({ ({
@ -48,6 +48,19 @@ const makeRedis = () => {
}; };
describe("LiveStateManager", () => { describe("LiveStateManager", () => {
it("resolves live limits from env with clamping", () => {
const limits = resolveGenericLiveLimits({
LIVE_LIMIT_OPTIONS: "777",
LIVE_LIMIT_NBBO: "200000",
LIVE_LIMIT_FLOW: "bad"
} as NodeJS.ProcessEnv);
expect(limits.options).toBe(777);
expect(limits.nbbo).toBe(100000);
expect(limits.flow).toBe(10000);
expect(limits.alerts).toBe(10000);
});
it("hydrates snapshots from redis generic windows", async () => { it("hydrates snapshots from redis generic windows", async () => {
const redis = makeRedis(); const redis = makeRedis();
await redis.lPush( await redis.lPush(
@ -120,4 +133,67 @@ describe("LiveStateManager", () => {
expect(candleSnapshot.watermark).toEqual({ ts: 100, seq: 1 }); expect(candleSnapshot.watermark).toEqual({ ts: 100, seq: 1 });
expect(overlaySnapshot.watermark).toEqual({ ts: 110, seq: 2 }); expect(overlaySnapshot.watermark).toEqual({ ts: 110, seq: 2 });
}); });
it("trims generic windows to configured per-channel limits", async () => {
const redis = makeRedis();
const manager = new LiveStateManager(
makeClickHouse(),
redis as never,
{
options: 10000,
nbbo: 10000,
equities: 10000,
"equity-joins": 10000,
flow: 2,
"classifier-hits": 10000,
alerts: 10000,
"inferred-dark": 10000
}
);
await manager.ingest("flow", {
source_ts: 100,
ingest_ts: 101,
seq: 1,
trace_id: "flow-1",
id: "flow-1",
members: ["a"],
features: {},
join_quality: {}
});
await manager.ingest("flow", {
source_ts: 110,
ingest_ts: 111,
seq: 2,
trace_id: "flow-2",
id: "flow-2",
members: ["b"],
features: {},
join_quality: {}
});
await manager.ingest("flow", {
source_ts: 120,
ingest_ts: 121,
seq: 3,
trace_id: "flow-3",
id: "flow-3",
members: ["c"],
features: {},
join_quality: {}
});
const snapshot = await manager.getSnapshot({ channel: "flow" });
expect(snapshot.items).toHaveLength(2);
expect((snapshot.items as Array<{ id: string }>).map((item) => item.id)).toEqual([
"flow-3",
"flow-2"
]);
const persisted = await redis.lRange("live:flow", 0, 99);
expect(persisted).toHaveLength(2);
const stats = manager.getStatsSnapshot();
expect(stats.trimOperations).toBeGreaterThan(0);
expect(stats.cacheDepthByKey["live:flow"]).toBe(2);
});
}); });