From a82db56ab607ab3b9164714a86d956a64ffea0c6 Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Wed, 28 Jan 2026 21:04:24 -0500 Subject: [PATCH] Detect option rolls and emit roll classifier --- services/compute/src/classifiers.ts | 103 +++++++++++++++ services/compute/src/index.ts | 61 ++++++++- services/compute/src/structure-packets.ts | 124 +++++++++++++++++- services/compute/src/structures.ts | 23 ++-- services/compute/tests/classifiers.test.ts | 26 ++++ .../compute/tests/structure-packets.test.ts | 41 +++++- services/compute/tests/structures.test.ts | 16 +++ 7 files changed, 378 insertions(+), 16 deletions(-) diff --git a/services/compute/src/classifiers.ts b/services/compute/src/classifiers.ts index 7a2f4a9..fc86e52 100644 --- a/services/compute/src/classifiers.ts +++ b/services/compute/src/classifiers.ts @@ -636,6 +636,104 @@ const buildLadderHit = ( }; }; +const buildRollHit = (packet: FlowPacket, config: ClassifierConfig): ClassifierHit | null => { + const structureType = getStringFeature(packet, "structure_type"); + if (structureType !== "roll") { + return null; + } + + const structureRights = getStringFeature(packet, "structure_rights"); + if (structureRights !== "C" && structureRights !== "P") { + return null; + } + + const activity = getLargeActivity(packet, config); + const qualifies = activity.totalPremium >= config.spikeMinPremium || activity.totalSize >= config.spikeMinSize; + if (!qualifies) { + return null; + } + + const { coverage, aggressiveBuyRatio, aggressiveSellRatio, aggressiveRatio } = + getAggressorContext(packet); + const fromExpiry = getStringFeature(packet, "roll_from_expiry") || ""; + const toExpiry = getStringFeature(packet, "roll_to_expiry") || ""; + + const getOptionalNumber = (key: string): number | null => { + const value = packet.features[key]; + return typeof value === "number" && Number.isFinite(value) ? value : null; + }; + + const fromStrike = getOptionalNumber("roll_from_strike"); + const toStrike = getOptionalNumber("roll_to_strike"); + const strikeDelta = getOptionalNumber("roll_strike_delta") ?? 0; + const expiryDaysDelta = getOptionalNumber("roll_expiry_days_delta"); + + const hasStrikePair = fromStrike !== null && toStrike !== null; + const hasExpiryPair = Boolean(fromExpiry) && Boolean(toExpiry); + + let rollFlavor = "roll out"; + if (hasStrikePair) { + if (strikeDelta > 0.0001) { + rollFlavor = "roll out and up"; + } else if (strikeDelta < -0.0001) { + rollFlavor = "roll out and down"; + } + } + + let direction: "bullish" | "bearish" | "neutral" = "neutral"; + if (hasStrikePair) { + if (structureRights === "C") { + direction = strikeDelta > 0.0001 ? "bullish" : strikeDelta < -0.0001 ? "bearish" : "neutral"; + } else { + direction = strikeDelta > 0.0001 ? "bearish" : strikeDelta < -0.0001 ? "bullish" : "neutral"; + } + } + + let confidence = 0.5; + if (activity.totalPremium >= config.spikeMinPremium * 2) { + confidence += 0.1; + } + if (activity.totalSize >= config.spikeMinSize * 2) { + confidence += 0.05; + } + if (hasStrikePair && Math.abs(strikeDelta) > 0.0001) { + confidence += 0.05; + } + if (hasExpiryPair && expiryDaysDelta !== null && expiryDaysDelta >= 7) { + confidence += 0.05; + } + + const aggressor = applyAggressorAdjustment(confidence, coverage, aggressiveRatio, config); + confidence = clamp(aggressor.confidence, 0, 0.85); + + const expiryNote = hasExpiryPair + ? `Expiries: ${fromExpiry} -> ${toExpiry}${ + expiryDaysDelta !== null && expiryDaysDelta !== 0 ? ` (${Math.round(expiryDaysDelta)}d)` : "" + }.` + : "Expiry pairing unavailable."; + const strikeNote = hasStrikePair + ? `Strikes: ${fromStrike} -> ${toStrike} (delta ${strikeDelta}).` + : "Strike pairing unavailable."; + const skewNote = `Aggressor skew: buy ${formatPct(aggressiveBuyRatio)}, sell ${formatPct( + aggressiveSellRatio + )}.`; + + return { + classifier_id: "roll_up_down_out", + confidence, + direction, + explanations: [ + `Consistent with ${rollFlavor}: ${activity.count} prints in ${Math.round(activity.windowMs)}ms for ${packet.features.underlying_id ?? packet.id}.`, + expiryNote, + strikeNote, + `Premium ${formatUsd(activity.totalPremium)} across ${Math.round(activity.totalSize)} contracts.`, + `Thresholds: >=${config.spikeMinSize} contracts or >=${formatUsd(config.spikeMinPremium)} premium.`, + skewNote, + aggressor.note + ] + }; +}; + const buildFarDatedHit = ( packet: FlowPacket, contract: ParsedContract, @@ -774,6 +872,11 @@ export const evaluateClassifiers = ( hits.push(ladderHit); } + const rollHit = buildRollHit(packet, config); + if (rollHit) { + hits.push(rollHit); + } + return hits; } diff --git a/services/compute/src/index.ts b/services/compute/src/index.ts index 832b823..9ac8732 100644 --- a/services/compute/src/index.ts +++ b/services/compute/src/index.ts @@ -223,6 +223,7 @@ const nbboCache = new Map(); const equityQuoteCache = new Map(); const darkInferenceState = createDarkInferenceState(); const recentLegsByKey = new Map(); +const recentLegsByRoot = new Map(); const recentStructureEmits = new Map(); const MAX_RECENT_LEGS = 20; @@ -301,6 +302,10 @@ const buildLegKey = (leg: ContractLeg): string => { return `${leg.root}:${leg.expiry}`; }; +const buildRootKey = (leg: ContractLeg): string => { + return leg.root; +}; + const isWithinStructureWindow = (anchorTs: number, candidateTs: number): boolean => { return Math.abs(anchorTs - candidateTs) <= env.CLUSTER_WINDOW_MS; }; @@ -321,6 +326,22 @@ const storeRecentLeg = (leg: LegEvidence, anchorTs: number): void => { recentLegsByKey.set(key, next); }; +const collectRecentRootLegs = (key: string, anchorTs: number, excludeId: string): LegEvidence[] => { + const recent = recentLegsByRoot.get(key) ?? []; + const filtered = recent.filter( + (leg) => leg.contractId !== excludeId && isWithinStructureWindow(anchorTs, leg.endTs) + ); + recentLegsByRoot.set(key, filtered); + return filtered; +}; + +const storeRecentRootLeg = (leg: LegEvidence, anchorTs: number): void => { + const key = buildRootKey(leg); + const recent = collectRecentRootLegs(key, anchorTs, ""); + const next = [leg, ...recent].slice(0, MAX_RECENT_LEGS); + recentLegsByRoot.set(key, next); +}; + const collectActiveLegs = ( key: string, anchorTs: number, @@ -346,7 +367,32 @@ const collectActiveLegs = ( return legs; }; -const STRUCTURE_TYPES = new Set(["straddle", "strangle", "vertical", "ladder"]); +const collectActiveRootLegs = ( + key: string, + anchorTs: number, + excludeId: string +): LegEvidence[] => { + const legs: LegEvidence[] = []; + for (const [contractId, cluster] of clusters) { + if (contractId === excludeId) { + continue; + } + const leg = buildLegFromCluster(cluster); + if (!leg) { + continue; + } + if (buildRootKey(leg) !== key) { + continue; + } + if (!isWithinStructureWindow(anchorTs, leg.endTs)) { + continue; + } + legs.push(leg); + } + return legs; +}; + +const STRUCTURE_TYPES = new Set(["straddle", "strangle", "vertical", "ladder", "roll"]); const MAX_RECENT_STRUCTURE_EMITS = 2000; const pruneRecentStructureEmits = (anchorTs: number): void => { @@ -691,7 +737,20 @@ const flushCluster = async ( } await emitStructurePacketIfNeeded(clickhouse, js, legs, summary, currentLeg.contractId); + + const rootKey = buildRootKey(currentLeg); + const rootCandidates = [ + ...collectRecentRootLegs(rootKey, anchorTs, currentLeg.contractId), + ...collectActiveRootLegs(rootKey, anchorTs, currentLeg.contractId) + ]; + const rollLegs = [currentLeg, ...rootCandidates]; + const rollSummary = summarizeStructure(rollLegs); + if (rollSummary?.type === "roll") { + await emitStructurePacketIfNeeded(clickhouse, js, rollLegs, rollSummary, currentLeg.contractId); + } + storeRecentLeg(currentLeg, anchorTs); + storeRecentRootLeg(currentLeg, anchorTs); } if (!nbboJoin.nbbo) { diff --git a/services/compute/src/structure-packets.ts b/services/compute/src/structure-packets.ts index 66a24fa..a168880 100644 --- a/services/compute/src/structure-packets.ts +++ b/services/compute/src/structure-packets.ts @@ -27,6 +27,14 @@ export type StructurePacketPlan = { bucketStartTs: number; root: string; pseudoContractId: string; + expiries: string[]; + strikes: number[]; + roll_from_expiry: string | null; + roll_to_expiry: string | null; + roll_from_strike: number | null; + roll_to_strike: number | null; + roll_strike_delta: number | null; + roll_expiry_days_delta: number | null; startTs: number; endTs: number; members: string[]; @@ -90,6 +98,40 @@ const uniqueSorted = (values: string[]): string[] => { return Array.from(new Set(values)).sort(); }; +const uniqueSortedNumbers = (values: number[]): number[] => { + return Array.from(new Set(values)).sort((a, b) => a - b); +}; + +const medianNumber = (values: number[]): number | null => { + if (values.length === 0) { + return null; + } + const sorted = values.slice().sort((a, b) => a - b); + const mid = Math.floor(sorted.length / 2); + if (sorted.length % 2 === 1) { + return sorted[mid] ?? null; + } + const a = sorted[mid - 1]; + const b = sorted[mid]; + if (!Number.isFinite(a) || !Number.isFinite(b)) { + return null; + } + return (a + b) / 2; +}; + +const dayDiff = (from: string | null, to: string | null): number | null => { + if (!from || !to) { + return null; + } + const fromTs = Date.parse(`${from}T00:00:00Z`); + const toTs = Date.parse(`${to}T00:00:00Z`); + if (!Number.isFinite(fromTs) || !Number.isFinite(toTs)) { + return null; + } + const diffMs = toTs - fromTs; + return Math.round(diffMs / 86_400_000); +}; + export const shouldEmitStructurePacket = (legs: LegEvidence[], currentLegContractId: string): boolean => { if (legs.length < 2) { return false; @@ -113,17 +155,55 @@ export const planStructurePacket = ( return null; } - const root = legs[0]?.root; - const expiry = legs[0]?.expiry; - if (!root || !expiry) { + const rootRaw = legs[0]?.root; + if (!rootRaw) { return null; } + const expiries = uniqueSorted(legs.map((leg) => leg.expiry)); + const expiry = expiries[0]; + if (!expiry) { + return null; + } + + const strikes = uniqueSortedNumbers(legs.map((leg) => leg.strike)); + + let rollFromExpiry: string | null = null; + let rollToExpiry: string | null = null; + let rollFromStrike: number | null = null; + let rollToStrike: number | null = null; + let rollStrikeDelta: number | null = null; + let rollExpiryDaysDelta: number | null = null; + + if (summary.type === "roll" && expiries.length >= 2) { + rollFromExpiry = expiries[0] ?? null; + rollToExpiry = expiries[expiries.length - 1] ?? null; + + const strikesByExpiry = new Map(); + for (const leg of legs) { + const bucket = strikesByExpiry.get(leg.expiry); + if (bucket) { + bucket.push(leg.strike); + } else { + strikesByExpiry.set(leg.expiry, [leg.strike]); + } + } + + rollFromStrike = medianNumber(strikesByExpiry.get(rollFromExpiry) ?? []) ?? null; + rollToStrike = medianNumber(strikesByExpiry.get(rollToExpiry) ?? []) ?? null; + + if (rollFromStrike !== null && rollToStrike !== null) { + rollStrikeDelta = roundTo(rollToStrike - rollFromStrike, 4); + } + + rollExpiryDaysDelta = dayDiff(rollFromExpiry, rollToExpiry); + } + const contractIds = uniqueSorted(legs.map((leg) => leg.contractId)); const startTs = legs.reduce((min, leg) => Math.min(min, leg.startTs), Number.POSITIVE_INFINITY); const endTs = legs.reduce((max, leg) => Math.max(max, leg.endTs), 0); const bucketStartTs = bucketTs(startTs, clusterWindowMs); - const pseudoContractId = buildPseudoContractId(root, expiry, summary.type); + const pseudoContractId = buildPseudoContractId(rootRaw, expiry, summary.type); const id = `flowpacket:${pseudoContractId}:${bucketStartTs}:${contractIds.join("|")}`; const dedupeKey = `${pseudoContractId}:${bucketStartTs}:${contractIds.join("|")}`; @@ -149,8 +229,16 @@ export const planStructurePacket = ( id, dedupeKey, bucketStartTs, - root: root.trim().toUpperCase(), + root: rootRaw.trim().toUpperCase(), pseudoContractId, + expiries, + strikes, + roll_from_expiry: rollFromExpiry, + roll_to_expiry: rollToExpiry, + roll_from_strike: rollFromStrike, + roll_to_strike: rollToStrike, + roll_strike_delta: rollStrikeDelta, + roll_expiry_days_delta: rollExpiryDaysDelta, startTs: Number.isFinite(startTs) ? startTs : 0, endTs, members, @@ -192,9 +280,33 @@ export const buildStructureFlowPacket = ( structure_strikes: summary.strikes, structure_strike_span: roundTo(summary.strikeSpan), structure_rights: summary.rights, - structure_contract_ids: summary.contractIds.join(",") + structure_contract_ids: summary.contractIds.join(","), + structure_expiries_count: plan.expiries.length, + structure_expiries: plan.expiries.join(","), + structure_strikes_list: plan.strikes.join(",") }; + if (summary.type === "roll") { + if (plan.roll_from_expiry) { + features.roll_from_expiry = plan.roll_from_expiry; + } + if (plan.roll_to_expiry) { + features.roll_to_expiry = plan.roll_to_expiry; + } + if (plan.roll_from_strike !== null) { + features.roll_from_strike = plan.roll_from_strike; + } + if (plan.roll_to_strike !== null) { + features.roll_to_strike = plan.roll_to_strike; + } + if (plan.roll_strike_delta !== null) { + features.roll_strike_delta = plan.roll_strike_delta; + } + if (plan.roll_expiry_days_delta !== null) { + features.roll_expiry_days_delta = plan.roll_expiry_days_delta; + } + } + // These are aggregate counts across the legs. We do not attach rolling z-scores // (baseline is per-contract), so structure packets default to absolute thresholds. features.nbbo_aa_count = plan.placements.aa; diff --git a/services/compute/src/structures.ts b/services/compute/src/structures.ts index f6eb909..f7d0026 100644 --- a/services/compute/src/structures.ts +++ b/services/compute/src/structures.ts @@ -22,17 +22,24 @@ export const summarizeStructure = (legs: ContractLeg[]): StructureSummary | null const strikes = Array.from(new Set(legs.map((leg) => leg.strike))).sort((a, b) => a - b); const rights = new Set(legs.map((leg) => leg.right)); + const expiries = new Set(legs.map((leg) => leg.expiry)); const strikeSpan = strikes.length >= 2 ? strikes[strikes.length - 1] - strikes[0] : 0; let type = "multi_leg"; - if (rights.size === 2 && strikes.length === 1) { - type = "straddle"; - } else if (rights.size === 2 && strikes.length >= 2) { - type = "strangle"; - } else if (rights.size === 1 && strikes.length === 2) { - type = "vertical"; - } else if (rights.size === 1 && strikes.length >= 3) { - type = "ladder"; + if (expiries.size === 1) { + if (rights.size === 2 && strikes.length === 1) { + type = "straddle"; + } else if (rights.size === 2 && strikes.length >= 2) { + type = "strangle"; + } else if (rights.size === 1 && strikes.length === 2) { + type = "vertical"; + } else if (rights.size === 1 && strikes.length >= 3) { + type = "ladder"; + } + } else if (rights.size === 1 && expiries.size === 2) { + // Conservative roll heuristic: same right, exactly two expiries within the burst window. + // We do not attempt to infer the exact strategy beyond roll-style behavior. + type = "roll"; } return { diff --git a/services/compute/tests/classifiers.test.ts b/services/compute/tests/classifiers.test.ts index f2513f8..2446197 100644 --- a/services/compute/tests/classifiers.test.ts +++ b/services/compute/tests/classifiers.test.ts @@ -194,6 +194,32 @@ describe("classifier structure and positioning signals", () => { expect(hits.some((hit) => hit.classifier_id === "ladder_accumulation")).toBe(true); }); + test("roll classifier triggers on cross-expiry structure packets", () => { + const packet = buildPacket({ + packet_kind: "structure", + structure_type: "roll", + structure_legs: 2, + structure_strikes: 2, + structure_rights: "C", + structure_strike_span: 5, + total_premium: 70_000, + total_size: 800, + nbbo_coverage_ratio: 0.85, + nbbo_aggressive_buy_ratio: 0.7, + nbbo_aggressive_sell_ratio: 0.3, + roll_from_expiry: "2025-01-17", + roll_to_expiry: "2025-02-21", + roll_from_strike: 450, + roll_to_strike: 455, + roll_strike_delta: 5, + roll_expiry_days_delta: 35 + }); + const hits = evaluateClassifiers(packet, baseConfig); + const hit = hits.find((candidate) => candidate.classifier_id === "roll_up_down_out"); + expect(hit).toBeTruthy(); + expect(hit?.direction).toBe("bullish"); + }); + test("far-dated conviction triggers on 60DTE threshold", () => { const packet = buildPacket({ option_contract_id: "SPY-2024-04-19-450-C", diff --git a/services/compute/tests/structure-packets.test.ts b/services/compute/tests/structure-packets.test.ts index d89c236..0ee20a8 100644 --- a/services/compute/tests/structure-packets.test.ts +++ b/services/compute/tests/structure-packets.test.ts @@ -22,7 +22,7 @@ const leg = (input: Partial & Pick { // 2 aggressive (AA + BB) out of 3 classified (AA + BB + MID) expect(packet.features.nbbo_aggressive_ratio).toBeCloseTo(2 / 3, 4); }); + + test("includes roll metadata when structure type is roll", () => { + const near = leg({ + contractId: "SPY-2025-01-17-450-C", + right: "C", + strike: 450, + expiry: "2025-01-17", + members: ["p1"], + totalSize: 10, + totalPremium: 2000, + placements: placements({ aa: 1 }) + }); + const far = leg({ + contractId: "SPY-2025-02-21-455-C", + right: "C", + strike: 455, + expiry: "2025-02-21", + startTs: 1010, + endTs: 1120, + members: ["p2"], + totalSize: 12, + totalPremium: 2500, + placements: placements({ bb: 1 }) + }); + + const legs = [near, far]; + const summary = summarizeStructure(legs); + expect(summary?.type).toBe("roll"); + + const plan = planStructurePacket(legs, summary!, 500); + const packet = buildStructureFlowPacket(plan!, summary!); + + expect(packet.features.structure_expiries_count).toBe(2); + expect(packet.features.roll_from_expiry).toBe("2025-01-17"); + expect(packet.features.roll_to_expiry).toBe("2025-02-21"); + expect(packet.features.roll_from_strike).toBe(450); + expect(packet.features.roll_to_strike).toBe(455); + expect(packet.features.roll_strike_delta).toBe(5); + }); }); diff --git a/services/compute/tests/structures.test.ts b/services/compute/tests/structures.test.ts index 3e9b968..1570648 100644 --- a/services/compute/tests/structures.test.ts +++ b/services/compute/tests/structures.test.ts @@ -40,4 +40,20 @@ describe("structure summaries", () => { expect(summary?.type).toBe("strangle"); expect(summary?.strikes).toBe(2); }); + + test("detects rolls across expiries", () => { + const summary = summarizeStructure([ + { + ...leg("c1", "C", 450), + expiry: "2025-01-17" + }, + { + ...leg("c2", "C", 455), + expiry: "2025-02-21" + } + ]); + expect(summary?.type).toBe("roll"); + expect(summary?.rights).toBe("C"); + expect(summary?.strikes).toBe(2); + }); });