Add Redis rolling stats for flow packets and z-score driven classifiers

This commit is contained in:
dirtydishes 2025-12-30 13:24:48 -05:00
parent fc7065792f
commit 163ab1039e
10 changed files with 389 additions and 32 deletions

View file

@ -10,8 +10,12 @@ type ParsedContract = {
export type ClassifierConfig = {
sweepMinPremium: number;
sweepMinCount: number;
sweepMinPremiumZ: number;
spikeMinPremium: number;
spikeMinSize: number;
spikeMinPremiumZ: number;
spikeMinSizeZ: number;
zMinSamples: number;
};
const clamp = (value: number, min = 0, max = 1): number => {
@ -120,8 +124,14 @@ const buildSweepHit = (
const firstPrice = getNumberFeature(packet, "first_price");
const lastPrice = getNumberFeature(packet, "last_price");
const windowMs = getNumberFeature(packet, "window_ms");
const premiumZ = getNumberFeature(packet, "total_premium_z");
const premiumBaseline = getNumberFeature(packet, "total_premium_baseline_n");
if (count < config.sweepMinCount || totalPremium < config.sweepMinPremium) {
const baselineReady = premiumBaseline >= config.zMinSamples;
const passesAbsolute = totalPremium >= config.sweepMinPremium;
const passesZ = baselineReady && premiumZ >= config.sweepMinPremiumZ;
if (count < config.sweepMinCount || (!passesAbsolute && !passesZ)) {
return null;
}
@ -138,9 +148,19 @@ const buildSweepHit = (
if (totalPremium >= config.sweepMinPremium * 2) {
confidence += 0.15;
}
if (passesZ) {
confidence += 0.1;
if (premiumZ >= config.sweepMinPremiumZ + 1) {
confidence += 0.05;
}
}
confidence = clamp(confidence, 0, 0.95);
const baselineNote = baselineReady
? `Baseline premium z-score ${premiumZ.toFixed(2)} over ${Math.round(premiumBaseline)} samples.`
: "Baseline premium z-score unavailable.";
return {
classifier_id: direction === "bullish" ? "large_bullish_call_sweep" : "large_bearish_put_sweep",
confidence,
@ -148,7 +168,8 @@ const buildSweepHit = (
explanations: [
`Likely ${direction === "bullish" ? "call" : "put"} sweep: ${count} prints in ${Math.round(windowMs)}ms for ${packet.features.option_contract_id ?? packet.id}.`,
`Premium ${formatUsd(totalPremium)} across ${Math.round(totalSize)} contracts; price ${priceTrend}.`,
`Thresholds: >=${config.sweepMinCount} prints and >=${formatUsd(config.sweepMinPremium)} premium.`
`Thresholds: >=${config.sweepMinCount} prints and >=${formatUsd(config.sweepMinPremium)} premium or z>=${config.sweepMinPremiumZ.toFixed(1)}.`,
baselineNote
]
};
};
@ -158,8 +179,19 @@ const buildSpikeHit = (packet: FlowPacket, config: ClassifierConfig): Classifier
const totalPremium = getNumberFeature(packet, "total_premium");
const totalSize = getNumberFeature(packet, "total_size");
const windowMs = getNumberFeature(packet, "window_ms");
const premiumZ = getNumberFeature(packet, "total_premium_z");
const sizeZ = getNumberFeature(packet, "total_size_z");
const premiumBaseline = getNumberFeature(packet, "total_premium_baseline_n");
const sizeBaseline = getNumberFeature(packet, "total_size_baseline_n");
if (totalSize < config.spikeMinSize || totalPremium < config.spikeMinPremium) {
const premiumBaselineReady = premiumBaseline >= config.zMinSamples;
const sizeBaselineReady = sizeBaseline >= config.zMinSamples;
const passesAbsolute = totalSize >= config.spikeMinSize && totalPremium >= config.spikeMinPremium;
const passesZ =
(premiumBaselineReady && premiumZ >= config.spikeMinPremiumZ) ||
(sizeBaselineReady && sizeZ >= config.spikeMinSizeZ);
if (!passesAbsolute && !passesZ) {
return null;
}
@ -173,9 +205,20 @@ const buildSpikeHit = (packet: FlowPacket, config: ClassifierConfig): Classifier
if (count >= 3) {
confidence += 0.1;
}
if (passesZ) {
confidence += 0.1;
if (premiumZ >= config.spikeMinPremiumZ + 1 || sizeZ >= config.spikeMinSizeZ + 1) {
confidence += 0.05;
}
}
confidence = clamp(confidence, 0, 0.9);
const baselineNote =
premiumBaselineReady || sizeBaselineReady
? `Baseline z-scores: premium ${premiumZ.toFixed(2)}, size ${sizeZ.toFixed(2)}.`
: "Baseline z-scores unavailable.";
return {
classifier_id: "unusual_contract_spike",
confidence,
@ -183,7 +226,8 @@ const buildSpikeHit = (packet: FlowPacket, config: ClassifierConfig): Classifier
explanations: [
`Unusual contract spike: ${count} prints in ${Math.round(windowMs)}ms for ${packet.features.option_contract_id ?? packet.id}.`,
`Premium ${formatUsd(totalPremium)} across ${Math.round(totalSize)} contracts.`,
`Thresholds: >=${config.spikeMinSize} contracts and >=${formatUsd(config.spikeMinPremium)} premium.`
`Thresholds: >=${config.spikeMinSize} contracts and >=${formatUsd(config.spikeMinPremium)} premium or z>=${config.spikeMinPremiumZ.toFixed(1)}.`,
baselineNote
]
};
};

View file

@ -40,6 +40,7 @@ import {
} from "@islandflow/types";
import { z } from "zod";
import { evaluateClassifiers, type ClassifierConfig } from "./classifiers";
import { createRedisClient, updateRollingStats, type RollingStatsConfig } from "./rolling-stats";
const service = "compute";
const logger = createLogger({ service });
@ -48,7 +49,10 @@ const envSchema = z.object({
NATS_URL: z.string().default("nats://localhost:4222"),
CLICKHOUSE_URL: z.string().default("http://localhost:8123"),
CLICKHOUSE_DATABASE: z.string().default("default"),
REDIS_URL: z.string().default("redis://localhost:6379"),
CLUSTER_WINDOW_MS: z.coerce.number().int().positive().default(500),
ROLLING_WINDOW_SIZE: z.coerce.number().int().positive().default(50),
ROLLING_TTL_SEC: z.coerce.number().int().nonnegative().default(86400),
COMPUTE_DELIVER_POLICY: z.enum(["new", "all", "last", "last_per_subject"]).default("new"),
COMPUTE_CONSUMER_RESET: z
.preprocess((value) => {
@ -67,8 +71,12 @@ const envSchema = z.object({
NBBO_MAX_AGE_MS: z.coerce.number().int().positive().default(1000),
CLASSIFIER_SWEEP_MIN_PREMIUM: z.coerce.number().positive().default(40_000),
CLASSIFIER_SWEEP_MIN_COUNT: z.coerce.number().int().positive().default(3),
CLASSIFIER_SWEEP_MIN_PREMIUM_Z: z.coerce.number().nonnegative().default(2),
CLASSIFIER_SPIKE_MIN_PREMIUM: z.coerce.number().positive().default(20_000),
CLASSIFIER_SPIKE_MIN_SIZE: z.coerce.number().int().positive().default(400)
CLASSIFIER_SPIKE_MIN_SIZE: z.coerce.number().int().positive().default(400),
CLASSIFIER_SPIKE_MIN_PREMIUM_Z: z.coerce.number().nonnegative().default(2.5),
CLASSIFIER_SPIKE_MIN_SIZE_Z: z.coerce.number().nonnegative().default(2),
CLASSIFIER_Z_MIN_SAMPLES: z.coerce.number().int().nonnegative().default(12)
});
const env = readEnv(envSchema);
@ -76,8 +84,12 @@ const env = readEnv(envSchema);
const classifierConfig: ClassifierConfig = {
sweepMinPremium: env.CLASSIFIER_SWEEP_MIN_PREMIUM,
sweepMinCount: env.CLASSIFIER_SWEEP_MIN_COUNT,
sweepMinPremiumZ: env.CLASSIFIER_SWEEP_MIN_PREMIUM_Z,
spikeMinPremium: env.CLASSIFIER_SPIKE_MIN_PREMIUM,
spikeMinSize: env.CLASSIFIER_SPIKE_MIN_SIZE
spikeMinSize: env.CLASSIFIER_SPIKE_MIN_SIZE,
spikeMinPremiumZ: env.CLASSIFIER_SPIKE_MIN_PREMIUM_Z,
spikeMinSizeZ: env.CLASSIFIER_SPIKE_MIN_SIZE_Z,
zMinSamples: env.CLASSIFIER_Z_MIN_SAMPLES
};
const retry = async <T>(
@ -107,6 +119,13 @@ const retry = async <T>(
throw lastError ?? new Error(`${label} failed after retries`);
};
const roundTo = (value: number, digits = 4): number => {
if (!Number.isFinite(value)) {
return 0;
}
return Number(value.toFixed(digits));
};
type ClusterState = {
contractId: string;
startTs: number;
@ -124,6 +143,10 @@ type ClusterState = {
const clusters = new Map<string, ClusterState>();
const nbboCache = new Map<string, OptionNBBO>();
const rollingKey = (metric: string, contractId: string): string => {
return `rolling:${metric}:${contractId}`;
};
const applyDeliverPolicy = (
opts: ReturnType<typeof buildDurableConsumer>,
policy: typeof env.COMPUTE_DELIVER_POLICY
@ -203,16 +226,20 @@ const selectNbbo = (contractId: string, ts: number): NbboJoin => {
const flushCluster = async (
clickhouse: ReturnType<typeof createClickHouseClient>,
js: Awaited<ReturnType<typeof connectJetStreamWithRetry>>["js"],
redis: ReturnType<typeof createRedisClient>,
rollingConfig: RollingStatsConfig,
cluster: ClusterState
): Promise<void> => {
const joinQuality: Record<string, number> = {};
const nbboJoin = selectNbbo(cluster.contractId, cluster.endTs);
const totalPremium = roundTo(cluster.totalPremium);
const features: Record<string, string | number | boolean> = {
option_contract_id: cluster.contractId,
count: cluster.members.length,
total_size: cluster.totalSize,
total_premium: Number(cluster.totalPremium.toFixed(4)),
total_premium: totalPremium,
first_price: cluster.firstPrice,
last_price: cluster.lastPrice,
start_ts: cluster.startTs,
@ -220,6 +247,34 @@ const flushCluster = async (
window_ms: env.CLUSTER_WINDOW_MS
};
const addRollingSnapshot = async (
metric: string,
value: number,
prefix: string
): Promise<void> => {
try {
const snapshot = await updateRollingStats(
redis,
rollingKey(metric, cluster.contractId),
value,
rollingConfig
);
features[`${prefix}_mean`] = roundTo(snapshot.mean);
features[`${prefix}_std`] = roundTo(snapshot.stddev);
features[`${prefix}_z`] = roundTo(snapshot.zscore);
features[`${prefix}_baseline_n`] = snapshot.baselineCount;
} catch (error) {
logger.warn("rolling stats update failed", {
metric,
contract: cluster.contractId,
error: error instanceof Error ? error.message : String(error)
});
}
};
await addRollingSnapshot("premium", totalPremium, "total_premium");
await addRollingSnapshot("size", cluster.totalSize, "total_size");
if (!nbboJoin.nbbo) {
joinQuality.nbbo_missing = 1;
} else {
@ -228,12 +283,14 @@ const flushCluster = async (
joinQuality.nbbo_stale = 1;
} else {
const mid = (nbboJoin.nbbo.bid + nbboJoin.nbbo.ask) / 2;
const spread = nbboJoin.nbbo.ask - nbboJoin.nbbo.bid;
features.nbbo_bid = nbboJoin.nbbo.bid;
features.nbbo_ask = nbboJoin.nbbo.ask;
features.nbbo_mid = Number(mid.toFixed(4));
features.nbbo_spread = Number((nbboJoin.nbbo.ask - nbboJoin.nbbo.bid).toFixed(4));
features.nbbo_mid = roundTo(mid);
features.nbbo_spread = roundTo(spread);
features.nbbo_bid_size = nbboJoin.nbbo.bidSize;
features.nbbo_ask_size = nbboJoin.nbbo.askSize;
await addRollingSnapshot("spread", roundTo(spread), "nbbo_spread");
}
}
@ -338,6 +395,8 @@ const emitClassifiers = async (
const flushEligibleClusters = async (
clickhouse: ReturnType<typeof createClickHouseClient>,
js: Awaited<ReturnType<typeof connectJetStreamWithRetry>>["js"],
redis: ReturnType<typeof createRedisClient>,
rollingConfig: RollingStatsConfig,
currentTs: number,
skipContractId: string
): Promise<void> => {
@ -348,7 +407,7 @@ const flushEligibleClusters = async (
if (currentTs - cluster.endTs > env.CLUSTER_WINDOW_MS) {
clusters.delete(contractId);
await flushCluster(clickhouse, js, cluster);
await flushCluster(clickhouse, js, redis, rollingConfig, cluster);
}
}
};
@ -434,6 +493,20 @@ const run = async () => {
database: env.CLICKHOUSE_DATABASE
});
const redis = createRedisClient(env.REDIS_URL);
redis.on("error", (error) => {
logger.warn("redis client error", { error: error instanceof Error ? error.message : String(error) });
});
await retry("redis connect", 20, 500, async () => {
await redis.connect();
});
const rollingConfig: RollingStatsConfig = {
windowSize: env.ROLLING_WINDOW_SIZE,
ttlSeconds: env.ROLLING_TTL_SEC
};
await retry("clickhouse table init", 20, 500, async () => {
await ensureFlowPacketsTable(clickhouse);
await ensureClassifierHitsTable(clickhouse);
@ -594,12 +667,13 @@ const run = async () => {
logger.info("service stopping", { signal });
for (const cluster of clusters.values()) {
await flushCluster(clickhouse, js, cluster);
await flushCluster(clickhouse, js, redis, rollingConfig, cluster);
}
clusters.clear();
await nc.drain();
await clickhouse.close();
await redis.quit();
process.exit(0);
};
@ -609,7 +683,14 @@ const run = async () => {
for await (const msg of subscription.messages) {
try {
const print = OptionPrintSchema.parse(subscription.decode(msg));
await flushEligibleClusters(clickhouse, js, print.ts, print.option_contract_id);
await flushEligibleClusters(
clickhouse,
js,
redis,
rollingConfig,
print.ts,
print.option_contract_id
);
const existing = clusters.get(print.option_contract_id);
if (!existing) {
@ -618,7 +699,7 @@ const run = async () => {
updateCluster(existing, print);
} else {
clusters.delete(print.option_contract_id);
await flushCluster(clickhouse, js, existing);
await flushCluster(clickhouse, js, redis, rollingConfig, existing);
clusters.set(print.option_contract_id, buildCluster(print));
}

View file

@ -0,0 +1,74 @@
import { createClient } from "redis";
export type RollingStatsConfig = {
windowSize: number;
ttlSeconds: number;
};
export type RollingSnapshot = {
baselineCount: number;
mean: number;
stddev: number;
zscore: number;
};
const toNumbers = (values: string[]): number[] => {
return values
.map((value) => Number(value))
.filter((value) => Number.isFinite(value));
};
export const computeStats = (values: number[]): { mean: number; stddev: number; count: number } => {
const count = values.length;
if (count === 0) {
return { mean: 0, stddev: 0, count: 0 };
}
const mean = values.reduce((sum, value) => sum + value, 0) / count;
const variance =
values.reduce((sum, value) => {
const delta = value - mean;
return sum + delta * delta;
}, 0) / count;
return { mean, stddev: Math.sqrt(variance), count };
};
export const computeSnapshot = (baseline: number[], value: number): RollingSnapshot => {
const stats = computeStats(baseline);
const zscore = stats.stddev === 0 ? 0 : (value - stats.mean) / stats.stddev;
return {
baselineCount: stats.count,
mean: stats.mean,
stddev: stats.stddev,
zscore
};
};
export const createRedisClient = (url: string) => {
return createClient({ url });
};
export const updateRollingStats = async (
client: ReturnType<typeof createClient>,
key: string,
value: number,
config: RollingStatsConfig
): Promise<RollingSnapshot> => {
const limit = Math.max(0, config.windowSize - 1);
const existing = await client.lRange(key, 0, limit);
const baseline = toNumbers(existing);
const snapshot = computeSnapshot(baseline, value);
const multi = client.multi();
multi.lPush(key, value.toString());
if (config.windowSize > 0) {
multi.lTrim(key, 0, config.windowSize - 1);
}
if (config.ttlSeconds > 0) {
multi.expire(key, config.ttlSeconds);
}
await multi.exec();
return snapshot;
};