diff --git a/services/compute/src/classifiers.ts b/services/compute/src/classifiers.ts index 3eef846..7a2f4a9 100644 --- a/services/compute/src/classifiers.ts +++ b/services/compute/src/classifiers.ts @@ -749,66 +749,75 @@ export const evaluateClassifiers = ( packet: FlowPacket, config: ClassifierConfig ): ClassifierHit[] => { + const packetKind = getStringFeature(packet, "packet_kind"); + const structureOnly = packetKind === "structure"; + const contractId = typeof packet.features.option_contract_id === "string" ? packet.features.option_contract_id : ""; - const contract = parseContractId(contractId); + const contract = structureOnly ? null : parseContractId(contractId); const hits: ClassifierHit[] = []; - if (contract?.right === "C") { - const hit = buildSweepHit(packet, contract, "bullish", config); - if (hit) { - hits.push(hit); - } - } - - if (contract?.right === "P") { - const hit = buildSweepHit(packet, contract, "bearish", config); - if (hit) { - hits.push(hit); - } - } - - const spikeHit = buildSpikeHit(packet, config); - if (spikeHit) { - hits.push(spikeHit); - } - - if (contract) { - const overwriteHit = buildOverwriteHit(packet, contract, config); - if (overwriteHit) { - hits.push(overwriteHit); + if (structureOnly) { + const structureHit = buildStraddleStrangleHit(packet, config); + if (structureHit) { + hits.push(structureHit); } - const putWriteHit = buildPutWriteHit(packet, contract, config); - if (putWriteHit) { - hits.push(putWriteHit); + const verticalHit = buildVerticalSpreadHit(packet, config); + if (verticalHit) { + hits.push(verticalHit); } - const farDatedHit = buildFarDatedHit(packet, contract, config); - if (farDatedHit) { - hits.push(farDatedHit); + const ladderHit = buildLadderHit(packet, config); + if (ladderHit) { + hits.push(ladderHit); } - const zeroDteHit = buildZeroDteGammaPunchHit(packet, contract, config); - if (zeroDteHit) { - hits.push(zeroDteHit); + return hits; + } + + if (!structureOnly) { + if (contract?.right === "C") { + const hit = buildSweepHit(packet, contract, "bullish", config); + if (hit) { + hits.push(hit); + } } - } - const structureHit = buildStraddleStrangleHit(packet, config); - if (structureHit) { - hits.push(structureHit); - } + if (contract?.right === "P") { + const hit = buildSweepHit(packet, contract, "bearish", config); + if (hit) { + hits.push(hit); + } + } - const verticalHit = buildVerticalSpreadHit(packet, config); - if (verticalHit) { - hits.push(verticalHit); - } + const spikeHit = buildSpikeHit(packet, config); + if (spikeHit) { + hits.push(spikeHit); + } - const ladderHit = buildLadderHit(packet, config); - if (ladderHit) { - hits.push(ladderHit); + if (contract) { + const overwriteHit = buildOverwriteHit(packet, contract, config); + if (overwriteHit) { + hits.push(overwriteHit); + } + + const putWriteHit = buildPutWriteHit(packet, contract, config); + if (putWriteHit) { + hits.push(putWriteHit); + } + + const farDatedHit = buildFarDatedHit(packet, contract, config); + if (farDatedHit) { + hits.push(farDatedHit); + } + + const zeroDteHit = buildZeroDteGammaPunchHit(packet, contract, config); + if (zeroDteHit) { + hits.push(zeroDteHit); + } + } } return hits; diff --git a/services/compute/src/index.ts b/services/compute/src/index.ts index 315c65f..832b823 100644 --- a/services/compute/src/index.ts +++ b/services/compute/src/index.ts @@ -69,6 +69,12 @@ import { import { buildEquityPrintJoin, type EquityQuoteJoin } from "./equity-joins"; import { createRedisClient, updateRollingStats, type RollingStatsConfig } from "./rolling-stats"; import { summarizeStructure, type ContractLeg } from "./structures"; +import { + buildStructureFlowPacket, + planStructurePacket, + shouldEmitStructurePacket, + type LegEvidence +} from "./structure-packets"; const service = "compute"; const logger = createLogger({ service }); @@ -216,7 +222,8 @@ const clusters = new Map(); const nbboCache = new Map(); const equityQuoteCache = new Map(); const darkInferenceState = createDarkInferenceState(); -const recentLegsByKey = new Map(); +const recentLegsByKey = new Map(); +const recentStructureEmits = new Map(); const MAX_RECENT_LEGS = 20; @@ -261,7 +268,7 @@ const recordPlacement = (counts: NbboPlacementCounts, placement: NbboPlacement): } }; -const buildLegFromCluster = (cluster: ClusterState): ContractLeg | null => { +const buildLegFromCluster = (cluster: ClusterState): LegEvidence | null => { const parsed = parseContractId(cluster.contractId); if (!parsed) { return null; @@ -271,7 +278,22 @@ const buildLegFromCluster = (cluster: ClusterState): ContractLeg | null => { ...parsed, contractId: cluster.contractId, startTs: cluster.startTs, - endTs: cluster.endTs + endTs: cluster.endTs, + members: cluster.members.slice(), + totalSize: cluster.totalSize, + totalPremium: cluster.totalPremium, + placements: { + aa: cluster.placements.aa, + a: cluster.placements.a, + b: cluster.placements.b, + bb: cluster.placements.bb, + mid: cluster.placements.mid, + missing: cluster.placements.missing, + stale: cluster.placements.stale + }, + source_ts: cluster.startSourceTs, + ingest_ts: cluster.endIngestTs, + seq: cluster.endSeq }; }; @@ -283,7 +305,7 @@ const isWithinStructureWindow = (anchorTs: number, candidateTs: number): boolean return Math.abs(anchorTs - candidateTs) <= env.CLUSTER_WINDOW_MS; }; -const collectRecentLegs = (key: string, anchorTs: number, excludeId: string): ContractLeg[] => { +const collectRecentLegs = (key: string, anchorTs: number, excludeId: string): LegEvidence[] => { const recent = recentLegsByKey.get(key) ?? []; const filtered = recent.filter( (leg) => leg.contractId !== excludeId && isWithinStructureWindow(anchorTs, leg.endTs) @@ -292,7 +314,7 @@ const collectRecentLegs = (key: string, anchorTs: number, excludeId: string): Co return filtered; }; -const storeRecentLeg = (leg: ContractLeg, anchorTs: number): void => { +const storeRecentLeg = (leg: LegEvidence, anchorTs: number): void => { const key = buildLegKey(leg); const recent = collectRecentLegs(key, anchorTs, ""); const next = [leg, ...recent].slice(0, MAX_RECENT_LEGS); @@ -303,8 +325,8 @@ const collectActiveLegs = ( key: string, anchorTs: number, excludeId: string -): ContractLeg[] => { - const legs: ContractLeg[] = []; +): LegEvidence[] => { + const legs: LegEvidence[] = []; for (const [contractId, cluster] of clusters) { if (contractId === excludeId) { continue; @@ -324,6 +346,78 @@ const collectActiveLegs = ( return legs; }; +const STRUCTURE_TYPES = new Set(["straddle", "strangle", "vertical", "ladder"]); +const MAX_RECENT_STRUCTURE_EMITS = 2000; + +const pruneRecentStructureEmits = (anchorTs: number): void => { + const ttl = env.CLUSTER_WINDOW_MS * 5; + for (const [key, ts] of recentStructureEmits) { + if (anchorTs - ts > ttl) { + recentStructureEmits.delete(key); + } + } + + if (recentStructureEmits.size <= MAX_RECENT_STRUCTURE_EMITS) { + return; + } + + const overflow = recentStructureEmits.size - MAX_RECENT_STRUCTURE_EMITS; + let removed = 0; + for (const key of recentStructureEmits.keys()) { + recentStructureEmits.delete(key); + removed += 1; + if (removed >= overflow) { + break; + } + } +}; + +const emitStructurePacketIfNeeded = async ( + clickhouse: ReturnType, + js: Awaited>["js"], + legs: LegEvidence[], + summary: ReturnType, + currentContractId: string +): Promise => { + if (!summary) { + return; + } + + if (!STRUCTURE_TYPES.has(summary.type)) { + return; + } + + if (!shouldEmitStructurePacket(legs, currentContractId)) { + return; + } + + const plan = planStructurePacket(legs, summary, env.CLUSTER_WINDOW_MS); + if (!plan) { + return; + } + + pruneRecentStructureEmits(plan.endTs); + const lastEmitTs = recentStructureEmits.get(plan.dedupeKey); + if (typeof lastEmitTs === "number" && plan.endTs - lastEmitTs <= env.CLUSTER_WINDOW_MS) { + return; + } + + recentStructureEmits.set(plan.dedupeKey, plan.endTs); + const packet = buildStructureFlowPacket(plan, summary); + const validated = FlowPacketSchema.parse(packet); + + await insertFlowPacket(clickhouse, validated); + await publishJson(js, SUBJECT_FLOW_PACKETS, validated); + await emitClassifiers(clickhouse, js, validated); + + logger.info("emitted structure flow packet", { + id: validated.id, + type: summary.type, + legs: summary.legs, + strikes: summary.strikes + }); +}; + const applyDeliverPolicy = ( opts: ReturnType, policy: typeof env.COMPUTE_DELIVER_POLICY @@ -586,7 +680,8 @@ const flushCluster = async ( ...collectRecentLegs(key, anchorTs, currentLeg.contractId), ...collectActiveLegs(key, anchorTs, currentLeg.contractId) ]; - const summary = summarizeStructure([currentLeg, ...candidates]); + const legs = [currentLeg, ...candidates]; + const summary = summarizeStructure(legs); if (summary) { features.structure_type = summary.type; features.structure_legs = summary.legs; @@ -594,6 +689,8 @@ const flushCluster = async ( features.structure_strike_span = roundTo(summary.strikeSpan); features.structure_rights = summary.rights; } + + await emitStructurePacketIfNeeded(clickhouse, js, legs, summary, currentLeg.contractId); storeRecentLeg(currentLeg, anchorTs); } diff --git a/services/compute/src/structure-packets.ts b/services/compute/src/structure-packets.ts new file mode 100644 index 0000000..66a24fa --- /dev/null +++ b/services/compute/src/structure-packets.ts @@ -0,0 +1,226 @@ +import type { FlowPacket } from "@islandflow/types"; +import type { ContractLeg, StructureSummary } from "./structures"; + +export type NbboPlacementCounts = { + aa: number; + a: number; + b: number; + bb: number; + mid: number; + missing: number; + stale: number; +}; + +export type LegEvidence = ContractLeg & { + members: string[]; + totalSize: number; + totalPremium: number; + placements: NbboPlacementCounts; + source_ts: number; + ingest_ts: number; + seq: number; +}; + +export type StructurePacketPlan = { + id: string; + dedupeKey: string; + bucketStartTs: number; + root: string; + pseudoContractId: string; + startTs: number; + endTs: number; + members: string[]; + totalSize: number; + totalPremium: number; + count: number; + placements: NbboPlacementCounts; + nbboCoverageRatio: number; + nbboAggressiveBuyRatio: number; + nbboAggressiveSellRatio: number; + nbboAggressiveRatio: number; + source_ts: number; + ingest_ts: number; + seq: number; +}; + +const roundTo = (value: number, digits = 4): number => { + if (!Number.isFinite(value)) { + return 0; + } + return Number(value.toFixed(digits)); +}; + +const emptyPlacements = (): NbboPlacementCounts => ({ + aa: 0, + a: 0, + b: 0, + bb: 0, + mid: 0, + missing: 0, + stale: 0 +}); + +const mergePlacements = (legs: LegEvidence[]): NbboPlacementCounts => { + const merged = emptyPlacements(); + for (const leg of legs) { + merged.aa += leg.placements.aa; + merged.a += leg.placements.a; + merged.b += leg.placements.b; + merged.bb += leg.placements.bb; + merged.mid += leg.placements.mid; + merged.missing += leg.placements.missing; + merged.stale += leg.placements.stale; + } + return merged; +}; + +const buildPseudoContractId = (root: string, expiry: string, structureType: string): string => { + const normalizedRoot = root.trim().toUpperCase(); + return `${normalizedRoot}-${expiry}-STRUCT-${structureType}`; +}; + +const bucketTs = (value: number, bucketMs: number): number => { + if (!Number.isFinite(value) || value <= 0 || !Number.isFinite(bucketMs) || bucketMs <= 0) { + return 0; + } + return Math.floor(value / bucketMs) * bucketMs; +}; + +const uniqueSorted = (values: string[]): string[] => { + return Array.from(new Set(values)).sort(); +}; + +export const shouldEmitStructurePacket = (legs: LegEvidence[], currentLegContractId: string): boolean => { + if (legs.length < 2) { + return false; + } + + const current = legs.find((leg) => leg.contractId === currentLegContractId); + if (!current) { + return false; + } + + const maxEnd = legs.reduce((max, leg) => Math.max(max, leg.endTs), 0); + return current.endTs >= maxEnd; +}; + +export const planStructurePacket = ( + legs: LegEvidence[], + summary: StructureSummary, + clusterWindowMs: number +): StructurePacketPlan | null => { + if (legs.length < 2) { + return null; + } + + const root = legs[0]?.root; + const expiry = legs[0]?.expiry; + if (!root || !expiry) { + return null; + } + + const contractIds = uniqueSorted(legs.map((leg) => leg.contractId)); + const startTs = legs.reduce((min, leg) => Math.min(min, leg.startTs), Number.POSITIVE_INFINITY); + const endTs = legs.reduce((max, leg) => Math.max(max, leg.endTs), 0); + const bucketStartTs = bucketTs(startTs, clusterWindowMs); + const pseudoContractId = buildPseudoContractId(root, expiry, summary.type); + const id = `flowpacket:${pseudoContractId}:${bucketStartTs}:${contractIds.join("|")}`; + const dedupeKey = `${pseudoContractId}:${bucketStartTs}:${contractIds.join("|")}`; + + const members = uniqueSorted(legs.flatMap((leg) => leg.members)); + const totalPremium = legs.reduce((sum, leg) => sum + leg.totalPremium, 0); + const totalSize = legs.reduce((sum, leg) => sum + leg.totalSize, 0); + const count = legs.reduce((sum, leg) => sum + leg.members.length, 0); + const placements = mergePlacements(legs); + const placementTotal = placements.aa + placements.a + placements.b + placements.bb + placements.mid; + const aggressiveTotal = placements.aa + placements.a + placements.b + placements.bb; + const aggressiveBuy = placements.aa + placements.a; + const aggressiveSell = placements.bb + placements.b; + const nbboCoverageRatio = count > 0 ? placementTotal / count : 0; + const nbboAggressiveBuyRatio = aggressiveTotal > 0 ? aggressiveBuy / aggressiveTotal : 0; + const nbboAggressiveSellRatio = aggressiveTotal > 0 ? aggressiveSell / aggressiveTotal : 0; + const nbboAggressiveRatio = placementTotal > 0 ? aggressiveTotal / placementTotal : 0; + + const source_ts = legs.reduce((min, leg) => Math.min(min, leg.source_ts), Number.POSITIVE_INFINITY); + const ingest_ts = legs.reduce((max, leg) => Math.max(max, leg.ingest_ts), 0); + const seq = legs.reduce((max, leg) => Math.max(max, leg.seq), 0); + + return { + id, + dedupeKey, + bucketStartTs, + root: root.trim().toUpperCase(), + pseudoContractId, + startTs: Number.isFinite(startTs) ? startTs : 0, + endTs, + members, + totalSize, + totalPremium, + count, + placements, + nbboCoverageRatio, + nbboAggressiveBuyRatio, + nbboAggressiveSellRatio, + nbboAggressiveRatio, + source_ts: Number.isFinite(source_ts) ? source_ts : 0, + ingest_ts, + seq + }; +}; + +export const buildStructureFlowPacket = ( + plan: StructurePacketPlan, + summary: StructureSummary +): FlowPacket => { + const totalPremium = roundTo(plan.totalPremium); + const totalNotional = roundTo(totalPremium * 100, 2); + const windowMs = Math.max(0, plan.endTs - plan.startTs); + + const features: Record = { + packet_kind: "structure", + option_contract_id: plan.pseudoContractId, + underlying_id: plan.root, + count: plan.count, + total_size: plan.totalSize, + total_premium: totalPremium, + total_notional: totalNotional, + start_ts: plan.startTs, + end_ts: plan.endTs, + window_ms: windowMs, + structure_type: summary.type, + structure_legs: summary.legs, + structure_strikes: summary.strikes, + structure_strike_span: roundTo(summary.strikeSpan), + structure_rights: summary.rights, + structure_contract_ids: summary.contractIds.join(",") + }; + + // These are aggregate counts across the legs. We do not attach rolling z-scores + // (baseline is per-contract), so structure packets default to absolute thresholds. + features.nbbo_aa_count = plan.placements.aa; + features.nbbo_a_count = plan.placements.a; + features.nbbo_b_count = plan.placements.b; + features.nbbo_bb_count = plan.placements.bb; + features.nbbo_mid_count = plan.placements.mid; + features.nbbo_missing_count = plan.placements.missing; + features.nbbo_stale_count = plan.placements.stale; + features.nbbo_coverage_ratio = roundTo(plan.nbboCoverageRatio); + features.nbbo_aggressive_buy_ratio = roundTo(plan.nbboAggressiveBuyRatio); + features.nbbo_aggressive_sell_ratio = roundTo(plan.nbboAggressiveSellRatio); + features.nbbo_aggressive_ratio = roundTo(plan.nbboAggressiveRatio); + + const join_quality: Record = { + nbbo_coverage_ratio: roundTo(plan.nbboCoverageRatio) + }; + + return { + source_ts: plan.source_ts, + ingest_ts: plan.ingest_ts, + seq: plan.seq, + trace_id: plan.id, + id: plan.id, + members: plan.members, + features, + join_quality + }; +}; diff --git a/services/compute/src/structures.ts b/services/compute/src/structures.ts index 7f228ea..f6eb909 100644 --- a/services/compute/src/structures.ts +++ b/services/compute/src/structures.ts @@ -41,6 +41,6 @@ export const summarizeStructure = (legs: ContractLeg[]): StructureSummary | null strikes: strikes.length, strikeSpan, rights: rights.size === 2 ? "C/P" : Array.from(rights)[0] ?? "", - contractIds: legs.map((leg) => leg.contractId) + contractIds: legs.map((leg) => leg.contractId).slice().sort() }; }; diff --git a/services/compute/tests/classifiers.test.ts b/services/compute/tests/classifiers.test.ts index ab3b110..f2513f8 100644 --- a/services/compute/tests/classifiers.test.ts +++ b/services/compute/tests/classifiers.test.ts @@ -137,6 +137,7 @@ describe("classifier structure and positioning signals", () => { test("straddle classifier triggers on structure tag", () => { const packet = buildPacket({ + packet_kind: "structure", structure_type: "straddle", structure_legs: 2, structure_strikes: 1, @@ -147,8 +148,21 @@ describe("classifier structure and positioning signals", () => { expect(hits.some((hit) => hit.classifier_id === "straddle")).toBe(true); }); + test("structure classifiers are suppressed on per-contract packets", () => { + const packet = buildPacket({ + structure_type: "straddle", + structure_legs: 2, + structure_strikes: 1, + structure_rights: "C/P", + structure_strike_span: 0 + }); + const hits = evaluateClassifiers(packet, baseConfig); + expect(hits.some((hit) => hit.classifier_id === "straddle")).toBe(false); + }); + test("vertical spread infers direction from aggressor skew", () => { const packet = buildPacket({ + packet_kind: "structure", structure_type: "vertical", structure_legs: 2, structure_strikes: 2, @@ -167,6 +181,7 @@ describe("classifier structure and positioning signals", () => { test("ladder accumulation triggers on multi-strike structures", () => { const packet = buildPacket({ + packet_kind: "structure", structure_type: "ladder", structure_legs: 3, structure_strikes: 3, diff --git a/services/compute/tests/structure-packets.test.ts b/services/compute/tests/structure-packets.test.ts new file mode 100644 index 0000000..d89c236 --- /dev/null +++ b/services/compute/tests/structure-packets.test.ts @@ -0,0 +1,137 @@ +import { describe, expect, test } from "bun:test"; +import { summarizeStructure } from "../src/structures"; +import { + buildStructureFlowPacket, + planStructurePacket, + shouldEmitStructurePacket, + type LegEvidence +} from "../src/structure-packets"; + +const placements = (overrides?: Partial): LegEvidence["placements"] => ({ + aa: 0, + a: 0, + b: 0, + bb: 0, + mid: 0, + missing: 0, + stale: 0, + ...overrides +}); + +const leg = (input: Partial & Pick): LegEvidence => { + return { + contractId: input.contractId, + root: "SPY", + expiry: "2025-01-17", + right: input.right, + strike: input.strike, + startTs: input.startTs ?? 1000, + endTs: input.endTs ?? 1100, + members: input.members ?? [input.contractId], + totalSize: input.totalSize ?? 100, + totalPremium: input.totalPremium ?? 1000, + placements: input.placements ?? placements(), + source_ts: input.source_ts ?? 1000, + ingest_ts: input.ingest_ts ?? 1200, + seq: input.seq ?? 1 + }; +}; + +describe("structure packet planning", () => { + test("emits only on latest leg endTs", () => { + const call = leg({ + contractId: "SPY-2025-01-17-450-C", + right: "C", + strike: 450, + endTs: 1100 + }); + const put = leg({ + contractId: "SPY-2025-01-17-450-P", + right: "P", + strike: 450, + endTs: 1125 + }); + const legs: LegEvidence[] = [call, put]; + + expect(shouldEmitStructurePacket(legs, call.contractId)).toBe(false); + expect(shouldEmitStructurePacket(legs, put.contractId)).toBe(true); + }); + + test("plans deterministic id + members across legs", () => { + const call = leg({ + contractId: "SPY-2025-01-17-450-C", + right: "C", + strike: 450, + members: ["p2", "p1"], + totalSize: 20, + totalPremium: 4000, + placements: placements({ aa: 1, mid: 1 }) + }); + const put = leg({ + contractId: "SPY-2025-01-17-450-P", + right: "P", + strike: 450, + startTs: 1005, + endTs: 1120, + members: ["p3"], + totalSize: 10, + totalPremium: 1500, + placements: placements({ bb: 1 }) + }); + + const legs = [call, put]; + const summary = summarizeStructure(legs); + expect(summary?.type).toBe("straddle"); + + const plan = planStructurePacket(legs, summary!, 500); + expect(plan).not.toBeNull(); + + expect(plan!.pseudoContractId).toBe("SPY-2025-01-17-STRUCT-straddle"); + expect(plan!.id.startsWith("flowpacket:SPY-2025-01-17-STRUCT-straddle:")).toBe(true); + expect(plan!.members).toEqual(["p1", "p2", "p3"]); + expect(plan!.totalSize).toBe(30); + expect(plan!.totalPremium).toBe(5500); + expect(plan!.count).toBe(3); + + const swappedPlan = planStructurePacket([put, call], summary!, 500); + expect(swappedPlan).not.toBeNull(); + expect(swappedPlan!.id).toBe(plan!.id); + expect(swappedPlan!.members).toEqual(plan!.members); + }); + + test("builds structure FlowPacket with aggregate aggressor ratios", () => { + const call = leg({ + contractId: "SPY-2025-01-17-450-C", + right: "C", + strike: 450, + members: ["p1", "p2"], + totalSize: 20, + totalPremium: 4000, + placements: placements({ aa: 1, mid: 1 }) + }); + const put = leg({ + contractId: "SPY-2025-01-17-450-P", + right: "P", + strike: 450, + members: ["p3"], + totalSize: 10, + totalPremium: 1500, + placements: placements({ bb: 1 }) + }); + + const legs = [call, put]; + const summary = summarizeStructure(legs); + const plan = planStructurePacket(legs, summary!, 500); + const packet = buildStructureFlowPacket(plan!, summary!); + + expect(packet.features.packet_kind).toBe("structure"); + expect(packet.features.underlying_id).toBe("SPY"); + expect(packet.features.nbbo_aa_count).toBe(1); + expect(packet.features.nbbo_bb_count).toBe(1); + expect(packet.features.nbbo_mid_count).toBe(1); + expect(packet.features.nbbo_coverage_ratio).toBeCloseTo(1, 6); + + // 2 aggressive (AA + BB) out of 3 classified (AA + BB + MID) + expect(packet.features.nbbo_aggressive_ratio).toBeCloseTo(2 / 3, 4); + }); +});