Add bounded live retention for UI and API caches

- Introduce configurable hot-window and pinned evidence retention
- Keep live evidence available after eviction with fetch-on-miss hydration
- Add tests and docs for the new retention settings
This commit is contained in:
dirtydishes 2026-04-27 14:37:52 -04:00
parent 32aae200c3
commit a45d5c85f6
7 changed files with 769 additions and 537 deletions

View file

@ -57,6 +57,9 @@ COMPUTE_DELIVER_POLICY=new
COMPUTE_CONSUMER_RESET=false COMPUTE_CONSUMER_RESET=false
NBBO_MAX_AGE_MS=1000 NBBO_MAX_AGE_MS=1000
NEXT_PUBLIC_NBBO_MAX_AGE_MS=1000 NEXT_PUBLIC_NBBO_MAX_AGE_MS=1000
NEXT_PUBLIC_LIVE_HOT_WINDOW=2000
NEXT_PUBLIC_PINNED_EVIDENCE_TTL_MS=1200000
NEXT_PUBLIC_PINNED_EVIDENCE_MAX_ITEMS=4000
ROLLING_WINDOW_SIZE=50 ROLLING_WINDOW_SIZE=50
ROLLING_TTL_SEC=86400 ROLLING_TTL_SEC=86400
CLASSIFIER_SWEEP_MIN_PREMIUM=40000 CLASSIFIER_SWEEP_MIN_PREMIUM=40000
@ -81,3 +84,13 @@ REPLAY_END_TS=0
REPLAY_SPEED=1 REPLAY_SPEED=1
REPLAY_BATCH_SIZE=200 REPLAY_BATCH_SIZE=200
REPLAY_LOG_EVERY=1000 REPLAY_LOG_EVERY=1000
# API live retention (generic channels)
LIVE_LIMIT_OPTIONS=10000
LIVE_LIMIT_NBBO=10000
LIVE_LIMIT_EQUITIES=10000
LIVE_LIMIT_EQUITY_JOINS=10000
LIVE_LIMIT_FLOW=10000
LIVE_LIMIT_CLASSIFIER_HITS=10000
LIVE_LIMIT_ALERTS=10000
LIVE_LIMIT_INFERRED_DARK=10000

View file

@ -1,428 +0,0 @@
# Detailed Agent Instructions for Beads Development
**For project overview and quick start, see [AGENTS.md](AGENTS.md)**
This document contains detailed operational instructions for AI agents working on beads development, testing, and releases.
## Development Guidelines
### Code Standards
- **Go version**: 1.24+
- **Linting**: `golangci-lint run ./...` (baseline warnings documented in [docs/LINTING.md](docs/LINTING.md))
- **Testing**: All new features need tests (`make test` for local baseline, `make test-full-cgo` when validating full CGO paths)
- **Documentation**: Update relevant .md files
### File Organization
```
beads/
├── cmd/bd/ # CLI commands
├── internal/
│ ├── types/ # Core data types
│ └── storage/ # Storage layer
│ └── dolt/ # Dolt implementation
├── examples/ # Integration examples
└── *.md # Documentation
```
### Testing Workflow
**IMPORTANT:** Never pollute the production database with test issues!
**For manual testing**, use the `BEADS_DB` environment variable to point to a temporary database:
```bash
# Create test issues in isolated database
BEADS_DB=/tmp/test.db bd init --quiet --prefix test
BEADS_DB=/tmp/test.db bd create "Test issue" -p 1
# Or for quick testing
BEADS_DB=/tmp/test.db bd create "Test feature" -p 1
```
**For automated tests**, use `t.TempDir()` in Go tests:
```go
func TestMyFeature(t *testing.T) {
tmpDir := t.TempDir()
testDB := filepath.Join(tmpDir, ".beads", "beads.db")
s := newTestStore(t, testDB)
// ... test code
}
```
**Git test isolation:** For tests that create temporary git repos, force repo-local hooks:
```bash
git config core.hooksPath .git/hooks
```
Do not rely on the developer's global git config. Global `core.hooksPath` can leak
into temp repos and produce flaky test behavior.
**Warning:** bd will warn you when creating issues with "Test" prefix in the production database. Always use `BEADS_DB` for manual testing.
### Before Committing
1. **Run tests**: `make test` (or `./scripts/test.sh`)
- For full CGO validation: `make test-full-cgo`
2. **Run linter**: `golangci-lint run ./...` (ignore baseline warnings)
3. **Update docs**: If you changed behavior, update README.md or other docs
4. **Commit**: With git hooks installed (`bd hooks install`), Dolt changes are auto-committed
### Commit Message Convention
When committing work for an issue, include the issue ID in parentheses at the end:
```bash
git commit -m "Fix auth validation bug (bd-abc)"
git commit -m "Add retry logic for database locks (bd-xyz)"
```
This enables `bd doctor` to detect **orphaned issues** - work that was committed but the issue wasn't closed. The doctor check cross-references open issues against git history to find these orphans.
### Git Workflow
bd uses **Dolt** as its primary database. Changes are committed to Dolt history automatically (one Dolt commit per write command).
**Install git hooks** for automatic sync:
```bash
bd hooks install
```
### Git Integration
**Dolt sync**: Dolt handles sync natively via `bd dolt push` / `bd dolt pull`. No JSONL export/import needed.
**Protected branches**: Dolt stores data under `refs/dolt/data`, separate from standard Git refs. See [docs/PROTECTED_BRANCHES.md](docs/PROTECTED_BRANCHES.md).
**Git worktrees**: Work directly with Dolt — no special flags needed. See [docs/ADVANCED.md](docs/ADVANCED.md).
**Merge conflicts**: Rare with hash IDs. Dolt uses cell-level 3-way merge for conflict resolution.
## Git Workflow: Push to Main, Never PR
Crew workers push directly to main. **Never create pull requests.**
- `git push` to main is the only way to land work
- `gh pr create` is forbidden — PRs are for external contributors, not crew
- Do not create feature branches for your own work — commit and push to main
- When handling external PRs, use fix-merge: checkout the PR branch locally,
fix/rebase onto main, merge locally, `git push`, then close the PR
This is enforced by pre-use hooks. If you try `gh pr create`, it will be blocked.
## Landing the Plane
**When the user says "let's land the plane"**, you MUST complete ALL steps below. The plane is NOT landed until `git push` succeeds. NEVER stop before pushing. NEVER say "ready to push when you are!" - that is a FAILURE.
**MANDATORY WORKFLOW - COMPLETE ALL STEPS:**
1. **File beads issues for any remaining work** that needs follow-up
2. **Ensure all quality gates pass** (only if code changes were made):
- Run `make lint` or `golangci-lint run ./...` (if pre-commit installed: `pre-commit run --all-files`)
- Run `make test` (and `make test-full-cgo` when CGO-relevant code changed)
- File P0 issues if quality gates are broken
3. **Update beads issues** - close finished work, update status
4. **PUSH TO REMOTE - NON-NEGOTIABLE** - This step is MANDATORY. Execute ALL commands below:
```bash
# Pull first to catch any remote changes
git pull --rebase
# MANDATORY: Push everything to remote
# DO NOT STOP BEFORE THIS COMMAND COMPLETES
git push
# MANDATORY: Verify push succeeded
git status # MUST show "up to date with origin/main"
```
**CRITICAL RULES:**
- The plane has NOT landed until `git push` completes successfully
- NEVER stop before `git push` - that leaves work stranded locally
- NEVER say "ready to push when you are!" - YOU must push, not the user
- If `git push` fails, resolve the issue and retry until it succeeds
- The user is managing multiple agents - unpushed work breaks their coordination workflow
5. **Clean up git state** - Clear old stashes and prune dead remote branches:
```bash
git stash clear # Remove old stashes
git remote prune origin # Clean up deleted remote branches
```
6. **Verify clean state** - Ensure all changes are committed AND PUSHED, no untracked files remain
7. **Choose a follow-up issue for next session**
- Provide a prompt for the user to give to you in the next session
- Format: "Continue work on bd-X: [issue title]. [Brief context about what's been done and what's next]"
**REMEMBER: Landing the plane means EVERYTHING is pushed to remote. No exceptions. No "ready when you are". PUSH IT.**
**Example "land the plane" session:**
```bash
# 1. File remaining work
bd create "Add integration tests for sync" -t task -p 2 --json
# 2. Run quality gates (only if code changes were made)
go test -short ./...
golangci-lint run ./...
# 3. Close finished issues
bd close bd-42 bd-43 --reason "Completed" --json
# 4. PUSH TO REMOTE - MANDATORY, NO STOPPING BEFORE THIS IS DONE
git pull --rebase
git push # MANDATORY - THE PLANE IS STILL IN THE AIR UNTIL THIS SUCCEEDS
git status # MUST verify "up to date with origin/main"
# 5. Clean up git state
git stash clear
git remote prune origin
# 6. Verify everything is clean and pushed
git status
# 7. Choose next work
bd ready --json
bd show bd-44 --json
```
**Then provide the user with:**
- Summary of what was completed this session
- What issues were filed for follow-up
- Status of quality gates (all passing / issues filed)
- Confirmation that ALL changes have been pushed to remote
- Recommended prompt for next session
**CRITICAL: Never end a "land the plane" session without successfully pushing. The user is coordinating multiple agents and unpushed work causes severe rebase conflicts.**
## Agent Session Workflow
**WARNING: DO NOT use `bd edit`** - it opens an interactive editor ($EDITOR) which AI agents cannot use. Use `bd update` with flags instead:
```bash
bd update <id> --description "new description"
bd update <id> --title "new title"
bd update <id> --design "design notes"
bd update <id> --notes "additional notes"
bd update <id> --acceptance "acceptance criteria"
```
**Use stdin for descriptions with special characters** (backticks, `!`, nested quotes):
```bash
# Pipe via stdin to avoid shell escaping issues
echo 'Description with `backticks` and "quotes"' | bd create "Title" --stdin
echo 'Updated description with $variables' | bd update <id> --description=-
# Or use --body-file for longer content
bd create "Title" --body-file=description.md
```
**Example agent session:**
```bash
# Make changes (each write auto-commits to Dolt)
bd create "Fix bug" -p 1
bd create "Add tests" -p 1
bd update bd-42 --claim
bd close bd-40 --reason "Completed"
# Push Dolt data to remote if configured
bd dolt push
# Now safe to end session
```
This installs:
- **pre-commit** — Commits pending Dolt changes
- **post-merge** — Pulls remote Dolt changes after git merge
**Note:** Hooks are embedded in the bd binary and work for all bd users (not just source repo users).
## Common Development Tasks
### CLI Design Principles
**Minimize cognitive overload.** Every new command, flag, or option adds cognitive burden for users. Before adding anything:
1. **Recovery/fix operations → `bd doctor --fix`**: Don't create separate commands like `bd recover` or `bd repair`. Doctor already detects problems - let `--fix` handle remediation. This keeps all health-related operations in one discoverable place.
For git hook marker migration specifically: use `bd migrate hooks --dry-run` to preview operations, and `bd doctor --fix` for the standard apply path.
2. **Prefer flags on existing commands**: Before creating a new command, ask: "Can this be a flag on an existing command?" Example: `bd list --stale` instead of `bd stale`.
3. **Consolidate related operations**: Related operations should live together. Version control uses `bd vc {log,diff,commit}`, not separate top-level commands.
4. **Count the commands**: Run `bd --help` and count. If we're approaching 30+ commands, we have a discoverability problem. Consider subcommand grouping.
5. **New commands need strong justification**: A new command should represent a fundamentally different operation, not just a convenience wrapper.
### Adding a New Command
1. Create file in `cmd/bd/`
2. Add to root command in `cmd/bd/main.go`
3. Implement with Cobra framework
4. Add `--json` flag for agent use
5. Add tests in `cmd/bd/*_test.go`
6. Document in README.md
### Adding Storage Features
1. Add Dolt SQL schema changes in `internal/storage/dolt/`
2. Add migration if needed
3. Update `internal/types/types.go` if new types
4. Implement in `internal/storage/dolt/` (queries, issues, etc.)
5. Add tests
6. Update export/import in `cmd/bd/export.go` and `cmd/bd/import.go`
### Adding Examples
1. Create directory in `examples/`
2. Add README.md explaining the example
3. Include working code
4. Link from `examples/README.md`
5. Mention in main README.md
## Building and Testing
```bash
# Build and install bd to ~/.local/bin (the canonical location)
make install
# Test (local baseline)
make test
# Test with full CGO-enabled suite (local/CI parity)
make test-full-cgo
# Coverage run
go test -coverprofile=coverage.out ./...
go tool cover -html=coverage.out
# Verify installed binary
bd init --prefix test
bd create "Test issue" -p 1
bd ready
```
> **WARNING**: Do NOT use `go build -o bd ./cmd/bd` or `go install ./cmd/bd`.
> These create stale binaries in the working directory or `~/go/bin/` that
> shadow the canonical install at `~/.local/bin/bd`. Always use `make install`.
## Version Management
**IMPORTANT**: When the user asks to "bump the version" or mentions a new version number (e.g., "bump to 0.9.3"), use the version bump script:
```bash
# Preview changes (shows diff, doesn't commit)
./scripts/bump-version.sh 0.9.3
# Auto-commit the version bump
./scripts/bump-version.sh 0.9.3 --commit
git push origin main
```
**What it does:**
- Updates ALL version files (CLI, plugin, MCP server, docs) in one command
- Validates semantic versioning format
- Shows diff preview
- Verifies all versions match after update
- Creates standardized commit message
**User will typically say:**
- "Bump to 0.9.3"
- "Update version to 1.0.0"
- "Rev the project to 0.9.4"
- "Increment the version"
**You should:**
1. Run `./scripts/bump-version.sh <version> --commit`
2. Push to GitHub
3. Confirm all versions updated correctly
**Files updated automatically:**
- `cmd/bd/version.go` - CLI version
- `claude-plugin/.claude-plugin/plugin.json` - Plugin version
- `.claude-plugin/marketplace.json` - Marketplace version
- `integrations/beads-mcp/pyproject.toml` - MCP server version
- `README.md` - Documentation version
- `PLUGIN.md` - Version requirements
**Why this matters:** We had version mismatches (bd-66) when only `version.go` was updated. This script prevents that by updating all components atomically.
See `scripts/README.md` for more details.
## Release Process (Maintainers)
**Automated (Recommended):**
```bash
# One command to do everything (version bump, tests, tag, Homebrew update, local install)
./scripts/release.sh 0.9.3
```
This handles the entire release workflow automatically, including waiting ~5 minutes for GitHub Actions to build release artifacts. See [scripts/README.md](scripts/README.md) for details.
**Manual (Step-by-Step):**
1. Bump version: `./scripts/bump-version.sh <version> --commit`
2. Update CHANGELOG.md with release notes
3. Run tests: `make test` (and `make test-full-cgo` for CGO-related changes)
4. Push version bump: `git push origin main`
5. Tag release: `git tag v<version> && git push origin v<version>`
6. Update Homebrew: `./scripts/update-homebrew.sh <version>` (waits for GitHub Actions)
7. Verify: `brew update && brew upgrade beads && bd version`
See [docs/RELEASING.md](docs/RELEASING.md) for complete manual instructions.
## Checking GitHub Issues and PRs
**IMPORTANT**: When asked to check GitHub issues or PRs, use command-line tools like `gh` instead of browser/playwright tools.
**Preferred approach:**
```bash
# List open issues with details
gh issue list --limit 30
# List open PRs
gh pr list --limit 30
# View specific issue
gh issue view 201
```
**Then provide an in-conversation summary** highlighting:
- Urgent/critical issues (regressions, bugs, broken builds)
- Common themes or patterns
- Feature requests with high engagement
- Items that need immediate attention
**Why this matters:**
- Browser tools consume more tokens and are slower
- CLI summaries are easier to scan and discuss
- Keeps the conversation focused and efficient
- Better for quick triage and prioritization
**Do NOT use:** `browser_navigate`, `browser_snapshot`, or other playwright tools for GitHub PR/issue reviews unless specifically requested by the user.
## Questions?
- Check existing issues: `bd list`
- Look at recent commits: `git log --oneline -20`
- Read the docs: README.md, ADVANCED.md, EXTENDING.md
- Create an issue if unsure: `bd create "Question: ..." -t task -p 2`
## Important Files
- **README.md** - Main documentation (keep this updated!)
- **EXTENDING.md** - Database extension guide
- **ADVANCED.md** - Advanced features (rename, merge, compaction)
- **CONTRIBUTING.md** - Contribution guidelines
- **SECURITY.md** - Security policy

View file

@ -149,6 +149,13 @@ All runtime configuration comes from `.env`.
### API ### API
- `API_PORT`, `REST_DEFAULT_LIMIT` - `API_PORT`, `REST_DEFAULT_LIMIT`
- `LIVE_LIMIT_OPTIONS`, `LIVE_LIMIT_NBBO`, `LIVE_LIMIT_EQUITIES`, `LIVE_LIMIT_EQUITY_JOINS`, `LIVE_LIMIT_FLOW`, `LIVE_LIMIT_CLASSIFIER_HITS`, `LIVE_LIMIT_ALERTS`, `LIVE_LIMIT_INFERRED_DARK` (bounded live generic cache depths; defaults `10000`, max `100000`)
### Web live retention
- `NEXT_PUBLIC_LIVE_HOT_WINDOW` (frontend hot live window cap; default `2000`)
- `NEXT_PUBLIC_PINNED_EVIDENCE_TTL_MS` (pinned evidence TTL; default `1200000`)
- `NEXT_PUBLIC_PINNED_EVIDENCE_MAX_ITEMS` (pinned evidence cache guardrail; default `4000`)
### Replay service ### Replay service
@ -163,4 +170,8 @@ All runtime configuration comes from `.env`.
- Python dependencies are required only for IBKR/Databento sidecars (`services/ingest-options/py/requirements.txt`). - Python dependencies are required only for IBKR/Databento sidecars (`services/ingest-options/py/requirements.txt`).
- Candle construction is server-side; the client consumes prebuilt OHLC events. - Candle construction is server-side; the client consumes prebuilt OHLC events.
- Live retention uses a two-tier model:
- API/Redis maintain a bounded hot cache per live generic channel.
- UI keeps a bounded hot window for rendering performance.
- Alert/drawer evidence is pinned and hydrated by id/trace so details remain inspectable after hot-window eviction.
- This repository is for personal, non-redistributed usage. - This repository is for personal, non-redistributed usage.

File diff suppressed because it is too large Load diff

View file

@ -678,6 +678,10 @@ const run = async () => {
const liveState = new LiveStateManager(clickhouse, redis); const liveState = new LiveStateManager(clickhouse, redis);
await liveState.hydrate(); await liveState.hydrate();
const liveStateMetricsTimer = setInterval(() => {
const snapshot = liveState.getStatsSnapshot();
logger.info("live cache metrics", snapshot);
}, 60000);
const subscribeWithReset = async <T>( const subscribeWithReset = async <T>(
subject: string, subject: string,
@ -1475,6 +1479,7 @@ const run = async () => {
state.shutdownPromise = (async () => { state.shutdownPromise = (async () => {
logger.info("service stopping", { signal }); logger.info("service stopping", { signal });
server.stop(); server.stop();
clearInterval(liveStateMetricsTimer);
if (redis && redis.isOpen) { if (redis && redis.isOpen) {
try { try {

View file

@ -33,16 +33,19 @@ import type { RedisClientType } from "redis";
const CURSOR_HASH_KEY = "live:cursors"; const CURSOR_HASH_KEY = "live:cursors";
const GENERIC_LIMITS = { const DEFAULT_GENERIC_LIMIT = 10000;
options: 500, const MAX_GENERIC_LIMIT = 100000;
nbbo: 500, const MIN_GENERIC_LIMIT = 1;
equities: 500, const GENERIC_LIMIT_ENV_KEYS: Record<LiveGenericChannel, string> = {
"equity-joins": 500, options: "LIVE_LIMIT_OPTIONS",
flow: 500, nbbo: "LIVE_LIMIT_NBBO",
"classifier-hits": 500, equities: "LIVE_LIMIT_EQUITIES",
alerts: 500, "equity-joins": "LIVE_LIMIT_EQUITY_JOINS",
"inferred-dark": 500 flow: "LIVE_LIMIT_FLOW",
} as const; "classifier-hits": "LIVE_LIMIT_CLASSIFIER_HITS",
alerts: "LIVE_LIMIT_ALERTS",
"inferred-dark": "LIVE_LIMIT_INFERRED_DARK"
};
const CHART_LIMITS = { const CHART_LIMITS = {
candles: 500, candles: 500,
@ -58,6 +61,43 @@ type GenericFeedConfig = {
fetchRecent: (clickhouse: ClickHouseClient, limit: number) => Promise<any[]>; fetchRecent: (clickhouse: ClickHouseClient, limit: number) => Promise<any[]>;
}; };
export type GenericLiveLimits = Record<LiveGenericChannel, number>;
const parseGenericLimit = (
env: NodeJS.ProcessEnv,
channel: LiveGenericChannel,
fallback: number
): number => {
const key = GENERIC_LIMIT_ENV_KEYS[channel];
const raw = env[key];
if (!raw || raw.trim().length === 0) {
return fallback;
}
const parsed = Number(raw);
if (!Number.isFinite(parsed)) {
console.warn(`Invalid ${key}="${raw}", using ${fallback}`);
return fallback;
}
const bounded = Math.max(MIN_GENERIC_LIMIT, Math.min(MAX_GENERIC_LIMIT, Math.floor(parsed)));
if (bounded !== parsed) {
console.warn(`Clamped ${key} from ${parsed} to ${bounded}`);
}
return bounded;
};
export const resolveGenericLiveLimits = (env: NodeJS.ProcessEnv = process.env): GenericLiveLimits => ({
options: parseGenericLimit(env, "options", DEFAULT_GENERIC_LIMIT),
nbbo: parseGenericLimit(env, "nbbo", DEFAULT_GENERIC_LIMIT),
equities: parseGenericLimit(env, "equities", DEFAULT_GENERIC_LIMIT),
"equity-joins": parseGenericLimit(env, "equity-joins", DEFAULT_GENERIC_LIMIT),
flow: parseGenericLimit(env, "flow", DEFAULT_GENERIC_LIMIT),
"classifier-hits": parseGenericLimit(env, "classifier-hits", DEFAULT_GENERIC_LIMIT),
alerts: parseGenericLimit(env, "alerts", DEFAULT_GENERIC_LIMIT),
"inferred-dark": parseGenericLimit(env, "inferred-dark", DEFAULT_GENERIC_LIMIT)
});
type RedisLike = Pick< type RedisLike = Pick<
RedisClientType, RedisClientType,
"isOpen" | "lRange" | "lPush" | "lTrim" | "hGet" | "hSet" "isOpen" | "lRange" | "lPush" | "lTrim" | "hGet" | "hSet"
@ -75,13 +115,13 @@ const parseCursor = (value: string | null): Cursor | null => {
} }
}; };
const getGenericConfig = (): { const getGenericConfig = (limits: GenericLiveLimits): {
[K in LiveGenericChannel]: GenericFeedConfig; [K in LiveGenericChannel]: GenericFeedConfig;
} => ({ } => ({
options: { options: {
redisKey: "live:options", redisKey: "live:options",
cursorField: "options", cursorField: "options",
limit: GENERIC_LIMITS.options, limit: limits.options,
parse: (value) => OptionPrintSchema.parse(value), parse: (value) => OptionPrintSchema.parse(value),
cursor: (item) => ({ ts: item.ts, seq: item.seq }), cursor: (item) => ({ ts: item.ts, seq: item.seq }),
fetchRecent: fetchRecentOptionPrints fetchRecent: fetchRecentOptionPrints
@ -89,7 +129,7 @@ const getGenericConfig = (): {
nbbo: { nbbo: {
redisKey: "live:nbbo", redisKey: "live:nbbo",
cursorField: "nbbo", cursorField: "nbbo",
limit: GENERIC_LIMITS.nbbo, limit: limits.nbbo,
parse: (value) => OptionNBBOSchema.parse(value), parse: (value) => OptionNBBOSchema.parse(value),
cursor: (item) => ({ ts: item.ts, seq: item.seq }), cursor: (item) => ({ ts: item.ts, seq: item.seq }),
fetchRecent: fetchRecentOptionNBBO fetchRecent: fetchRecentOptionNBBO
@ -97,7 +137,7 @@ const getGenericConfig = (): {
equities: { equities: {
redisKey: "live:equities", redisKey: "live:equities",
cursorField: "equities", cursorField: "equities",
limit: GENERIC_LIMITS.equities, limit: limits.equities,
parse: (value) => EquityPrintSchema.parse(value), parse: (value) => EquityPrintSchema.parse(value),
cursor: (item) => ({ ts: item.ts, seq: item.seq }), cursor: (item) => ({ ts: item.ts, seq: item.seq }),
fetchRecent: fetchRecentEquityPrints fetchRecent: fetchRecentEquityPrints
@ -105,7 +145,7 @@ const getGenericConfig = (): {
"equity-joins": { "equity-joins": {
redisKey: "live:equity-joins", redisKey: "live:equity-joins",
cursorField: "equity-joins", cursorField: "equity-joins",
limit: GENERIC_LIMITS["equity-joins"], limit: limits["equity-joins"],
parse: (value) => EquityPrintJoinSchema.parse(value), parse: (value) => EquityPrintJoinSchema.parse(value),
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
fetchRecent: fetchRecentEquityPrintJoins fetchRecent: fetchRecentEquityPrintJoins
@ -113,7 +153,7 @@ const getGenericConfig = (): {
flow: { flow: {
redisKey: "live:flow", redisKey: "live:flow",
cursorField: "flow", cursorField: "flow",
limit: GENERIC_LIMITS.flow, limit: limits.flow,
parse: (value) => FlowPacketSchema.parse(value), parse: (value) => FlowPacketSchema.parse(value),
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
fetchRecent: fetchRecentFlowPackets fetchRecent: fetchRecentFlowPackets
@ -121,7 +161,7 @@ const getGenericConfig = (): {
"classifier-hits": { "classifier-hits": {
redisKey: "live:classifier-hits", redisKey: "live:classifier-hits",
cursorField: "classifier-hits", cursorField: "classifier-hits",
limit: GENERIC_LIMITS["classifier-hits"], limit: limits["classifier-hits"],
parse: (value) => ClassifierHitEventSchema.parse(value), parse: (value) => ClassifierHitEventSchema.parse(value),
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
fetchRecent: fetchRecentClassifierHits fetchRecent: fetchRecentClassifierHits
@ -129,7 +169,7 @@ const getGenericConfig = (): {
alerts: { alerts: {
redisKey: "live:alerts", redisKey: "live:alerts",
cursorField: "alerts", cursorField: "alerts",
limit: GENERIC_LIMITS.alerts, limit: limits.alerts,
parse: (value) => AlertEventSchema.parse(value), parse: (value) => AlertEventSchema.parse(value),
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
fetchRecent: fetchRecentAlerts fetchRecent: fetchRecentAlerts
@ -137,7 +177,7 @@ const getGenericConfig = (): {
"inferred-dark": { "inferred-dark": {
redisKey: "live:inferred-dark", redisKey: "live:inferred-dark",
cursorField: "inferred-dark", cursorField: "inferred-dark",
limit: GENERIC_LIMITS["inferred-dark"], limit: limits["inferred-dark"],
parse: (value) => InferredDarkEventSchema.parse(value), parse: (value) => InferredDarkEventSchema.parse(value),
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
fetchRecent: fetchRecentInferredDark fetchRecent: fetchRecentInferredDark
@ -171,18 +211,43 @@ const overlayRedisKey = (underlyingId: string): string => `live:equity-overlay:$
const overlayCursorField = (underlyingId: string): string => `equities:${underlyingId}`; const overlayCursorField = (underlyingId: string): string => `equities:${underlyingId}`;
export class LiveStateManager { export class LiveStateManager {
private readonly generic = getGenericConfig(); private readonly generic: {
[K in LiveGenericChannel]: GenericFeedConfig;
};
private readonly genericItems = new Map<LiveGenericChannel, any[]>(); private readonly genericItems = new Map<LiveGenericChannel, any[]>();
private readonly genericCursors = new Map<string, Cursor | null>(); private readonly genericCursors = new Map<string, Cursor | null>();
private readonly candleItems = new Map<string, EquityCandle[]>(); private readonly candleItems = new Map<string, EquityCandle[]>();
private readonly candleCursors = new Map<string, Cursor | null>(); private readonly candleCursors = new Map<string, Cursor | null>();
private readonly overlayItems = new Map<string, EquityPrint[]>(); private readonly overlayItems = new Map<string, EquityPrint[]>();
private readonly overlayCursors = new Map<string, Cursor | null>(); private readonly overlayCursors = new Map<string, Cursor | null>();
private readonly stats = {
genericHydrateFromRedis: 0,
genericHydrateFromClickHouse: 0,
trimOperations: 0,
cacheDepthByKey: new Map<string, number>()
};
constructor( constructor(
private readonly clickhouse: ClickHouseClient, private readonly clickhouse: ClickHouseClient,
private readonly redis: RedisLike | null private readonly redis: RedisLike | null,
) {} limits: GenericLiveLimits = resolveGenericLiveLimits()
) {
this.generic = getGenericConfig(limits);
}
getStatsSnapshot(): {
genericHydrateFromRedis: number;
genericHydrateFromClickHouse: number;
trimOperations: number;
cacheDepthByKey: Record<string, number>;
} {
return {
genericHydrateFromRedis: this.stats.genericHydrateFromRedis,
genericHydrateFromClickHouse: this.stats.genericHydrateFromClickHouse,
trimOperations: this.stats.trimOperations,
cacheDepthByKey: Object.fromEntries(this.stats.cacheDepthByKey)
};
}
async hydrate(): Promise<void> { async hydrate(): Promise<void> {
const channels = Object.keys(this.generic) as LiveGenericChannel[]; const channels = Object.keys(this.generic) as LiveGenericChannel[];
@ -196,12 +261,16 @@ export class LiveStateManager {
const cached = parseJsonList(payloads, config.parse); const cached = parseJsonList(payloads, config.parse);
if (cached.length > 0) { if (cached.length > 0) {
this.genericItems.set(channel, cached); this.genericItems.set(channel, cached);
this.stats.genericHydrateFromRedis += 1;
this.stats.cacheDepthByKey.set(config.redisKey, cached.length);
this.genericCursors.set(config.cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, config.cursorField))); this.genericCursors.set(config.cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, config.cursorField)));
return; return;
} }
} }
const fresh = await config.fetchRecent(this.clickhouse, config.limit); const fresh = await config.fetchRecent(this.clickhouse, config.limit);
this.stats.genericHydrateFromClickHouse += 1;
this.stats.cacheDepthByKey.set(config.redisKey, fresh.length);
this.genericItems.set(channel, fresh); this.genericItems.set(channel, fresh);
const watermark = fresh[0] ? config.cursor(fresh[0]) : null; const watermark = fresh[0] ? config.cursor(fresh[0]) : null;
this.genericCursors.set(config.cursorField, watermark); this.genericCursors.set(config.cursorField, watermark);
@ -262,6 +331,7 @@ export class LiveStateManager {
.sort((a, b) => (b.ts - a.ts) || (b.seq - a.seq)) .sort((a, b) => (b.ts - a.ts) || (b.seq - a.seq))
.slice(0, CHART_LIMITS.candles); .slice(0, CHART_LIMITS.candles);
this.candleItems.set(key, next); this.candleItems.set(key, next);
this.stats.cacheDepthByKey.set(key, next.length);
const cursor = { ts: candle.ts, seq: candle.seq }; const cursor = { ts: candle.ts, seq: candle.seq };
this.candleCursors.set(cursorField, cursor); this.candleCursors.set(cursorField, cursor);
await this.persistList(key, cursorField, next, CHART_LIMITS.candles, cursor); await this.persistList(key, cursorField, next, CHART_LIMITS.candles, cursor);
@ -276,6 +346,7 @@ export class LiveStateManager {
.sort((a, b) => (b.ts - a.ts) || (b.seq - a.seq)) .sort((a, b) => (b.ts - a.ts) || (b.seq - a.seq))
.slice(0, CHART_LIMITS.overlay); .slice(0, CHART_LIMITS.overlay);
this.overlayItems.set(key, next); this.overlayItems.set(key, next);
this.stats.cacheDepthByKey.set(key, next.length);
const cursor = { ts: print.ts, seq: print.seq }; const cursor = { ts: print.ts, seq: print.seq };
this.overlayCursors.set(cursorField, cursor); this.overlayCursors.set(cursorField, cursor);
await this.persistList(key, cursorField, next, CHART_LIMITS.overlay, cursor); await this.persistList(key, cursorField, next, CHART_LIMITS.overlay, cursor);
@ -293,6 +364,7 @@ export class LiveStateManager {
}) })
.slice(0, config.limit); .slice(0, config.limit);
this.genericItems.set(channel, next); this.genericItems.set(channel, next);
this.stats.cacheDepthByKey.set(config.redisKey, next.length);
const cursor = config.cursor(parsed); const cursor = config.cursor(parsed);
this.genericCursors.set(config.cursorField, cursor); this.genericCursors.set(config.cursorField, cursor);
await this.persistList(config.redisKey, config.cursorField, next, config.limit, cursor); await this.persistList(config.redisKey, config.cursorField, next, config.limit, cursor);
@ -309,6 +381,7 @@ export class LiveStateManager {
const cached = parseJsonList(payloads, (value) => EquityCandleSchema.parse(value)); const cached = parseJsonList(payloads, (value) => EquityCandleSchema.parse(value));
if (cached.length > 0) { if (cached.length > 0) {
this.candleItems.set(key, cached); this.candleItems.set(key, cached);
this.stats.cacheDepthByKey.set(key, cached.length);
this.candleCursors.set(cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, cursorField))); this.candleCursors.set(cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, cursorField)));
return; return;
} }
@ -316,6 +389,7 @@ export class LiveStateManager {
const fresh = await fetchRecentEquityCandles(this.clickhouse, underlyingId, intervalMs, CHART_LIMITS.candles); const fresh = await fetchRecentEquityCandles(this.clickhouse, underlyingId, intervalMs, CHART_LIMITS.candles);
this.candleItems.set(key, fresh); this.candleItems.set(key, fresh);
this.stats.cacheDepthByKey.set(key, fresh.length);
const watermark = fresh[0] ? { ts: fresh[0].ts, seq: fresh[0].seq } : null; const watermark = fresh[0] ? { ts: fresh[0].ts, seq: fresh[0].seq } : null;
this.candleCursors.set(cursorField, watermark); this.candleCursors.set(cursorField, watermark);
await this.persistList(key, cursorField, fresh, CHART_LIMITS.candles, watermark); await this.persistList(key, cursorField, fresh, CHART_LIMITS.candles, watermark);
@ -329,6 +403,7 @@ export class LiveStateManager {
const cached = parseJsonList(payloads, (value) => EquityPrintSchema.parse(value)); const cached = parseJsonList(payloads, (value) => EquityPrintSchema.parse(value));
if (cached.length > 0) { if (cached.length > 0) {
this.overlayItems.set(key, cached); this.overlayItems.set(key, cached);
this.stats.cacheDepthByKey.set(key, cached.length);
this.overlayCursors.set(cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, cursorField))); this.overlayCursors.set(cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, cursorField)));
return; return;
} }
@ -338,6 +413,7 @@ export class LiveStateManager {
(item) => item.underlying_id === underlyingId (item) => item.underlying_id === underlyingId
); );
this.overlayItems.set(key, fresh); this.overlayItems.set(key, fresh);
this.stats.cacheDepthByKey.set(key, fresh.length);
const watermark = fresh[0] ? { ts: fresh[0].ts, seq: fresh[0].seq } : null; const watermark = fresh[0] ? { ts: fresh[0].ts, seq: fresh[0].seq } : null;
this.overlayCursors.set(cursorField, watermark); this.overlayCursors.set(cursorField, watermark);
await this.persistList(key, cursorField, fresh, CHART_LIMITS.overlay, watermark); await this.persistList(key, cursorField, fresh, CHART_LIMITS.overlay, watermark);
@ -356,6 +432,7 @@ export class LiveStateManager {
const payloads = items.map((item) => JSON.stringify(item)); const payloads = items.map((item) => JSON.stringify(item));
await this.redis.lTrim(listKey, 1, 0); await this.redis.lTrim(listKey, 1, 0);
this.stats.trimOperations += 1;
if (payloads.length > 0) { if (payloads.length > 0) {
for (let idx = payloads.length - 1; idx >= 0; idx -= 1) { for (let idx = payloads.length - 1; idx >= 0; idx -= 1) {
const payload = payloads[idx]; const payload = payloads[idx];
@ -364,7 +441,9 @@ export class LiveStateManager {
} }
} }
await this.redis.lTrim(listKey, 0, limit - 1); await this.redis.lTrim(listKey, 0, limit - 1);
this.stats.trimOperations += 1;
} }
this.stats.cacheDepthByKey.set(listKey, Math.min(items.length, limit));
await this.redis.hSet(CURSOR_HASH_KEY, cursorField, JSON.stringify(cursor)); await this.redis.hSet(CURSOR_HASH_KEY, cursorField, JSON.stringify(cursor));
} }
} }

View file

@ -1,6 +1,6 @@
import { describe, expect, it } from "bun:test"; import { describe, expect, it } from "bun:test";
import type { ClickHouseClient } from "@islandflow/storage"; import type { ClickHouseClient } from "@islandflow/storage";
import { LiveStateManager } from "../src/live"; import { LiveStateManager, resolveGenericLiveLimits } from "../src/live";
const makeClickHouse = (): ClickHouseClient => const makeClickHouse = (): ClickHouseClient =>
({ ({
@ -48,6 +48,19 @@ const makeRedis = () => {
}; };
describe("LiveStateManager", () => { describe("LiveStateManager", () => {
it("resolves live limits from env with clamping", () => {
const limits = resolveGenericLiveLimits({
LIVE_LIMIT_OPTIONS: "777",
LIVE_LIMIT_NBBO: "200000",
LIVE_LIMIT_FLOW: "bad"
} as NodeJS.ProcessEnv);
expect(limits.options).toBe(777);
expect(limits.nbbo).toBe(100000);
expect(limits.flow).toBe(10000);
expect(limits.alerts).toBe(10000);
});
it("hydrates snapshots from redis generic windows", async () => { it("hydrates snapshots from redis generic windows", async () => {
const redis = makeRedis(); const redis = makeRedis();
await redis.lPush( await redis.lPush(
@ -120,4 +133,67 @@ describe("LiveStateManager", () => {
expect(candleSnapshot.watermark).toEqual({ ts: 100, seq: 1 }); expect(candleSnapshot.watermark).toEqual({ ts: 100, seq: 1 });
expect(overlaySnapshot.watermark).toEqual({ ts: 110, seq: 2 }); expect(overlaySnapshot.watermark).toEqual({ ts: 110, seq: 2 });
}); });
it("trims generic windows to configured per-channel limits", async () => {
const redis = makeRedis();
const manager = new LiveStateManager(
makeClickHouse(),
redis as never,
{
options: 10000,
nbbo: 10000,
equities: 10000,
"equity-joins": 10000,
flow: 2,
"classifier-hits": 10000,
alerts: 10000,
"inferred-dark": 10000
}
);
await manager.ingest("flow", {
source_ts: 100,
ingest_ts: 101,
seq: 1,
trace_id: "flow-1",
id: "flow-1",
members: ["a"],
features: {},
join_quality: {}
});
await manager.ingest("flow", {
source_ts: 110,
ingest_ts: 111,
seq: 2,
trace_id: "flow-2",
id: "flow-2",
members: ["b"],
features: {},
join_quality: {}
});
await manager.ingest("flow", {
source_ts: 120,
ingest_ts: 121,
seq: 3,
trace_id: "flow-3",
id: "flow-3",
members: ["c"],
features: {},
join_quality: {}
});
const snapshot = await manager.getSnapshot({ channel: "flow" });
expect(snapshot.items).toHaveLength(2);
expect((snapshot.items as Array<{ id: string }>).map((item) => item.id)).toEqual([
"flow-3",
"flow-2"
]);
const persisted = await redis.lRange("live:flow", 0, 99);
expect(persisted).toHaveLength(2);
const stats = manager.getStatsSnapshot();
expect(stats.trimOperations).toBeGreaterThan(0);
expect(stats.cacheDepthByKey["live:flow"]).toBe(2);
});
}); });