From 163ab1039ed71ae0d34ff6bde8f12fdb3946691b Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Tue, 30 Dec 2025 13:24:48 -0500 Subject: [PATCH] Add Redis rolling stats for flow packets and z-score driven classifiers --- .env.example | 7 ++ README.md | 6 +- bun.lock | 15 +++ services/api/src/index.ts | 74 +++++++++++---- services/compute/package.json | 1 + services/compute/src/classifiers.ts | 52 +++++++++- services/compute/src/index.ts | 99 ++++++++++++++++++-- services/compute/src/rolling-stats.ts | 74 +++++++++++++++ services/compute/tests/classifiers.test.ts | 69 ++++++++++++++ services/compute/tests/rolling-stats.test.ts | 24 +++++ 10 files changed, 389 insertions(+), 32 deletions(-) create mode 100644 services/compute/src/rolling-stats.ts create mode 100644 services/compute/tests/classifiers.test.ts create mode 100644 services/compute/tests/rolling-stats.test.ts diff --git a/.env.example b/.env.example index 53f8f64..0feb4d5 100644 --- a/.env.example +++ b/.env.example @@ -1,6 +1,7 @@ NATS_URL=nats://localhost:4222 CLICKHOUSE_URL=http://localhost:8123 CLICKHOUSE_DATABASE=default +REDIS_URL=redis://localhost:6379 # Options ingest OPTIONS_INGEST_ADAPTER=alpaca @@ -54,7 +55,13 @@ COMPUTE_DELIVER_POLICY=new COMPUTE_CONSUMER_RESET=false NBBO_MAX_AGE_MS=1000 NEXT_PUBLIC_NBBO_MAX_AGE_MS=1000 +ROLLING_WINDOW_SIZE=50 +ROLLING_TTL_SEC=86400 CLASSIFIER_SWEEP_MIN_PREMIUM=40000 CLASSIFIER_SWEEP_MIN_COUNT=3 +CLASSIFIER_SWEEP_MIN_PREMIUM_Z=2 CLASSIFIER_SPIKE_MIN_PREMIUM=20000 CLASSIFIER_SPIKE_MIN_SIZE=400 +CLASSIFIER_SPIKE_MIN_PREMIUM_Z=2.5 +CLASSIFIER_SPIKE_MIN_SIZE_Z=2 +CLASSIFIER_Z_MIN_SAMPLES=12 diff --git a/README.md b/README.md index 1deaa03..542f508 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ Done now (in repo): - Shared event schemas + logging + config helpers - Synthetic options/equity prints (full S&P 500) published to NATS and persisted to ClickHouse - Deterministic option FlowPacket clustering (time window) + persistence +- Rolling stats in Redis (premium/size/spread) with z-score features on FlowPackets - Rule-first classifiers + alert scoring with ClickHouse persistence + WS/REST endpoints - API: REST for prints/flow packets/classifier hits/alerts, WS for live options/equities/flow/alerts/hits, replay endpoints - UI: live tapes for options/equities/flow + replay toggle + pause controls + replay time/completion @@ -23,7 +24,7 @@ Done now (in repo): In progress / blocked: - Live data adapters beyond dev-only feeds (requires licensed data source) -- Rolling stats and advanced clustering +- Advanced clustering Not started: - Dark pool inference @@ -43,6 +44,7 @@ Not started: - Ingest adapter seam (env-selected; options default `alpaca`, equities default `synthetic`) - Raw event persistence in ClickHouse + streaming via NATS JetStream - Deterministic option FlowPacket clustering (time-window) +- Rolling stats baselines in Redis with z-score features on FlowPackets - Classifiers + alert scoring (rule-first) with WS/REST endpoints - API gateway with REST, WS, and replay endpoints - UI tapes for options/equities/flow packets + alerts/hits with live/replay toggle and pause controls @@ -104,6 +106,8 @@ Adapter selection (env): - Options: `OPTIONS_INGEST_ADAPTER` (defaults to `alpaca`) - Equities: `EQUITIES_INGEST_ADAPTER` (defaults to `synthetic`) - Compute: `COMPUTE_DELIVER_POLICY` (`new` default), `COMPUTE_CONSUMER_RESET` (force skip backlog) +- Rolling stats: `REDIS_URL`, `ROLLING_WINDOW_SIZE`, `ROLLING_TTL_SEC` +- Classifier tuning: `CLASSIFIER_SWEEP_MIN_PREMIUM_Z`, `CLASSIFIER_SPIKE_MIN_PREMIUM_Z`, `CLASSIFIER_SPIKE_MIN_SIZE_Z`, `CLASSIFIER_Z_MIN_SAMPLES` Testing mode (throttles ingest to reduce CPU): - `TESTING_MODE=true` enables throttling diff --git a/bun.lock b/bun.lock index 3123f18..4c233bc 100644 --- a/bun.lock +++ b/bun.lock @@ -73,6 +73,7 @@ "@islandflow/observability": "workspace:*", "@islandflow/storage": "workspace:*", "@islandflow/types": "workspace:*", + "redis": "^5.10.0", "zod": "^3.23.8", }, }, @@ -168,6 +169,16 @@ "@next/swc-win32-x64-msvc": ["@next/swc-win32-x64-msvc@14.2.33", "", { "os": "win32", "cpu": "x64" }, "sha512-nOjfZMy8B94MdisuzZo9/57xuFVLHJaDj5e/xrduJp9CV2/HrfxTRH2fbyLe+K9QT41WBLUd4iXX3R7jBp0EUg=="], + "@redis/bloom": ["@redis/bloom@5.10.0", "", { "peerDependencies": { "@redis/client": "^5.10.0" } }, "sha512-doIF37ob+l47n0rkpRNgU8n4iacBlKM9xLiP1LtTZTvz8TloJB8qx/MgvhMhKdYG+CvCY2aPBnN2706izFn/4A=="], + + "@redis/client": ["@redis/client@5.10.0", "", { "dependencies": { "cluster-key-slot": "1.1.2" } }, "sha512-JXmM4XCoso6C75Mr3lhKA3eNxSzkYi3nCzxDIKY+YOszYsJjuKbFgVtguVPbLMOttN4iu2fXoc2BGhdnYhIOxA=="], + + "@redis/json": ["@redis/json@5.10.0", "", { "peerDependencies": { "@redis/client": "^5.10.0" } }, "sha512-B2G8XlOmTPUuZtD44EMGbtoepQG34RCDXLZbjrtON1Djet0t5Ri7/YPXvL9aomXqP8lLTreaprtyLKF4tmXEEA=="], + + "@redis/search": ["@redis/search@5.10.0", "", { "peerDependencies": { "@redis/client": "^5.10.0" } }, "sha512-3SVcPswoSfp2HnmWbAGUzlbUPn7fOohVu2weUQ0S+EMiQi8jwjL+aN2p6V3TI65eNfVsJ8vyPvqWklm6H6esmg=="], + + "@redis/time-series": ["@redis/time-series@5.10.0", "", { "peerDependencies": { "@redis/client": "^5.10.0" } }, "sha512-cPkpddXH5kc/SdRhF0YG0qtjL+noqFT0AcHbQ6axhsPsO7iqPi1cjxgdkE9TNeKiBUUdCaU1DbqkR/LzbzPBhg=="], + "@swc/counter": ["@swc/counter@0.1.3", "", {}, "sha512-e2BR4lsJkkRlKZ/qCHPw9ZaSxc0MVUd7gtbtaB7aMvHeJVYe8sOB8DBZkP2DtISHGSku9sCK6T6cnY0CtXrOCQ=="], "@swc/helpers": ["@swc/helpers@0.5.5", "", { "dependencies": { "@swc/counter": "^0.1.3", "tslib": "^2.4.0" } }, "sha512-KGYxvIOXcceOAbEk4bi/dVLEK9z8sZ0uBB3Il5b1rhfClSpcX0yfRO0KmTkqR2cnQDymwLB+25ZyMzICg/cm/A=="], @@ -184,6 +195,8 @@ "client-only": ["client-only@0.0.1", "", {}, "sha512-IV3Ou0jSMzZrd3pZ48nLkT9DA7Ag1pnPzaiQhpW7c3RbcqqzvzzVu+L8gfqMp/8IM2MQtSiqaCxrrcfu8I8rMA=="], + "cluster-key-slot": ["cluster-key-slot@1.1.2", "", {}, "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA=="], + "csstype": ["csstype@3.2.3", "", {}, "sha512-z1HGKcYy2xA8AGQfwrn0PAy+PB7X/GSj3UVJW9qKyn43xWa+gl5nXmU4qqLMRzWVLFC8KusUX8T/0kCiOYpAIQ=="], "graceful-fs": ["graceful-fs@4.2.11", "", {}, "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ=="], @@ -208,6 +221,8 @@ "react-dom": ["react-dom@18.3.1", "", { "dependencies": { "loose-envify": "^1.1.0", "scheduler": "^0.23.2" }, "peerDependencies": { "react": "^18.3.1" } }, "sha512-5m4nQKp+rZRb09LNH59GM4BxTh9251/ylbKIbpe7TpGxfJ+9kv6BLkLBXIjjspbgbnIBNqlI23tRnTWT0snUIw=="], + "redis": ["redis@5.10.0", "", { "dependencies": { "@redis/bloom": "5.10.0", "@redis/client": "5.10.0", "@redis/json": "5.10.0", "@redis/search": "5.10.0", "@redis/time-series": "5.10.0" } }, "sha512-0/Y+7IEiTgVGPrLFKy8oAEArSyEJkU0zvgV5xyi9NzNQ+SLZmyFbUsWIbgPcd4UdUh00opXGKlXJwMmsis5Byw=="], + "scheduler": ["scheduler@0.23.2", "", { "dependencies": { "loose-envify": "^1.1.0" } }, "sha512-UOShsPwz7NrMUqhR6t0hWjFduvOzbtv7toDH1/hIrfRNIDBnnBWd0CwJTGvTpngVlmwGCdP9/Zl/tVrDqcuYzQ=="], "source-map-js": ["source-map-js@1.2.1", "", {}, "sha512-UXWMKhLOwVKb728IUtQPXxfYU+usdybtUrK/8uGE8CQMvrhOpwvzDBwj0QhSL7MQc7vIsISBG8VQ8+IDQxpfQA=="], diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 4c4ee37..59495cb 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -255,40 +255,78 @@ const run = async () => { await ensureAlertsTable(clickhouse); }); - const optionSubscription = await subscribeJson( - js, + const subscribeWithReset = async ( + subject: string, + stream: string, + durableName: string + ) => { + const opts = buildDurableConsumer(durableName); + try { + return await subscribeJson(js, subject, opts); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + const shouldReset = + message.includes("duplicate subscription") || + message.includes("durable requires") || + message.includes("subject does not match consumer"); + + if (!shouldReset) { + throw error; + } + + logger.warn("resetting jetstream consumer", { durable: durableName, error: message }); + + try { + await jsm.consumers.delete(stream, durableName); + } catch (deleteError) { + const deleteMessage = deleteError instanceof Error ? deleteError.message : String(deleteError); + if (!deleteMessage.includes("not found")) { + logger.warn("failed to delete jetstream consumer", { + durable: durableName, + error: deleteMessage + }); + } + } + + const resetOpts = buildDurableConsumer(durableName); + return await subscribeJson(js, subject, resetOpts); + } + }; + + const optionSubscription = await subscribeWithReset( SUBJECT_OPTION_PRINTS, - buildDurableConsumer("api-option-prints") + STREAM_OPTION_PRINTS, + "api-option-prints" ); - const optionNbboSubscription = await subscribeJson( - js, + const optionNbboSubscription = await subscribeWithReset( SUBJECT_OPTION_NBBO, - buildDurableConsumer("api-option-nbbo") + STREAM_OPTION_NBBO, + "api-option-nbbo" ); - const equitySubscription = await subscribeJson( - js, + const equitySubscription = await subscribeWithReset( SUBJECT_EQUITY_PRINTS, - buildDurableConsumer("api-equity-prints") + STREAM_EQUITY_PRINTS, + "api-equity-prints" ); - const flowSubscription = await subscribeJson( - js, + const flowSubscription = await subscribeWithReset( SUBJECT_FLOW_PACKETS, - buildDurableConsumer("api-flow-packets") + STREAM_FLOW_PACKETS, + "api-flow-packets" ); - const classifierHitSubscription = await subscribeJson( - js, + const classifierHitSubscription = await subscribeWithReset( SUBJECT_CLASSIFIER_HITS, - buildDurableConsumer("api-classifier-hits") + STREAM_CLASSIFIER_HITS, + "api-classifier-hits" ); - const alertSubscription = await subscribeJson( - js, + const alertSubscription = await subscribeWithReset( SUBJECT_ALERTS, - buildDurableConsumer("api-alerts") + STREAM_ALERTS, + "api-alerts" ); const pumpOptions = async () => { diff --git a/services/compute/package.json b/services/compute/package.json index d148767..7386064 100644 --- a/services/compute/package.json +++ b/services/compute/package.json @@ -11,6 +11,7 @@ "@islandflow/observability": "workspace:*", "@islandflow/storage": "workspace:*", "@islandflow/types": "workspace:*", + "redis": "^5.10.0", "zod": "^3.23.8" } } diff --git a/services/compute/src/classifiers.ts b/services/compute/src/classifiers.ts index 2fd9e56..4d188da 100644 --- a/services/compute/src/classifiers.ts +++ b/services/compute/src/classifiers.ts @@ -10,8 +10,12 @@ type ParsedContract = { export type ClassifierConfig = { sweepMinPremium: number; sweepMinCount: number; + sweepMinPremiumZ: number; spikeMinPremium: number; spikeMinSize: number; + spikeMinPremiumZ: number; + spikeMinSizeZ: number; + zMinSamples: number; }; const clamp = (value: number, min = 0, max = 1): number => { @@ -120,8 +124,14 @@ const buildSweepHit = ( const firstPrice = getNumberFeature(packet, "first_price"); const lastPrice = getNumberFeature(packet, "last_price"); const windowMs = getNumberFeature(packet, "window_ms"); + const premiumZ = getNumberFeature(packet, "total_premium_z"); + const premiumBaseline = getNumberFeature(packet, "total_premium_baseline_n"); - if (count < config.sweepMinCount || totalPremium < config.sweepMinPremium) { + const baselineReady = premiumBaseline >= config.zMinSamples; + const passesAbsolute = totalPremium >= config.sweepMinPremium; + const passesZ = baselineReady && premiumZ >= config.sweepMinPremiumZ; + + if (count < config.sweepMinCount || (!passesAbsolute && !passesZ)) { return null; } @@ -138,9 +148,19 @@ const buildSweepHit = ( if (totalPremium >= config.sweepMinPremium * 2) { confidence += 0.15; } + if (passesZ) { + confidence += 0.1; + if (premiumZ >= config.sweepMinPremiumZ + 1) { + confidence += 0.05; + } + } confidence = clamp(confidence, 0, 0.95); + const baselineNote = baselineReady + ? `Baseline premium z-score ${premiumZ.toFixed(2)} over ${Math.round(premiumBaseline)} samples.` + : "Baseline premium z-score unavailable."; + return { classifier_id: direction === "bullish" ? "large_bullish_call_sweep" : "large_bearish_put_sweep", confidence, @@ -148,7 +168,8 @@ const buildSweepHit = ( explanations: [ `Likely ${direction === "bullish" ? "call" : "put"} sweep: ${count} prints in ${Math.round(windowMs)}ms for ${packet.features.option_contract_id ?? packet.id}.`, `Premium ${formatUsd(totalPremium)} across ${Math.round(totalSize)} contracts; price ${priceTrend}.`, - `Thresholds: >=${config.sweepMinCount} prints and >=${formatUsd(config.sweepMinPremium)} premium.` + `Thresholds: >=${config.sweepMinCount} prints and >=${formatUsd(config.sweepMinPremium)} premium or z>=${config.sweepMinPremiumZ.toFixed(1)}.`, + baselineNote ] }; }; @@ -158,8 +179,19 @@ const buildSpikeHit = (packet: FlowPacket, config: ClassifierConfig): Classifier const totalPremium = getNumberFeature(packet, "total_premium"); const totalSize = getNumberFeature(packet, "total_size"); const windowMs = getNumberFeature(packet, "window_ms"); + const premiumZ = getNumberFeature(packet, "total_premium_z"); + const sizeZ = getNumberFeature(packet, "total_size_z"); + const premiumBaseline = getNumberFeature(packet, "total_premium_baseline_n"); + const sizeBaseline = getNumberFeature(packet, "total_size_baseline_n"); - if (totalSize < config.spikeMinSize || totalPremium < config.spikeMinPremium) { + const premiumBaselineReady = premiumBaseline >= config.zMinSamples; + const sizeBaselineReady = sizeBaseline >= config.zMinSamples; + const passesAbsolute = totalSize >= config.spikeMinSize && totalPremium >= config.spikeMinPremium; + const passesZ = + (premiumBaselineReady && premiumZ >= config.spikeMinPremiumZ) || + (sizeBaselineReady && sizeZ >= config.spikeMinSizeZ); + + if (!passesAbsolute && !passesZ) { return null; } @@ -173,9 +205,20 @@ const buildSpikeHit = (packet: FlowPacket, config: ClassifierConfig): Classifier if (count >= 3) { confidence += 0.1; } + if (passesZ) { + confidence += 0.1; + if (premiumZ >= config.spikeMinPremiumZ + 1 || sizeZ >= config.spikeMinSizeZ + 1) { + confidence += 0.05; + } + } confidence = clamp(confidence, 0, 0.9); + const baselineNote = + premiumBaselineReady || sizeBaselineReady + ? `Baseline z-scores: premium ${premiumZ.toFixed(2)}, size ${sizeZ.toFixed(2)}.` + : "Baseline z-scores unavailable."; + return { classifier_id: "unusual_contract_spike", confidence, @@ -183,7 +226,8 @@ const buildSpikeHit = (packet: FlowPacket, config: ClassifierConfig): Classifier explanations: [ `Unusual contract spike: ${count} prints in ${Math.round(windowMs)}ms for ${packet.features.option_contract_id ?? packet.id}.`, `Premium ${formatUsd(totalPremium)} across ${Math.round(totalSize)} contracts.`, - `Thresholds: >=${config.spikeMinSize} contracts and >=${formatUsd(config.spikeMinPremium)} premium.` + `Thresholds: >=${config.spikeMinSize} contracts and >=${formatUsd(config.spikeMinPremium)} premium or z>=${config.spikeMinPremiumZ.toFixed(1)}.`, + baselineNote ] }; }; diff --git a/services/compute/src/index.ts b/services/compute/src/index.ts index 615edbc..a38709b 100644 --- a/services/compute/src/index.ts +++ b/services/compute/src/index.ts @@ -40,6 +40,7 @@ import { } from "@islandflow/types"; import { z } from "zod"; import { evaluateClassifiers, type ClassifierConfig } from "./classifiers"; +import { createRedisClient, updateRollingStats, type RollingStatsConfig } from "./rolling-stats"; const service = "compute"; const logger = createLogger({ service }); @@ -48,7 +49,10 @@ const envSchema = z.object({ NATS_URL: z.string().default("nats://localhost:4222"), CLICKHOUSE_URL: z.string().default("http://localhost:8123"), CLICKHOUSE_DATABASE: z.string().default("default"), + REDIS_URL: z.string().default("redis://localhost:6379"), CLUSTER_WINDOW_MS: z.coerce.number().int().positive().default(500), + ROLLING_WINDOW_SIZE: z.coerce.number().int().positive().default(50), + ROLLING_TTL_SEC: z.coerce.number().int().nonnegative().default(86400), COMPUTE_DELIVER_POLICY: z.enum(["new", "all", "last", "last_per_subject"]).default("new"), COMPUTE_CONSUMER_RESET: z .preprocess((value) => { @@ -67,8 +71,12 @@ const envSchema = z.object({ NBBO_MAX_AGE_MS: z.coerce.number().int().positive().default(1000), CLASSIFIER_SWEEP_MIN_PREMIUM: z.coerce.number().positive().default(40_000), CLASSIFIER_SWEEP_MIN_COUNT: z.coerce.number().int().positive().default(3), + CLASSIFIER_SWEEP_MIN_PREMIUM_Z: z.coerce.number().nonnegative().default(2), CLASSIFIER_SPIKE_MIN_PREMIUM: z.coerce.number().positive().default(20_000), - CLASSIFIER_SPIKE_MIN_SIZE: z.coerce.number().int().positive().default(400) + CLASSIFIER_SPIKE_MIN_SIZE: z.coerce.number().int().positive().default(400), + CLASSIFIER_SPIKE_MIN_PREMIUM_Z: z.coerce.number().nonnegative().default(2.5), + CLASSIFIER_SPIKE_MIN_SIZE_Z: z.coerce.number().nonnegative().default(2), + CLASSIFIER_Z_MIN_SAMPLES: z.coerce.number().int().nonnegative().default(12) }); const env = readEnv(envSchema); @@ -76,8 +84,12 @@ const env = readEnv(envSchema); const classifierConfig: ClassifierConfig = { sweepMinPremium: env.CLASSIFIER_SWEEP_MIN_PREMIUM, sweepMinCount: env.CLASSIFIER_SWEEP_MIN_COUNT, + sweepMinPremiumZ: env.CLASSIFIER_SWEEP_MIN_PREMIUM_Z, spikeMinPremium: env.CLASSIFIER_SPIKE_MIN_PREMIUM, - spikeMinSize: env.CLASSIFIER_SPIKE_MIN_SIZE + spikeMinSize: env.CLASSIFIER_SPIKE_MIN_SIZE, + spikeMinPremiumZ: env.CLASSIFIER_SPIKE_MIN_PREMIUM_Z, + spikeMinSizeZ: env.CLASSIFIER_SPIKE_MIN_SIZE_Z, + zMinSamples: env.CLASSIFIER_Z_MIN_SAMPLES }; const retry = async ( @@ -107,6 +119,13 @@ const retry = async ( throw lastError ?? new Error(`${label} failed after retries`); }; +const roundTo = (value: number, digits = 4): number => { + if (!Number.isFinite(value)) { + return 0; + } + return Number(value.toFixed(digits)); +}; + type ClusterState = { contractId: string; startTs: number; @@ -124,6 +143,10 @@ type ClusterState = { const clusters = new Map(); const nbboCache = new Map(); +const rollingKey = (metric: string, contractId: string): string => { + return `rolling:${metric}:${contractId}`; +}; + const applyDeliverPolicy = ( opts: ReturnType, policy: typeof env.COMPUTE_DELIVER_POLICY @@ -203,16 +226,20 @@ const selectNbbo = (contractId: string, ts: number): NbboJoin => { const flushCluster = async ( clickhouse: ReturnType, js: Awaited>["js"], + redis: ReturnType, + rollingConfig: RollingStatsConfig, cluster: ClusterState ): Promise => { const joinQuality: Record = {}; const nbboJoin = selectNbbo(cluster.contractId, cluster.endTs); + const totalPremium = roundTo(cluster.totalPremium); + const features: Record = { option_contract_id: cluster.contractId, count: cluster.members.length, total_size: cluster.totalSize, - total_premium: Number(cluster.totalPremium.toFixed(4)), + total_premium: totalPremium, first_price: cluster.firstPrice, last_price: cluster.lastPrice, start_ts: cluster.startTs, @@ -220,6 +247,34 @@ const flushCluster = async ( window_ms: env.CLUSTER_WINDOW_MS }; + const addRollingSnapshot = async ( + metric: string, + value: number, + prefix: string + ): Promise => { + try { + const snapshot = await updateRollingStats( + redis, + rollingKey(metric, cluster.contractId), + value, + rollingConfig + ); + features[`${prefix}_mean`] = roundTo(snapshot.mean); + features[`${prefix}_std`] = roundTo(snapshot.stddev); + features[`${prefix}_z`] = roundTo(snapshot.zscore); + features[`${prefix}_baseline_n`] = snapshot.baselineCount; + } catch (error) { + logger.warn("rolling stats update failed", { + metric, + contract: cluster.contractId, + error: error instanceof Error ? error.message : String(error) + }); + } + }; + + await addRollingSnapshot("premium", totalPremium, "total_premium"); + await addRollingSnapshot("size", cluster.totalSize, "total_size"); + if (!nbboJoin.nbbo) { joinQuality.nbbo_missing = 1; } else { @@ -228,12 +283,14 @@ const flushCluster = async ( joinQuality.nbbo_stale = 1; } else { const mid = (nbboJoin.nbbo.bid + nbboJoin.nbbo.ask) / 2; + const spread = nbboJoin.nbbo.ask - nbboJoin.nbbo.bid; features.nbbo_bid = nbboJoin.nbbo.bid; features.nbbo_ask = nbboJoin.nbbo.ask; - features.nbbo_mid = Number(mid.toFixed(4)); - features.nbbo_spread = Number((nbboJoin.nbbo.ask - nbboJoin.nbbo.bid).toFixed(4)); + features.nbbo_mid = roundTo(mid); + features.nbbo_spread = roundTo(spread); features.nbbo_bid_size = nbboJoin.nbbo.bidSize; features.nbbo_ask_size = nbboJoin.nbbo.askSize; + await addRollingSnapshot("spread", roundTo(spread), "nbbo_spread"); } } @@ -338,6 +395,8 @@ const emitClassifiers = async ( const flushEligibleClusters = async ( clickhouse: ReturnType, js: Awaited>["js"], + redis: ReturnType, + rollingConfig: RollingStatsConfig, currentTs: number, skipContractId: string ): Promise => { @@ -348,7 +407,7 @@ const flushEligibleClusters = async ( if (currentTs - cluster.endTs > env.CLUSTER_WINDOW_MS) { clusters.delete(contractId); - await flushCluster(clickhouse, js, cluster); + await flushCluster(clickhouse, js, redis, rollingConfig, cluster); } } }; @@ -434,6 +493,20 @@ const run = async () => { database: env.CLICKHOUSE_DATABASE }); + const redis = createRedisClient(env.REDIS_URL); + redis.on("error", (error) => { + logger.warn("redis client error", { error: error instanceof Error ? error.message : String(error) }); + }); + + await retry("redis connect", 20, 500, async () => { + await redis.connect(); + }); + + const rollingConfig: RollingStatsConfig = { + windowSize: env.ROLLING_WINDOW_SIZE, + ttlSeconds: env.ROLLING_TTL_SEC + }; + await retry("clickhouse table init", 20, 500, async () => { await ensureFlowPacketsTable(clickhouse); await ensureClassifierHitsTable(clickhouse); @@ -594,12 +667,13 @@ const run = async () => { logger.info("service stopping", { signal }); for (const cluster of clusters.values()) { - await flushCluster(clickhouse, js, cluster); + await flushCluster(clickhouse, js, redis, rollingConfig, cluster); } clusters.clear(); await nc.drain(); await clickhouse.close(); + await redis.quit(); process.exit(0); }; @@ -609,7 +683,14 @@ const run = async () => { for await (const msg of subscription.messages) { try { const print = OptionPrintSchema.parse(subscription.decode(msg)); - await flushEligibleClusters(clickhouse, js, print.ts, print.option_contract_id); + await flushEligibleClusters( + clickhouse, + js, + redis, + rollingConfig, + print.ts, + print.option_contract_id + ); const existing = clusters.get(print.option_contract_id); if (!existing) { @@ -618,7 +699,7 @@ const run = async () => { updateCluster(existing, print); } else { clusters.delete(print.option_contract_id); - await flushCluster(clickhouse, js, existing); + await flushCluster(clickhouse, js, redis, rollingConfig, existing); clusters.set(print.option_contract_id, buildCluster(print)); } diff --git a/services/compute/src/rolling-stats.ts b/services/compute/src/rolling-stats.ts new file mode 100644 index 0000000..63c6caa --- /dev/null +++ b/services/compute/src/rolling-stats.ts @@ -0,0 +1,74 @@ +import { createClient } from "redis"; + +export type RollingStatsConfig = { + windowSize: number; + ttlSeconds: number; +}; + +export type RollingSnapshot = { + baselineCount: number; + mean: number; + stddev: number; + zscore: number; +}; + +const toNumbers = (values: string[]): number[] => { + return values + .map((value) => Number(value)) + .filter((value) => Number.isFinite(value)); +}; + +export const computeStats = (values: number[]): { mean: number; stddev: number; count: number } => { + const count = values.length; + if (count === 0) { + return { mean: 0, stddev: 0, count: 0 }; + } + + const mean = values.reduce((sum, value) => sum + value, 0) / count; + const variance = + values.reduce((sum, value) => { + const delta = value - mean; + return sum + delta * delta; + }, 0) / count; + + return { mean, stddev: Math.sqrt(variance), count }; +}; + +export const computeSnapshot = (baseline: number[], value: number): RollingSnapshot => { + const stats = computeStats(baseline); + const zscore = stats.stddev === 0 ? 0 : (value - stats.mean) / stats.stddev; + return { + baselineCount: stats.count, + mean: stats.mean, + stddev: stats.stddev, + zscore + }; +}; + +export const createRedisClient = (url: string) => { + return createClient({ url }); +}; + +export const updateRollingStats = async ( + client: ReturnType, + key: string, + value: number, + config: RollingStatsConfig +): Promise => { + const limit = Math.max(0, config.windowSize - 1); + const existing = await client.lRange(key, 0, limit); + const baseline = toNumbers(existing); + const snapshot = computeSnapshot(baseline, value); + + const multi = client.multi(); + multi.lPush(key, value.toString()); + if (config.windowSize > 0) { + multi.lTrim(key, 0, config.windowSize - 1); + } + if (config.ttlSeconds > 0) { + multi.expire(key, config.ttlSeconds); + } + await multi.exec(); + + return snapshot; +}; diff --git a/services/compute/tests/classifiers.test.ts b/services/compute/tests/classifiers.test.ts new file mode 100644 index 0000000..12dfd11 --- /dev/null +++ b/services/compute/tests/classifiers.test.ts @@ -0,0 +1,69 @@ +import { describe, expect, test } from "bun:test"; +import type { FlowPacket } from "@islandflow/types"; +import { evaluateClassifiers, type ClassifierConfig } from "../src/classifiers"; + +const baseConfig: ClassifierConfig = { + sweepMinPremium: 40_000, + sweepMinCount: 3, + sweepMinPremiumZ: 2, + spikeMinPremium: 20_000, + spikeMinSize: 400, + spikeMinPremiumZ: 2.5, + spikeMinSizeZ: 2, + zMinSamples: 12 +}; + +const buildPacket = ( + overrides: Record +): FlowPacket => { + return { + source_ts: 1, + ingest_ts: 1, + seq: 1, + trace_id: "trace", + id: "packet", + members: ["m1"], + features: { + option_contract_id: "SPY-2025-01-17-450-C", + count: 3, + total_premium: 1000, + total_size: 20, + first_price: 1, + last_price: 1.01, + window_ms: 500, + ...overrides + }, + join_quality: {} + }; +}; + +describe("classifier z-score behavior", () => { + test("spike hit triggers on z-score even when absolute thresholds fail", () => { + const packet = buildPacket({ + total_premium_z: 3.2, + total_premium_baseline_n: 20, + total_size_z: 0.4, + total_size_baseline_n: 20 + }); + const hits = evaluateClassifiers(packet, baseConfig); + expect(hits.some((hit) => hit.classifier_id === "unusual_contract_spike")).toBe(true); + }); + + test("sweep hit triggers on premium z-score when baseline is ready", () => { + const packet = buildPacket({ + total_premium_z: 2.4, + total_premium_baseline_n: 20 + }); + const hits = evaluateClassifiers(packet, baseConfig); + expect(hits.some((hit) => hit.classifier_id === "large_bullish_call_sweep")).toBe(true); + }); + + test("sweep hit does not trigger when baseline is insufficient", () => { + const packet = buildPacket({ + total_premium_z: 3, + total_premium_baseline_n: 4 + }); + const hits = evaluateClassifiers(packet, baseConfig); + expect(hits.some((hit) => hit.classifier_id === "large_bullish_call_sweep")).toBe(false); + }); +}); diff --git a/services/compute/tests/rolling-stats.test.ts b/services/compute/tests/rolling-stats.test.ts new file mode 100644 index 0000000..555d77c --- /dev/null +++ b/services/compute/tests/rolling-stats.test.ts @@ -0,0 +1,24 @@ +import { describe, expect, test } from "bun:test"; +import { computeSnapshot, computeStats } from "../src/rolling-stats"; + +describe("rolling stats helpers", () => { + test("computeStats handles empty baseline", () => { + const stats = computeStats([]); + expect(stats.count).toBe(0); + expect(stats.mean).toBe(0); + expect(stats.stddev).toBe(0); + }); + + test("computeStats calculates mean and stddev", () => { + const stats = computeStats([10, 12, 14]); + expect(stats.count).toBe(3); + expect(stats.mean).toBe(12); + expect(stats.stddev).toBeCloseTo(1.633, 3); + }); + + test("computeSnapshot calculates z-score against baseline", () => { + const snapshot = computeSnapshot([10, 12, 14], 15); + expect(snapshot.baselineCount).toBe(3); + expect(snapshot.zscore).toBeCloseTo(1.84, 2); + }); +});