diff --git a/README.md b/README.md index 542f508..766ef7f 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ Done now (in repo): - Synthetic options/equity prints (full S&P 500) published to NATS and persisted to ClickHouse - Deterministic option FlowPacket clustering (time window) + persistence - Rolling stats in Redis (premium/size/spread) with z-score features on FlowPackets +- FlowPacket structure tags (vertical/ladder/straddle/strangle) for multi-leg bursts - Rule-first classifiers + alert scoring with ClickHouse persistence + WS/REST endpoints - API: REST for prints/flow packets/classifier hits/alerts, WS for live options/equities/flow/alerts/hits, replay endpoints - UI: live tapes for options/equities/flow + replay toggle + pause controls + replay time/completion @@ -24,7 +25,7 @@ Done now (in repo): In progress / blocked: - Live data adapters beyond dev-only feeds (requires licensed data source) -- Advanced clustering +- Advanced clustering (spreads/rolls beyond basic structure tags) Not started: - Dark pool inference @@ -45,6 +46,7 @@ Not started: - Raw event persistence in ClickHouse + streaming via NATS JetStream - Deterministic option FlowPacket clustering (time-window) - Rolling stats baselines in Redis with z-score features on FlowPackets +- Basic multi-leg structure tagging on FlowPackets - Classifiers + alert scoring (rule-first) with WS/REST endpoints - API gateway with REST, WS, and replay endpoints - UI tapes for options/equities/flow packets + alerts/hits with live/replay toggle and pause controls diff --git a/apps/web/app/globals.css b/apps/web/app/globals.css index ce90040..9489ab1 100644 --- a/apps/web/app/globals.css +++ b/apps/web/app/globals.css @@ -426,6 +426,12 @@ h1 { background: rgba(111, 91, 57, 0.12); } +.structure-tag { + border-color: rgba(39, 84, 138, 0.45); + color: #27548a; + background: rgba(39, 84, 138, 0.12); +} + .nbbo-meta { font-size: 0.72rem; color: #6f5b39; diff --git a/apps/web/app/page.tsx b/apps/web/app/page.tsx index 51bbfc0..df874a7 100644 --- a/apps/web/app/page.tsx +++ b/apps/web/app/page.tsx @@ -1778,16 +1778,22 @@ export default function HomePage() { const features = packet.features ?? {}; const contract = String(features.option_contract_id ?? packet.id ?? "unknown"); const count = parseNumber(features.count, packet.members.length); - const totalSize = parseNumber(features.total_size, 0); - const totalPremium = parseNumber(features.total_premium, 0); - const notional = totalPremium * 100; - const startTs = parseNumber(features.start_ts, packet.source_ts); - const endTs = parseNumber(features.end_ts, startTs); - const windowMs = parseNumber(features.window_ms, 0); - const nbboBid = parseNumber(features.nbbo_bid, Number.NaN); - const nbboAsk = parseNumber(features.nbbo_ask, Number.NaN); - const nbboMid = parseNumber(features.nbbo_mid, Number.NaN); - const nbboSpread = parseNumber(features.nbbo_spread, Number.NaN); + const totalSize = parseNumber(features.total_size, 0); + const totalPremium = parseNumber(features.total_premium, 0); + const notional = totalPremium * 100; + const startTs = parseNumber(features.start_ts, packet.source_ts); + const endTs = parseNumber(features.end_ts, startTs); + const windowMs = parseNumber(features.window_ms, 0); + const structureType = + typeof features.structure_type === "string" ? features.structure_type : ""; + const structureLegs = parseNumber(features.structure_legs, 0); + const structureRights = + typeof features.structure_rights === "string" ? features.structure_rights : ""; + const structureStrikes = parseNumber(features.structure_strikes, 0); + const nbboBid = parseNumber(features.nbbo_bid, Number.NaN); + const nbboAsk = parseNumber(features.nbbo_ask, Number.NaN); + const nbboMid = parseNumber(features.nbbo_mid, Number.NaN); + const nbboSpread = parseNumber(features.nbbo_spread, Number.NaN); const nbboAge = parseNumber(packet.join_quality.nbbo_age_ms, Number.NaN); const nbboStale = parseNumber(packet.join_quality.nbbo_stale, 0) > 0; const nbboMissing = parseNumber(packet.join_quality.nbbo_missing, 0) > 0; @@ -1804,6 +1810,14 @@ export default function HomePage() { {windowMs > 0 ? ( {formatFlowMetric(windowMs, "ms")} ) : null} + {structureType ? ( + + {structureType.replace(/_/g, " ")} + {structureRights ? ` ${structureRights}` : ""} + {structureLegs > 0 ? ` ${structureLegs}L` : ""} + {structureStrikes > 0 ? ` ${structureStrikes}K` : ""} + + ) : null} {Number.isFinite(nbboBid) && Number.isFinite(nbboAsk) ? ( NBBO ${formatPrice(nbboBid)} x ${formatPrice(nbboAsk)} diff --git a/services/compute/src/classifiers.ts b/services/compute/src/classifiers.ts index 4d188da..7ab61e1 100644 --- a/services/compute/src/classifiers.ts +++ b/services/compute/src/classifiers.ts @@ -1,11 +1,5 @@ import type { ClassifierHit, FlowPacket } from "@islandflow/types"; - -type ParsedContract = { - root: string; - expiry: string; - strike: number; - right: "C" | "P"; -}; +import { parseContractId, type ParsedContract } from "./contracts"; export type ClassifierConfig = { sweepMinPremium: number; @@ -32,81 +26,6 @@ const formatUsd = (value: number): string => { return `$${value.toFixed(2)}`; }; -const parseDashedContract = (value: string): ParsedContract | null => { - const parts = value.split("-"); - if (parts.length < 6) { - return null; - } - - const rightRaw = parts.at(-1) ?? ""; - if (rightRaw !== "C" && rightRaw !== "P") { - return null; - } - - const strikeRaw = parts.at(-2) ?? ""; - const strike = Number(strikeRaw); - const expiryParts = parts.slice(-5, -2); - const expiry = expiryParts.join("-"); - const root = parts.slice(0, -5).join("-"); - - if (!root || !expiry || !Number.isFinite(strike)) { - return null; - } - - return { - root, - expiry, - strike, - right: rightRaw - }; -}; - -const parseOccContract = (value: string): ParsedContract | null => { - if (value.length < 15) { - return null; - } - - const tail = value.slice(-15); - const root = value.slice(0, -15).trim(); - const expiryRaw = tail.slice(0, 6); - const right = tail.slice(6, 7); - const strikeRaw = tail.slice(7); - - if (!/^\d{6}$/.test(expiryRaw) || !/^\d{8}$/.test(strikeRaw)) { - return null; - } - - if (right !== "C" && right !== "P") { - return null; - } - - const year = 2000 + Number(expiryRaw.slice(0, 2)); - const month = Number(expiryRaw.slice(2, 4)) - 1; - const day = Number(expiryRaw.slice(4, 6)); - const expiryDate = new Date(Date.UTC(year, month, day)); - const expiry = expiryDate.toISOString().slice(0, 10); - const strike = Number(strikeRaw) / 1000; - - if (!root || !Number.isFinite(strike)) { - return null; - } - - return { - root, - expiry, - strike, - right - }; -}; - -const parseContractId = (value: string | undefined): ParsedContract | null => { - if (!value) { - return null; - } - - return parseDashedContract(value) ?? parseOccContract(value); -}; - const getNumberFeature = (packet: FlowPacket, key: string): number => { const value = packet.features[key]; return typeof value === "number" && Number.isFinite(value) ? value : 0; diff --git a/services/compute/src/contracts.ts b/services/compute/src/contracts.ts new file mode 100644 index 0000000..3e5efc7 --- /dev/null +++ b/services/compute/src/contracts.ts @@ -0,0 +1,81 @@ +export type ParsedContract = { + root: string; + expiry: string; + strike: number; + right: "C" | "P"; +}; + +const parseDashedContract = (value: string): ParsedContract | null => { + const parts = value.split("-"); + if (parts.length < 6) { + return null; + } + + const rightRaw = parts.at(-1) ?? ""; + if (rightRaw !== "C" && rightRaw !== "P") { + return null; + } + + const strikeRaw = parts.at(-2) ?? ""; + const strike = Number(strikeRaw); + const expiryParts = parts.slice(-5, -2); + const expiry = expiryParts.join("-"); + const root = parts.slice(0, -5).join("-"); + + if (!root || !expiry || !Number.isFinite(strike)) { + return null; + } + + return { + root, + expiry, + strike, + right: rightRaw + }; +}; + +const parseOccContract = (value: string): ParsedContract | null => { + if (value.length < 15) { + return null; + } + + const tail = value.slice(-15); + const root = value.slice(0, -15).trim(); + const expiryRaw = tail.slice(0, 6); + const right = tail.slice(6, 7); + const strikeRaw = tail.slice(7); + + if (!/^\d{6}$/.test(expiryRaw) || !/^\d{8}$/.test(strikeRaw)) { + return null; + } + + if (right !== "C" && right !== "P") { + return null; + } + + const year = 2000 + Number(expiryRaw.slice(0, 2)); + const month = Number(expiryRaw.slice(2, 4)) - 1; + const day = Number(expiryRaw.slice(4, 6)); + const expiryDate = new Date(Date.UTC(year, month, day)); + const expiry = expiryDate.toISOString().slice(0, 10); + const strike = Number(strikeRaw) / 1000; + + if (!root || !Number.isFinite(strike)) { + return null; + } + + return { + root, + expiry, + strike, + right + }; +}; + +export const parseContractId = (value: string | undefined): ParsedContract | null => { + if (!value) { + return null; + } + + return parseDashedContract(value) ?? parseOccContract(value); +}; diff --git a/services/compute/src/index.ts b/services/compute/src/index.ts index a38709b..fc53e5d 100644 --- a/services/compute/src/index.ts +++ b/services/compute/src/index.ts @@ -40,7 +40,9 @@ import { } from "@islandflow/types"; import { z } from "zod"; import { evaluateClassifiers, type ClassifierConfig } from "./classifiers"; +import { parseContractId } from "./contracts"; import { createRedisClient, updateRollingStats, type RollingStatsConfig } from "./rolling-stats"; +import { summarizeStructure, type ContractLeg } from "./structures"; const service = "compute"; const logger = createLogger({ service }); @@ -142,11 +144,77 @@ type ClusterState = { const clusters = new Map(); const nbboCache = new Map(); +const recentLegsByKey = new Map(); + +const MAX_RECENT_LEGS = 20; const rollingKey = (metric: string, contractId: string): string => { return `rolling:${metric}:${contractId}`; }; +const buildLegFromCluster = (cluster: ClusterState): ContractLeg | null => { + const parsed = parseContractId(cluster.contractId); + if (!parsed) { + return null; + } + + return { + ...parsed, + contractId: cluster.contractId, + startTs: cluster.startTs, + endTs: cluster.endTs + }; +}; + +const buildLegKey = (leg: ContractLeg): string => { + return `${leg.root}:${leg.expiry}`; +}; + +const isWithinStructureWindow = (anchorTs: number, candidateTs: number): boolean => { + return Math.abs(anchorTs - candidateTs) <= env.CLUSTER_WINDOW_MS; +}; + +const collectRecentLegs = (key: string, anchorTs: number, excludeId: string): ContractLeg[] => { + const recent = recentLegsByKey.get(key) ?? []; + const filtered = recent.filter( + (leg) => leg.contractId !== excludeId && isWithinStructureWindow(anchorTs, leg.endTs) + ); + recentLegsByKey.set(key, filtered); + return filtered; +}; + +const storeRecentLeg = (leg: ContractLeg, anchorTs: number): void => { + const key = buildLegKey(leg); + const recent = collectRecentLegs(key, anchorTs, ""); + const next = [leg, ...recent].slice(0, MAX_RECENT_LEGS); + recentLegsByKey.set(key, next); +}; + +const collectActiveLegs = ( + key: string, + anchorTs: number, + excludeId: string +): ContractLeg[] => { + const legs: ContractLeg[] = []; + for (const [contractId, cluster] of clusters) { + if (contractId === excludeId) { + continue; + } + const leg = buildLegFromCluster(cluster); + if (!leg) { + continue; + } + if (buildLegKey(leg) !== key) { + continue; + } + if (!isWithinStructureWindow(anchorTs, leg.endTs)) { + continue; + } + legs.push(leg); + } + return legs; +}; + const applyDeliverPolicy = ( opts: ReturnType, policy: typeof env.COMPUTE_DELIVER_POLICY @@ -275,6 +343,25 @@ const flushCluster = async ( await addRollingSnapshot("premium", totalPremium, "total_premium"); await addRollingSnapshot("size", cluster.totalSize, "total_size"); + const currentLeg = buildLegFromCluster(cluster); + if (currentLeg) { + const key = buildLegKey(currentLeg); + const anchorTs = cluster.endTs; + const candidates = [ + ...collectRecentLegs(key, anchorTs, currentLeg.contractId), + ...collectActiveLegs(key, anchorTs, currentLeg.contractId) + ]; + const summary = summarizeStructure([currentLeg, ...candidates]); + if (summary) { + features.structure_type = summary.type; + features.structure_legs = summary.legs; + features.structure_strikes = summary.strikes; + features.structure_strike_span = roundTo(summary.strikeSpan); + features.structure_rights = summary.rights; + } + storeRecentLeg(currentLeg, anchorTs); + } + if (!nbboJoin.nbbo) { joinQuality.nbbo_missing = 1; } else { diff --git a/services/compute/src/structures.ts b/services/compute/src/structures.ts new file mode 100644 index 0000000..7f228ea --- /dev/null +++ b/services/compute/src/structures.ts @@ -0,0 +1,46 @@ +import type { ParsedContract } from "./contracts"; + +export type ContractLeg = ParsedContract & { + contractId: string; + startTs: number; + endTs: number; +}; + +export type StructureSummary = { + type: string; + legs: number; + strikes: number; + strikeSpan: number; + rights: string; + contractIds: string[]; +}; + +export const summarizeStructure = (legs: ContractLeg[]): StructureSummary | null => { + if (legs.length < 2) { + return null; + } + + const strikes = Array.from(new Set(legs.map((leg) => leg.strike))).sort((a, b) => a - b); + const rights = new Set(legs.map((leg) => leg.right)); + const strikeSpan = strikes.length >= 2 ? strikes[strikes.length - 1] - strikes[0] : 0; + + let type = "multi_leg"; + if (rights.size === 2 && strikes.length === 1) { + type = "straddle"; + } else if (rights.size === 2 && strikes.length >= 2) { + type = "strangle"; + } else if (rights.size === 1 && strikes.length === 2) { + type = "vertical"; + } else if (rights.size === 1 && strikes.length >= 3) { + type = "ladder"; + } + + return { + type, + legs: legs.length, + strikes: strikes.length, + strikeSpan, + rights: rights.size === 2 ? "C/P" : Array.from(rights)[0] ?? "", + contractIds: legs.map((leg) => leg.contractId) + }; +}; diff --git a/services/compute/tests/structures.test.ts b/services/compute/tests/structures.test.ts new file mode 100644 index 0000000..3e9b968 --- /dev/null +++ b/services/compute/tests/structures.test.ts @@ -0,0 +1,43 @@ +import { describe, expect, test } from "bun:test"; +import { summarizeStructure, type ContractLeg } from "../src/structures"; + +const leg = (contractId: string, right: "C" | "P", strike: number): ContractLeg => ({ + contractId, + root: "SPY", + expiry: "2025-01-17", + right, + strike, + startTs: 0, + endTs: 0 +}); + +describe("structure summaries", () => { + test("detects verticals", () => { + const summary = summarizeStructure([leg("c1", "C", 100), leg("c2", "C", 105)]); + expect(summary?.type).toBe("vertical"); + expect(summary?.legs).toBe(2); + expect(summary?.strikes).toBe(2); + }); + + test("detects ladders", () => { + const summary = summarizeStructure([ + leg("c1", "C", 100), + leg("c2", "C", 105), + leg("c3", "C", 110) + ]); + expect(summary?.type).toBe("ladder"); + expect(summary?.strikes).toBe(3); + }); + + test("detects straddles", () => { + const summary = summarizeStructure([leg("c1", "C", 100), leg("p1", "P", 100)]); + expect(summary?.type).toBe("straddle"); + expect(summary?.rights).toBe("C/P"); + }); + + test("detects strangles", () => { + const summary = summarizeStructure([leg("c1", "C", 105), leg("p1", "P", 95)]); + expect(summary?.type).toBe("strangle"); + expect(summary?.strikes).toBe(2); + }); +});