diff --git a/.env.example b/.env.example
index 0feb4d5..a7183d0 100644
--- a/.env.example
+++ b/.env.example
@@ -65,3 +65,5 @@ CLASSIFIER_SPIKE_MIN_SIZE=400
CLASSIFIER_SPIKE_MIN_PREMIUM_Z=2.5
CLASSIFIER_SPIKE_MIN_SIZE_Z=2
CLASSIFIER_Z_MIN_SAMPLES=12
+CLASSIFIER_MIN_NBBO_COVERAGE=0.5
+CLASSIFIER_MIN_AGGRESSOR_RATIO=0.55
diff --git a/README.md b/README.md
index 766ef7f..7d5cabc 100644
--- a/README.md
+++ b/README.md
@@ -15,6 +15,7 @@ Done now (in repo):
- Deterministic option FlowPacket clustering (time window) + persistence
- Rolling stats in Redis (premium/size/spread) with z-score features on FlowPackets
- FlowPacket structure tags (vertical/ladder/straddle/strangle) for multi-leg bursts
+- Aggressor mix features (NBBO placement ratios) on FlowPackets
- Rule-first classifiers + alert scoring with ClickHouse persistence + WS/REST endpoints
- API: REST for prints/flow packets/classifier hits/alerts, WS for live options/equities/flow/alerts/hits, replay endpoints
- UI: live tapes for options/equities/flow + replay toggle + pause controls + replay time/completion
@@ -47,6 +48,7 @@ Not started:
- Deterministic option FlowPacket clustering (time-window)
- Rolling stats baselines in Redis with z-score features on FlowPackets
- Basic multi-leg structure tagging on FlowPackets
+- Aggressor mix features from NBBO placement on FlowPackets
- Classifiers + alert scoring (rule-first) with WS/REST endpoints
- API gateway with REST, WS, and replay endpoints
- UI tapes for options/equities/flow packets + alerts/hits with live/replay toggle and pause controls
@@ -110,6 +112,7 @@ Adapter selection (env):
- Compute: `COMPUTE_DELIVER_POLICY` (`new` default), `COMPUTE_CONSUMER_RESET` (force skip backlog)
- Rolling stats: `REDIS_URL`, `ROLLING_WINDOW_SIZE`, `ROLLING_TTL_SEC`
- Classifier tuning: `CLASSIFIER_SWEEP_MIN_PREMIUM_Z`, `CLASSIFIER_SPIKE_MIN_PREMIUM_Z`, `CLASSIFIER_SPIKE_MIN_SIZE_Z`, `CLASSIFIER_Z_MIN_SAMPLES`
+- Aggressor gating: `CLASSIFIER_MIN_NBBO_COVERAGE`, `CLASSIFIER_MIN_AGGRESSOR_RATIO`
Testing mode (throttles ingest to reduce CPU):
- `TESTING_MODE=true` enables throttling
diff --git a/apps/web/app/globals.css b/apps/web/app/globals.css
index 9489ab1..b09ce01 100644
--- a/apps/web/app/globals.css
+++ b/apps/web/app/globals.css
@@ -432,6 +432,12 @@ h1 {
background: rgba(39, 84, 138, 0.12);
}
+.aggressor-tag {
+ border-color: rgba(93, 70, 144, 0.45);
+ color: #5d4690;
+ background: rgba(93, 70, 144, 0.12);
+}
+
.nbbo-meta {
font-size: 0.72rem;
color: #6f5b39;
diff --git a/apps/web/app/page.tsx b/apps/web/app/page.tsx
index df874a7..3b398dd 100644
--- a/apps/web/app/page.tsx
+++ b/apps/web/app/page.tsx
@@ -179,6 +179,8 @@ const formatTime = (ts: number): string => {
const formatConfidence = (value: number): string => `${Math.round(value * 100)}%`;
+const formatPct = (value: number): string => `${Math.round(value * 100)}%`;
+
const formatUsd = (value: number): string => {
if (!Number.isFinite(value)) {
return "0.00";
@@ -1790,10 +1792,20 @@ export default function HomePage() {
const structureRights =
typeof features.structure_rights === "string" ? features.structure_rights : "";
const structureStrikes = parseNumber(features.structure_strikes, 0);
- const nbboBid = parseNumber(features.nbbo_bid, Number.NaN);
- const nbboAsk = parseNumber(features.nbbo_ask, Number.NaN);
- const nbboMid = parseNumber(features.nbbo_mid, Number.NaN);
- const nbboSpread = parseNumber(features.nbbo_spread, Number.NaN);
+ const nbboBid = parseNumber(features.nbbo_bid, Number.NaN);
+ const nbboAsk = parseNumber(features.nbbo_ask, Number.NaN);
+ const nbboMid = parseNumber(features.nbbo_mid, Number.NaN);
+ const nbboSpread = parseNumber(features.nbbo_spread, Number.NaN);
+ const aggressiveBuyRatio = parseNumber(
+ features.nbbo_aggressive_buy_ratio,
+ Number.NaN
+ );
+ const aggressiveSellRatio = parseNumber(
+ features.nbbo_aggressive_sell_ratio,
+ Number.NaN
+ );
+ const aggressiveCoverage = parseNumber(features.nbbo_coverage_ratio, Number.NaN);
+ const insideRatio = parseNumber(features.nbbo_inside_ratio, Number.NaN);
const nbboAge = parseNumber(packet.join_quality.nbbo_age_ms, Number.NaN);
const nbboStale = parseNumber(packet.join_quality.nbbo_stale, 0) > 0;
const nbboMissing = parseNumber(packet.join_quality.nbbo_missing, 0) > 0;
@@ -1818,6 +1830,15 @@ export default function HomePage() {
{structureStrikes > 0 ? ` ${structureStrikes}K` : ""}
) : null}
+ {Number.isFinite(aggressiveCoverage) && aggressiveCoverage > 0 ? (
+
+ Agg {formatPct(aggressiveBuyRatio)} / {formatPct(aggressiveSellRatio)}
+ {Number.isFinite(insideRatio) && insideRatio > 0
+ ? ` · In ${formatPct(insideRatio)}`
+ : ""}
+ {` · ${formatPct(aggressiveCoverage)} cov`}
+
+ ) : null}
{Number.isFinite(nbboBid) && Number.isFinite(nbboAsk) ? (
NBBO ${formatPrice(nbboBid)} x ${formatPrice(nbboAsk)}
diff --git a/services/compute/src/classifiers.ts b/services/compute/src/classifiers.ts
index 7ab61e1..5c9bbff 100644
--- a/services/compute/src/classifiers.ts
+++ b/services/compute/src/classifiers.ts
@@ -10,6 +10,8 @@ export type ClassifierConfig = {
spikeMinPremiumZ: number;
spikeMinSizeZ: number;
zMinSamples: number;
+ minNbboCoverage: number;
+ minAggressorRatio: number;
};
const clamp = (value: number, min = 0, max = 1): number => {
@@ -31,6 +33,34 @@ const getNumberFeature = (packet: FlowPacket, key: string): number => {
return typeof value === "number" && Number.isFinite(value) ? value : 0;
};
+const formatPct = (value: number): string => `${Math.round(value * 100)}%`;
+
+const applyAggressorAdjustment = (
+ confidence: number,
+ coverage: number,
+ aggressiveRatio: number,
+ config: ClassifierConfig
+): { confidence: number; note: string } => {
+ if (!Number.isFinite(coverage) || coverage <= 0) {
+ return { confidence, note: "Aggressor mix unavailable (no NBBO coverage)." };
+ }
+
+ let adjusted = confidence;
+ if (coverage >= config.minNbboCoverage) {
+ if (aggressiveRatio >= config.minAggressorRatio) {
+ adjusted += 0.05;
+ } else {
+ adjusted -= 0.1;
+ }
+ }
+
+ const note = `Aggressor mix ${formatPct(aggressiveRatio)} aggressive, NBBO coverage ${formatPct(
+ coverage
+ )}.`;
+
+ return { confidence: adjusted, note };
+};
+
const buildSweepHit = (
packet: FlowPacket,
contract: ParsedContract,
@@ -45,6 +75,10 @@ const buildSweepHit = (
const windowMs = getNumberFeature(packet, "window_ms");
const premiumZ = getNumberFeature(packet, "total_premium_z");
const premiumBaseline = getNumberFeature(packet, "total_premium_baseline_n");
+ const coverage = getNumberFeature(packet, "nbbo_coverage_ratio");
+ const aggressiveBuyRatio = getNumberFeature(packet, "nbbo_aggressive_buy_ratio");
+ const aggressiveSellRatio = getNumberFeature(packet, "nbbo_aggressive_sell_ratio");
+ const aggressiveRatio = Math.max(aggressiveBuyRatio, aggressiveSellRatio);
const baselineReady = premiumBaseline >= config.zMinSamples;
const passesAbsolute = totalPremium >= config.sweepMinPremium;
@@ -74,7 +108,8 @@ const buildSweepHit = (
}
}
- confidence = clamp(confidence, 0, 0.95);
+ const aggressor = applyAggressorAdjustment(confidence, coverage, aggressiveRatio, config);
+ confidence = clamp(aggressor.confidence, 0, 0.95);
const baselineNote = baselineReady
? `Baseline premium z-score ${premiumZ.toFixed(2)} over ${Math.round(premiumBaseline)} samples.`
@@ -88,7 +123,8 @@ const buildSweepHit = (
`Likely ${direction === "bullish" ? "call" : "put"} sweep: ${count} prints in ${Math.round(windowMs)}ms for ${packet.features.option_contract_id ?? packet.id}.`,
`Premium ${formatUsd(totalPremium)} across ${Math.round(totalSize)} contracts; price ${priceTrend}.`,
`Thresholds: >=${config.sweepMinCount} prints and >=${formatUsd(config.sweepMinPremium)} premium or z>=${config.sweepMinPremiumZ.toFixed(1)}.`,
- baselineNote
+ baselineNote,
+ aggressor.note
]
};
};
@@ -102,6 +138,10 @@ const buildSpikeHit = (packet: FlowPacket, config: ClassifierConfig): Classifier
const sizeZ = getNumberFeature(packet, "total_size_z");
const premiumBaseline = getNumberFeature(packet, "total_premium_baseline_n");
const sizeBaseline = getNumberFeature(packet, "total_size_baseline_n");
+ const coverage = getNumberFeature(packet, "nbbo_coverage_ratio");
+ const aggressiveBuyRatio = getNumberFeature(packet, "nbbo_aggressive_buy_ratio");
+ const aggressiveSellRatio = getNumberFeature(packet, "nbbo_aggressive_sell_ratio");
+ const aggressiveRatio = Math.max(aggressiveBuyRatio, aggressiveSellRatio);
const premiumBaselineReady = premiumBaseline >= config.zMinSamples;
const sizeBaselineReady = sizeBaseline >= config.zMinSamples;
@@ -131,7 +171,8 @@ const buildSpikeHit = (packet: FlowPacket, config: ClassifierConfig): Classifier
}
}
- confidence = clamp(confidence, 0, 0.9);
+ const aggressor = applyAggressorAdjustment(confidence, coverage, aggressiveRatio, config);
+ confidence = clamp(aggressor.confidence, 0, 0.9);
const baselineNote =
premiumBaselineReady || sizeBaselineReady
@@ -146,7 +187,8 @@ const buildSpikeHit = (packet: FlowPacket, config: ClassifierConfig): Classifier
`Unusual contract spike: ${count} prints in ${Math.round(windowMs)}ms for ${packet.features.option_contract_id ?? packet.id}.`,
`Premium ${formatUsd(totalPremium)} across ${Math.round(totalSize)} contracts.`,
`Thresholds: >=${config.spikeMinSize} contracts and >=${formatUsd(config.spikeMinPremium)} premium or z>=${config.spikeMinPremiumZ.toFixed(1)}.`,
- baselineNote
+ baselineNote,
+ aggressor.note
]
};
};
diff --git a/services/compute/src/index.ts b/services/compute/src/index.ts
index fc53e5d..8a67226 100644
--- a/services/compute/src/index.ts
+++ b/services/compute/src/index.ts
@@ -78,7 +78,9 @@ const envSchema = z.object({
CLASSIFIER_SPIKE_MIN_SIZE: z.coerce.number().int().positive().default(400),
CLASSIFIER_SPIKE_MIN_PREMIUM_Z: z.coerce.number().nonnegative().default(2.5),
CLASSIFIER_SPIKE_MIN_SIZE_Z: z.coerce.number().nonnegative().default(2),
- CLASSIFIER_Z_MIN_SAMPLES: z.coerce.number().int().nonnegative().default(12)
+ CLASSIFIER_Z_MIN_SAMPLES: z.coerce.number().int().nonnegative().default(12),
+ CLASSIFIER_MIN_NBBO_COVERAGE: z.coerce.number().min(0).max(1).default(0.5),
+ CLASSIFIER_MIN_AGGRESSOR_RATIO: z.coerce.number().min(0).max(1).default(0.55)
});
const env = readEnv(envSchema);
@@ -91,7 +93,9 @@ const classifierConfig: ClassifierConfig = {
spikeMinSize: env.CLASSIFIER_SPIKE_MIN_SIZE,
spikeMinPremiumZ: env.CLASSIFIER_SPIKE_MIN_PREMIUM_Z,
spikeMinSizeZ: env.CLASSIFIER_SPIKE_MIN_SIZE_Z,
- zMinSamples: env.CLASSIFIER_Z_MIN_SAMPLES
+ zMinSamples: env.CLASSIFIER_Z_MIN_SAMPLES,
+ minNbboCoverage: env.CLASSIFIER_MIN_NBBO_COVERAGE,
+ minAggressorRatio: env.CLASSIFIER_MIN_AGGRESSOR_RATIO
};
const retry = async (
@@ -128,6 +132,18 @@ const roundTo = (value: number, digits = 4): number => {
return Number(value.toFixed(digits));
};
+type NbboPlacement = "AA" | "A" | "B" | "BB" | "MID" | "MISSING" | "STALE";
+
+type NbboPlacementCounts = {
+ aa: number;
+ a: number;
+ b: number;
+ bb: number;
+ mid: number;
+ missing: number;
+ stale: number;
+};
+
type ClusterState = {
contractId: string;
startTs: number;
@@ -140,6 +156,7 @@ type ClusterState = {
totalPremium: number;
firstPrice: number;
lastPrice: number;
+ placements: NbboPlacementCounts;
};
const clusters = new Map();
@@ -152,6 +169,43 @@ const rollingKey = (metric: string, contractId: string): string => {
return `rolling:${metric}:${contractId}`;
};
+const createPlacementCounts = (): NbboPlacementCounts => ({
+ aa: 0,
+ a: 0,
+ b: 0,
+ bb: 0,
+ mid: 0,
+ missing: 0,
+ stale: 0
+});
+
+const recordPlacement = (counts: NbboPlacementCounts, placement: NbboPlacement): void => {
+ switch (placement) {
+ case "AA":
+ counts.aa += 1;
+ break;
+ case "A":
+ counts.a += 1;
+ break;
+ case "B":
+ counts.b += 1;
+ break;
+ case "BB":
+ counts.bb += 1;
+ break;
+ case "MID":
+ counts.mid += 1;
+ break;
+ case "STALE":
+ counts.stale += 1;
+ break;
+ case "MISSING":
+ default:
+ counts.missing += 1;
+ break;
+ }
+};
+
const buildLegFromCluster = (cluster: ClusterState): ContractLeg | null => {
const parsed = parseContractId(cluster.contractId);
if (!parsed) {
@@ -237,6 +291,8 @@ const applyDeliverPolicy = (
};
const buildCluster = (print: OptionPrint): ClusterState => {
+ const placements = createPlacementCounts();
+ recordPlacement(placements, classifyPlacement(print.price, selectNbbo(print.option_contract_id, print.ts)));
return {
contractId: print.option_contract_id,
startTs: print.ts,
@@ -248,7 +304,8 @@ const buildCluster = (print: OptionPrint): ClusterState => {
totalSize: print.size,
totalPremium: print.price * print.size,
firstPrice: print.price,
- lastPrice: print.price
+ lastPrice: print.price,
+ placements
};
};
@@ -260,6 +317,10 @@ const updateCluster = (cluster: ClusterState, print: OptionPrint): ClusterState
cluster.totalSize += print.size;
cluster.totalPremium += print.price * print.size;
cluster.lastPrice = print.price;
+ recordPlacement(
+ cluster.placements,
+ classifyPlacement(print.price, selectNbbo(print.option_contract_id, print.ts))
+ );
return cluster;
};
@@ -291,6 +352,42 @@ const selectNbbo = (contractId: string, ts: number): NbboJoin => {
return { nbbo, ageMs, stale };
};
+const classifyPlacement = (price: number, join: NbboJoin): NbboPlacement => {
+ if (!Number.isFinite(price)) {
+ return "MISSING";
+ }
+ if (!join.nbbo) {
+ return "MISSING";
+ }
+ if (join.stale) {
+ return "STALE";
+ }
+
+ const bid = join.nbbo.bid;
+ const ask = join.nbbo.ask;
+ if (!Number.isFinite(bid) || !Number.isFinite(ask) || ask <= 0) {
+ return "MISSING";
+ }
+
+ const spread = Math.max(0, ask - bid);
+ const epsilon = Math.max(0.01, spread * 0.05);
+
+ if (price > ask + epsilon) {
+ return "AA";
+ }
+ if (price >= ask - epsilon) {
+ return "A";
+ }
+ if (price < bid - epsilon) {
+ return "BB";
+ }
+ if (price <= bid + epsilon) {
+ return "B";
+ }
+
+ return "MID";
+};
+
const flushCluster = async (
clickhouse: ReturnType,
js: Awaited>["js"],
@@ -315,6 +412,37 @@ const flushCluster = async (
window_ms: env.CLUSTER_WINDOW_MS
};
+ const placementTotal =
+ cluster.placements.aa +
+ cluster.placements.a +
+ cluster.placements.b +
+ cluster.placements.bb +
+ cluster.placements.mid;
+ const aggressiveTotal =
+ cluster.placements.aa + cluster.placements.a + cluster.placements.b + cluster.placements.bb;
+ const aggressiveBuy = cluster.placements.aa + cluster.placements.a;
+ const aggressiveSell = cluster.placements.bb + cluster.placements.b;
+ const coverageRatio = cluster.members.length > 0 ? placementTotal / cluster.members.length : 0;
+ const aggressiveBuyRatio = aggressiveTotal > 0 ? aggressiveBuy / aggressiveTotal : 0;
+ const aggressiveSellRatio = aggressiveTotal > 0 ? aggressiveSell / aggressiveTotal : 0;
+ const insideRatio = placementTotal > 0 ? cluster.placements.mid / placementTotal : 0;
+ const aggressiveRatio = placementTotal > 0 ? aggressiveTotal / placementTotal : 0;
+
+ features.nbbo_aa_count = cluster.placements.aa;
+ features.nbbo_a_count = cluster.placements.a;
+ features.nbbo_b_count = cluster.placements.b;
+ features.nbbo_bb_count = cluster.placements.bb;
+ features.nbbo_mid_count = cluster.placements.mid;
+ features.nbbo_missing_count = cluster.placements.missing;
+ features.nbbo_stale_count = cluster.placements.stale;
+ features.nbbo_coverage_ratio = roundTo(coverageRatio);
+ features.nbbo_aggressive_buy_ratio = roundTo(aggressiveBuyRatio);
+ features.nbbo_aggressive_sell_ratio = roundTo(aggressiveSellRatio);
+ features.nbbo_inside_ratio = roundTo(insideRatio);
+ features.nbbo_aggressive_ratio = roundTo(aggressiveRatio);
+
+ joinQuality.nbbo_coverage_ratio = roundTo(coverageRatio);
+
const addRollingSnapshot = async (
metric: string,
value: number,
diff --git a/services/compute/tests/classifiers.test.ts b/services/compute/tests/classifiers.test.ts
index 12dfd11..8903963 100644
--- a/services/compute/tests/classifiers.test.ts
+++ b/services/compute/tests/classifiers.test.ts
@@ -10,7 +10,9 @@ const baseConfig: ClassifierConfig = {
spikeMinSize: 400,
spikeMinPremiumZ: 2.5,
spikeMinSizeZ: 2,
- zMinSamples: 12
+ zMinSamples: 12,
+ minNbboCoverage: 0.5,
+ minAggressorRatio: 0.55
};
const buildPacket = (
@@ -66,4 +68,35 @@ describe("classifier z-score behavior", () => {
const hits = evaluateClassifiers(packet, baseConfig);
expect(hits.some((hit) => hit.classifier_id === "large_bullish_call_sweep")).toBe(false);
});
+
+ test("aggressor mix adjusts sweep confidence", () => {
+ const basePacket = {
+ total_premium: 120_000,
+ total_size: 900,
+ count: 4,
+ nbbo_coverage_ratio: 0.8
+ };
+
+ const lowAgg = buildPacket({
+ ...basePacket,
+ nbbo_aggressive_buy_ratio: 0.2,
+ nbbo_aggressive_sell_ratio: 0.2
+ });
+ const highAgg = buildPacket({
+ ...basePacket,
+ nbbo_aggressive_buy_ratio: 0.7,
+ nbbo_aggressive_sell_ratio: 0.3
+ });
+
+ const lowHit = evaluateClassifiers(lowAgg, baseConfig).find(
+ (hit) => hit.classifier_id === "large_bullish_call_sweep"
+ );
+ const highHit = evaluateClassifiers(highAgg, baseConfig).find(
+ (hit) => hit.classifier_id === "large_bullish_call_sweep"
+ );
+
+ expect(lowHit).toBeTruthy();
+ expect(highHit).toBeTruthy();
+ expect((highHit?.confidence ?? 0)).toBeGreaterThan(lowHit?.confidence ?? 0);
+ });
});