Add dark inference pipeline

This commit is contained in:
dirtydishes 2026-01-04 17:29:21 -05:00
parent 3164167bee
commit ea61c3b013
9 changed files with 699 additions and 1 deletions

View file

@ -8,6 +8,8 @@ export const STREAM_EQUITY_QUOTES = "EQUITY_QUOTES";
export const SUBJECT_EQUITY_QUOTES = "equities.quotes"; export const SUBJECT_EQUITY_QUOTES = "equities.quotes";
export const STREAM_EQUITY_JOINS = "EQUITY_JOINS"; export const STREAM_EQUITY_JOINS = "EQUITY_JOINS";
export const SUBJECT_EQUITY_JOINS = "equities.joins"; export const SUBJECT_EQUITY_JOINS = "equities.joins";
export const STREAM_INFERRED_DARK = "INFERRED_DARK";
export const SUBJECT_INFERRED_DARK = "dark.inferred";
export const STREAM_FLOW_PACKETS = "FLOW_PACKETS"; export const STREAM_FLOW_PACKETS = "FLOW_PACKETS";
export const SUBJECT_FLOW_PACKETS = "flow.packets"; export const SUBJECT_FLOW_PACKETS = "flow.packets";
export const STREAM_CLASSIFIER_HITS = "CLASSIFIER_HITS"; export const STREAM_CLASSIFIER_HITS = "CLASSIFIER_HITS";

View file

@ -5,6 +5,7 @@ import {
EquityPrintSchema, EquityPrintSchema,
EquityQuoteSchema, EquityQuoteSchema,
EquityPrintJoinSchema, EquityPrintJoinSchema,
InferredDarkEventSchema,
FlowPacketSchema, FlowPacketSchema,
OptionNBBOSchema, OptionNBBOSchema,
OptionPrintSchema OptionPrintSchema
@ -15,6 +16,7 @@ import type {
EquityPrint, EquityPrint,
EquityQuote, EquityQuote,
EquityPrintJoin, EquityPrintJoin,
InferredDarkEvent,
FlowPacket, FlowPacket,
OptionNBBO, OptionNBBO,
OptionPrint OptionPrint
@ -42,6 +44,13 @@ import {
toEquityPrintJoinRecord, toEquityPrintJoinRecord,
type EquityPrintJoinRecord type EquityPrintJoinRecord
} from "./equity-print-joins"; } from "./equity-print-joins";
import {
inferredDarkTableDDL,
INFERRED_DARK_TABLE,
fromInferredDarkRecord,
toInferredDarkRecord,
type InferredDarkRecord
} from "./inferred-dark";
import { import {
FLOW_PACKETS_TABLE, FLOW_PACKETS_TABLE,
flowPacketsTableDDL, flowPacketsTableDDL,
@ -120,6 +129,14 @@ export const ensureEquityPrintJoinsTable = async (
}); });
}; };
export const ensureInferredDarkTable = async (
client: ClickHouseClient
): Promise<void> => {
await client.exec({
query: inferredDarkTableDDL()
});
};
export const ensureFlowPacketsTable = async ( export const ensureFlowPacketsTable = async (
client: ClickHouseClient client: ClickHouseClient
): Promise<void> => { ): Promise<void> => {
@ -202,6 +219,18 @@ export const insertEquityPrintJoin = async (
}); });
}; };
export const insertInferredDark = async (
client: ClickHouseClient,
event: InferredDarkEvent
): Promise<void> => {
const record = toInferredDarkRecord(event);
await client.insert({
table: INFERRED_DARK_TABLE,
values: [record],
format: "JSONEachRow"
});
};
export const insertFlowPacket = async ( export const insertFlowPacket = async (
client: ClickHouseClient, client: ClickHouseClient,
packet: FlowPacket packet: FlowPacket
@ -367,6 +396,23 @@ const normalizeEquityPrintJoinRow = (row: unknown): EquityPrintJoinRecord | null
}; };
}; };
const normalizeInferredDarkRow = (row: unknown): InferredDarkRecord | null => {
if (!row || typeof row !== "object") {
return null;
}
const record = row as Record<string, unknown>;
return {
source_ts: coerceNumber(record.source_ts) as number,
ingest_ts: coerceNumber(record.ingest_ts) as number,
seq: coerceNumber(record.seq) as number,
trace_id: String(record.trace_id ?? ""),
type: String(record.type ?? ""),
confidence: Number(coerceNumber(record.confidence) ?? 0),
evidence_refs_json: String(record.evidence_refs_json ?? "[]")
};
};
const normalizeFlowPacketRow = (row: unknown): FlowPacketRecord | null => { const normalizeFlowPacketRow = (row: unknown): FlowPacketRecord | null => {
if (!row || typeof row !== "object") { if (!row || typeof row !== "object") {
return null; return null;
@ -497,6 +543,24 @@ export const fetchRecentEquityPrintJoins = async (
return EquityPrintJoinSchema.array().parse(joins); return EquityPrintJoinSchema.array().parse(joins);
}; };
export const fetchRecentInferredDark = async (
client: ClickHouseClient,
limit: number
): Promise<InferredDarkEvent[]> => {
const safeLimit = clampLimit(limit);
const result = await client.query({
query: `SELECT * FROM ${INFERRED_DARK_TABLE} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const records = rows
.map(normalizeInferredDarkRow)
.filter((record): record is InferredDarkRecord => record !== null);
const events = records.map(fromInferredDarkRecord);
return InferredDarkEventSchema.array().parse(events);
};
export const fetchRecentFlowPackets = async ( export const fetchRecentFlowPackets = async (
client: ClickHouseClient, client: ClickHouseClient,
limit: number limit: number
@ -649,3 +713,26 @@ export const fetchEquityPrintJoinsAfter = async (
const joins = records.map(fromEquityPrintJoinRecord); const joins = records.map(fromEquityPrintJoinRecord);
return EquityPrintJoinSchema.array().parse(joins); return EquityPrintJoinSchema.array().parse(joins);
}; };
export const fetchInferredDarkAfter = async (
client: ClickHouseClient,
afterTs: number,
afterSeq: number,
limit: number
): Promise<InferredDarkEvent[]> => {
const safeLimit = clampLimit(limit);
const safeAfterTs = clampCursor(afterTs);
const safeAfterSeq = clampCursor(afterSeq);
const result = await client.query({
query: `SELECT * FROM ${INFERRED_DARK_TABLE} WHERE (source_ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY source_ts ASC, seq ASC LIMIT ${safeLimit}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const records = rows
.map(normalizeInferredDarkRow)
.filter((record): record is InferredDarkRecord => record !== null);
const events = records.map(fromInferredDarkRecord);
return InferredDarkEventSchema.array().parse(events);
};

View file

@ -5,5 +5,6 @@ export * from "./flow-packets";
export * from "./equity-prints"; export * from "./equity-prints";
export * from "./equity-quotes"; export * from "./equity-quotes";
export * from "./equity-print-joins"; export * from "./equity-print-joins";
export * from "./inferred-dark";
export * from "./option-prints"; export * from "./option-prints";
export * from "./option-nbbo"; export * from "./option-nbbo";

View file

@ -0,0 +1,66 @@
import type { InferredDarkEvent } from "@islandflow/types";
export const INFERRED_DARK_TABLE = "inferred_dark";
export type InferredDarkRecord = {
source_ts: number;
ingest_ts: number;
seq: number;
trace_id: string;
type: string;
confidence: number;
evidence_refs_json: string;
};
export const inferredDarkTableDDL = (): string => {
return `
CREATE TABLE IF NOT EXISTS ${INFERRED_DARK_TABLE} (
source_ts UInt64,
ingest_ts UInt64,
seq UInt64,
trace_id String,
type String,
confidence Float64,
evidence_refs_json String
)
ENGINE = MergeTree
ORDER BY (source_ts, seq)
`;
};
export const toInferredDarkRecord = (event: InferredDarkEvent): InferredDarkRecord => {
return {
source_ts: event.source_ts,
ingest_ts: event.ingest_ts,
seq: event.seq,
trace_id: event.trace_id,
type: event.type,
confidence: event.confidence,
evidence_refs_json: JSON.stringify(event.evidence_refs)
};
};
const safeStringArray = (value: string): string[] => {
try {
const parsed = JSON.parse(value);
if (Array.isArray(parsed)) {
return parsed.map((entry) => String(entry));
}
} catch {
// ignore
}
return [];
};
export const fromInferredDarkRecord = (record: InferredDarkRecord): InferredDarkEvent => {
return {
source_ts: record.source_ts,
ingest_ts: record.ingest_ts,
seq: record.seq,
trace_id: record.trace_id,
type: record.type,
confidence: record.confidence,
evidence_refs: safeStringArray(record.evidence_refs_json)
};
};

View file

@ -0,0 +1,33 @@
import { describe, expect, it } from "bun:test";
import {
fromInferredDarkRecord,
inferredDarkTableDDL,
INFERRED_DARK_TABLE,
toInferredDarkRecord
} from "../src/inferred-dark";
const event = {
source_ts: 100,
ingest_ts: 120,
seq: 1,
trace_id: "dark:absorbed:join-1",
type: "absorbed_block",
confidence: 0.62,
evidence_refs: ["equityjoin:print-1"]
};
describe("inferred-dark storage helpers", () => {
it("includes the correct table name in the DDL", () => {
const ddl = inferredDarkTableDDL();
expect(ddl).toContain(INFERRED_DARK_TABLE);
expect(ddl).toContain("CREATE TABLE IF NOT EXISTS");
});
it("round-trips inferred dark records", () => {
const record = toInferredDarkRecord(event);
const restored = fromInferredDarkRecord(record);
expect(restored.evidence_refs).toEqual(event.evidence_refs);
expect(restored.type).toBe(event.type);
expect(restored.confidence).toBeCloseTo(event.confidence, 4);
});
});

View file

@ -6,6 +6,7 @@ import {
SUBJECT_EQUITY_JOINS, SUBJECT_EQUITY_JOINS,
SUBJECT_EQUITY_PRINTS, SUBJECT_EQUITY_PRINTS,
SUBJECT_EQUITY_QUOTES, SUBJECT_EQUITY_QUOTES,
SUBJECT_INFERRED_DARK,
SUBJECT_FLOW_PACKETS, SUBJECT_FLOW_PACKETS,
SUBJECT_OPTION_NBBO, SUBJECT_OPTION_NBBO,
SUBJECT_OPTION_PRINTS, SUBJECT_OPTION_PRINTS,
@ -14,6 +15,7 @@ import {
STREAM_EQUITY_JOINS, STREAM_EQUITY_JOINS,
STREAM_EQUITY_PRINTS, STREAM_EQUITY_PRINTS,
STREAM_EQUITY_QUOTES, STREAM_EQUITY_QUOTES,
STREAM_INFERRED_DARK,
STREAM_FLOW_PACKETS, STREAM_FLOW_PACKETS,
STREAM_OPTION_NBBO, STREAM_OPTION_NBBO,
STREAM_OPTION_PRINTS, STREAM_OPTION_PRINTS,
@ -29,6 +31,7 @@ import {
ensureEquityPrintJoinsTable, ensureEquityPrintJoinsTable,
ensureEquityPrintsTable, ensureEquityPrintsTable,
ensureEquityQuotesTable, ensureEquityQuotesTable,
ensureInferredDarkTable,
ensureFlowPacketsTable, ensureFlowPacketsTable,
ensureOptionNBBOTable, ensureOptionNBBOTable,
ensureOptionPrintsTable, ensureOptionPrintsTable,
@ -36,11 +39,13 @@ import {
fetchRecentClassifierHits, fetchRecentClassifierHits,
fetchRecentEquityPrintJoins, fetchRecentEquityPrintJoins,
fetchRecentFlowPackets, fetchRecentFlowPackets,
fetchRecentInferredDark,
fetchRecentEquityQuotes, fetchRecentEquityQuotes,
fetchRecentOptionNBBO, fetchRecentOptionNBBO,
fetchEquityPrintsAfter, fetchEquityPrintsAfter,
fetchEquityPrintJoinsAfter, fetchEquityPrintJoinsAfter,
fetchEquityQuotesAfter, fetchEquityQuotesAfter,
fetchInferredDarkAfter,
fetchRecentEquityPrints, fetchRecentEquityPrints,
fetchOptionNBBOAfter, fetchOptionNBBOAfter,
fetchOptionPrintsAfter, fetchOptionPrintsAfter,
@ -52,6 +57,7 @@ import {
EquityPrintSchema, EquityPrintSchema,
EquityPrintJoinSchema, EquityPrintJoinSchema,
EquityQuoteSchema, EquityQuoteSchema,
InferredDarkEventSchema,
FlowPacketSchema, FlowPacketSchema,
OptionNBBOSchema, OptionNBBOSchema,
OptionPrintSchema OptionPrintSchema
@ -111,6 +117,7 @@ type Channel =
| "equities" | "equities"
| "equity-quotes" | "equity-quotes"
| "equity-joins" | "equity-joins"
| "inferred-dark"
| "flow" | "flow"
| "classifier-hits" | "classifier-hits"
| "alerts"; | "alerts";
@ -124,6 +131,7 @@ const optionNbboSockets = new Set<WebSocket<WsData>>();
const equitySockets = new Set<WebSocket<WsData>>(); const equitySockets = new Set<WebSocket<WsData>>();
const equityQuoteSockets = new Set<WebSocket<WsData>>(); const equityQuoteSockets = new Set<WebSocket<WsData>>();
const equityJoinSockets = new Set<WebSocket<WsData>>(); const equityJoinSockets = new Set<WebSocket<WsData>>();
const inferredDarkSockets = new Set<WebSocket<WsData>>();
const flowSockets = new Set<WebSocket<WsData>>(); const flowSockets = new Set<WebSocket<WsData>>();
const classifierHitSockets = new Set<WebSocket<WsData>>(); const classifierHitSockets = new Set<WebSocket<WsData>>();
const alertSockets = new Set<WebSocket<WsData>>(); const alertSockets = new Set<WebSocket<WsData>>();
@ -250,6 +258,19 @@ const run = async () => {
num_replicas: 1 num_replicas: 1
}); });
await ensureStream(jsm, {
name: STREAM_INFERRED_DARK,
subjects: [SUBJECT_INFERRED_DARK],
retention: "limits",
storage: "file",
discard: "old",
max_msgs_per_subject: -1,
max_msgs: -1,
max_bytes: -1,
max_age: 0,
num_replicas: 1
});
await ensureStream(jsm, { await ensureStream(jsm, {
name: STREAM_FLOW_PACKETS, name: STREAM_FLOW_PACKETS,
subjects: [SUBJECT_FLOW_PACKETS], subjects: [SUBJECT_FLOW_PACKETS],
@ -300,6 +321,7 @@ const run = async () => {
await ensureEquityPrintsTable(clickhouse); await ensureEquityPrintsTable(clickhouse);
await ensureEquityQuotesTable(clickhouse); await ensureEquityQuotesTable(clickhouse);
await ensureEquityPrintJoinsTable(clickhouse); await ensureEquityPrintJoinsTable(clickhouse);
await ensureInferredDarkTable(clickhouse);
await ensureFlowPacketsTable(clickhouse); await ensureFlowPacketsTable(clickhouse);
await ensureClassifierHitsTable(clickhouse); await ensureClassifierHitsTable(clickhouse);
await ensureAlertsTable(clickhouse); await ensureAlertsTable(clickhouse);
@ -373,6 +395,12 @@ const run = async () => {
"api-equity-joins" "api-equity-joins"
); );
const inferredDarkSubscription = await subscribeWithReset(
SUBJECT_INFERRED_DARK,
STREAM_INFERRED_DARK,
"api-inferred-dark"
);
const flowSubscription = await subscribeWithReset( const flowSubscription = await subscribeWithReset(
SUBJECT_FLOW_PACKETS, SUBJECT_FLOW_PACKETS,
STREAM_FLOW_PACKETS, STREAM_FLOW_PACKETS,
@ -466,6 +494,21 @@ const run = async () => {
} }
}; };
const pumpInferredDark = async () => {
for await (const msg of inferredDarkSubscription.messages) {
try {
const payload = InferredDarkEventSchema.parse(inferredDarkSubscription.decode(msg));
broadcast(inferredDarkSockets, { type: "inferred-dark", payload });
msg.ack();
} catch (error) {
logger.error("failed to process inferred dark event", {
error: error instanceof Error ? error.message : String(error)
});
msg.term();
}
}
};
const pumpFlow = async () => { const pumpFlow = async () => {
for await (const msg of flowSubscription.messages) { for await (const msg of flowSubscription.messages) {
try { try {
@ -516,6 +559,7 @@ const run = async () => {
void pumpEquities(); void pumpEquities();
void pumpEquityQuotes(); void pumpEquityQuotes();
void pumpEquityJoins(); void pumpEquityJoins();
void pumpInferredDark();
void pumpFlow(); void pumpFlow();
void pumpClassifierHits(); void pumpClassifierHits();
void pumpAlerts(); void pumpAlerts();
@ -559,6 +603,12 @@ const run = async () => {
return jsonResponse({ data }); return jsonResponse({ data });
} }
if (req.method === "GET" && url.pathname === "/dark/inferred") {
const limit = parseLimit(url.searchParams.get("limit"));
const data = await fetchRecentInferredDark(clickhouse, limit);
return jsonResponse({ data });
}
if (req.method === "GET" && url.pathname === "/flow/packets") { if (req.method === "GET" && url.pathname === "/flow/packets") {
const limit = parseLimit(url.searchParams.get("limit")); const limit = parseLimit(url.searchParams.get("limit"));
const data = await fetchRecentFlowPackets(clickhouse, limit); const data = await fetchRecentFlowPackets(clickhouse, limit);
@ -617,6 +667,14 @@ const run = async () => {
return jsonResponse({ data, next }); return jsonResponse({ data, next });
} }
if (req.method === "GET" && url.pathname === "/replay/inferred-dark") {
const { afterTs, afterSeq, limit } = parseReplayParams(url);
const data = await fetchInferredDarkAfter(clickhouse, afterTs, afterSeq, limit);
const last = data.at(-1);
const next = last ? { ts: last.source_ts, seq: last.seq } : null;
return jsonResponse({ data, next });
}
if (req.method === "GET" && url.pathname === "/ws/options") { if (req.method === "GET" && url.pathname === "/ws/options") {
if (serverRef.upgrade(req, { data: { channel: "options" } })) { if (serverRef.upgrade(req, { data: { channel: "options" } })) {
return new Response(null, { status: 101 }); return new Response(null, { status: 101 });
@ -657,6 +715,14 @@ const run = async () => {
return jsonResponse({ error: "websocket upgrade failed" }, 400); return jsonResponse({ error: "websocket upgrade failed" }, 400);
} }
if (req.method === "GET" && url.pathname === "/ws/inferred-dark") {
if (serverRef.upgrade(req, { data: { channel: "inferred-dark" } })) {
return new Response(null, { status: 101 });
}
return jsonResponse({ error: "websocket upgrade failed" }, 400);
}
if (req.method === "GET" && url.pathname === "/ws/flow") { if (req.method === "GET" && url.pathname === "/ws/flow") {
if (serverRef.upgrade(req, { data: { channel: "flow" } })) { if (serverRef.upgrade(req, { data: { channel: "flow" } })) {
return new Response(null, { status: 101 }); return new Response(null, { status: 101 });
@ -695,6 +761,8 @@ const run = async () => {
equityQuoteSockets.add(socket); equityQuoteSockets.add(socket);
} else if (socket.data.channel === "equity-joins") { } else if (socket.data.channel === "equity-joins") {
equityJoinSockets.add(socket); equityJoinSockets.add(socket);
} else if (socket.data.channel === "inferred-dark") {
inferredDarkSockets.add(socket);
} else if (socket.data.channel === "flow") { } else if (socket.data.channel === "flow") {
flowSockets.add(socket); flowSockets.add(socket);
} else if (socket.data.channel === "classifier-hits") { } else if (socket.data.channel === "classifier-hits") {
@ -716,6 +784,8 @@ const run = async () => {
equityQuoteSockets.delete(socket); equityQuoteSockets.delete(socket);
} else if (socket.data.channel === "equity-joins") { } else if (socket.data.channel === "equity-joins") {
equityJoinSockets.delete(socket); equityJoinSockets.delete(socket);
} else if (socket.data.channel === "inferred-dark") {
inferredDarkSockets.delete(socket);
} else if (socket.data.channel === "flow") { } else if (socket.data.channel === "flow") {
flowSockets.delete(socket); flowSockets.delete(socket);
} else if (socket.data.channel === "classifier-hits") { } else if (socket.data.channel === "classifier-hits") {

View file

@ -0,0 +1,243 @@
import type { EquityPrintJoin, InferredDarkEvent } from "@islandflow/types";
export type DarkInferenceConfig = {
windowMs: number;
cooldownMs: number;
minBlockSize: number;
minAccumulationSize: number;
minAccumulationCount: number;
minPrintSize: number;
maxEvidence: number;
maxSpreadPct: number;
maxQuoteAgeMs: number;
};
type Evidence = {
id: string;
ts: number;
size: number;
placement: string;
offExchange: boolean;
};
export type DarkInferenceState = {
evidenceByUnderlying: Map<string, Evidence[]>;
lastEmittedByUnderlying: Map<string, Record<string, number>>;
};
const clamp01 = (value: number): number => {
if (!Number.isFinite(value)) {
return 0;
}
return Math.max(0, Math.min(1, value));
};
const getNumber = (value: unknown): number | null => {
if (typeof value === "number" && Number.isFinite(value)) {
return value;
}
if (typeof value === "string") {
const parsed = Number(value);
if (Number.isFinite(parsed)) {
return parsed;
}
}
return null;
};
const getString = (value: unknown): string | null => {
if (typeof value === "string") {
return value;
}
return null;
};
const getBoolean = (value: unknown): boolean | null => {
if (typeof value === "boolean") {
return value;
}
if (typeof value === "number") {
return value !== 0;
}
if (typeof value === "string") {
const normalized = value.trim().toLowerCase();
if (["true", "1", "yes", "on"].includes(normalized)) {
return true;
}
if (["false", "0", "no", "off"].includes(normalized)) {
return false;
}
}
return null;
};
const isBuyPlacement = (placement: string): boolean => {
return placement === "A" || placement === "AA";
};
const isSellPlacement = (placement: string): boolean => {
return placement === "B" || placement === "BB";
};
const getSpreadPct = (features: Record<string, unknown>): number | null => {
const spread = getNumber(features.quote_spread);
const mid = getNumber(features.quote_mid);
if (spread === null || mid === null || mid <= 0) {
return null;
}
return spread / mid;
};
export const createDarkInferenceState = (): DarkInferenceState => {
return {
evidenceByUnderlying: new Map(),
lastEmittedByUnderlying: new Map()
};
};
const shouldEmit = (
state: DarkInferenceState,
underlyingId: string,
type: string,
ts: number,
cooldownMs: number
): boolean => {
const record = state.lastEmittedByUnderlying.get(underlyingId) ?? {};
const last = record[type] ?? -Infinity;
if (ts - last < cooldownMs) {
return false;
}
record[type] = ts;
state.lastEmittedByUnderlying.set(underlyingId, record);
return true;
};
export const evaluateDarkInferences = (
join: EquityPrintJoin,
config: DarkInferenceConfig,
state: DarkInferenceState
): InferredDarkEvent[] => {
const features = join.features ?? {};
const joinQuality = join.join_quality ?? {};
const underlyingId = getString(features.underlying_id);
if (!underlyingId) {
return [];
}
const size = getNumber(features.size);
if (size === null) {
return [];
}
const placement = getString(features.quote_placement) ?? "MISSING";
const offExchange = getBoolean(features.off_exchange_flag) ?? false;
const ts = Number.isFinite(join.source_ts) ? join.source_ts : 0;
const quoteAgeMs = getNumber(joinQuality.quote_age_ms) ?? config.maxQuoteAgeMs + 1;
const quoteMissing = getNumber(joinQuality.quote_missing) === 1;
const quoteStale = getNumber(joinQuality.quote_stale) === 1;
const spreadPct = getSpreadPct(features);
const goodQuality =
!quoteMissing &&
!quoteStale &&
quoteAgeMs <= config.maxQuoteAgeMs &&
(spreadPct === null || spreadPct <= config.maxSpreadPct);
const events: InferredDarkEvent[] = [];
if (
offExchange &&
goodQuality &&
placement === "MID" &&
size >= config.minBlockSize &&
shouldEmit(state, underlyingId, "absorbed_block", ts, config.cooldownMs)
) {
const sizeRatio = Math.min(1, size / (config.minBlockSize * 2));
const spreadScore =
spreadPct === null || spreadPct <= 0 ? 0.5 : Math.max(0, 1 - spreadPct / config.maxSpreadPct);
const confidence = clamp01(0.35 + sizeRatio * 0.45 + spreadScore * 0.2);
events.push({
source_ts: join.source_ts,
ingest_ts: join.ingest_ts,
seq: join.seq,
trace_id: `dark:absorbed_block:${join.id}`,
type: "absorbed_block",
confidence,
evidence_refs: [join.id]
});
}
if (
offExchange &&
goodQuality &&
size >= config.minPrintSize &&
(isBuyPlacement(placement) || isSellPlacement(placement))
) {
const existing = state.evidenceByUnderlying.get(underlyingId) ?? [];
const nextEvidence = [
...existing,
{
id: join.id,
ts,
size,
placement,
offExchange
}
].filter((entry) => ts - entry.ts <= config.windowMs);
state.evidenceByUnderlying.set(underlyingId, nextEvidence);
const buys = nextEvidence.filter((entry) => isBuyPlacement(entry.placement));
const sells = nextEvidence.filter((entry) => isSellPlacement(entry.placement));
const buySize = buys.reduce((sum, entry) => sum + entry.size, 0);
const sellSize = sells.reduce((sum, entry) => sum + entry.size, 0);
if (
buys.length >= config.minAccumulationCount &&
buySize >= config.minAccumulationSize &&
shouldEmit(state, underlyingId, "stealth_accumulation", ts, config.cooldownMs)
) {
const sizeRatio = Math.min(1, buySize / (config.minAccumulationSize * 2));
const countRatio = Math.min(1, buys.length / (config.minAccumulationCount * 2));
const confidence = clamp01(0.3 + sizeRatio * 0.4 + countRatio * 0.3);
const evidence = buys.slice(-config.maxEvidence).map((entry) => entry.id);
events.push({
source_ts: join.source_ts,
ingest_ts: join.ingest_ts,
seq: join.seq,
trace_id: `dark:stealth_accumulation:${underlyingId}:${ts}`,
type: "stealth_accumulation",
confidence,
evidence_refs: evidence
});
}
if (
sells.length >= config.minAccumulationCount &&
sellSize >= config.minAccumulationSize &&
shouldEmit(state, underlyingId, "distribution", ts, config.cooldownMs)
) {
const sizeRatio = Math.min(1, sellSize / (config.minAccumulationSize * 2));
const countRatio = Math.min(1, sells.length / (config.minAccumulationCount * 2));
const confidence = clamp01(0.3 + sizeRatio * 0.4 + countRatio * 0.3);
const evidence = sells.slice(-config.maxEvidence).map((entry) => entry.id);
events.push({
source_ts: join.source_ts,
ingest_ts: join.ingest_ts,
seq: join.seq,
trace_id: `dark:distribution:${underlyingId}:${ts}`,
type: "distribution",
confidence,
evidence_refs: evidence
});
}
}
return events;
};

View file

@ -6,6 +6,7 @@ import {
SUBJECT_EQUITY_JOINS, SUBJECT_EQUITY_JOINS,
SUBJECT_EQUITY_PRINTS, SUBJECT_EQUITY_PRINTS,
SUBJECT_EQUITY_QUOTES, SUBJECT_EQUITY_QUOTES,
SUBJECT_INFERRED_DARK,
SUBJECT_FLOW_PACKETS, SUBJECT_FLOW_PACKETS,
SUBJECT_OPTION_NBBO, SUBJECT_OPTION_NBBO,
SUBJECT_OPTION_PRINTS, SUBJECT_OPTION_PRINTS,
@ -14,6 +15,7 @@ import {
STREAM_EQUITY_JOINS, STREAM_EQUITY_JOINS,
STREAM_EQUITY_PRINTS, STREAM_EQUITY_PRINTS,
STREAM_EQUITY_QUOTES, STREAM_EQUITY_QUOTES,
STREAM_INFERRED_DARK,
STREAM_FLOW_PACKETS, STREAM_FLOW_PACKETS,
STREAM_OPTION_NBBO, STREAM_OPTION_NBBO,
STREAM_OPTION_PRINTS, STREAM_OPTION_PRINTS,
@ -28,10 +30,12 @@ import {
ensureAlertsTable, ensureAlertsTable,
ensureClassifierHitsTable, ensureClassifierHitsTable,
ensureEquityPrintJoinsTable, ensureEquityPrintJoinsTable,
ensureInferredDarkTable,
ensureFlowPacketsTable, ensureFlowPacketsTable,
insertAlert, insertAlert,
insertClassifierHit, insertClassifierHit,
insertEquityPrintJoin, insertEquityPrintJoin,
insertInferredDark,
insertFlowPacket insertFlowPacket
} from "@islandflow/storage"; } from "@islandflow/storage";
import { import {
@ -40,6 +44,7 @@ import {
EquityPrintJoinSchema, EquityPrintJoinSchema,
EquityPrintSchema, EquityPrintSchema,
EquityQuoteSchema, EquityQuoteSchema,
InferredDarkEventSchema,
FlowPacketSchema, FlowPacketSchema,
OptionNBBOSchema, OptionNBBOSchema,
OptionPrintSchema, OptionPrintSchema,
@ -48,6 +53,7 @@ import {
type EquityPrint, type EquityPrint,
type EquityQuote, type EquityQuote,
type EquityPrintJoin, type EquityPrintJoin,
type InferredDarkEvent,
type FlowPacket, type FlowPacket,
type OptionNBBO, type OptionNBBO,
type OptionPrint type OptionPrint
@ -55,6 +61,11 @@ import {
import { z } from "zod"; import { z } from "zod";
import { evaluateClassifiers, type ClassifierConfig } from "./classifiers"; import { evaluateClassifiers, type ClassifierConfig } from "./classifiers";
import { parseContractId } from "./contracts"; import { parseContractId } from "./contracts";
import {
createDarkInferenceState,
evaluateDarkInferences,
type DarkInferenceConfig
} from "./dark-inference";
import { buildEquityPrintJoin, type EquityQuoteJoin } from "./equity-joins"; import { buildEquityPrintJoin, type EquityQuoteJoin } from "./equity-joins";
import { createRedisClient, updateRollingStats, type RollingStatsConfig } from "./rolling-stats"; import { createRedisClient, updateRollingStats, type RollingStatsConfig } from "./rolling-stats";
import { summarizeStructure, type ContractLeg } from "./structures"; import { summarizeStructure, type ContractLeg } from "./structures";
@ -87,6 +98,14 @@ const envSchema = z.object({
.default(false), .default(false),
NBBO_MAX_AGE_MS: z.coerce.number().int().positive().default(1000), NBBO_MAX_AGE_MS: z.coerce.number().int().positive().default(1000),
EQUITY_QUOTE_MAX_AGE_MS: z.coerce.number().int().positive().default(1000), EQUITY_QUOTE_MAX_AGE_MS: z.coerce.number().int().positive().default(1000),
DARK_INFER_WINDOW_MS: z.coerce.number().int().positive().default(60000),
DARK_INFER_COOLDOWN_MS: z.coerce.number().int().nonnegative().default(30000),
DARK_INFER_MIN_BLOCK_SIZE: z.coerce.number().int().positive().default(2000),
DARK_INFER_MIN_ACCUM_SIZE: z.coerce.number().int().positive().default(3000),
DARK_INFER_MIN_ACCUM_COUNT: z.coerce.number().int().positive().default(4),
DARK_INFER_MIN_PRINT_SIZE: z.coerce.number().int().positive().default(200),
DARK_INFER_MAX_EVIDENCE: z.coerce.number().int().positive().default(20),
DARK_INFER_MAX_SPREAD_PCT: z.coerce.number().positive().default(0.005),
CLASSIFIER_SWEEP_MIN_PREMIUM: z.coerce.number().positive().default(40_000), CLASSIFIER_SWEEP_MIN_PREMIUM: z.coerce.number().positive().default(40_000),
CLASSIFIER_SWEEP_MIN_COUNT: z.coerce.number().int().positive().default(3), CLASSIFIER_SWEEP_MIN_COUNT: z.coerce.number().int().positive().default(3),
CLASSIFIER_SWEEP_MIN_PREMIUM_Z: z.coerce.number().nonnegative().default(2), CLASSIFIER_SWEEP_MIN_PREMIUM_Z: z.coerce.number().nonnegative().default(2),
@ -114,6 +133,18 @@ const classifierConfig: ClassifierConfig = {
minAggressorRatio: env.CLASSIFIER_MIN_AGGRESSOR_RATIO minAggressorRatio: env.CLASSIFIER_MIN_AGGRESSOR_RATIO
}; };
const darkInferenceConfig: DarkInferenceConfig = {
windowMs: env.DARK_INFER_WINDOW_MS,
cooldownMs: env.DARK_INFER_COOLDOWN_MS,
minBlockSize: env.DARK_INFER_MIN_BLOCK_SIZE,
minAccumulationSize: env.DARK_INFER_MIN_ACCUM_SIZE,
minAccumulationCount: env.DARK_INFER_MIN_ACCUM_COUNT,
minPrintSize: env.DARK_INFER_MIN_PRINT_SIZE,
maxEvidence: env.DARK_INFER_MAX_EVIDENCE,
maxSpreadPct: env.DARK_INFER_MAX_SPREAD_PCT,
maxQuoteAgeMs: env.EQUITY_QUOTE_MAX_AGE_MS
};
const retry = async <T>( const retry = async <T>(
label: string, label: string,
attempts: number, attempts: number,
@ -178,6 +209,7 @@ type ClusterState = {
const clusters = new Map<string, ClusterState>(); const clusters = new Map<string, ClusterState>();
const nbboCache = new Map<string, OptionNBBO>(); const nbboCache = new Map<string, OptionNBBO>();
const equityQuoteCache = new Map<string, EquityQuote>(); const equityQuoteCache = new Map<string, EquityQuote>();
const darkInferenceState = createDarkInferenceState();
const recentLegsByKey = new Map<string, ContractLeg[]>(); const recentLegsByKey = new Map<string, ContractLeg[]>();
const MAX_RECENT_LEGS = 20; const MAX_RECENT_LEGS = 20;
@ -658,12 +690,43 @@ const emitEquityJoin = async (
try { try {
await insertEquityPrintJoin(clickhouse, payload); await insertEquityPrintJoin(clickhouse, payload);
await publishJson(js, SUBJECT_EQUITY_JOINS, payload);
} catch (error) { } catch (error) {
logger.error("failed to emit equity print join", { logger.error("failed to emit equity print join", {
error: error instanceof Error ? error.message : String(error), error: error instanceof Error ? error.message : String(error),
trace_id: payload.trace_id trace_id: payload.trace_id
}); });
return;
}
try {
await publishJson(js, SUBJECT_EQUITY_JOINS, payload);
} catch (error) {
logger.error("failed to publish equity print join", {
error: error instanceof Error ? error.message : String(error),
trace_id: payload.trace_id
});
}
await emitDarkInferences(clickhouse, js, payload);
};
const emitDarkInferences = async (
clickhouse: ReturnType<typeof createClickHouseClient>,
js: Awaited<ReturnType<typeof connectJetStreamWithRetry>>["js"],
join: EquityPrintJoin
): Promise<void> => {
const events = evaluateDarkInferences(join, darkInferenceConfig, darkInferenceState);
for (const event of events) {
const validated: InferredDarkEvent = InferredDarkEventSchema.parse(event);
try {
await insertInferredDark(clickhouse, validated);
await publishJson(js, SUBJECT_INFERRED_DARK, validated);
} catch (error) {
logger.error("failed to emit inferred dark event", {
error: error instanceof Error ? error.message : String(error),
trace_id: validated.trace_id
});
}
} }
}; };
@ -776,6 +839,19 @@ const run = async () => {
num_replicas: 1 num_replicas: 1
}); });
await ensureStream(jsm, {
name: STREAM_INFERRED_DARK,
subjects: [SUBJECT_INFERRED_DARK],
retention: "limits",
storage: "file",
discard: "old",
max_msgs_per_subject: -1,
max_msgs: -1,
max_bytes: -1,
max_age: 0,
num_replicas: 1
});
await ensureStream(jsm, { await ensureStream(jsm, {
name: STREAM_CLASSIFIER_HITS, name: STREAM_CLASSIFIER_HITS,
subjects: [SUBJECT_CLASSIFIER_HITS], subjects: [SUBJECT_CLASSIFIER_HITS],
@ -824,6 +900,7 @@ const run = async () => {
await retry("clickhouse table init", 20, 500, async () => { await retry("clickhouse table init", 20, 500, async () => {
await ensureFlowPacketsTable(clickhouse); await ensureFlowPacketsTable(clickhouse);
await ensureEquityPrintJoinsTable(clickhouse); await ensureEquityPrintJoinsTable(clickhouse);
await ensureInferredDarkTable(clickhouse);
await ensureClassifierHitsTable(clickhouse); await ensureClassifierHitsTable(clickhouse);
await ensureAlertsTable(clickhouse); await ensureAlertsTable(clickhouse);
}); });

View file

@ -0,0 +1,119 @@
import { describe, expect, it } from "bun:test";
import {
createDarkInferenceState,
evaluateDarkInferences,
type DarkInferenceConfig
} from "../src/dark-inference";
const config: DarkInferenceConfig = {
windowMs: 60_000,
cooldownMs: 30_000,
minBlockSize: 1000,
minAccumulationSize: 2000,
minAccumulationCount: 3,
minPrintSize: 200,
maxEvidence: 5,
maxSpreadPct: 0.01,
maxQuoteAgeMs: 1000
};
const baseJoin = {
source_ts: 1_000,
ingest_ts: 1_010,
seq: 1,
trace_id: "equityjoin:print-1",
id: "equityjoin:print-1",
print_trace_id: "print-1",
quote_trace_id: "quote-1",
features: {
underlying_id: "SPY",
price: 100,
size: 1200,
off_exchange_flag: true,
print_ts: 1_000,
quote_placement: "MID",
quote_mid: 100,
quote_spread: 0.1
},
join_quality: {
quote_age_ms: 5
}
};
describe("dark inference rules", () => {
it("emits absorbed block on large off-exchange mid prints", () => {
const state = createDarkInferenceState();
const events = evaluateDarkInferences(baseJoin, config, state);
expect(events).toHaveLength(1);
expect(events[0].type).toBe("absorbed_block");
expect(events[0].evidence_refs).toEqual([baseJoin.id]);
});
it("skips absorbed block when quote is stale", () => {
const state = createDarkInferenceState();
const staleJoin = {
...baseJoin,
join_quality: {
quote_age_ms: 5000,
quote_stale: 1
}
};
const events = evaluateDarkInferences(staleJoin, config, state);
expect(events).toHaveLength(0);
});
it("emits stealth accumulation on repeated buy placements", () => {
const state = createDarkInferenceState();
const joins = [0, 1, 2].map((offset) => ({
...baseJoin,
id: `equityjoin:buy-${offset}`,
trace_id: `equityjoin:buy-${offset}`,
seq: 10 + offset,
source_ts: 2_000 + offset * 500,
features: {
...baseJoin.features,
size: 800,
quote_placement: "A"
}
}));
const events = joins.flatMap((join) => evaluateDarkInferences(join, config, state));
const accumulation = events.find((event) => event.type === "stealth_accumulation");
expect(accumulation).toBeDefined();
expect(accumulation?.evidence_refs.length).toBeGreaterThan(0);
});
it("emits distribution on repeated sell placements", () => {
const state = createDarkInferenceState();
const joins = [0, 1, 2].map((offset) => ({
...baseJoin,
id: `equityjoin:sell-${offset}`,
trace_id: `equityjoin:sell-${offset}`,
seq: 20 + offset,
source_ts: 3_000 + offset * 500,
features: {
...baseJoin.features,
size: 900,
quote_placement: "B"
}
}));
const events = joins.flatMap((join) => evaluateDarkInferences(join, config, state));
const distribution = events.find((event) => event.type === "distribution");
expect(distribution).toBeDefined();
expect(distribution?.evidence_refs.length).toBeGreaterThan(0);
});
it("respects cooldown windows", () => {
const state = createDarkInferenceState();
const first = evaluateDarkInferences(baseJoin, config, state);
const second = evaluateDarkInferences(
{ ...baseJoin, source_ts: baseJoin.source_ts + 1_000, seq: baseJoin.seq + 1 },
config,
state
);
expect(first.length).toBeGreaterThan(0);
expect(second.find((event) => event.type === "absorbed_block")).toBeUndefined();
});
});