Reconcile JetStream retention caps

This commit is contained in:
dirtydishes 2026-05-08 15:52:32 -04:00
parent 26e69bf98d
commit f7aed36591
15 changed files with 837 additions and 68 deletions

View file

@ -1,3 +1,4 @@
{"_type":"issue","id":"islandflow-ebp","title":"Implement JetStream retention reconciliation and admin rollout command","description":"Implement shared JetStream stream catalog and reconciliation logic so retention cap changes take effect on existing streams without deleting them.\n\nScope:\n- Centralize known stream definitions in packages/bus\n- Change retention defaults to raw=60m/512MiB and derived=12h/256MiB\n- Update ensureStream() to reconcile allowed retention drift in place and fail on structural mismatch\n- Add a Bun CLI entrypoint to audit/apply stream reconciliation\n- Reuse the same helpers from startup and CLI paths\n- Document Docker rollout and verification flow\n- Add unit tests for defaults, drift detection, safe updates, and CLI behavior\n","status":"closed","priority":1,"issue_type":"feature","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-08T19:47:23Z","created_by":"dirtydishes","updated_at":"2026-05-08T19:52:08Z","started_at":"2026-05-08T19:47:29Z","closed_at":"2026-05-08T19:52:08Z","close_reason":"Implemented shared JetStream retention reconciliation, startup drift correction, admin CLI, docs, and tests","dependencies":[{"issue_id":"islandflow-ebp","depends_on_id":"islandflow-1ln","type":"discovered-from","created_at":"2026-05-08T15:47:22Z","created_by":"dirtydishes","metadata":"{}"}],"dependency_count":0,"dependent_count":0,"comment_count":0}
{"_type":"issue","id":"islandflow-vnq","title":"Fix deploy verification for same-origin host","description":"Remove the hardcoded separate API host assumption from deployment tooling and docs. Make deploy verification and documentation match the current flow.deltaisland.io setup, using same-origin verification where appropriate instead of forcing api.flow.deltaisland.io.\n","status":"closed","priority":1,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-08T11:34:49Z","created_by":"dirtydishes","updated_at":"2026-05-08T11:37:35Z","started_at":"2026-05-08T11:35:37Z","closed_at":"2026-05-08T11:37:35Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0}
{"_type":"issue","id":"islandflow-762","title":"Fix public API hostname TLS/proxy path","description":"Debug and fix the public API hostname so https://api.flow.deltaisland.io/health works again. Determine whether the failure is in Cloudflare, Nginx Proxy Manager, DNS, or the API proxy host definition, then apply the smallest safe fix and verify the public endpoint.\n","status":"in_progress","priority":1,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-08T11:21:41Z","created_by":"dirtydishes","updated_at":"2026-05-08T11:21:52Z","started_at":"2026-05-08T11:21:52Z","dependency_count":0,"dependent_count":0,"comment_count":0}
{"_type":"issue","id":"islandflow-33c","title":"Investigate public API TLS handshake failure","description":"Investigate the public TLS handshake failure on https://api.flow.deltaisland.io/health. After the compose network fix, the app host is healthy and nginx-proxy-manager can reach islandflow-vps-api-1 internally, but both local and server-side HTTPS requests to api.flow.deltaisland.io fail during TLS handshake at the public edge. This likely needs proxy or Cloudflare inspection outside the app stack.\n","status":"open","priority":1,"issue_type":"task","owner":"dishes@dpdrm.com","created_at":"2026-05-08T11:13:36Z","created_by":"dirtydishes","updated_at":"2026-05-08T11:13:36Z","dependency_count":0,"dependent_count":0,"comment_count":0}

View file

@ -127,7 +127,7 @@ OPTION_CONTEXT_MAX_KEYS=20000
OPTION_CONTEXT_TTL_MS=900000
# JetStream retention
STREAM_RAW_MAX_AGE_MS=7200000
STREAM_RAW_MAX_BYTES=1073741824
STREAM_DERIVED_MAX_AGE_MS=86400000
STREAM_DERIVED_MAX_BYTES=536870912
STREAM_RAW_MAX_AGE_MS=3600000
STREAM_RAW_MAX_BYTES=536870912
STREAM_DERIVED_MAX_AGE_MS=43200000
STREAM_DERIVED_MAX_BYTES=268435456

View file

@ -149,7 +149,7 @@ COMPUTE_NBBO_CACHE_MAX_KEYS=20000
COMPUTE_NBBO_CACHE_TTL_MS=900000
# JetStream retention
STREAM_RAW_MAX_AGE_MS=7200000
STREAM_RAW_MAX_BYTES=1073741824
STREAM_DERIVED_MAX_AGE_MS=86400000
STREAM_DERIVED_MAX_BYTES=536870912
STREAM_RAW_MAX_AGE_MS=3600000
STREAM_RAW_MAX_BYTES=536870912
STREAM_DERIVED_MAX_AGE_MS=43200000
STREAM_DERIVED_MAX_BYTES=268435456

View file

@ -299,6 +299,48 @@ cd /home/delta/islandflow/deployment/docker
docker compose up -d
```
## JetStream retention rollout
JetStream in this stack is the live event buffer between ingest, compute, candles, replay, and API services. ClickHouse remains the durable history layer; JetStream should stay bounded enough to protect the VPS during normal live operation.
Why redeploy alone is not enough for old streams:
- Older streams keep the retention settings they were created with.
- A code deploy only helps new streams unless something explicitly reconciles existing stream configs.
- This repo now includes both startup reconciliation and a manual audit/apply command so live streams can be corrected in place without deleting them.
Target retention baseline:
- Raw streams: `60m`, `512 MiB`
- Derived streams: `12h`, `256 MiB`
Audit current stream caps from a running service container:
```bash
cd deployment/docker
docker compose exec api bun packages/bus/src/reconcile-streams.ts --check
```
Apply in-place reconciliation:
```bash
cd deployment/docker
docker compose exec api bun packages/bus/src/reconcile-streams.ts --apply
```
Verify the rollout:
1. Re-run `--check` and require all lines to report `✓`.
2. Inspect service logs for any `structural-mismatch` line or reconciliation failure.
3. Confirm the production `.env` keeps these values:
- `STREAM_RAW_MAX_AGE_MS=3600000`
- `STREAM_RAW_MAX_BYTES=536870912`
- `STREAM_DERIVED_MAX_AGE_MS=43200000`
- `STREAM_DERIVED_MAX_BYTES=268435456`
4. Compare post-rollout `docker stats --no-stream` with the pre-rollout baseline and watch JetStream storage stabilize under the tighter caps.
If any stream reports a structural mismatch, stop the rollout. Do not purge or recreate streams under this procedure; capture the stream name and mismatch details for follow-up.
If you changed `NEXT_PUBLIC_API_URL` or `NEXT_PUBLIC_NBBO_MAX_AGE_MS`, rebuild the web image because those are public Next.js build-time values:
```bash

View file

@ -1,2 +1,3 @@
export * from "./jetstream";
export * from "./streams";
export * from "./subjects";

View file

@ -6,10 +6,12 @@ import {
type JetStreamManager,
type NatsConnection,
type StreamConfig,
type StreamUpdateConfig,
JSONCodec,
type JsMsg,
createInbox
} from "nats";
import { getKnownStreamDefinitions, getStreamDefinition, type StreamRetentionClass } from "./streams";
export type NatsConnectionOptions = {
servers: string | string[];
@ -28,6 +30,60 @@ export type RetryOptions = {
delayMs: number;
};
export type LoggerLike = {
info: (msg: string, context?: Record<string, unknown>) => void;
};
export type StreamFieldDelta = {
field: string;
current: unknown;
desired: unknown;
};
export type StreamAuditState = "match" | "missing" | "retention_drift" | "structural_mismatch";
export type StreamReconciliationAction = "none" | "created" | "updated";
export type StreamAuditReport = {
name: string;
desired: StreamConfig;
existing: StreamConfig | null;
state: StreamAuditState;
retentionDrift: StreamFieldDelta[];
structuralMismatch: StreamFieldDelta[];
};
export type StreamReconciliationReport = StreamAuditReport & {
action: StreamReconciliationAction;
};
export type ReconcileStreamOptions = {
logger?: LoggerLike;
};
export type KnownStreamOptions = ReconcileStreamOptions & {
env?: Record<string, string | undefined>;
};
export type ReconcileStreamsCommandDependencies = {
connect?: typeof connectJetStream;
env?: Record<string, string | undefined>;
stdout?: (line: string) => void;
stderr?: (line: string) => void;
};
const RETENTION_FIELDS = [
"retention",
"discard",
"max_msgs",
"max_msgs_per_subject",
"max_age",
"max_bytes",
"num_replicas"
] as const;
const STRUCTURAL_FIELDS = ["name", "subjects", "storage"] as const;
const sleep = (delayMs: number): Promise<void> => {
return new Promise((resolve) => setTimeout(resolve, delayMs));
};
@ -69,18 +125,28 @@ export const connectJetStreamWithRetry = async (
export const ensureStream = async (
jsm: JetStreamManager,
config: StreamConfig
): Promise<void> => {
try {
await jsm.streams.info(config.name);
return;
} catch (error) {
if (error instanceof Error && error.message.includes("not found")) {
await jsm.streams.add(config);
return;
}
config: StreamConfig,
options: ReconcileStreamOptions = {}
): Promise<StreamReconciliationReport> => {
const audit = await auditStream(jsm, config);
throw error;
switch (audit.state) {
case "match":
return { ...audit, action: "none" };
case "missing":
await jsm.streams.add(config);
return { ...audit, action: "created" };
case "retention_drift": {
const updateConfig = buildStreamUpdateConfig(audit.existing!, config);
await jsm.streams.update(config.name, updateConfig as Partial<StreamUpdateConfig>);
options.logger?.info("reconciled jetstream retention", {
stream: config.name,
drift: audit.retentionDrift
});
return { ...audit, action: "updated" };
}
case "structural_mismatch":
throw new Error(formatStructuralMismatchMessage(audit));
}
};
@ -92,22 +158,20 @@ const parseBoundedNumber = (value: string | undefined, fallback: number): number
return Math.floor(parsed);
};
export type StreamRetentionClass = "raw" | "derived";
export const resolveStreamRetention = (
streamClass: StreamRetentionClass,
env: Record<string, string | undefined> = process.env
): Pick<StreamConfig, "max_bytes" | "max_age"> => {
if (streamClass === "raw") {
return {
max_age: parseBoundedNumber(env.STREAM_RAW_MAX_AGE_MS, 7_200_000),
max_bytes: parseBoundedNumber(env.STREAM_RAW_MAX_BYTES, 1_073_741_824)
max_age: parseBoundedNumber(env.STREAM_RAW_MAX_AGE_MS, 3_600_000),
max_bytes: parseBoundedNumber(env.STREAM_RAW_MAX_BYTES, 536_870_912)
};
}
return {
max_age: parseBoundedNumber(env.STREAM_DERIVED_MAX_AGE_MS, 86_400_000),
max_bytes: parseBoundedNumber(env.STREAM_DERIVED_MAX_BYTES, 536_870_912)
max_age: parseBoundedNumber(env.STREAM_DERIVED_MAX_AGE_MS, 43_200_000),
max_bytes: parseBoundedNumber(env.STREAM_DERIVED_MAX_BYTES, 268_435_456)
};
};
@ -128,6 +192,340 @@ export const buildStreamConfig = (
num_replicas: 1
});
export const buildKnownStreamConfig = (
name: string,
env: Record<string, string | undefined> = process.env
): StreamConfig => {
const definition = getStreamDefinition(name);
return buildStreamConfig(definition.name, definition.subject, definition.retentionClass, env);
};
const arraysEqual = (left: unknown[], right: unknown[]): boolean => {
if (left.length !== right.length) {
return false;
}
return left.every((value, index) => value === right[index]);
};
const getFieldValue = (config: StreamConfig, field: string): unknown => {
switch (field) {
case "name":
return config.name;
case "subjects":
return config.subjects;
case "storage":
return config.storage;
case "retention":
return config.retention;
case "discard":
return config.discard;
case "max_msgs":
return config.max_msgs;
case "max_msgs_per_subject":
return config.max_msgs_per_subject;
case "max_age":
return config.max_age;
case "max_bytes":
return config.max_bytes;
case "num_replicas":
return config.num_replicas;
default:
return undefined;
}
};
const diffConfigFields = (
current: StreamConfig,
desired: StreamConfig,
fields: readonly string[]
): StreamFieldDelta[] => {
const deltas: StreamFieldDelta[] = [];
for (const field of fields) {
const currentValue = getFieldValue(current, field);
const desiredValue = getFieldValue(desired, field);
const matches = Array.isArray(currentValue) && Array.isArray(desiredValue)
? arraysEqual(currentValue, desiredValue)
: currentValue === desiredValue;
if (!matches) {
deltas.push({
field,
current: currentValue,
desired: desiredValue
});
}
}
return deltas;
};
const isNotFoundError = (error: unknown): boolean => {
return error instanceof Error && error.message.toLowerCase().includes("not found");
};
export const auditStreamConfig = (
current: StreamConfig | null,
desired: StreamConfig
): StreamAuditReport => {
if (!current) {
return {
name: desired.name,
desired,
existing: null,
state: "missing",
retentionDrift: [],
structuralMismatch: []
};
}
const structuralMismatch = diffConfigFields(current, desired, STRUCTURAL_FIELDS);
if (structuralMismatch.length > 0) {
return {
name: desired.name,
desired,
existing: current,
state: "structural_mismatch",
retentionDrift: [],
structuralMismatch
};
}
const retentionDrift = diffConfigFields(current, desired, RETENTION_FIELDS);
if (retentionDrift.length > 0) {
return {
name: desired.name,
desired,
existing: current,
state: "retention_drift",
retentionDrift,
structuralMismatch: []
};
}
return {
name: desired.name,
desired,
existing: current,
state: "match",
retentionDrift: [],
structuralMismatch: []
};
};
const buildStreamUpdateConfig = (
current: StreamConfig,
desired: StreamConfig
): Partial<StreamConfig> => {
const updateConfig: Partial<StreamConfig> = { ...current };
for (const field of RETENTION_FIELDS) {
(updateConfig as Record<string, unknown>)[field] = getFieldValue(desired, field);
}
return updateConfig;
};
export const auditStream = async (
jsm: JetStreamManager,
desired: StreamConfig
): Promise<StreamAuditReport> => {
try {
const info = await jsm.streams.info(desired.name);
return auditStreamConfig(info.config, desired);
} catch (error) {
if (isNotFoundError(error)) {
return auditStreamConfig(null, desired);
}
throw error;
}
};
export const auditKnownStreams = async (
jsm: JetStreamManager,
streamNames: readonly string[],
options: KnownStreamOptions = {}
): Promise<StreamAuditReport[]> => {
const reports: StreamAuditReport[] = [];
for (const name of streamNames) {
reports.push(await auditStream(jsm, buildKnownStreamConfig(name, options.env)));
}
return reports;
};
export const ensureKnownStreams = async (
jsm: JetStreamManager,
streamNames: readonly string[],
options: KnownStreamOptions = {}
): Promise<StreamReconciliationReport[]> => {
const reports: StreamReconciliationReport[] = [];
for (const name of streamNames) {
reports.push(
await ensureStream(jsm, buildKnownStreamConfig(name, options.env), {
logger: options.logger
})
);
}
return reports;
};
const formatStructuredValue = (value: unknown): string => {
if (Array.isArray(value)) {
return value.join(",");
}
return String(value);
};
const formatStructuralMismatchMessage = (audit: StreamAuditReport): string => {
const details = audit.structuralMismatch
.map((delta) => `${delta.field} current=${formatStructuredValue(delta.current)} desired=${formatStructuredValue(delta.desired)}`)
.join("; ");
return `Refusing to reconcile stream ${audit.name}: structural mismatch (${details})`;
};
const formatDurationMs = (value: number): string => {
if (value % 3_600_000 === 0) {
return `${value / 3_600_000}h`;
}
if (value % 60_000 === 0) {
return `${value / 60_000}m`;
}
if (value % 1_000 === 0) {
return `${value / 1_000}s`;
}
return `${value}ms`;
};
const formatBytes = (value: number): string => {
if (value < 0) {
return String(value);
}
const mib = 1024 * 1024;
if (value % mib === 0) {
return `${value / mib} MiB`;
}
return `${value} B`;
};
const formatRetentionSummary = (config: StreamConfig): string => {
return `age=${formatDurationMs(Number(config.max_age))} bytes=${formatBytes(config.max_bytes)} replicas=${config.num_replicas} retention=${config.retention} discard=${config.discard}`;
};
const formatReportLine = (
report: StreamAuditReport | StreamReconciliationReport,
mode: "check" | "apply"
): string => {
if ("action" in report && report.action === "created") {
return `${report.name} created ${formatRetentionSummary(report.desired)}`;
}
if ("action" in report && report.action === "updated") {
const fields = report.retentionDrift.map((delta) => delta.field).join(",");
return `${report.name} updated fields=${fields} ${formatRetentionSummary(report.desired)}`;
}
switch (report.state) {
case "match":
return `${report.name} ${formatRetentionSummary(report.desired)}`;
case "missing":
return `${mode === "check" ? "○" : "◐"} ${report.name} missing desired ${formatRetentionSummary(report.desired)}`;
case "retention_drift": {
const details = report.retentionDrift
.map((delta) => {
const desiredValue = delta.field === "max_age"
? formatDurationMs(Number(delta.desired))
: delta.field === "max_bytes"
? formatBytes(Number(delta.desired))
: formatStructuredValue(delta.desired);
const currentValue = delta.field === "max_age"
? formatDurationMs(Number(delta.current))
: delta.field === "max_bytes"
? formatBytes(Number(delta.current))
: formatStructuredValue(delta.current);
return `${delta.field}:${currentValue}->${desiredValue}`;
})
.join(" ");
return `${report.name} drift ${details}`;
}
case "structural_mismatch": {
const details = report.structuralMismatch
.map((delta) => `${delta.field}:${formatStructuredValue(delta.current)}->${formatStructuredValue(delta.desired)}`)
.join(" ");
return `${report.name} structural-mismatch ${details}`;
}
}
};
export const runReconcileStreamsCommand = async (
args: string[],
dependencies: ReconcileStreamsCommandDependencies = {}
): Promise<number> => {
const connectFn = dependencies.connect ?? connectJetStream;
const stdout = dependencies.stdout ?? ((line: string) => console.log(line));
const stderr = dependencies.stderr ?? ((line: string) => console.error(line));
const env = dependencies.env ?? process.env;
const apply = args.includes("--apply");
const check = args.includes("--check");
if (apply === check) {
stderr("Usage: bun packages/bus/src/reconcile-streams.ts --check|--apply");
return 2;
}
let connection: JetStreamConnection | null = null;
try {
connection = await connectFn({
servers: env.NATS_URL ?? "nats://127.0.0.1:4222",
name: "bus-reconcile-streams"
});
const streamNames = getKnownStreamDefinitions().map((definition) => definition.name);
const mode = apply ? "apply" : "check";
let exitCode = 0;
if (check) {
const reports = await auditKnownStreams(connection.jsm, streamNames, { env });
for (const report of reports) {
stdout(formatReportLine(report, mode));
if (report.state !== "match") {
exitCode = 1;
}
}
return exitCode;
}
for (const name of streamNames) {
const desired = buildKnownStreamConfig(name, env);
try {
const report = await ensureStream(connection.jsm, desired);
stdout(formatReportLine(report, mode));
} catch (error) {
const audit = await auditStream(connection.jsm, desired);
if (audit.state === "structural_mismatch") {
stdout(formatReportLine(audit, mode));
}
stderr(error instanceof Error ? error.message : String(error));
exitCode = 1;
break;
}
}
return exitCode;
} finally {
await connection?.nc.close();
}
};
export const buildDurableConsumer = (
durableName: string,
deliverSubject: string = createInbox()

View file

@ -0,0 +1,4 @@
import { runReconcileStreamsCommand } from "./jetstream";
const exitCode = await runReconcileStreamsCommand(process.argv.slice(2));
process.exit(exitCode);

View file

@ -0,0 +1,72 @@
import {
STREAM_ALERTS,
STREAM_CLASSIFIER_HITS,
STREAM_EQUITY_CANDLES,
STREAM_EQUITY_JOINS,
STREAM_EQUITY_PRINTS,
STREAM_EQUITY_QUOTES,
STREAM_FLOW_PACKETS,
STREAM_INFERRED_DARK,
STREAM_OPTION_NBBO,
STREAM_OPTION_PRINTS,
STREAM_OPTION_SIGNAL_PRINTS,
STREAM_SMART_MONEY_EVENTS,
SUBJECT_ALERTS,
SUBJECT_CLASSIFIER_HITS,
SUBJECT_EQUITY_CANDLES,
SUBJECT_EQUITY_JOINS,
SUBJECT_EQUITY_PRINTS,
SUBJECT_EQUITY_QUOTES,
SUBJECT_FLOW_PACKETS,
SUBJECT_INFERRED_DARK,
SUBJECT_OPTION_NBBO,
SUBJECT_OPTION_PRINTS,
SUBJECT_OPTION_SIGNAL_PRINTS,
SUBJECT_SMART_MONEY_EVENTS
} from "./subjects";
export type StreamRetentionClass = "raw" | "derived";
export type KnownStreamDefinition = {
name: string;
subject: string;
retentionClass: StreamRetentionClass;
};
export const STREAM_CATALOG: readonly KnownStreamDefinition[] = [
{ name: STREAM_OPTION_PRINTS, subject: SUBJECT_OPTION_PRINTS, retentionClass: "raw" },
{ name: STREAM_OPTION_NBBO, subject: SUBJECT_OPTION_NBBO, retentionClass: "raw" },
{ name: STREAM_EQUITY_PRINTS, subject: SUBJECT_EQUITY_PRINTS, retentionClass: "raw" },
{ name: STREAM_EQUITY_QUOTES, subject: SUBJECT_EQUITY_QUOTES, retentionClass: "raw" },
{
name: STREAM_OPTION_SIGNAL_PRINTS,
subject: SUBJECT_OPTION_SIGNAL_PRINTS,
retentionClass: "derived"
},
{ name: STREAM_EQUITY_CANDLES, subject: SUBJECT_EQUITY_CANDLES, retentionClass: "derived" },
{ name: STREAM_EQUITY_JOINS, subject: SUBJECT_EQUITY_JOINS, retentionClass: "derived" },
{ name: STREAM_INFERRED_DARK, subject: SUBJECT_INFERRED_DARK, retentionClass: "derived" },
{ name: STREAM_FLOW_PACKETS, subject: SUBJECT_FLOW_PACKETS, retentionClass: "derived" },
{
name: STREAM_SMART_MONEY_EVENTS,
subject: SUBJECT_SMART_MONEY_EVENTS,
retentionClass: "derived"
},
{ name: STREAM_CLASSIFIER_HITS, subject: SUBJECT_CLASSIFIER_HITS, retentionClass: "derived" },
{ name: STREAM_ALERTS, subject: SUBJECT_ALERTS, retentionClass: "derived" }
];
const STREAM_CATALOG_BY_NAME = new Map(STREAM_CATALOG.map((definition) => [definition.name, definition]));
export const getKnownStreamDefinitions = (): readonly KnownStreamDefinition[] => {
return STREAM_CATALOG;
};
export const getStreamDefinition = (name: string): KnownStreamDefinition => {
const definition = STREAM_CATALOG_BY_NAME.get(name);
if (!definition) {
throw new Error(`Unknown stream definition: ${name}`);
}
return definition;
};

View file

@ -0,0 +1,246 @@
import { describe, expect, it } from "bun:test";
import type { JetStreamManager, StreamConfig } from "nats";
import {
auditStreamConfig,
buildKnownStreamConfig,
ensureStream,
getKnownStreamDefinitions,
resolveStreamRetention,
runReconcileStreamsCommand
} from "../src";
const STREAMS = getKnownStreamDefinitions().map((definition) => definition.name);
const buildMockStreamManager = (configs: Record<string, StreamConfig | null>) => {
const addCalls: StreamConfig[] = [];
const updateCalls: Array<{ name: string; config: Partial<StreamConfig> }> = [];
return {
manager: {
streams: {
info: async (name: string) => {
const config = configs[name];
if (!config) {
throw new Error("stream not found");
}
return { config };
},
add: async (config: StreamConfig) => {
addCalls.push(config);
configs[config.name] = config;
return { config };
},
update: async (name: string, config?: Partial<StreamConfig>) => {
updateCalls.push({ name, config: config ?? {} });
configs[name] = config as StreamConfig;
return { config };
}
}
} as unknown as JetStreamManager,
addCalls,
updateCalls
};
};
const buildAllKnownConfigs = (env: Record<string, string | undefined> = {}) => {
return Object.fromEntries(STREAMS.map((name) => [name, buildKnownStreamConfig(name, env)])) as Record<
string,
StreamConfig
>;
};
describe("jetstream retention defaults", () => {
it("resolves raw defaults to 60m and 512 MiB", () => {
expect(resolveStreamRetention("raw")).toEqual({
max_age: 3_600_000,
max_bytes: 536_870_912
});
});
it("resolves derived defaults to 12h and 256 MiB", () => {
expect(resolveStreamRetention("derived")).toEqual({
max_age: 43_200_000,
max_bytes: 268_435_456
});
});
it("lets env overrides win over defaults", () => {
expect(
resolveStreamRetention("raw", {
STREAM_RAW_MAX_AGE_MS: "1234",
STREAM_RAW_MAX_BYTES: "5678"
})
).toEqual({
max_age: 1234,
max_bytes: 5678
});
});
});
describe("ensureStream", () => {
it("creates a missing stream", async () => {
const desired = buildKnownStreamConfig("OPTIONS_PRINTS");
const { manager, addCalls, updateCalls } = buildMockStreamManager({});
const report = await ensureStream(manager, desired);
expect(report.state).toBe("missing");
expect(report.action).toBe("created");
expect(addCalls).toHaveLength(1);
expect(updateCalls).toHaveLength(0);
});
it("does nothing when an existing stream already matches", async () => {
const desired = buildKnownStreamConfig("OPTIONS_PRINTS");
const { manager, addCalls, updateCalls } = buildMockStreamManager({
[desired.name]: desired
});
const report = await ensureStream(manager, desired);
expect(report.state).toBe("match");
expect(report.action).toBe("none");
expect(addCalls).toHaveLength(0);
expect(updateCalls).toHaveLength(0);
});
it("updates only retention drift in place", async () => {
const desired = buildKnownStreamConfig("OPTIONS_PRINTS");
const { manager, addCalls, updateCalls } = buildMockStreamManager({
[desired.name]: {
...desired,
max_age: 7_200_000,
max_bytes: 1_073_741_824
}
});
const report = await ensureStream(manager, desired);
expect(report.state).toBe("retention_drift");
expect(report.action).toBe("updated");
expect(addCalls).toHaveLength(0);
expect(updateCalls).toHaveLength(1);
expect(updateCalls[0]?.name).toBe(desired.name);
expect(updateCalls[0]?.config.max_age).toBe(desired.max_age);
expect(updateCalls[0]?.config.max_bytes).toBe(desired.max_bytes);
});
it("throws on structural mismatch instead of mutating", async () => {
const desired = buildKnownStreamConfig("OPTIONS_PRINTS");
const { manager, addCalls, updateCalls } = buildMockStreamManager({
[desired.name]: {
...desired,
subjects: ["options.prints.legacy"]
}
});
await expect(ensureStream(manager, desired)).rejects.toThrow("structural mismatch");
expect(addCalls).toHaveLength(0);
expect(updateCalls).toHaveLength(0);
});
});
describe("auditStreamConfig", () => {
it("flags structural mismatches before retention drift", () => {
const desired = buildKnownStreamConfig("OPTIONS_PRINTS");
const report = auditStreamConfig(
{
...desired,
subjects: ["options.prints.legacy"],
max_age: 7_200_000
},
desired
);
expect(report.state).toBe("structural_mismatch");
expect(report.structuralMismatch).toHaveLength(1);
expect(report.retentionDrift).toHaveLength(0);
});
});
describe("runReconcileStreamsCommand", () => {
it("returns clean in --check mode when all streams match", async () => {
const configs = buildAllKnownConfigs();
const outputs: string[] = [];
const exitCode = await runReconcileStreamsCommand(["--check"], {
connect: async () => ({
nc: { close: async () => {} } as never,
js: {} as never,
jsm: buildMockStreamManager(configs).manager
}),
stdout: (line) => outputs.push(line)
});
expect(exitCode).toBe(0);
expect(outputs.every((line) => line.startsWith("✓"))).toBe(true);
});
it("returns non-zero in --check mode when a stream drifts", async () => {
const configs = buildAllKnownConfigs();
configs.OPTIONS_PRINTS = {
...configs.OPTIONS_PRINTS,
max_age: 7_200_000
};
const outputs: string[] = [];
const exitCode = await runReconcileStreamsCommand(["--check"], {
connect: async () => ({
nc: { close: async () => {} } as never,
js: {} as never,
jsm: buildMockStreamManager(configs).manager
}),
stdout: (line) => outputs.push(line)
});
expect(exitCode).toBe(1);
expect(outputs.some((line) => line.includes("OPTIONS_PRINTS") && line.includes("drift"))).toBe(true);
});
it("updates drift in --apply mode and reports actions", async () => {
const configs = buildAllKnownConfigs();
configs.OPTIONS_PRINTS = {
...configs.OPTIONS_PRINTS,
max_age: 7_200_000
};
const outputs: string[] = [];
const { manager, updateCalls } = buildMockStreamManager(configs);
const exitCode = await runReconcileStreamsCommand(["--apply"], {
connect: async () => ({
nc: { close: async () => {} } as never,
js: {} as never,
jsm: manager
}),
stdout: (line) => outputs.push(line)
});
expect(exitCode).toBe(0);
expect(updateCalls).toHaveLength(1);
expect(outputs.some((line) => line.includes("OPTIONS_PRINTS updated"))).toBe(true);
});
it("returns non-zero on structural mismatch and names the stream", async () => {
const configs = buildAllKnownConfigs();
configs.OPTIONS_PRINTS = {
...configs.OPTIONS_PRINTS,
subjects: ["options.prints.legacy"]
};
const outputs: string[] = [];
const errors: string[] = [];
const exitCode = await runReconcileStreamsCommand(["--apply"], {
connect: async () => ({
nc: { close: async () => {} } as never,
js: {} as never,
jsm: buildMockStreamManager(configs).manager
}),
stdout: (line) => outputs.push(line),
stderr: (line) => errors.push(line)
});
expect(exitCode).toBe(1);
expect(outputs.some((line) => line.includes("OPTIONS_PRINTS") && line.includes("structural-mismatch"))).toBe(true);
expect(errors.some((line) => line.includes("OPTIONS_PRINTS"))).toBe(true);
});
});

View file

@ -23,10 +23,9 @@ import {
STREAM_SMART_MONEY_EVENTS,
STREAM_OPTION_NBBO,
STREAM_OPTION_SIGNAL_PRINTS,
buildStreamConfig,
buildDurableConsumer,
connectJetStreamWithRetry,
ensureStream,
ensureKnownStreams,
subscribeJson
} from "@islandflow/bus";
import {
@ -624,17 +623,23 @@ const run = async () => {
{ attempts: 120, delayMs: 500 }
);
await ensureStream(jsm, buildStreamConfig(STREAM_OPTION_SIGNAL_PRINTS, SUBJECT_OPTION_SIGNAL_PRINTS, "derived"));
await ensureStream(jsm, buildStreamConfig(STREAM_OPTION_NBBO, SUBJECT_OPTION_NBBO, "raw"));
await ensureStream(jsm, buildStreamConfig(STREAM_EQUITY_PRINTS, SUBJECT_EQUITY_PRINTS, "raw"));
await ensureStream(jsm, buildStreamConfig(STREAM_EQUITY_QUOTES, SUBJECT_EQUITY_QUOTES, "raw"));
await ensureStream(jsm, buildStreamConfig(STREAM_EQUITY_CANDLES, SUBJECT_EQUITY_CANDLES, "derived"));
await ensureStream(jsm, buildStreamConfig(STREAM_EQUITY_JOINS, SUBJECT_EQUITY_JOINS, "derived"));
await ensureStream(jsm, buildStreamConfig(STREAM_INFERRED_DARK, SUBJECT_INFERRED_DARK, "derived"));
await ensureStream(jsm, buildStreamConfig(STREAM_FLOW_PACKETS, SUBJECT_FLOW_PACKETS, "derived"));
await ensureStream(jsm, buildStreamConfig(STREAM_SMART_MONEY_EVENTS, SUBJECT_SMART_MONEY_EVENTS, "derived"));
await ensureStream(jsm, buildStreamConfig(STREAM_CLASSIFIER_HITS, SUBJECT_CLASSIFIER_HITS, "derived"));
await ensureStream(jsm, buildStreamConfig(STREAM_ALERTS, SUBJECT_ALERTS, "derived"));
await ensureKnownStreams(
jsm,
[
STREAM_OPTION_SIGNAL_PRINTS,
STREAM_OPTION_NBBO,
STREAM_EQUITY_PRINTS,
STREAM_EQUITY_QUOTES,
STREAM_EQUITY_CANDLES,
STREAM_EQUITY_JOINS,
STREAM_INFERRED_DARK,
STREAM_FLOW_PACKETS,
STREAM_SMART_MONEY_EVENTS,
STREAM_CLASSIFIER_HITS,
STREAM_ALERTS
],
{ logger }
);
const clickhouse = createClickHouseClient({
url: env.CLICKHOUSE_URL,

View file

@ -5,10 +5,9 @@ import {
SUBJECT_EQUITY_PRINTS,
STREAM_EQUITY_CANDLES,
STREAM_EQUITY_PRINTS,
buildStreamConfig,
buildDurableConsumer,
connectJetStreamWithRetry,
ensureStream,
ensureKnownStreams,
publishJson,
subscribeJson
} from "@islandflow/bus";
@ -241,8 +240,7 @@ const run = async () => {
{ attempts: 120, delayMs: 500 }
);
await ensureStream(jsm, buildStreamConfig(STREAM_EQUITY_PRINTS, SUBJECT_EQUITY_PRINTS, "raw"));
await ensureStream(jsm, buildStreamConfig(STREAM_EQUITY_CANDLES, SUBJECT_EQUITY_CANDLES, "derived"));
await ensureKnownStreams(jsm, [STREAM_EQUITY_PRINTS, STREAM_EQUITY_CANDLES], { logger });
const clickhouse = createClickHouseClient({
url: env.CLICKHOUSE_URL,

View file

@ -26,10 +26,9 @@ import {
STREAM_SMART_MONEY_EVENTS,
STREAM_OPTION_NBBO,
STREAM_OPTION_SIGNAL_PRINTS,
buildStreamConfig,
buildDurableConsumer,
connectJetStreamWithRetry,
ensureStream,
ensureKnownStreams,
publishJson,
subscribeJson
} from "@islandflow/bus";
@ -1174,16 +1173,22 @@ const run = async () => {
{ attempts: 120, delayMs: 500 }
);
await ensureStream(jsm, buildStreamConfig(STREAM_OPTION_SIGNAL_PRINTS, SUBJECT_OPTION_SIGNAL_PRINTS, "derived"));
await ensureStream(jsm, buildStreamConfig(STREAM_OPTION_NBBO, SUBJECT_OPTION_NBBO, "raw"));
await ensureStream(jsm, buildStreamConfig(STREAM_EQUITY_PRINTS, SUBJECT_EQUITY_PRINTS, "raw"));
await ensureStream(jsm, buildStreamConfig(STREAM_EQUITY_QUOTES, SUBJECT_EQUITY_QUOTES, "raw"));
await ensureStream(jsm, buildStreamConfig(STREAM_FLOW_PACKETS, SUBJECT_FLOW_PACKETS, "derived"));
await ensureStream(jsm, buildStreamConfig(STREAM_SMART_MONEY_EVENTS, SUBJECT_SMART_MONEY_EVENTS, "derived"));
await ensureStream(jsm, buildStreamConfig(STREAM_EQUITY_JOINS, SUBJECT_EQUITY_JOINS, "derived"));
await ensureStream(jsm, buildStreamConfig(STREAM_INFERRED_DARK, SUBJECT_INFERRED_DARK, "derived"));
await ensureStream(jsm, buildStreamConfig(STREAM_CLASSIFIER_HITS, SUBJECT_CLASSIFIER_HITS, "derived"));
await ensureStream(jsm, buildStreamConfig(STREAM_ALERTS, SUBJECT_ALERTS, "derived"));
await ensureKnownStreams(
jsm,
[
STREAM_OPTION_SIGNAL_PRINTS,
STREAM_OPTION_NBBO,
STREAM_EQUITY_PRINTS,
STREAM_EQUITY_QUOTES,
STREAM_FLOW_PACKETS,
STREAM_SMART_MONEY_EVENTS,
STREAM_EQUITY_JOINS,
STREAM_INFERRED_DARK,
STREAM_CLASSIFIER_HITS,
STREAM_ALERTS
],
{ logger }
);
const clickhouse = createClickHouseClient({
url: env.CLICKHOUSE_URL,

View file

@ -5,9 +5,8 @@ import {
SUBJECT_EQUITY_QUOTES,
STREAM_EQUITY_PRINTS,
STREAM_EQUITY_QUOTES,
buildStreamConfig,
connectJetStreamWithRetry,
ensureStream,
ensureKnownStreams,
publishJson
} from "@islandflow/bus";
import {
@ -195,8 +194,7 @@ const run = async () => {
{ attempts: 120, delayMs: 500 }
);
await ensureStream(jsm, buildStreamConfig(STREAM_EQUITY_PRINTS, SUBJECT_EQUITY_PRINTS, "raw"));
await ensureStream(jsm, buildStreamConfig(STREAM_EQUITY_QUOTES, SUBJECT_EQUITY_QUOTES, "raw"));
await ensureKnownStreams(jsm, [STREAM_EQUITY_PRINTS, STREAM_EQUITY_QUOTES], { logger });
const clickhouse = createClickHouseClient({
url: env.CLICKHOUSE_URL,

View file

@ -9,10 +9,9 @@ import {
STREAM_OPTION_NBBO,
STREAM_OPTION_PRINTS,
STREAM_OPTION_SIGNAL_PRINTS,
buildStreamConfig,
buildDurableConsumer,
connectJetStreamWithRetry,
ensureStream,
ensureKnownStreams,
publishJson,
subscribeJson
} from "@islandflow/bus";
@ -346,10 +345,11 @@ const run = async () => {
{ attempts: 120, delayMs: 500 }
);
await ensureStream(jsm, buildStreamConfig(STREAM_OPTION_PRINTS, SUBJECT_OPTION_PRINTS, "raw"));
await ensureStream(jsm, buildStreamConfig(STREAM_OPTION_NBBO, SUBJECT_OPTION_NBBO, "raw"));
await ensureStream(jsm, buildStreamConfig(STREAM_OPTION_SIGNAL_PRINTS, SUBJECT_OPTION_SIGNAL_PRINTS, "derived"));
await ensureStream(jsm, buildStreamConfig(STREAM_EQUITY_QUOTES, SUBJECT_EQUITY_QUOTES, "raw"));
await ensureKnownStreams(
jsm,
[STREAM_OPTION_PRINTS, STREAM_OPTION_NBBO, STREAM_OPTION_SIGNAL_PRINTS, STREAM_EQUITY_QUOTES],
{ logger }
);
const clickhouse = createClickHouseClient({
url: env.CLICKHOUSE_URL,

View file

@ -11,9 +11,8 @@ import {
STREAM_OPTION_NBBO,
STREAM_OPTION_PRINTS,
STREAM_OPTION_SIGNAL_PRINTS,
buildStreamConfig,
connectJetStreamWithRetry,
ensureStream,
ensureKnownStreams,
publishJson
} from "@islandflow/bus";
import {
@ -292,10 +291,10 @@ const run = async () => {
for (const kind of streamKinds) {
const def = STREAM_DEFS[kind];
await ensureStream(jsm, buildStreamConfig(def.streamName, def.subject, "raw"));
await ensureKnownStreams(jsm, [def.streamName], { logger });
}
if (streamKinds.includes("options")) {
await ensureStream(jsm, buildStreamConfig(STREAM_OPTION_SIGNAL_PRINTS, SUBJECT_OPTION_SIGNAL_PRINTS, "derived"));
await ensureKnownStreams(jsm, [STREAM_OPTION_SIGNAL_PRINTS], { logger });
}
const clickhouse = createClickHouseClient({