From ea61c3b0137aaebf5de0dad072549d8b64b37aa6 Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Sun, 4 Jan 2026 17:29:21 -0500 Subject: [PATCH] Add dark inference pipeline --- packages/bus/src/subjects.ts | 2 + packages/storage/src/clickhouse.ts | 87 +++++++ packages/storage/src/index.ts | 1 + packages/storage/src/inferred-dark.ts | 66 +++++ packages/storage/tests/inferred-dark.test.ts | 33 +++ services/api/src/index.ts | 70 +++++ services/compute/src/dark-inference.ts | 243 ++++++++++++++++++ services/compute/src/index.ts | 79 +++++- services/compute/tests/dark-inference.test.ts | 119 +++++++++ 9 files changed, 699 insertions(+), 1 deletion(-) create mode 100644 packages/storage/src/inferred-dark.ts create mode 100644 packages/storage/tests/inferred-dark.test.ts create mode 100644 services/compute/src/dark-inference.ts create mode 100644 services/compute/tests/dark-inference.test.ts diff --git a/packages/bus/src/subjects.ts b/packages/bus/src/subjects.ts index 1bacd52..371c102 100644 --- a/packages/bus/src/subjects.ts +++ b/packages/bus/src/subjects.ts @@ -8,6 +8,8 @@ export const STREAM_EQUITY_QUOTES = "EQUITY_QUOTES"; export const SUBJECT_EQUITY_QUOTES = "equities.quotes"; export const STREAM_EQUITY_JOINS = "EQUITY_JOINS"; export const SUBJECT_EQUITY_JOINS = "equities.joins"; +export const STREAM_INFERRED_DARK = "INFERRED_DARK"; +export const SUBJECT_INFERRED_DARK = "dark.inferred"; export const STREAM_FLOW_PACKETS = "FLOW_PACKETS"; export const SUBJECT_FLOW_PACKETS = "flow.packets"; export const STREAM_CLASSIFIER_HITS = "CLASSIFIER_HITS"; diff --git a/packages/storage/src/clickhouse.ts b/packages/storage/src/clickhouse.ts index 3b4957f..9fac84c 100644 --- a/packages/storage/src/clickhouse.ts +++ b/packages/storage/src/clickhouse.ts @@ -5,6 +5,7 @@ import { EquityPrintSchema, EquityQuoteSchema, EquityPrintJoinSchema, + InferredDarkEventSchema, FlowPacketSchema, OptionNBBOSchema, OptionPrintSchema @@ -15,6 +16,7 @@ import type { EquityPrint, EquityQuote, EquityPrintJoin, + InferredDarkEvent, FlowPacket, OptionNBBO, OptionPrint @@ -42,6 +44,13 @@ import { toEquityPrintJoinRecord, type EquityPrintJoinRecord } from "./equity-print-joins"; +import { + inferredDarkTableDDL, + INFERRED_DARK_TABLE, + fromInferredDarkRecord, + toInferredDarkRecord, + type InferredDarkRecord +} from "./inferred-dark"; import { FLOW_PACKETS_TABLE, flowPacketsTableDDL, @@ -120,6 +129,14 @@ export const ensureEquityPrintJoinsTable = async ( }); }; +export const ensureInferredDarkTable = async ( + client: ClickHouseClient +): Promise => { + await client.exec({ + query: inferredDarkTableDDL() + }); +}; + export const ensureFlowPacketsTable = async ( client: ClickHouseClient ): Promise => { @@ -202,6 +219,18 @@ export const insertEquityPrintJoin = async ( }); }; +export const insertInferredDark = async ( + client: ClickHouseClient, + event: InferredDarkEvent +): Promise => { + const record = toInferredDarkRecord(event); + await client.insert({ + table: INFERRED_DARK_TABLE, + values: [record], + format: "JSONEachRow" + }); +}; + export const insertFlowPacket = async ( client: ClickHouseClient, packet: FlowPacket @@ -367,6 +396,23 @@ const normalizeEquityPrintJoinRow = (row: unknown): EquityPrintJoinRecord | null }; }; +const normalizeInferredDarkRow = (row: unknown): InferredDarkRecord | null => { + if (!row || typeof row !== "object") { + return null; + } + + const record = row as Record; + return { + source_ts: coerceNumber(record.source_ts) as number, + ingest_ts: coerceNumber(record.ingest_ts) as number, + seq: coerceNumber(record.seq) as number, + trace_id: String(record.trace_id ?? ""), + type: String(record.type ?? ""), + confidence: Number(coerceNumber(record.confidence) ?? 0), + evidence_refs_json: String(record.evidence_refs_json ?? "[]") + }; +}; + const normalizeFlowPacketRow = (row: unknown): FlowPacketRecord | null => { if (!row || typeof row !== "object") { return null; @@ -497,6 +543,24 @@ export const fetchRecentEquityPrintJoins = async ( return EquityPrintJoinSchema.array().parse(joins); }; +export const fetchRecentInferredDark = async ( + client: ClickHouseClient, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const result = await client.query({ + query: `SELECT * FROM ${INFERRED_DARK_TABLE} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeInferredDarkRow) + .filter((record): record is InferredDarkRecord => record !== null); + const events = records.map(fromInferredDarkRecord); + return InferredDarkEventSchema.array().parse(events); +}; + export const fetchRecentFlowPackets = async ( client: ClickHouseClient, limit: number @@ -649,3 +713,26 @@ export const fetchEquityPrintJoinsAfter = async ( const joins = records.map(fromEquityPrintJoinRecord); return EquityPrintJoinSchema.array().parse(joins); }; + +export const fetchInferredDarkAfter = async ( + client: ClickHouseClient, + afterTs: number, + afterSeq: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const safeAfterTs = clampCursor(afterTs); + const safeAfterSeq = clampCursor(afterSeq); + + const result = await client.query({ + query: `SELECT * FROM ${INFERRED_DARK_TABLE} WHERE (source_ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY source_ts ASC, seq ASC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeInferredDarkRow) + .filter((record): record is InferredDarkRecord => record !== null); + const events = records.map(fromInferredDarkRecord); + return InferredDarkEventSchema.array().parse(events); +}; diff --git a/packages/storage/src/index.ts b/packages/storage/src/index.ts index dadea11..a4c4079 100644 --- a/packages/storage/src/index.ts +++ b/packages/storage/src/index.ts @@ -5,5 +5,6 @@ export * from "./flow-packets"; export * from "./equity-prints"; export * from "./equity-quotes"; export * from "./equity-print-joins"; +export * from "./inferred-dark"; export * from "./option-prints"; export * from "./option-nbbo"; diff --git a/packages/storage/src/inferred-dark.ts b/packages/storage/src/inferred-dark.ts new file mode 100644 index 0000000..d6e0b0b --- /dev/null +++ b/packages/storage/src/inferred-dark.ts @@ -0,0 +1,66 @@ +import type { InferredDarkEvent } from "@islandflow/types"; + +export const INFERRED_DARK_TABLE = "inferred_dark"; + +export type InferredDarkRecord = { + source_ts: number; + ingest_ts: number; + seq: number; + trace_id: string; + type: string; + confidence: number; + evidence_refs_json: string; +}; + +export const inferredDarkTableDDL = (): string => { + return ` +CREATE TABLE IF NOT EXISTS ${INFERRED_DARK_TABLE} ( + source_ts UInt64, + ingest_ts UInt64, + seq UInt64, + trace_id String, + type String, + confidence Float64, + evidence_refs_json String +) +ENGINE = MergeTree +ORDER BY (source_ts, seq) +`; +}; + +export const toInferredDarkRecord = (event: InferredDarkEvent): InferredDarkRecord => { + return { + source_ts: event.source_ts, + ingest_ts: event.ingest_ts, + seq: event.seq, + trace_id: event.trace_id, + type: event.type, + confidence: event.confidence, + evidence_refs_json: JSON.stringify(event.evidence_refs) + }; +}; + +const safeStringArray = (value: string): string[] => { + try { + const parsed = JSON.parse(value); + if (Array.isArray(parsed)) { + return parsed.map((entry) => String(entry)); + } + } catch { + // ignore + } + + return []; +}; + +export const fromInferredDarkRecord = (record: InferredDarkRecord): InferredDarkEvent => { + return { + source_ts: record.source_ts, + ingest_ts: record.ingest_ts, + seq: record.seq, + trace_id: record.trace_id, + type: record.type, + confidence: record.confidence, + evidence_refs: safeStringArray(record.evidence_refs_json) + }; +}; diff --git a/packages/storage/tests/inferred-dark.test.ts b/packages/storage/tests/inferred-dark.test.ts new file mode 100644 index 0000000..2c7291e --- /dev/null +++ b/packages/storage/tests/inferred-dark.test.ts @@ -0,0 +1,33 @@ +import { describe, expect, it } from "bun:test"; +import { + fromInferredDarkRecord, + inferredDarkTableDDL, + INFERRED_DARK_TABLE, + toInferredDarkRecord +} from "../src/inferred-dark"; + +const event = { + source_ts: 100, + ingest_ts: 120, + seq: 1, + trace_id: "dark:absorbed:join-1", + type: "absorbed_block", + confidence: 0.62, + evidence_refs: ["equityjoin:print-1"] +}; + +describe("inferred-dark storage helpers", () => { + it("includes the correct table name in the DDL", () => { + const ddl = inferredDarkTableDDL(); + expect(ddl).toContain(INFERRED_DARK_TABLE); + expect(ddl).toContain("CREATE TABLE IF NOT EXISTS"); + }); + + it("round-trips inferred dark records", () => { + const record = toInferredDarkRecord(event); + const restored = fromInferredDarkRecord(record); + expect(restored.evidence_refs).toEqual(event.evidence_refs); + expect(restored.type).toBe(event.type); + expect(restored.confidence).toBeCloseTo(event.confidence, 4); + }); +}); diff --git a/services/api/src/index.ts b/services/api/src/index.ts index aa0d2db..9e50961 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -6,6 +6,7 @@ import { SUBJECT_EQUITY_JOINS, SUBJECT_EQUITY_PRINTS, SUBJECT_EQUITY_QUOTES, + SUBJECT_INFERRED_DARK, SUBJECT_FLOW_PACKETS, SUBJECT_OPTION_NBBO, SUBJECT_OPTION_PRINTS, @@ -14,6 +15,7 @@ import { STREAM_EQUITY_JOINS, STREAM_EQUITY_PRINTS, STREAM_EQUITY_QUOTES, + STREAM_INFERRED_DARK, STREAM_FLOW_PACKETS, STREAM_OPTION_NBBO, STREAM_OPTION_PRINTS, @@ -29,6 +31,7 @@ import { ensureEquityPrintJoinsTable, ensureEquityPrintsTable, ensureEquityQuotesTable, + ensureInferredDarkTable, ensureFlowPacketsTable, ensureOptionNBBOTable, ensureOptionPrintsTable, @@ -36,11 +39,13 @@ import { fetchRecentClassifierHits, fetchRecentEquityPrintJoins, fetchRecentFlowPackets, + fetchRecentInferredDark, fetchRecentEquityQuotes, fetchRecentOptionNBBO, fetchEquityPrintsAfter, fetchEquityPrintJoinsAfter, fetchEquityQuotesAfter, + fetchInferredDarkAfter, fetchRecentEquityPrints, fetchOptionNBBOAfter, fetchOptionPrintsAfter, @@ -52,6 +57,7 @@ import { EquityPrintSchema, EquityPrintJoinSchema, EquityQuoteSchema, + InferredDarkEventSchema, FlowPacketSchema, OptionNBBOSchema, OptionPrintSchema @@ -111,6 +117,7 @@ type Channel = | "equities" | "equity-quotes" | "equity-joins" + | "inferred-dark" | "flow" | "classifier-hits" | "alerts"; @@ -124,6 +131,7 @@ const optionNbboSockets = new Set>(); const equitySockets = new Set>(); const equityQuoteSockets = new Set>(); const equityJoinSockets = new Set>(); +const inferredDarkSockets = new Set>(); const flowSockets = new Set>(); const classifierHitSockets = new Set>(); const alertSockets = new Set>(); @@ -250,6 +258,19 @@ const run = async () => { num_replicas: 1 }); + await ensureStream(jsm, { + name: STREAM_INFERRED_DARK, + subjects: [SUBJECT_INFERRED_DARK], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + await ensureStream(jsm, { name: STREAM_FLOW_PACKETS, subjects: [SUBJECT_FLOW_PACKETS], @@ -300,6 +321,7 @@ const run = async () => { await ensureEquityPrintsTable(clickhouse); await ensureEquityQuotesTable(clickhouse); await ensureEquityPrintJoinsTable(clickhouse); + await ensureInferredDarkTable(clickhouse); await ensureFlowPacketsTable(clickhouse); await ensureClassifierHitsTable(clickhouse); await ensureAlertsTable(clickhouse); @@ -373,6 +395,12 @@ const run = async () => { "api-equity-joins" ); + const inferredDarkSubscription = await subscribeWithReset( + SUBJECT_INFERRED_DARK, + STREAM_INFERRED_DARK, + "api-inferred-dark" + ); + const flowSubscription = await subscribeWithReset( SUBJECT_FLOW_PACKETS, STREAM_FLOW_PACKETS, @@ -466,6 +494,21 @@ const run = async () => { } }; + const pumpInferredDark = async () => { + for await (const msg of inferredDarkSubscription.messages) { + try { + const payload = InferredDarkEventSchema.parse(inferredDarkSubscription.decode(msg)); + broadcast(inferredDarkSockets, { type: "inferred-dark", payload }); + msg.ack(); + } catch (error) { + logger.error("failed to process inferred dark event", { + error: error instanceof Error ? error.message : String(error) + }); + msg.term(); + } + } + }; + const pumpFlow = async () => { for await (const msg of flowSubscription.messages) { try { @@ -516,6 +559,7 @@ const run = async () => { void pumpEquities(); void pumpEquityQuotes(); void pumpEquityJoins(); + void pumpInferredDark(); void pumpFlow(); void pumpClassifierHits(); void pumpAlerts(); @@ -559,6 +603,12 @@ const run = async () => { return jsonResponse({ data }); } + if (req.method === "GET" && url.pathname === "/dark/inferred") { + const limit = parseLimit(url.searchParams.get("limit")); + const data = await fetchRecentInferredDark(clickhouse, limit); + return jsonResponse({ data }); + } + if (req.method === "GET" && url.pathname === "/flow/packets") { const limit = parseLimit(url.searchParams.get("limit")); const data = await fetchRecentFlowPackets(clickhouse, limit); @@ -617,6 +667,14 @@ const run = async () => { return jsonResponse({ data, next }); } + if (req.method === "GET" && url.pathname === "/replay/inferred-dark") { + const { afterTs, afterSeq, limit } = parseReplayParams(url); + const data = await fetchInferredDarkAfter(clickhouse, afterTs, afterSeq, limit); + const last = data.at(-1); + const next = last ? { ts: last.source_ts, seq: last.seq } : null; + return jsonResponse({ data, next }); + } + if (req.method === "GET" && url.pathname === "/ws/options") { if (serverRef.upgrade(req, { data: { channel: "options" } })) { return new Response(null, { status: 101 }); @@ -657,6 +715,14 @@ const run = async () => { return jsonResponse({ error: "websocket upgrade failed" }, 400); } + if (req.method === "GET" && url.pathname === "/ws/inferred-dark") { + if (serverRef.upgrade(req, { data: { channel: "inferred-dark" } })) { + return new Response(null, { status: 101 }); + } + + return jsonResponse({ error: "websocket upgrade failed" }, 400); + } + if (req.method === "GET" && url.pathname === "/ws/flow") { if (serverRef.upgrade(req, { data: { channel: "flow" } })) { return new Response(null, { status: 101 }); @@ -695,6 +761,8 @@ const run = async () => { equityQuoteSockets.add(socket); } else if (socket.data.channel === "equity-joins") { equityJoinSockets.add(socket); + } else if (socket.data.channel === "inferred-dark") { + inferredDarkSockets.add(socket); } else if (socket.data.channel === "flow") { flowSockets.add(socket); } else if (socket.data.channel === "classifier-hits") { @@ -716,6 +784,8 @@ const run = async () => { equityQuoteSockets.delete(socket); } else if (socket.data.channel === "equity-joins") { equityJoinSockets.delete(socket); + } else if (socket.data.channel === "inferred-dark") { + inferredDarkSockets.delete(socket); } else if (socket.data.channel === "flow") { flowSockets.delete(socket); } else if (socket.data.channel === "classifier-hits") { diff --git a/services/compute/src/dark-inference.ts b/services/compute/src/dark-inference.ts new file mode 100644 index 0000000..603606b --- /dev/null +++ b/services/compute/src/dark-inference.ts @@ -0,0 +1,243 @@ +import type { EquityPrintJoin, InferredDarkEvent } from "@islandflow/types"; + +export type DarkInferenceConfig = { + windowMs: number; + cooldownMs: number; + minBlockSize: number; + minAccumulationSize: number; + minAccumulationCount: number; + minPrintSize: number; + maxEvidence: number; + maxSpreadPct: number; + maxQuoteAgeMs: number; +}; + +type Evidence = { + id: string; + ts: number; + size: number; + placement: string; + offExchange: boolean; +}; + +export type DarkInferenceState = { + evidenceByUnderlying: Map; + lastEmittedByUnderlying: Map>; +}; + +const clamp01 = (value: number): number => { + if (!Number.isFinite(value)) { + return 0; + } + return Math.max(0, Math.min(1, value)); +}; + +const getNumber = (value: unknown): number | null => { + if (typeof value === "number" && Number.isFinite(value)) { + return value; + } + if (typeof value === "string") { + const parsed = Number(value); + if (Number.isFinite(parsed)) { + return parsed; + } + } + return null; +}; + +const getString = (value: unknown): string | null => { + if (typeof value === "string") { + return value; + } + return null; +}; + +const getBoolean = (value: unknown): boolean | null => { + if (typeof value === "boolean") { + return value; + } + if (typeof value === "number") { + return value !== 0; + } + if (typeof value === "string") { + const normalized = value.trim().toLowerCase(); + if (["true", "1", "yes", "on"].includes(normalized)) { + return true; + } + if (["false", "0", "no", "off"].includes(normalized)) { + return false; + } + } + return null; +}; + +const isBuyPlacement = (placement: string): boolean => { + return placement === "A" || placement === "AA"; +}; + +const isSellPlacement = (placement: string): boolean => { + return placement === "B" || placement === "BB"; +}; + +const getSpreadPct = (features: Record): number | null => { + const spread = getNumber(features.quote_spread); + const mid = getNumber(features.quote_mid); + if (spread === null || mid === null || mid <= 0) { + return null; + } + return spread / mid; +}; + +export const createDarkInferenceState = (): DarkInferenceState => { + return { + evidenceByUnderlying: new Map(), + lastEmittedByUnderlying: new Map() + }; +}; + +const shouldEmit = ( + state: DarkInferenceState, + underlyingId: string, + type: string, + ts: number, + cooldownMs: number +): boolean => { + const record = state.lastEmittedByUnderlying.get(underlyingId) ?? {}; + const last = record[type] ?? -Infinity; + if (ts - last < cooldownMs) { + return false; + } + record[type] = ts; + state.lastEmittedByUnderlying.set(underlyingId, record); + return true; +}; + +export const evaluateDarkInferences = ( + join: EquityPrintJoin, + config: DarkInferenceConfig, + state: DarkInferenceState +): InferredDarkEvent[] => { + const features = join.features ?? {}; + const joinQuality = join.join_quality ?? {}; + + const underlyingId = getString(features.underlying_id); + if (!underlyingId) { + return []; + } + + const size = getNumber(features.size); + if (size === null) { + return []; + } + + const placement = getString(features.quote_placement) ?? "MISSING"; + const offExchange = getBoolean(features.off_exchange_flag) ?? false; + const ts = Number.isFinite(join.source_ts) ? join.source_ts : 0; + + const quoteAgeMs = getNumber(joinQuality.quote_age_ms) ?? config.maxQuoteAgeMs + 1; + const quoteMissing = getNumber(joinQuality.quote_missing) === 1; + const quoteStale = getNumber(joinQuality.quote_stale) === 1; + const spreadPct = getSpreadPct(features); + + const goodQuality = + !quoteMissing && + !quoteStale && + quoteAgeMs <= config.maxQuoteAgeMs && + (spreadPct === null || spreadPct <= config.maxSpreadPct); + + const events: InferredDarkEvent[] = []; + + if ( + offExchange && + goodQuality && + placement === "MID" && + size >= config.minBlockSize && + shouldEmit(state, underlyingId, "absorbed_block", ts, config.cooldownMs) + ) { + const sizeRatio = Math.min(1, size / (config.minBlockSize * 2)); + const spreadScore = + spreadPct === null || spreadPct <= 0 ? 0.5 : Math.max(0, 1 - spreadPct / config.maxSpreadPct); + const confidence = clamp01(0.35 + sizeRatio * 0.45 + spreadScore * 0.2); + + events.push({ + source_ts: join.source_ts, + ingest_ts: join.ingest_ts, + seq: join.seq, + trace_id: `dark:absorbed_block:${join.id}`, + type: "absorbed_block", + confidence, + evidence_refs: [join.id] + }); + } + + if ( + offExchange && + goodQuality && + size >= config.minPrintSize && + (isBuyPlacement(placement) || isSellPlacement(placement)) + ) { + const existing = state.evidenceByUnderlying.get(underlyingId) ?? []; + const nextEvidence = [ + ...existing, + { + id: join.id, + ts, + size, + placement, + offExchange + } + ].filter((entry) => ts - entry.ts <= config.windowMs); + + state.evidenceByUnderlying.set(underlyingId, nextEvidence); + + const buys = nextEvidence.filter((entry) => isBuyPlacement(entry.placement)); + const sells = nextEvidence.filter((entry) => isSellPlacement(entry.placement)); + + const buySize = buys.reduce((sum, entry) => sum + entry.size, 0); + const sellSize = sells.reduce((sum, entry) => sum + entry.size, 0); + + if ( + buys.length >= config.minAccumulationCount && + buySize >= config.minAccumulationSize && + shouldEmit(state, underlyingId, "stealth_accumulation", ts, config.cooldownMs) + ) { + const sizeRatio = Math.min(1, buySize / (config.minAccumulationSize * 2)); + const countRatio = Math.min(1, buys.length / (config.minAccumulationCount * 2)); + const confidence = clamp01(0.3 + sizeRatio * 0.4 + countRatio * 0.3); + const evidence = buys.slice(-config.maxEvidence).map((entry) => entry.id); + + events.push({ + source_ts: join.source_ts, + ingest_ts: join.ingest_ts, + seq: join.seq, + trace_id: `dark:stealth_accumulation:${underlyingId}:${ts}`, + type: "stealth_accumulation", + confidence, + evidence_refs: evidence + }); + } + + if ( + sells.length >= config.minAccumulationCount && + sellSize >= config.minAccumulationSize && + shouldEmit(state, underlyingId, "distribution", ts, config.cooldownMs) + ) { + const sizeRatio = Math.min(1, sellSize / (config.minAccumulationSize * 2)); + const countRatio = Math.min(1, sells.length / (config.minAccumulationCount * 2)); + const confidence = clamp01(0.3 + sizeRatio * 0.4 + countRatio * 0.3); + const evidence = sells.slice(-config.maxEvidence).map((entry) => entry.id); + + events.push({ + source_ts: join.source_ts, + ingest_ts: join.ingest_ts, + seq: join.seq, + trace_id: `dark:distribution:${underlyingId}:${ts}`, + type: "distribution", + confidence, + evidence_refs: evidence + }); + } + } + + return events; +}; diff --git a/services/compute/src/index.ts b/services/compute/src/index.ts index c9d638b..0aa2d67 100644 --- a/services/compute/src/index.ts +++ b/services/compute/src/index.ts @@ -6,6 +6,7 @@ import { SUBJECT_EQUITY_JOINS, SUBJECT_EQUITY_PRINTS, SUBJECT_EQUITY_QUOTES, + SUBJECT_INFERRED_DARK, SUBJECT_FLOW_PACKETS, SUBJECT_OPTION_NBBO, SUBJECT_OPTION_PRINTS, @@ -14,6 +15,7 @@ import { STREAM_EQUITY_JOINS, STREAM_EQUITY_PRINTS, STREAM_EQUITY_QUOTES, + STREAM_INFERRED_DARK, STREAM_FLOW_PACKETS, STREAM_OPTION_NBBO, STREAM_OPTION_PRINTS, @@ -28,10 +30,12 @@ import { ensureAlertsTable, ensureClassifierHitsTable, ensureEquityPrintJoinsTable, + ensureInferredDarkTable, ensureFlowPacketsTable, insertAlert, insertClassifierHit, insertEquityPrintJoin, + insertInferredDark, insertFlowPacket } from "@islandflow/storage"; import { @@ -40,6 +44,7 @@ import { EquityPrintJoinSchema, EquityPrintSchema, EquityQuoteSchema, + InferredDarkEventSchema, FlowPacketSchema, OptionNBBOSchema, OptionPrintSchema, @@ -48,6 +53,7 @@ import { type EquityPrint, type EquityQuote, type EquityPrintJoin, + type InferredDarkEvent, type FlowPacket, type OptionNBBO, type OptionPrint @@ -55,6 +61,11 @@ import { import { z } from "zod"; import { evaluateClassifiers, type ClassifierConfig } from "./classifiers"; import { parseContractId } from "./contracts"; +import { + createDarkInferenceState, + evaluateDarkInferences, + type DarkInferenceConfig +} from "./dark-inference"; import { buildEquityPrintJoin, type EquityQuoteJoin } from "./equity-joins"; import { createRedisClient, updateRollingStats, type RollingStatsConfig } from "./rolling-stats"; import { summarizeStructure, type ContractLeg } from "./structures"; @@ -87,6 +98,14 @@ const envSchema = z.object({ .default(false), NBBO_MAX_AGE_MS: z.coerce.number().int().positive().default(1000), EQUITY_QUOTE_MAX_AGE_MS: z.coerce.number().int().positive().default(1000), + DARK_INFER_WINDOW_MS: z.coerce.number().int().positive().default(60000), + DARK_INFER_COOLDOWN_MS: z.coerce.number().int().nonnegative().default(30000), + DARK_INFER_MIN_BLOCK_SIZE: z.coerce.number().int().positive().default(2000), + DARK_INFER_MIN_ACCUM_SIZE: z.coerce.number().int().positive().default(3000), + DARK_INFER_MIN_ACCUM_COUNT: z.coerce.number().int().positive().default(4), + DARK_INFER_MIN_PRINT_SIZE: z.coerce.number().int().positive().default(200), + DARK_INFER_MAX_EVIDENCE: z.coerce.number().int().positive().default(20), + DARK_INFER_MAX_SPREAD_PCT: z.coerce.number().positive().default(0.005), CLASSIFIER_SWEEP_MIN_PREMIUM: z.coerce.number().positive().default(40_000), CLASSIFIER_SWEEP_MIN_COUNT: z.coerce.number().int().positive().default(3), CLASSIFIER_SWEEP_MIN_PREMIUM_Z: z.coerce.number().nonnegative().default(2), @@ -114,6 +133,18 @@ const classifierConfig: ClassifierConfig = { minAggressorRatio: env.CLASSIFIER_MIN_AGGRESSOR_RATIO }; +const darkInferenceConfig: DarkInferenceConfig = { + windowMs: env.DARK_INFER_WINDOW_MS, + cooldownMs: env.DARK_INFER_COOLDOWN_MS, + minBlockSize: env.DARK_INFER_MIN_BLOCK_SIZE, + minAccumulationSize: env.DARK_INFER_MIN_ACCUM_SIZE, + minAccumulationCount: env.DARK_INFER_MIN_ACCUM_COUNT, + minPrintSize: env.DARK_INFER_MIN_PRINT_SIZE, + maxEvidence: env.DARK_INFER_MAX_EVIDENCE, + maxSpreadPct: env.DARK_INFER_MAX_SPREAD_PCT, + maxQuoteAgeMs: env.EQUITY_QUOTE_MAX_AGE_MS +}; + const retry = async ( label: string, attempts: number, @@ -178,6 +209,7 @@ type ClusterState = { const clusters = new Map(); const nbboCache = new Map(); const equityQuoteCache = new Map(); +const darkInferenceState = createDarkInferenceState(); const recentLegsByKey = new Map(); const MAX_RECENT_LEGS = 20; @@ -658,12 +690,43 @@ const emitEquityJoin = async ( try { await insertEquityPrintJoin(clickhouse, payload); - await publishJson(js, SUBJECT_EQUITY_JOINS, payload); } catch (error) { logger.error("failed to emit equity print join", { error: error instanceof Error ? error.message : String(error), trace_id: payload.trace_id }); + return; + } + + try { + await publishJson(js, SUBJECT_EQUITY_JOINS, payload); + } catch (error) { + logger.error("failed to publish equity print join", { + error: error instanceof Error ? error.message : String(error), + trace_id: payload.trace_id + }); + } + + await emitDarkInferences(clickhouse, js, payload); +}; + +const emitDarkInferences = async ( + clickhouse: ReturnType, + js: Awaited>["js"], + join: EquityPrintJoin +): Promise => { + const events = evaluateDarkInferences(join, darkInferenceConfig, darkInferenceState); + for (const event of events) { + const validated: InferredDarkEvent = InferredDarkEventSchema.parse(event); + try { + await insertInferredDark(clickhouse, validated); + await publishJson(js, SUBJECT_INFERRED_DARK, validated); + } catch (error) { + logger.error("failed to emit inferred dark event", { + error: error instanceof Error ? error.message : String(error), + trace_id: validated.trace_id + }); + } } }; @@ -776,6 +839,19 @@ const run = async () => { num_replicas: 1 }); + await ensureStream(jsm, { + name: STREAM_INFERRED_DARK, + subjects: [SUBJECT_INFERRED_DARK], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + await ensureStream(jsm, { name: STREAM_CLASSIFIER_HITS, subjects: [SUBJECT_CLASSIFIER_HITS], @@ -824,6 +900,7 @@ const run = async () => { await retry("clickhouse table init", 20, 500, async () => { await ensureFlowPacketsTable(clickhouse); await ensureEquityPrintJoinsTable(clickhouse); + await ensureInferredDarkTable(clickhouse); await ensureClassifierHitsTable(clickhouse); await ensureAlertsTable(clickhouse); }); diff --git a/services/compute/tests/dark-inference.test.ts b/services/compute/tests/dark-inference.test.ts new file mode 100644 index 0000000..8a49507 --- /dev/null +++ b/services/compute/tests/dark-inference.test.ts @@ -0,0 +1,119 @@ +import { describe, expect, it } from "bun:test"; +import { + createDarkInferenceState, + evaluateDarkInferences, + type DarkInferenceConfig +} from "../src/dark-inference"; + +const config: DarkInferenceConfig = { + windowMs: 60_000, + cooldownMs: 30_000, + minBlockSize: 1000, + minAccumulationSize: 2000, + minAccumulationCount: 3, + minPrintSize: 200, + maxEvidence: 5, + maxSpreadPct: 0.01, + maxQuoteAgeMs: 1000 +}; + +const baseJoin = { + source_ts: 1_000, + ingest_ts: 1_010, + seq: 1, + trace_id: "equityjoin:print-1", + id: "equityjoin:print-1", + print_trace_id: "print-1", + quote_trace_id: "quote-1", + features: { + underlying_id: "SPY", + price: 100, + size: 1200, + off_exchange_flag: true, + print_ts: 1_000, + quote_placement: "MID", + quote_mid: 100, + quote_spread: 0.1 + }, + join_quality: { + quote_age_ms: 5 + } +}; + +describe("dark inference rules", () => { + it("emits absorbed block on large off-exchange mid prints", () => { + const state = createDarkInferenceState(); + const events = evaluateDarkInferences(baseJoin, config, state); + expect(events).toHaveLength(1); + expect(events[0].type).toBe("absorbed_block"); + expect(events[0].evidence_refs).toEqual([baseJoin.id]); + }); + + it("skips absorbed block when quote is stale", () => { + const state = createDarkInferenceState(); + const staleJoin = { + ...baseJoin, + join_quality: { + quote_age_ms: 5000, + quote_stale: 1 + } + }; + const events = evaluateDarkInferences(staleJoin, config, state); + expect(events).toHaveLength(0); + }); + + it("emits stealth accumulation on repeated buy placements", () => { + const state = createDarkInferenceState(); + const joins = [0, 1, 2].map((offset) => ({ + ...baseJoin, + id: `equityjoin:buy-${offset}`, + trace_id: `equityjoin:buy-${offset}`, + seq: 10 + offset, + source_ts: 2_000 + offset * 500, + features: { + ...baseJoin.features, + size: 800, + quote_placement: "A" + } + })); + + const events = joins.flatMap((join) => evaluateDarkInferences(join, config, state)); + const accumulation = events.find((event) => event.type === "stealth_accumulation"); + expect(accumulation).toBeDefined(); + expect(accumulation?.evidence_refs.length).toBeGreaterThan(0); + }); + + it("emits distribution on repeated sell placements", () => { + const state = createDarkInferenceState(); + const joins = [0, 1, 2].map((offset) => ({ + ...baseJoin, + id: `equityjoin:sell-${offset}`, + trace_id: `equityjoin:sell-${offset}`, + seq: 20 + offset, + source_ts: 3_000 + offset * 500, + features: { + ...baseJoin.features, + size: 900, + quote_placement: "B" + } + })); + + const events = joins.flatMap((join) => evaluateDarkInferences(join, config, state)); + const distribution = events.find((event) => event.type === "distribution"); + expect(distribution).toBeDefined(); + expect(distribution?.evidence_refs.length).toBeGreaterThan(0); + }); + + it("respects cooldown windows", () => { + const state = createDarkInferenceState(); + const first = evaluateDarkInferences(baseJoin, config, state); + const second = evaluateDarkInferences( + { ...baseJoin, source_ts: baseJoin.source_ts + 1_000, seq: baseJoin.seq + 1 }, + config, + state + ); + + expect(first.length).toBeGreaterThan(0); + expect(second.find((event) => event.type === "absorbed_block")).toBeUndefined(); + }); +});