diff --git a/.env.example b/.env.example index 081c73d..5ebe0a7 100644 --- a/.env.example +++ b/.env.example @@ -48,3 +48,7 @@ EMIT_INTERVAL_MS=1000 # Compute consumer behavior COMPUTE_DELIVER_POLICY=new COMPUTE_CONSUMER_RESET=false +CLASSIFIER_SWEEP_MIN_PREMIUM=50000 +CLASSIFIER_SWEEP_MIN_COUNT=3 +CLASSIFIER_SPIKE_MIN_PREMIUM=25000 +CLASSIFIER_SPIKE_MIN_SIZE=500 diff --git a/packages/bus/src/subjects.ts b/packages/bus/src/subjects.ts index 9b22df7..d2c7a39 100644 --- a/packages/bus/src/subjects.ts +++ b/packages/bus/src/subjects.ts @@ -4,3 +4,7 @@ export const STREAM_EQUITY_PRINTS = "EQUITY_PRINTS"; export const SUBJECT_EQUITY_PRINTS = "equities.prints"; export const STREAM_FLOW_PACKETS = "FLOW_PACKETS"; export const SUBJECT_FLOW_PACKETS = "flow.packets"; +export const STREAM_CLASSIFIER_HITS = "CLASSIFIER_HITS"; +export const SUBJECT_CLASSIFIER_HITS = "flow.classifier_hits"; +export const STREAM_ALERTS = "ALERTS"; +export const SUBJECT_ALERTS = "flow.alerts"; diff --git a/packages/storage/src/alerts.ts b/packages/storage/src/alerts.ts new file mode 100644 index 0000000..ef9302c --- /dev/null +++ b/packages/storage/src/alerts.ts @@ -0,0 +1,93 @@ +import type { AlertEvent, ClassifierHit } from "@islandflow/types"; + +export const ALERTS_TABLE = "alerts"; + +export type AlertRecord = { + source_ts: number; + ingest_ts: number; + seq: number; + trace_id: string; + score: number; + severity: string; + hits_json: string; + evidence_refs_json: string; +}; + +export const alertsTableDDL = (): string => { + return ` +CREATE TABLE IF NOT EXISTS ${ALERTS_TABLE} ( + source_ts UInt64, + ingest_ts UInt64, + seq UInt64, + trace_id String, + score Float64, + severity String, + hits_json String, + evidence_refs_json String +) +ENGINE = MergeTree +ORDER BY (source_ts, seq) +`; +}; + +export const toAlertRecord = (alert: AlertEvent): AlertRecord => { + return { + source_ts: alert.source_ts, + ingest_ts: alert.ingest_ts, + seq: alert.seq, + trace_id: alert.trace_id, + score: alert.score, + severity: alert.severity, + hits_json: JSON.stringify(alert.hits), + evidence_refs_json: JSON.stringify(alert.evidence_refs) + }; +}; + +const safeHitArray = (value: string): ClassifierHit[] => { + try { + const parsed = JSON.parse(value); + if (Array.isArray(parsed)) { + return parsed.map((entry) => { + const record = entry as Partial; + return { + classifier_id: String(record.classifier_id ?? ""), + confidence: Number(record.confidence ?? 0), + direction: String(record.direction ?? ""), + explanations: Array.isArray(record.explanations) + ? record.explanations.map((item) => String(item)) + : [] + }; + }); + } + } catch { + // ignore + } + + return []; +}; + +const safeStringArray = (value: string): string[] => { + try { + const parsed = JSON.parse(value); + if (Array.isArray(parsed)) { + return parsed.map((entry) => String(entry)); + } + } catch { + // ignore + } + + return []; +}; + +export const fromAlertRecord = (record: AlertRecord): AlertEvent => { + return { + source_ts: record.source_ts, + ingest_ts: record.ingest_ts, + seq: record.seq, + trace_id: record.trace_id, + score: record.score, + severity: record.severity, + hits: safeHitArray(record.hits_json), + evidence_refs: safeStringArray(record.evidence_refs_json) + }; +}; diff --git a/packages/storage/src/classifier-hits.ts b/packages/storage/src/classifier-hits.ts new file mode 100644 index 0000000..87baea3 --- /dev/null +++ b/packages/storage/src/classifier-hits.ts @@ -0,0 +1,70 @@ +import type { ClassifierHitEvent } from "@islandflow/types"; + +export const CLASSIFIER_HITS_TABLE = "classifier_hits"; + +export type ClassifierHitRecord = { + source_ts: number; + ingest_ts: number; + seq: number; + trace_id: string; + classifier_id: string; + confidence: number; + direction: string; + explanations_json: string; +}; + +export const classifierHitsTableDDL = (): string => { + return ` +CREATE TABLE IF NOT EXISTS ${CLASSIFIER_HITS_TABLE} ( + source_ts UInt64, + ingest_ts UInt64, + seq UInt64, + trace_id String, + classifier_id String, + confidence Float64, + direction String, + explanations_json String +) +ENGINE = MergeTree +ORDER BY (source_ts, seq) +`; +}; + +export const toClassifierHitRecord = (hit: ClassifierHitEvent): ClassifierHitRecord => { + return { + source_ts: hit.source_ts, + ingest_ts: hit.ingest_ts, + seq: hit.seq, + trace_id: hit.trace_id, + classifier_id: hit.classifier_id, + confidence: hit.confidence, + direction: hit.direction, + explanations_json: JSON.stringify(hit.explanations) + }; +}; + +const safeJsonArray = (value: string): string[] => { + try { + const parsed = JSON.parse(value); + if (Array.isArray(parsed)) { + return parsed.map((entry) => String(entry)); + } + } catch { + // ignore + } + + return []; +}; + +export const fromClassifierHitRecord = (record: ClassifierHitRecord): ClassifierHitEvent => { + return { + source_ts: record.source_ts, + ingest_ts: record.ingest_ts, + seq: record.seq, + trace_id: record.trace_id, + classifier_id: record.classifier_id, + confidence: record.confidence, + direction: record.direction, + explanations: safeJsonArray(record.explanations_json) + }; +}; diff --git a/packages/storage/src/clickhouse.ts b/packages/storage/src/clickhouse.ts index c674aeb..e20ff4b 100644 --- a/packages/storage/src/clickhouse.ts +++ b/packages/storage/src/clickhouse.ts @@ -1,6 +1,18 @@ import { createClient, type ClickHouseClient } from "@clickhouse/client"; -import { EquityPrintSchema, FlowPacketSchema, OptionPrintSchema } from "@islandflow/types"; -import type { EquityPrint, FlowPacket, OptionPrint } from "@islandflow/types"; +import { + AlertEventSchema, + ClassifierHitEventSchema, + EquityPrintSchema, + FlowPacketSchema, + OptionPrintSchema +} from "@islandflow/types"; +import type { + AlertEvent, + ClassifierHitEvent, + EquityPrint, + FlowPacket, + OptionPrint +} from "@islandflow/types"; import { normalizeOptionPrint, optionPrintsTableDDL, @@ -18,6 +30,20 @@ import { toFlowPacketRecord, type FlowPacketRecord } from "./flow-packets"; +import { + CLASSIFIER_HITS_TABLE, + classifierHitsTableDDL, + fromClassifierHitRecord, + toClassifierHitRecord, + type ClassifierHitRecord +} from "./classifier-hits"; +import { + ALERTS_TABLE, + alertsTableDDL, + fromAlertRecord, + toAlertRecord, + type AlertRecord +} from "./alerts"; export type ClickHouseOptions = { url: string; @@ -59,6 +85,20 @@ export const ensureFlowPacketsTable = async ( }); }; +export const ensureClassifierHitsTable = async ( + client: ClickHouseClient +): Promise => { + await client.exec({ + query: classifierHitsTableDDL() + }); +}; + +export const ensureAlertsTable = async (client: ClickHouseClient): Promise => { + await client.exec({ + query: alertsTableDDL() + }); +}; + export const insertOptionPrint = async ( client: ClickHouseClient, print: OptionPrint @@ -95,6 +135,27 @@ export const insertFlowPacket = async ( }); }; +export const insertClassifierHit = async ( + client: ClickHouseClient, + hit: ClassifierHitEvent +): Promise => { + const record = toClassifierHitRecord(hit); + await client.insert({ + table: CLASSIFIER_HITS_TABLE, + values: [record], + format: "JSONEachRow" + }); +}; + +export const insertAlert = async (client: ClickHouseClient, alert: AlertEvent): Promise => { + const record = toAlertRecord(alert); + await client.insert({ + table: ALERTS_TABLE, + values: [record], + format: "JSONEachRow" + }); +}; + const clampLimit = (limit: number): number => { if (!Number.isFinite(limit)) { return 100; @@ -196,6 +257,42 @@ const normalizeFlowPacketRow = (row: unknown): FlowPacketRecord | null => { }; }; +const normalizeClassifierHitRow = (row: unknown): ClassifierHitRecord | null => { + if (!row || typeof row !== "object") { + return null; + } + + const record = row as Record; + return { + source_ts: coerceNumber(record.source_ts) as number, + ingest_ts: coerceNumber(record.ingest_ts) as number, + seq: coerceNumber(record.seq) as number, + trace_id: String(record.trace_id ?? ""), + classifier_id: String(record.classifier_id ?? ""), + confidence: Number(coerceNumber(record.confidence) ?? 0), + direction: String(record.direction ?? ""), + explanations_json: String(record.explanations_json ?? "[]") + }; +}; + +const normalizeAlertRow = (row: unknown): AlertRecord | null => { + if (!row || typeof row !== "object") { + return null; + } + + const record = row as Record; + return { + source_ts: coerceNumber(record.source_ts) as number, + ingest_ts: coerceNumber(record.ingest_ts) as number, + seq: coerceNumber(record.seq) as number, + trace_id: String(record.trace_id ?? ""), + score: Number(coerceNumber(record.score) ?? 0), + severity: String(record.severity ?? ""), + hits_json: String(record.hits_json ?? "[]"), + evidence_refs_json: String(record.evidence_refs_json ?? "[]") + }; +}; + export const fetchRecentOptionPrints = async ( client: ClickHouseClient, limit: number @@ -242,6 +339,42 @@ export const fetchRecentFlowPackets = async ( return FlowPacketSchema.array().parse(packets); }; +export const fetchRecentClassifierHits = async ( + client: ClickHouseClient, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const result = await client.query({ + query: `SELECT * FROM ${CLASSIFIER_HITS_TABLE} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeClassifierHitRow) + .filter((record): record is ClassifierHitRecord => record !== null); + const hits = records.map(fromClassifierHitRecord); + return ClassifierHitEventSchema.array().parse(hits); +}; + +export const fetchRecentAlerts = async ( + client: ClickHouseClient, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const result = await client.query({ + query: `SELECT * FROM ${ALERTS_TABLE} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeAlertRow) + .filter((record): record is AlertRecord => record !== null); + const alerts = records.map(fromAlertRecord); + return AlertEventSchema.array().parse(alerts); +}; + export const fetchOptionPrintsAfter = async ( client: ClickHouseClient, afterTs: number, diff --git a/packages/storage/src/index.ts b/packages/storage/src/index.ts index 69a4bca..a1d4aed 100644 --- a/packages/storage/src/index.ts +++ b/packages/storage/src/index.ts @@ -1,4 +1,6 @@ export * from "./clickhouse"; +export * from "./classifier-hits"; +export * from "./alerts"; export * from "./flow-packets"; export * from "./equity-prints"; export * from "./option-prints"; diff --git a/packages/storage/tests/alerts.test.ts b/packages/storage/tests/alerts.test.ts new file mode 100644 index 0000000..9f9449c --- /dev/null +++ b/packages/storage/tests/alerts.test.ts @@ -0,0 +1,36 @@ +import { describe, expect, it } from "bun:test"; +import { alertsTableDDL, ALERTS_TABLE, fromAlertRecord, toAlertRecord } from "../src/alerts"; + +const alert = { + source_ts: 10, + ingest_ts: 20, + seq: 1, + trace_id: "alert:fp-1", + score: 78, + severity: "medium", + hits: [ + { + classifier_id: "large_bullish_call_sweep", + confidence: 0.72, + direction: "bullish", + explanations: ["Likely call sweep.", "Premium $50000."] + } + ], + evidence_refs: ["flowpacket:1", "print:1"] +}; + +describe("alerts storage helpers", () => { + it("includes the correct table name in the DDL", () => { + const ddl = alertsTableDDL(); + expect(ddl).toContain(ALERTS_TABLE); + expect(ddl).toContain("CREATE TABLE IF NOT EXISTS"); + }); + + it("round-trips alert records", () => { + const record = toAlertRecord(alert); + const restored = fromAlertRecord(record); + expect(restored.hits).toEqual(alert.hits); + expect(restored.evidence_refs).toEqual(alert.evidence_refs); + expect(restored.severity).toBe(alert.severity); + }); +}); diff --git a/packages/storage/tests/classifier-hits.test.ts b/packages/storage/tests/classifier-hits.test.ts new file mode 100644 index 0000000..b3c46e5 --- /dev/null +++ b/packages/storage/tests/classifier-hits.test.ts @@ -0,0 +1,34 @@ +import { describe, expect, it } from "bun:test"; +import { + classifierHitsTableDDL, + CLASSIFIER_HITS_TABLE, + fromClassifierHitRecord, + toClassifierHitRecord +} from "../src/classifier-hits"; + +const hit = { + source_ts: 10, + ingest_ts: 20, + seq: 1, + trace_id: "classifier:large_bullish_call_sweep:fp-1", + classifier_id: "large_bullish_call_sweep", + confidence: 0.72, + direction: "bullish", + explanations: ["Likely call sweep.", "Premium $50000."] +}; + +describe("classifier hits storage helpers", () => { + it("includes the correct table name in the DDL", () => { + const ddl = classifierHitsTableDDL(); + expect(ddl).toContain(CLASSIFIER_HITS_TABLE); + expect(ddl).toContain("CREATE TABLE IF NOT EXISTS"); + }); + + it("round-trips classifier hit records", () => { + const record = toClassifierHitRecord(hit); + const restored = fromClassifierHitRecord(record); + expect(restored.explanations).toEqual(hit.explanations); + expect(restored.classifier_id).toBe(hit.classifier_id); + expect(restored.direction).toBe(hit.direction); + }); +}); diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 444e47c..2f55bcb 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -1,9 +1,13 @@ import { readEnv } from "@islandflow/config"; import { createLogger } from "@islandflow/observability"; import { + SUBJECT_ALERTS, + SUBJECT_CLASSIFIER_HITS, SUBJECT_EQUITY_PRINTS, SUBJECT_FLOW_PACKETS, SUBJECT_OPTION_PRINTS, + STREAM_ALERTS, + STREAM_CLASSIFIER_HITS, STREAM_EQUITY_PRINTS, STREAM_FLOW_PACKETS, STREAM_OPTION_PRINTS, @@ -14,16 +18,26 @@ import { } from "@islandflow/bus"; import { createClickHouseClient, + ensureAlertsTable, + ensureClassifierHitsTable, ensureEquityPrintsTable, ensureFlowPacketsTable, ensureOptionPrintsTable, + fetchRecentAlerts, + fetchRecentClassifierHits, fetchRecentFlowPackets, fetchEquityPrintsAfter, fetchRecentEquityPrints, fetchOptionPrintsAfter, fetchRecentOptionPrints } from "@islandflow/storage"; -import { EquityPrintSchema, FlowPacketSchema, OptionPrintSchema } from "@islandflow/types"; +import { + AlertEventSchema, + ClassifierHitEventSchema, + EquityPrintSchema, + FlowPacketSchema, + OptionPrintSchema +} from "@islandflow/types"; import { z } from "zod"; const service = "api"; @@ -73,7 +87,7 @@ const replayParamsSchema = z.object({ limit: z.coerce.number().int().positive().max(1000).default(200) }); -type Channel = "options" | "equities" | "flow"; +type Channel = "options" | "equities" | "flow" | "classifier-hits" | "alerts"; type WsData = { channel: Channel; @@ -82,6 +96,8 @@ type WsData = { const optionSockets = new Set>(); const equitySockets = new Set>(); const flowSockets = new Set>(); +const classifierHitSockets = new Set>(); +const alertSockets = new Set>(); const jsonResponse = (body: unknown, status = 200): Response => { return new Response(JSON.stringify(body), { @@ -179,6 +195,32 @@ const run = async () => { num_replicas: 1 }); + await ensureStream(jsm, { + name: STREAM_CLASSIFIER_HITS, + subjects: [SUBJECT_CLASSIFIER_HITS], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + + await ensureStream(jsm, { + name: STREAM_ALERTS, + subjects: [SUBJECT_ALERTS], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + const clickhouse = createClickHouseClient({ url: env.CLICKHOUSE_URL, database: env.CLICKHOUSE_DATABASE @@ -188,6 +230,8 @@ const run = async () => { await ensureOptionPrintsTable(clickhouse); await ensureEquityPrintsTable(clickhouse); await ensureFlowPacketsTable(clickhouse); + await ensureClassifierHitsTable(clickhouse); + await ensureAlertsTable(clickhouse); }); const optionSubscription = await subscribeJson( @@ -208,6 +252,18 @@ const run = async () => { buildDurableConsumer("api-flow-packets") ); + const classifierHitSubscription = await subscribeJson( + js, + SUBJECT_CLASSIFIER_HITS, + buildDurableConsumer("api-classifier-hits") + ); + + const alertSubscription = await subscribeJson( + js, + SUBJECT_ALERTS, + buildDurableConsumer("api-alerts") + ); + const pumpOptions = async () => { for await (const msg of optionSubscription.messages) { try { @@ -253,9 +309,41 @@ const run = async () => { } }; + const pumpClassifierHits = async () => { + for await (const msg of classifierHitSubscription.messages) { + try { + const payload = ClassifierHitEventSchema.parse(classifierHitSubscription.decode(msg)); + broadcast(classifierHitSockets, { type: "classifier-hit", payload }); + msg.ack(); + } catch (error) { + logger.error("failed to process classifier hit", { + error: error instanceof Error ? error.message : String(error) + }); + msg.term(); + } + } + }; + + const pumpAlerts = async () => { + for await (const msg of alertSubscription.messages) { + try { + const payload = AlertEventSchema.parse(alertSubscription.decode(msg)); + broadcast(alertSockets, { type: "alert", payload }); + msg.ack(); + } catch (error) { + logger.error("failed to process alert", { + error: error instanceof Error ? error.message : String(error) + }); + msg.term(); + } + } + }; + void pumpOptions(); void pumpEquities(); void pumpFlow(); + void pumpClassifierHits(); + void pumpAlerts(); const server = Bun.serve({ port: env.API_PORT, @@ -284,6 +372,18 @@ const run = async () => { return jsonResponse({ data }); } + if (req.method === "GET" && url.pathname === "/flow/classifier-hits") { + const limit = parseLimit(url.searchParams.get("limit")); + const data = await fetchRecentClassifierHits(clickhouse, limit); + return jsonResponse({ data }); + } + + if (req.method === "GET" && url.pathname === "/flow/alerts") { + const limit = parseLimit(url.searchParams.get("limit")); + const data = await fetchRecentAlerts(clickhouse, limit); + return jsonResponse({ data }); + } + if (req.method === "GET" && url.pathname === "/replay/options") { const { afterTs, afterSeq, limit } = parseReplayParams(url); const data = await fetchOptionPrintsAfter(clickhouse, afterTs, afterSeq, limit); @@ -324,6 +424,22 @@ const run = async () => { return jsonResponse({ error: "websocket upgrade failed" }, 400); } + if (req.method === "GET" && url.pathname === "/ws/classifier-hits") { + if (serverRef.upgrade(req, { data: { channel: "classifier-hits" } })) { + return new Response(null, { status: 101 }); + } + + return jsonResponse({ error: "websocket upgrade failed" }, 400); + } + + if (req.method === "GET" && url.pathname === "/ws/alerts") { + if (serverRef.upgrade(req, { data: { channel: "alerts" } })) { + return new Response(null, { status: 101 }); + } + + return jsonResponse({ error: "websocket upgrade failed" }, 400); + } + return jsonResponse({ error: "not found" }, 404); }, websocket: { @@ -332,8 +448,12 @@ const run = async () => { optionSockets.add(socket); } else if (socket.data.channel === "equities") { equitySockets.add(socket); - } else { + } else if (socket.data.channel === "flow") { flowSockets.add(socket); + } else if (socket.data.channel === "classifier-hits") { + classifierHitSockets.add(socket); + } else { + alertSockets.add(socket); } logger.info("websocket connected", { channel: socket.data.channel }); @@ -343,8 +463,12 @@ const run = async () => { optionSockets.delete(socket); } else if (socket.data.channel === "equities") { equitySockets.delete(socket); - } else { + } else if (socket.data.channel === "flow") { flowSockets.delete(socket); + } else if (socket.data.channel === "classifier-hits") { + classifierHitSockets.delete(socket); + } else { + alertSockets.delete(socket); } logger.info("websocket disconnected", { channel: socket.data.channel }); diff --git a/services/compute/src/classifiers.ts b/services/compute/src/classifiers.ts new file mode 100644 index 0000000..2fd9e56 --- /dev/null +++ b/services/compute/src/classifiers.ts @@ -0,0 +1,221 @@ +import type { ClassifierHit, FlowPacket } from "@islandflow/types"; + +type ParsedContract = { + root: string; + expiry: string; + strike: number; + right: "C" | "P"; +}; + +export type ClassifierConfig = { + sweepMinPremium: number; + sweepMinCount: number; + spikeMinPremium: number; + spikeMinSize: number; +}; + +const clamp = (value: number, min = 0, max = 1): number => { + if (!Number.isFinite(value)) { + return min; + } + return Math.max(min, Math.min(max, value)); +}; + +const formatUsd = (value: number): string => { + if (!Number.isFinite(value)) { + return "$0"; + } + return `$${value.toFixed(2)}`; +}; + +const parseDashedContract = (value: string): ParsedContract | null => { + const parts = value.split("-"); + if (parts.length < 6) { + return null; + } + + const rightRaw = parts.at(-1) ?? ""; + if (rightRaw !== "C" && rightRaw !== "P") { + return null; + } + + const strikeRaw = parts.at(-2) ?? ""; + const strike = Number(strikeRaw); + const expiryParts = parts.slice(-5, -2); + const expiry = expiryParts.join("-"); + const root = parts.slice(0, -5).join("-"); + + if (!root || !expiry || !Number.isFinite(strike)) { + return null; + } + + return { + root, + expiry, + strike, + right: rightRaw + }; +}; + +const parseOccContract = (value: string): ParsedContract | null => { + if (value.length < 15) { + return null; + } + + const tail = value.slice(-15); + const root = value.slice(0, -15).trim(); + const expiryRaw = tail.slice(0, 6); + const right = tail.slice(6, 7); + const strikeRaw = tail.slice(7); + + if (!/^\d{6}$/.test(expiryRaw) || !/^\d{8}$/.test(strikeRaw)) { + return null; + } + + if (right !== "C" && right !== "P") { + return null; + } + + const year = 2000 + Number(expiryRaw.slice(0, 2)); + const month = Number(expiryRaw.slice(2, 4)) - 1; + const day = Number(expiryRaw.slice(4, 6)); + const expiryDate = new Date(Date.UTC(year, month, day)); + const expiry = expiryDate.toISOString().slice(0, 10); + const strike = Number(strikeRaw) / 1000; + + if (!root || !Number.isFinite(strike)) { + return null; + } + + return { + root, + expiry, + strike, + right + }; +}; + +const parseContractId = (value: string | undefined): ParsedContract | null => { + if (!value) { + return null; + } + + return parseDashedContract(value) ?? parseOccContract(value); +}; + +const getNumberFeature = (packet: FlowPacket, key: string): number => { + const value = packet.features[key]; + return typeof value === "number" && Number.isFinite(value) ? value : 0; +}; + +const buildSweepHit = ( + packet: FlowPacket, + contract: ParsedContract, + direction: "bullish" | "bearish", + config: ClassifierConfig +): ClassifierHit | null => { + const count = getNumberFeature(packet, "count"); + const totalPremium = getNumberFeature(packet, "total_premium"); + const totalSize = getNumberFeature(packet, "total_size"); + const firstPrice = getNumberFeature(packet, "first_price"); + const lastPrice = getNumberFeature(packet, "last_price"); + const windowMs = getNumberFeature(packet, "window_ms"); + + if (count < config.sweepMinCount || totalPremium < config.sweepMinPremium) { + return null; + } + + const priceDelta = lastPrice - firstPrice; + const priceTrend = priceDelta >= 0 ? "up" : "down"; + + let confidence = 0.55; + if (priceDelta >= 0) { + confidence += 0.1; + } + if (count >= config.sweepMinCount + 2) { + confidence += 0.1; + } + if (totalPremium >= config.sweepMinPremium * 2) { + confidence += 0.15; + } + + confidence = clamp(confidence, 0, 0.95); + + return { + classifier_id: direction === "bullish" ? "large_bullish_call_sweep" : "large_bearish_put_sweep", + confidence, + direction, + explanations: [ + `Likely ${direction === "bullish" ? "call" : "put"} sweep: ${count} prints in ${Math.round(windowMs)}ms for ${packet.features.option_contract_id ?? packet.id}.`, + `Premium ${formatUsd(totalPremium)} across ${Math.round(totalSize)} contracts; price ${priceTrend}.`, + `Thresholds: >=${config.sweepMinCount} prints and >=${formatUsd(config.sweepMinPremium)} premium.` + ] + }; +}; + +const buildSpikeHit = (packet: FlowPacket, config: ClassifierConfig): ClassifierHit | null => { + const count = getNumberFeature(packet, "count"); + const totalPremium = getNumberFeature(packet, "total_premium"); + const totalSize = getNumberFeature(packet, "total_size"); + const windowMs = getNumberFeature(packet, "window_ms"); + + if (totalSize < config.spikeMinSize || totalPremium < config.spikeMinPremium) { + return null; + } + + let confidence = 0.5; + if (totalSize >= config.spikeMinSize * 2) { + confidence += 0.15; + } + if (totalPremium >= config.spikeMinPremium * 2) { + confidence += 0.15; + } + if (count >= 3) { + confidence += 0.1; + } + + confidence = clamp(confidence, 0, 0.9); + + return { + classifier_id: "unusual_contract_spike", + confidence, + direction: "neutral", + explanations: [ + `Unusual contract spike: ${count} prints in ${Math.round(windowMs)}ms for ${packet.features.option_contract_id ?? packet.id}.`, + `Premium ${formatUsd(totalPremium)} across ${Math.round(totalSize)} contracts.`, + `Thresholds: >=${config.spikeMinSize} contracts and >=${formatUsd(config.spikeMinPremium)} premium.` + ] + }; +}; + +export const evaluateClassifiers = ( + packet: FlowPacket, + config: ClassifierConfig +): ClassifierHit[] => { + const contractId = typeof packet.features.option_contract_id === "string" + ? packet.features.option_contract_id + : ""; + const contract = parseContractId(contractId); + const hits: ClassifierHit[] = []; + + if (contract?.right === "C") { + const hit = buildSweepHit(packet, contract, "bullish", config); + if (hit) { + hits.push(hit); + } + } + + if (contract?.right === "P") { + const hit = buildSweepHit(packet, contract, "bearish", config); + if (hit) { + hits.push(hit); + } + } + + const spikeHit = buildSpikeHit(packet, config); + if (spikeHit) { + hits.push(spikeHit); + } + + return hits; +}; diff --git a/services/compute/src/index.ts b/services/compute/src/index.ts index cbbd91f..ab5e4f5 100644 --- a/services/compute/src/index.ts +++ b/services/compute/src/index.ts @@ -1,8 +1,12 @@ import { readEnv } from "@islandflow/config"; import { createLogger } from "@islandflow/observability"; import { + SUBJECT_ALERTS, + SUBJECT_CLASSIFIER_HITS, SUBJECT_FLOW_PACKETS, SUBJECT_OPTION_PRINTS, + STREAM_ALERTS, + STREAM_CLASSIFIER_HITS, STREAM_FLOW_PACKETS, STREAM_OPTION_PRINTS, buildDurableConsumer, @@ -13,11 +17,25 @@ import { } from "@islandflow/bus"; import { createClickHouseClient, + ensureAlertsTable, + ensureClassifierHitsTable, ensureFlowPacketsTable, + insertAlert, + insertClassifierHit, insertFlowPacket } from "@islandflow/storage"; -import { FlowPacketSchema, OptionPrintSchema, type FlowPacket, type OptionPrint } from "@islandflow/types"; +import { + AlertEventSchema, + ClassifierHitEventSchema, + FlowPacketSchema, + OptionPrintSchema, + type AlertEvent, + type ClassifierHitEvent, + type FlowPacket, + type OptionPrint +} from "@islandflow/types"; import { z } from "zod"; +import { evaluateClassifiers, type ClassifierConfig } from "./classifiers"; const service = "compute"; const logger = createLogger({ service }); @@ -41,11 +59,22 @@ const envSchema = z.object({ } return value; }, z.boolean()) - .default(false) + .default(false), + CLASSIFIER_SWEEP_MIN_PREMIUM: z.coerce.number().positive().default(50_000), + CLASSIFIER_SWEEP_MIN_COUNT: z.coerce.number().int().positive().default(3), + CLASSIFIER_SPIKE_MIN_PREMIUM: z.coerce.number().positive().default(25_000), + CLASSIFIER_SPIKE_MIN_SIZE: z.coerce.number().int().positive().default(500) }); const env = readEnv(envSchema); +const classifierConfig: ClassifierConfig = { + sweepMinPremium: env.CLASSIFIER_SWEEP_MIN_PREMIUM, + sweepMinCount: env.CLASSIFIER_SWEEP_MIN_COUNT, + spikeMinPremium: env.CLASSIFIER_SPIKE_MIN_PREMIUM, + spikeMinSize: env.CLASSIFIER_SPIKE_MIN_SIZE +}; + const retry = async ( label: string, attempts: number, @@ -170,6 +199,8 @@ const flushCluster = async ( await insertFlowPacket(clickhouse, validated); await publishJson(js, SUBJECT_FLOW_PACKETS, validated); + await emitClassifiers(clickhouse, js, validated); + logger.info("emitted flow packet", { id: validated.id, contract: cluster.contractId, @@ -177,6 +208,79 @@ const flushCluster = async ( }); }; +const scoreAlert = (packet: FlowPacket, hits: ClassifierHitEvent[]): { score: number; severity: string } => { + const premium = + typeof packet.features.total_premium === "number" ? packet.features.total_premium : 0; + const premiumScore = Math.min(70, Math.round(premium / 1000)); + const maxConfidence = hits.reduce((max, hit) => Math.max(max, hit.confidence), 0); + const confidenceScore = Math.round(maxConfidence * 20); + const hitScore = Math.min(20, hits.length * 5); + const score = Math.max(0, Math.min(100, premiumScore + confidenceScore + hitScore)); + const severity = score >= 80 ? "high" : score >= 45 ? "medium" : "low"; + return { score, severity }; +}; + +const emitClassifiers = async ( + clickhouse: ReturnType, + js: Awaited>["js"], + packet: FlowPacket +): Promise => { + const hits = evaluateClassifiers(packet, classifierConfig); + if (hits.length === 0) { + return; + } + + const hitEvents: ClassifierHitEvent[] = hits.map((hit) => + ClassifierHitEventSchema.parse({ + source_ts: packet.source_ts, + ingest_ts: packet.ingest_ts, + seq: packet.seq, + trace_id: `classifier:${hit.classifier_id}:${packet.id}`, + ...hit + }) + ); + + for (const hit of hitEvents) { + try { + await insertClassifierHit(clickhouse, hit); + await publishJson(js, SUBJECT_CLASSIFIER_HITS, hit); + } catch (error) { + logger.error("failed to emit classifier hit", { + error: error instanceof Error ? error.message : String(error), + classifier_id: hit.classifier_id, + packet_id: packet.id + }); + } + } + + const { score, severity } = scoreAlert(packet, hitEvents); + const alert: AlertEvent = AlertEventSchema.parse({ + source_ts: packet.source_ts, + ingest_ts: packet.ingest_ts, + seq: packet.seq, + trace_id: `alert:${packet.id}`, + score, + severity, + hits: hitEvents.map((hit) => ({ + classifier_id: hit.classifier_id, + confidence: hit.confidence, + direction: hit.direction, + explanations: hit.explanations + })), + evidence_refs: [packet.id, ...packet.members] + }); + + try { + await insertAlert(clickhouse, alert); + await publishJson(js, SUBJECT_ALERTS, alert); + } catch (error) { + logger.error("failed to emit alert", { + error: error instanceof Error ? error.message : String(error), + packet_id: packet.id + }); + } +}; + const flushEligibleClusters = async ( clickhouse: ReturnType, js: Awaited>["js"], @@ -232,6 +336,32 @@ const run = async () => { num_replicas: 1 }); + await ensureStream(jsm, { + name: STREAM_CLASSIFIER_HITS, + subjects: [SUBJECT_CLASSIFIER_HITS], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + + await ensureStream(jsm, { + name: STREAM_ALERTS, + subjects: [SUBJECT_ALERTS], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + const clickhouse = createClickHouseClient({ url: env.CLICKHOUSE_URL, database: env.CLICKHOUSE_DATABASE @@ -239,6 +369,8 @@ const run = async () => { await retry("clickhouse table init", 20, 500, async () => { await ensureFlowPacketsTable(clickhouse); + await ensureClassifierHitsTable(clickhouse); + await ensureAlertsTable(clickhouse); }); const durableName = "compute-option-prints";