Added per‑print NBBO placement tracking into clustering, exposed aggressor mix on

FlowPackets, and used it to adjust sweep/spike confidence. The Flow Packets UI now
  shows an aggressor mix pill with coverage and inside ratio.
This commit is contained in:
dirtydishes 2025-12-30 17:31:37 -05:00
parent 0b0ffa651e
commit 69758d28d9
7 changed files with 247 additions and 12 deletions

View file

@ -10,6 +10,8 @@ export type ClassifierConfig = {
spikeMinPremiumZ: number;
spikeMinSizeZ: number;
zMinSamples: number;
minNbboCoverage: number;
minAggressorRatio: number;
};
const clamp = (value: number, min = 0, max = 1): number => {
@ -31,6 +33,34 @@ const getNumberFeature = (packet: FlowPacket, key: string): number => {
return typeof value === "number" && Number.isFinite(value) ? value : 0;
};
const formatPct = (value: number): string => `${Math.round(value * 100)}%`;
const applyAggressorAdjustment = (
confidence: number,
coverage: number,
aggressiveRatio: number,
config: ClassifierConfig
): { confidence: number; note: string } => {
if (!Number.isFinite(coverage) || coverage <= 0) {
return { confidence, note: "Aggressor mix unavailable (no NBBO coverage)." };
}
let adjusted = confidence;
if (coverage >= config.minNbboCoverage) {
if (aggressiveRatio >= config.minAggressorRatio) {
adjusted += 0.05;
} else {
adjusted -= 0.1;
}
}
const note = `Aggressor mix ${formatPct(aggressiveRatio)} aggressive, NBBO coverage ${formatPct(
coverage
)}.`;
return { confidence: adjusted, note };
};
const buildSweepHit = (
packet: FlowPacket,
contract: ParsedContract,
@ -45,6 +75,10 @@ const buildSweepHit = (
const windowMs = getNumberFeature(packet, "window_ms");
const premiumZ = getNumberFeature(packet, "total_premium_z");
const premiumBaseline = getNumberFeature(packet, "total_premium_baseline_n");
const coverage = getNumberFeature(packet, "nbbo_coverage_ratio");
const aggressiveBuyRatio = getNumberFeature(packet, "nbbo_aggressive_buy_ratio");
const aggressiveSellRatio = getNumberFeature(packet, "nbbo_aggressive_sell_ratio");
const aggressiveRatio = Math.max(aggressiveBuyRatio, aggressiveSellRatio);
const baselineReady = premiumBaseline >= config.zMinSamples;
const passesAbsolute = totalPremium >= config.sweepMinPremium;
@ -74,7 +108,8 @@ const buildSweepHit = (
}
}
confidence = clamp(confidence, 0, 0.95);
const aggressor = applyAggressorAdjustment(confidence, coverage, aggressiveRatio, config);
confidence = clamp(aggressor.confidence, 0, 0.95);
const baselineNote = baselineReady
? `Baseline premium z-score ${premiumZ.toFixed(2)} over ${Math.round(premiumBaseline)} samples.`
@ -88,7 +123,8 @@ const buildSweepHit = (
`Likely ${direction === "bullish" ? "call" : "put"} sweep: ${count} prints in ${Math.round(windowMs)}ms for ${packet.features.option_contract_id ?? packet.id}.`,
`Premium ${formatUsd(totalPremium)} across ${Math.round(totalSize)} contracts; price ${priceTrend}.`,
`Thresholds: >=${config.sweepMinCount} prints and >=${formatUsd(config.sweepMinPremium)} premium or z>=${config.sweepMinPremiumZ.toFixed(1)}.`,
baselineNote
baselineNote,
aggressor.note
]
};
};
@ -102,6 +138,10 @@ const buildSpikeHit = (packet: FlowPacket, config: ClassifierConfig): Classifier
const sizeZ = getNumberFeature(packet, "total_size_z");
const premiumBaseline = getNumberFeature(packet, "total_premium_baseline_n");
const sizeBaseline = getNumberFeature(packet, "total_size_baseline_n");
const coverage = getNumberFeature(packet, "nbbo_coverage_ratio");
const aggressiveBuyRatio = getNumberFeature(packet, "nbbo_aggressive_buy_ratio");
const aggressiveSellRatio = getNumberFeature(packet, "nbbo_aggressive_sell_ratio");
const aggressiveRatio = Math.max(aggressiveBuyRatio, aggressiveSellRatio);
const premiumBaselineReady = premiumBaseline >= config.zMinSamples;
const sizeBaselineReady = sizeBaseline >= config.zMinSamples;
@ -131,7 +171,8 @@ const buildSpikeHit = (packet: FlowPacket, config: ClassifierConfig): Classifier
}
}
confidence = clamp(confidence, 0, 0.9);
const aggressor = applyAggressorAdjustment(confidence, coverage, aggressiveRatio, config);
confidence = clamp(aggressor.confidence, 0, 0.9);
const baselineNote =
premiumBaselineReady || sizeBaselineReady
@ -146,7 +187,8 @@ const buildSpikeHit = (packet: FlowPacket, config: ClassifierConfig): Classifier
`Unusual contract spike: ${count} prints in ${Math.round(windowMs)}ms for ${packet.features.option_contract_id ?? packet.id}.`,
`Premium ${formatUsd(totalPremium)} across ${Math.round(totalSize)} contracts.`,
`Thresholds: >=${config.spikeMinSize} contracts and >=${formatUsd(config.spikeMinPremium)} premium or z>=${config.spikeMinPremiumZ.toFixed(1)}.`,
baselineNote
baselineNote,
aggressor.note
]
};
};

View file

@ -78,7 +78,9 @@ const envSchema = z.object({
CLASSIFIER_SPIKE_MIN_SIZE: z.coerce.number().int().positive().default(400),
CLASSIFIER_SPIKE_MIN_PREMIUM_Z: z.coerce.number().nonnegative().default(2.5),
CLASSIFIER_SPIKE_MIN_SIZE_Z: z.coerce.number().nonnegative().default(2),
CLASSIFIER_Z_MIN_SAMPLES: z.coerce.number().int().nonnegative().default(12)
CLASSIFIER_Z_MIN_SAMPLES: z.coerce.number().int().nonnegative().default(12),
CLASSIFIER_MIN_NBBO_COVERAGE: z.coerce.number().min(0).max(1).default(0.5),
CLASSIFIER_MIN_AGGRESSOR_RATIO: z.coerce.number().min(0).max(1).default(0.55)
});
const env = readEnv(envSchema);
@ -91,7 +93,9 @@ const classifierConfig: ClassifierConfig = {
spikeMinSize: env.CLASSIFIER_SPIKE_MIN_SIZE,
spikeMinPremiumZ: env.CLASSIFIER_SPIKE_MIN_PREMIUM_Z,
spikeMinSizeZ: env.CLASSIFIER_SPIKE_MIN_SIZE_Z,
zMinSamples: env.CLASSIFIER_Z_MIN_SAMPLES
zMinSamples: env.CLASSIFIER_Z_MIN_SAMPLES,
minNbboCoverage: env.CLASSIFIER_MIN_NBBO_COVERAGE,
minAggressorRatio: env.CLASSIFIER_MIN_AGGRESSOR_RATIO
};
const retry = async <T>(
@ -128,6 +132,18 @@ const roundTo = (value: number, digits = 4): number => {
return Number(value.toFixed(digits));
};
type NbboPlacement = "AA" | "A" | "B" | "BB" | "MID" | "MISSING" | "STALE";
type NbboPlacementCounts = {
aa: number;
a: number;
b: number;
bb: number;
mid: number;
missing: number;
stale: number;
};
type ClusterState = {
contractId: string;
startTs: number;
@ -140,6 +156,7 @@ type ClusterState = {
totalPremium: number;
firstPrice: number;
lastPrice: number;
placements: NbboPlacementCounts;
};
const clusters = new Map<string, ClusterState>();
@ -152,6 +169,43 @@ const rollingKey = (metric: string, contractId: string): string => {
return `rolling:${metric}:${contractId}`;
};
const createPlacementCounts = (): NbboPlacementCounts => ({
aa: 0,
a: 0,
b: 0,
bb: 0,
mid: 0,
missing: 0,
stale: 0
});
const recordPlacement = (counts: NbboPlacementCounts, placement: NbboPlacement): void => {
switch (placement) {
case "AA":
counts.aa += 1;
break;
case "A":
counts.a += 1;
break;
case "B":
counts.b += 1;
break;
case "BB":
counts.bb += 1;
break;
case "MID":
counts.mid += 1;
break;
case "STALE":
counts.stale += 1;
break;
case "MISSING":
default:
counts.missing += 1;
break;
}
};
const buildLegFromCluster = (cluster: ClusterState): ContractLeg | null => {
const parsed = parseContractId(cluster.contractId);
if (!parsed) {
@ -237,6 +291,8 @@ const applyDeliverPolicy = (
};
const buildCluster = (print: OptionPrint): ClusterState => {
const placements = createPlacementCounts();
recordPlacement(placements, classifyPlacement(print.price, selectNbbo(print.option_contract_id, print.ts)));
return {
contractId: print.option_contract_id,
startTs: print.ts,
@ -248,7 +304,8 @@ const buildCluster = (print: OptionPrint): ClusterState => {
totalSize: print.size,
totalPremium: print.price * print.size,
firstPrice: print.price,
lastPrice: print.price
lastPrice: print.price,
placements
};
};
@ -260,6 +317,10 @@ const updateCluster = (cluster: ClusterState, print: OptionPrint): ClusterState
cluster.totalSize += print.size;
cluster.totalPremium += print.price * print.size;
cluster.lastPrice = print.price;
recordPlacement(
cluster.placements,
classifyPlacement(print.price, selectNbbo(print.option_contract_id, print.ts))
);
return cluster;
};
@ -291,6 +352,42 @@ const selectNbbo = (contractId: string, ts: number): NbboJoin => {
return { nbbo, ageMs, stale };
};
const classifyPlacement = (price: number, join: NbboJoin): NbboPlacement => {
if (!Number.isFinite(price)) {
return "MISSING";
}
if (!join.nbbo) {
return "MISSING";
}
if (join.stale) {
return "STALE";
}
const bid = join.nbbo.bid;
const ask = join.nbbo.ask;
if (!Number.isFinite(bid) || !Number.isFinite(ask) || ask <= 0) {
return "MISSING";
}
const spread = Math.max(0, ask - bid);
const epsilon = Math.max(0.01, spread * 0.05);
if (price > ask + epsilon) {
return "AA";
}
if (price >= ask - epsilon) {
return "A";
}
if (price < bid - epsilon) {
return "BB";
}
if (price <= bid + epsilon) {
return "B";
}
return "MID";
};
const flushCluster = async (
clickhouse: ReturnType<typeof createClickHouseClient>,
js: Awaited<ReturnType<typeof connectJetStreamWithRetry>>["js"],
@ -315,6 +412,37 @@ const flushCluster = async (
window_ms: env.CLUSTER_WINDOW_MS
};
const placementTotal =
cluster.placements.aa +
cluster.placements.a +
cluster.placements.b +
cluster.placements.bb +
cluster.placements.mid;
const aggressiveTotal =
cluster.placements.aa + cluster.placements.a + cluster.placements.b + cluster.placements.bb;
const aggressiveBuy = cluster.placements.aa + cluster.placements.a;
const aggressiveSell = cluster.placements.bb + cluster.placements.b;
const coverageRatio = cluster.members.length > 0 ? placementTotal / cluster.members.length : 0;
const aggressiveBuyRatio = aggressiveTotal > 0 ? aggressiveBuy / aggressiveTotal : 0;
const aggressiveSellRatio = aggressiveTotal > 0 ? aggressiveSell / aggressiveTotal : 0;
const insideRatio = placementTotal > 0 ? cluster.placements.mid / placementTotal : 0;
const aggressiveRatio = placementTotal > 0 ? aggressiveTotal / placementTotal : 0;
features.nbbo_aa_count = cluster.placements.aa;
features.nbbo_a_count = cluster.placements.a;
features.nbbo_b_count = cluster.placements.b;
features.nbbo_bb_count = cluster.placements.bb;
features.nbbo_mid_count = cluster.placements.mid;
features.nbbo_missing_count = cluster.placements.missing;
features.nbbo_stale_count = cluster.placements.stale;
features.nbbo_coverage_ratio = roundTo(coverageRatio);
features.nbbo_aggressive_buy_ratio = roundTo(aggressiveBuyRatio);
features.nbbo_aggressive_sell_ratio = roundTo(aggressiveSellRatio);
features.nbbo_inside_ratio = roundTo(insideRatio);
features.nbbo_aggressive_ratio = roundTo(aggressiveRatio);
joinQuality.nbbo_coverage_ratio = roundTo(coverageRatio);
const addRollingSnapshot = async (
metric: string,
value: number,