diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index eb80781..f6d1839 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -4,5 +4,9 @@ {"_type":"issue","id":"islandflow-ayo","title":"Drop stale backlog events from live fanout","description":"Follow-up to live freshness rollout: /ws/live was still fanning out stale backlog events for freshness-gated channels, which kept tape panes in Live feed behind despite active synthetic ingest. Gate fanout and cache ingest by freshness for options/nbbo/equities/flow.","status":"closed","priority":1,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T21:26:39Z","created_by":"dirtydishes","updated_at":"2026-04-28T21:26:44Z","started_at":"2026-04-28T21:26:44Z","closed_at":"2026-04-28T21:26:44Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-0v6","title":"Fix tape freshness, NBBO coverage, pause controls, and filter popup","description":"Implement the tape fixes requested for synthetic options notional sizing, strict live freshness, live-mode pause/resume behavior, stronger NBBO snapshot coverage, and moving flow filters behind a popup. Includes server-side live cache changes, web terminal state/UI changes, and tests for synthetic pricing, live snapshot freshness/NBBO retention, and live pause/filter interactions.","status":"closed","priority":1,"issue_type":"task","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T21:02:52Z","created_by":"dirtydishes","updated_at":"2026-04-28T21:13:38Z","started_at":"2026-04-28T21:02:57Z","closed_at":"2026-04-28T21:13:38Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-e4r","title":"Implement smart-money flow filtering and synthetic firehose modes","description":"Implement the approved multi-surface plan for named synthetic market profiles, options raw-vs-signal filtering, live/API filter contracts, Tape page client-side flow filters, firehose-readiness improvements, tests, and README updates.","status":"closed","priority":1,"issue_type":"feature","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T20:10:49Z","created_by":"dirtydishes","updated_at":"2026-04-28T20:29:29Z","started_at":"2026-04-28T20:10:53Z","closed_at":"2026-04-28T20:29:29Z","close_reason":"Implemented synthetic market profiles, options signal-path filtering, signal-aware API/replay contracts, Tape page filters, tests, and README updates. Follow-up tracked in islandflow-biq.","dependency_count":0,"dependent_count":0,"comment_count":0} +{"_type":"issue","id":"islandflow-b6d","title":"Finish smart-money event-calendar enrichment","description":"Finish the smart-money event-calendar provider layer in services/refdata and connect days-to-event / expiry-after-event enrichment into compute using timestamp-available data only.","status":"open","priority":2,"issue_type":"task","owner":"dishes@dpdrm.com","created_at":"2026-05-04T21:35:26Z","created_by":"dirtydishes","updated_at":"2026-05-04T21:35:26Z","dependency_count":0,"dependent_count":0,"comment_count":0} +{"_type":"issue","id":"islandflow-e60","title":"Add smart-money replay evaluation harness","description":"Add replay-style live-vs-batch consistency tests plus evaluation utilities for parent-event precision/recall, calibration, abstention rate, and economic sanity checks.","status":"open","priority":2,"issue_type":"task","owner":"dishes@dpdrm.com","created_at":"2026-05-04T21:35:25Z","created_by":"dirtydishes","updated_at":"2026-05-04T21:35:25Z","dependency_count":0,"dependent_count":0,"comment_count":0} +{"_type":"issue","id":"islandflow-020","title":"Rebuild synthetic smart-money scenarios","description":"Rework services/ingest-options synthetic generation around labeled parent-event templates for the six core smart-money profiles plus neutral background noise, with deterministic test/demo modes and hidden labels for tests.","status":"open","priority":2,"issue_type":"task","owner":"dishes@dpdrm.com","created_at":"2026-05-04T21:35:24Z","created_by":"dirtydishes","updated_at":"2026-05-04T21:35:24Z","dependency_count":0,"dependent_count":0,"comment_count":0} +{"_type":"issue","id":"islandflow-zs0","title":"Migrate terminal UI to smart-money profiles","description":"Migrate apps/web terminal rendering to consume SmartMoneyEvent directly: primary profile, probability ladder, reason codes, and suppression/abstention state, while preserving legacy alert/classifier displays during the bridge.","status":"open","priority":2,"issue_type":"task","owner":"dishes@dpdrm.com","created_at":"2026-05-04T21:35:23Z","created_by":"dirtydishes","updated_at":"2026-05-04T21:35:23Z","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-igk","title":"Add plan mode","description":"Implement a user-facing plan mode in the application so users can switch into planning before taking action. Scope to be clarified from existing app patterns.","status":"closed","priority":2,"issue_type":"feature","owner":"dishes@dpdrm.com","created_at":"2026-05-04T04:22:37Z","created_by":"dirtydishes","updated_at":"2026-05-04T04:26:18Z","started_at":"2026-05-04T04:22:40Z","closed_at":"2026-05-04T04:26:18Z","close_reason":"Implemented as a global pi extension toggled with Shift+P","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-biq","title":"Finish raw live options delivery and filter/backpressure observability","description":"The smart-money signal path and Tape filters are in place, but the next firehose pass should finish server-side selective raw live delivery for options subscriptions and add explicit filtered-out/backpressure observability for API/web counters. This was discovered while landing islandflow-e4r.\n","status":"in_progress","priority":2,"issue_type":"task","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T20:28:58Z","created_by":"dirtydishes","updated_at":"2026-04-29T03:54:12Z","started_at":"2026-04-29T03:54:12Z","dependencies":[{"issue_id":"islandflow-biq","depends_on_id":"islandflow-e4r","type":"discovered-from","created_at":"2026-04-28T16:28:58Z","created_by":"dirtydishes","metadata":"{}"}],"dependency_count":0,"dependent_count":0,"comment_count":0} diff --git a/SMART_MONEY_REBUILD_PLAN.md b/SMART_MONEY_REBUILD_PLAN.md new file mode 100644 index 0000000..09d540c --- /dev/null +++ b/SMART_MONEY_REBUILD_PLAN.md @@ -0,0 +1,63 @@ +# Smart Money Rebuild Plan + +Living implementation tracker for the rules-first smart-money rebuild. Issue tracking remains in `bd`; this file records migration state, acceptance criteria, and handoff notes. + +## Phase Checklists + +### Phase 1: Contracts and Storage +- [x] Add `SmartMoneyEvent` contract in `packages/types`. +- [x] Add typed features, profile scores, abstention, and suppression metadata. +- [x] Extend `AlertEvent` with optional profile metadata. +- [x] Add `smart_money_events` ClickHouse storage helpers. +- [x] Add bus/live channel names for smart-money events. + +Acceptance: smart-money events round-trip through schema/storage helpers and alerts remain backward-compatible. + +### Phase 2: Parent-Event Reconstruction +- [x] Add `services/compute/src/parent-events.ts`. +- [x] Convert existing `FlowPacket` clusters and structure packets into deterministic parent events. +- [x] Emit deterministic event IDs from packet identity. +- [x] Preserve bridge semantics while `FlowPacket` remains an intermediate artifact. + +Acceptance: live and replay produce the same event ID for the same packet. + +### Phase 3: Feature Engineering +- [x] Build typed features for aggressor mix, spread/quote quality, timing, strike concentration, DTE, moneyness, structure markers, and event alignment fields. +- [x] Keep batch-only validation fields out of live scoring. +- [ ] Connect an external event-calendar feed through `services/refdata`. + +Acceptance: missing event-calendar fields produce neutral `null` feature values and do not block scoring. + +### Phase 4: Rules Engine +- [x] Score the six primary profiles. +- [x] Return probabilities, confidence bands, directions, reason codes, and suppression reasons. +- [x] Add false-positive guards for stale quotes, complex/special prints, retail-frenzy directional suppression, hedge-reactive 0-2 DTE ATM contexts, and arbitrage symmetry. + +Acceptance: abstained events do not emit legacy classifier hits. + +### Phase 5: Synthetic Market Redesign +- [ ] Rework synthetic options adapter around labeled parent-event templates. +- [ ] Add deterministic scenario families for all six profiles. +- [ ] Add test/demo operating modes with hidden labels. + +Acceptance: scenario tests assert intended profile wins and wrong nearby profiles remain below threshold. + +### Phase 6: Compute, API, and UI Rollout +- [x] Emit `SmartMoneyEvent` first in compute. +- [x] Derive compatibility `ClassifierHitEvent` and `AlertEvent`. +- [x] Add REST/history/replay/ws/live support for smart-money events. +- [ ] Migrate terminal UI to profile-aware display. + +Acceptance: old classifier and alert endpoints still work while `/flow/smart-money`, `/history/smart-money`, `/replay/smart-money`, and `/ws/smart-money` expose the new model. + +### Phase 7: Evaluation and Replay +- [x] Add deterministic unit tests for parent-event scoring and storage. +- [ ] Add replay-style live-vs-batch consistency tests. +- [ ] Add evaluation utilities for calibration, abstention rate, and economic sanity checks. + +## Migration Notes + +- `FlowPacket` remains the packet/cluster bridge and is no longer the final semantic alert object. +- `ClassifierHitEvent` is now a compatibility surface derived from `SmartMoneyEvent.primary_profile_id`. +- `AlertEvent` keeps existing fields and may include `primary_profile_id` plus `profile_scores`. +- Existing structure labels such as vertical, straddle, roll, and 0DTE gamma are evidence/reason concepts rather than final business-facing profile IDs. diff --git a/packages/bus/src/subjects.ts b/packages/bus/src/subjects.ts index 24fc427..6b21afd 100644 --- a/packages/bus/src/subjects.ts +++ b/packages/bus/src/subjects.ts @@ -16,6 +16,8 @@ export const STREAM_INFERRED_DARK = "INFERRED_DARK"; export const SUBJECT_INFERRED_DARK = "dark.inferred"; export const STREAM_FLOW_PACKETS = "FLOW_PACKETS"; export const SUBJECT_FLOW_PACKETS = "flow.packets"; +export const STREAM_SMART_MONEY_EVENTS = "SMART_MONEY_EVENTS"; +export const SUBJECT_SMART_MONEY_EVENTS = "flow.smart_money"; export const STREAM_CLASSIFIER_HITS = "CLASSIFIER_HITS"; export const SUBJECT_CLASSIFIER_HITS = "flow.classifier_hits"; export const STREAM_ALERTS = "ALERTS"; diff --git a/packages/storage/src/alerts.ts b/packages/storage/src/alerts.ts index ef9302c..ae79e75 100644 --- a/packages/storage/src/alerts.ts +++ b/packages/storage/src/alerts.ts @@ -1,4 +1,4 @@ -import type { AlertEvent, ClassifierHit } from "@islandflow/types"; +import type { AlertEvent, ClassifierHit, SmartMoneyProfileScore } from "@islandflow/types"; export const ALERTS_TABLE = "alerts"; @@ -11,6 +11,8 @@ export type AlertRecord = { severity: string; hits_json: string; evidence_refs_json: string; + primary_profile_id: string; + profile_scores_json: string; }; export const alertsTableDDL = (): string => { @@ -23,13 +25,20 @@ CREATE TABLE IF NOT EXISTS ${ALERTS_TABLE} ( score Float64, severity String, hits_json String, - evidence_refs_json String + evidence_refs_json String, + primary_profile_id String DEFAULT '', + profile_scores_json String DEFAULT '[]' ) ENGINE = MergeTree ORDER BY (source_ts, seq) `; }; +export const alertsTableMigrations = (): string[] => [ + `ALTER TABLE ${ALERTS_TABLE} ADD COLUMN IF NOT EXISTS primary_profile_id String DEFAULT ''`, + `ALTER TABLE ${ALERTS_TABLE} ADD COLUMN IF NOT EXISTS profile_scores_json String DEFAULT '[]'` +]; + export const toAlertRecord = (alert: AlertEvent): AlertRecord => { return { source_ts: alert.source_ts, @@ -39,7 +48,9 @@ export const toAlertRecord = (alert: AlertEvent): AlertRecord => { score: alert.score, severity: alert.severity, hits_json: JSON.stringify(alert.hits), - evidence_refs_json: JSON.stringify(alert.evidence_refs) + evidence_refs_json: JSON.stringify(alert.evidence_refs), + primary_profile_id: alert.primary_profile_id ?? "", + profile_scores_json: JSON.stringify(alert.profile_scores ?? []) }; }; @@ -79,6 +90,28 @@ const safeStringArray = (value: string): string[] => { return []; }; +const safeProfileScoreArray = (value: string): SmartMoneyProfileScore[] => { + try { + const parsed = JSON.parse(value); + if (Array.isArray(parsed)) { + return parsed.map((entry) => { + const record = entry as Partial; + return { + profile_id: String(record.profile_id ?? "") as SmartMoneyProfileScore["profile_id"], + probability: Number(record.probability ?? 0), + confidence_band: String(record.confidence_band ?? "low") as SmartMoneyProfileScore["confidence_band"], + direction: String(record.direction ?? "unknown") as SmartMoneyProfileScore["direction"], + reasons: Array.isArray(record.reasons) ? record.reasons.map((item) => String(item)) : [] + }; + }); + } + } catch { + // ignore + } + + return []; +}; + export const fromAlertRecord = (record: AlertRecord): AlertEvent => { return { source_ts: record.source_ts, @@ -88,6 +121,8 @@ export const fromAlertRecord = (record: AlertRecord): AlertEvent => { score: record.score, severity: record.severity, hits: safeHitArray(record.hits_json), - evidence_refs: safeStringArray(record.evidence_refs_json) + evidence_refs: safeStringArray(record.evidence_refs_json), + ...(record.primary_profile_id ? { primary_profile_id: record.primary_profile_id as AlertEvent["primary_profile_id"] } : {}), + profile_scores: safeProfileScoreArray(record.profile_scores_json) }; }; diff --git a/packages/storage/src/clickhouse.ts b/packages/storage/src/clickhouse.ts index 37526d9..4c71357 100644 --- a/packages/storage/src/clickhouse.ts +++ b/packages/storage/src/clickhouse.ts @@ -8,7 +8,8 @@ import { InferredDarkEventSchema, FlowPacketSchema, OptionNBBOSchema, - OptionPrintSchema + OptionPrintSchema, + SmartMoneyEventSchema } from "@islandflow/types"; import type { AlertEvent, @@ -19,6 +20,7 @@ import type { EquityPrintJoin, InferredDarkEvent, FlowPacket, + SmartMoneyEvent, OptionNBBO, OptionPrint, OptionFlowFilters, @@ -76,11 +78,19 @@ import { } from "./classifier-hits"; import { ALERTS_TABLE, + alertsTableMigrations, alertsTableDDL, fromAlertRecord, toAlertRecord, type AlertRecord } from "./alerts"; +import { + SMART_MONEY_EVENTS_TABLE, + smartMoneyEventsTableDDL, + fromSmartMoneyEventRecord, + toSmartMoneyEventRecord, + type SmartMoneyEventRecord +} from "./smart-money-events"; export type ClickHouseOptions = { url: string; @@ -285,6 +295,14 @@ export const ensureFlowPacketsTable = async ( }); }; +export const ensureSmartMoneyEventsTable = async ( + client: ClickHouseClient +): Promise => { + await client.exec({ + query: smartMoneyEventsTableDDL() + }); +}; + export const ensureClassifierHitsTable = async ( client: ClickHouseClient ): Promise => { @@ -297,6 +315,9 @@ export const ensureAlertsTable = async (client: ClickHouseClient): Promise await client.exec({ query: alertsTableDDL() }); + for (const query of alertsTableMigrations()) { + await client.exec({ query }); + } }; export const insertOptionPrint = async ( @@ -395,6 +416,18 @@ export const insertFlowPacket = async ( }); }; +export const insertSmartMoneyEvent = async ( + client: ClickHouseClient, + event: SmartMoneyEvent +): Promise => { + const record = toSmartMoneyEventRecord(event); + await client.insert({ + table: SMART_MONEY_EVENTS_TABLE, + values: [record], + format: "JSONEachRow" + }); +}; + export const insertClassifierHit = async ( client: ClickHouseClient, hit: ClassifierHitEvent @@ -777,6 +810,34 @@ const normalizeClassifierHitRow = (row: unknown): ClassifierHitRecord | null => }; }; +const normalizeSmartMoneyEventRow = (row: unknown): SmartMoneyEventRecord | null => { + if (!row || typeof row !== "object") { + return null; + } + + const record = row as Record; + return { + source_ts: coerceNumber(record.source_ts) as number, + ingest_ts: coerceNumber(record.ingest_ts) as number, + seq: coerceNumber(record.seq) as number, + trace_id: String(record.trace_id ?? ""), + event_id: String(record.event_id ?? ""), + packet_ids: Array.isArray(record.packet_ids) ? record.packet_ids.map((value) => String(value)) : [], + member_print_ids: Array.isArray(record.member_print_ids) + ? record.member_print_ids.map((value) => String(value)) + : [], + underlying_id: String(record.underlying_id ?? ""), + event_kind: String(record.event_kind ?? ""), + event_window_ms: coerceNumber(record.event_window_ms) as number, + features_json: String(record.features_json ?? "{}"), + profile_scores_json: String(record.profile_scores_json ?? "[]"), + primary_profile_id: String(record.primary_profile_id ?? ""), + primary_direction: String(record.primary_direction ?? "unknown"), + abstained: Boolean(record.abstained), + suppressed_reasons_json: String(record.suppressed_reasons_json ?? "[]") + }; +}; + const normalizeAlertRow = (row: unknown): AlertRecord | null => { if (!row || typeof row !== "object") { return null; @@ -791,7 +852,9 @@ const normalizeAlertRow = (row: unknown): AlertRecord | null => { score: Number(coerceNumber(record.score) ?? 0), severity: String(record.severity ?? ""), hits_json: String(record.hits_json ?? "[]"), - evidence_refs_json: String(record.evidence_refs_json ?? "[]") + evidence_refs_json: String(record.evidence_refs_json ?? "[]"), + primary_profile_id: String(record.primary_profile_id ?? ""), + profile_scores_json: String(record.profile_scores_json ?? "[]") }; }; @@ -951,6 +1014,23 @@ export const fetchRecentClassifierHits = async ( return ClassifierHitEventSchema.array().parse(hits); }; +export const fetchRecentSmartMoneyEvents = async ( + client: ClickHouseClient, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const result = await client.query({ + query: `SELECT * FROM ${SMART_MONEY_EVENTS_TABLE} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeSmartMoneyEventRow) + .filter((record): record is SmartMoneyEventRecord => record !== null); + return SmartMoneyEventSchema.array().parse(records.map(fromSmartMoneyEventRecord)); +}; + export const fetchRecentAlerts = async ( client: ClickHouseClient, limit: number @@ -1222,6 +1302,28 @@ export const fetchClassifierHitsAfter = async ( return ClassifierHitEventSchema.array().parse(hits); }; +export const fetchSmartMoneyEventsAfter = async ( + client: ClickHouseClient, + afterTs: number, + afterSeq: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const safeAfterTs = clampCursor(afterTs); + const safeAfterSeq = clampCursor(afterSeq); + + const result = await client.query({ + query: `SELECT * FROM ${SMART_MONEY_EVENTS_TABLE} WHERE (source_ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY source_ts ASC, seq ASC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeSmartMoneyEventRow) + .filter((record): record is SmartMoneyEventRecord => record !== null); + return SmartMoneyEventSchema.array().parse(records.map(fromSmartMoneyEventRecord)); +}; + export const fetchAlertsAfter = async ( client: ClickHouseClient, afterTs: number, @@ -1385,6 +1487,25 @@ export const fetchClassifierHitsBefore = async ( return ClassifierHitEventSchema.array().parse(records.map(fromClassifierHitRecord)); }; +export const fetchSmartMoneyEventsBefore = async ( + client: ClickHouseClient, + beforeTs: number, + beforeSeq: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const result = await client.query({ + query: `SELECT * FROM ${SMART_MONEY_EVENTS_TABLE} WHERE ${buildBeforeTupleCondition("source_ts", "seq", beforeTs, beforeSeq)} ORDER BY source_ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeSmartMoneyEventRow) + .filter((record): record is SmartMoneyEventRecord => record !== null); + return SmartMoneyEventSchema.array().parse(records.map(fromSmartMoneyEventRecord)); +}; + export const fetchAlertsBefore = async ( client: ClickHouseClient, beforeTs: number, diff --git a/packages/storage/src/index.ts b/packages/storage/src/index.ts index 192a474..4fefabc 100644 --- a/packages/storage/src/index.ts +++ b/packages/storage/src/index.ts @@ -2,6 +2,7 @@ export * from "./clickhouse"; export * from "./classifier-hits"; export * from "./alerts"; export * from "./flow-packets"; +export * from "./smart-money-events"; export * from "./equity-prints"; export * from "./equity-quotes"; export * from "./equity-candles"; diff --git a/packages/storage/src/smart-money-events.ts b/packages/storage/src/smart-money-events.ts new file mode 100644 index 0000000..f73c3f4 --- /dev/null +++ b/packages/storage/src/smart-money-events.ts @@ -0,0 +1,100 @@ +import type { SmartMoneyEvent } from "@islandflow/types"; + +export const SMART_MONEY_EVENTS_TABLE = "smart_money_events"; + +export type SmartMoneyEventRecord = { + source_ts: number; + ingest_ts: number; + seq: number; + trace_id: string; + event_id: string; + packet_ids: string[]; + member_print_ids: string[]; + underlying_id: string; + event_kind: string; + event_window_ms: number; + features_json: string; + profile_scores_json: string; + primary_profile_id: string; + primary_direction: string; + abstained: boolean; + suppressed_reasons_json: string; +}; + +export const smartMoneyEventsTableDDL = (): string => { + return ` +CREATE TABLE IF NOT EXISTS ${SMART_MONEY_EVENTS_TABLE} ( + source_ts UInt64, + ingest_ts UInt64, + seq UInt64, + trace_id String, + event_id String, + packet_ids Array(String), + member_print_ids Array(String), + underlying_id String, + event_kind String, + event_window_ms UInt64, + features_json String, + profile_scores_json String, + primary_profile_id String, + primary_direction String, + abstained Bool, + suppressed_reasons_json String +) +ENGINE = MergeTree +ORDER BY (source_ts, seq) +`; +}; + +export const toSmartMoneyEventRecord = (event: SmartMoneyEvent): SmartMoneyEventRecord => { + return { + source_ts: event.source_ts, + ingest_ts: event.ingest_ts, + seq: event.seq, + trace_id: event.trace_id, + event_id: event.event_id, + packet_ids: event.packet_ids, + member_print_ids: event.member_print_ids, + underlying_id: event.underlying_id, + event_kind: event.event_kind, + event_window_ms: event.event_window_ms, + features_json: JSON.stringify(event.features), + profile_scores_json: JSON.stringify(event.profile_scores), + primary_profile_id: event.primary_profile_id ?? "", + primary_direction: event.primary_direction, + abstained: event.abstained, + suppressed_reasons_json: JSON.stringify(event.suppressed_reasons) + }; +}; + +const safeJson = (value: string, fallback: T): T => { + try { + return JSON.parse(value) as T; + } catch { + return fallback; + } +}; + +export const fromSmartMoneyEventRecord = (record: SmartMoneyEventRecord): SmartMoneyEvent => { + const primaryProfileId = record.primary_profile_id.trim(); + return { + source_ts: record.source_ts, + ingest_ts: record.ingest_ts, + seq: record.seq, + trace_id: record.trace_id, + event_id: record.event_id, + packet_ids: record.packet_ids, + member_print_ids: record.member_print_ids, + underlying_id: record.underlying_id, + event_kind: record.event_kind as SmartMoneyEvent["event_kind"], + event_window_ms: record.event_window_ms, + features: safeJson(record.features_json, {} as SmartMoneyEvent["features"]), + profile_scores: safeJson(record.profile_scores_json, [] as SmartMoneyEvent["profile_scores"]), + primary_profile_id: primaryProfileId + ? (primaryProfileId as SmartMoneyEvent["primary_profile_id"]) + : null, + primary_direction: record.primary_direction as SmartMoneyEvent["primary_direction"], + abstained: Boolean(record.abstained), + suppressed_reasons: safeJson(record.suppressed_reasons_json, [] as string[]) + }; +}; diff --git a/packages/storage/tests/smart-money-events.test.ts b/packages/storage/tests/smart-money-events.test.ts new file mode 100644 index 0000000..6ab5eb8 --- /dev/null +++ b/packages/storage/tests/smart-money-events.test.ts @@ -0,0 +1,85 @@ +import { describe, expect, it } from "bun:test"; +import { + SMART_MONEY_EVENTS_TABLE, + fromSmartMoneyEventRecord, + smartMoneyEventsTableDDL, + toSmartMoneyEventRecord +} from "../src/smart-money-events"; +import type { SmartMoneyEvent } from "@islandflow/types"; + +const event: SmartMoneyEvent = { + source_ts: 10, + ingest_ts: 20, + seq: 1, + trace_id: "smartmoney:flowpacket:1", + event_id: "smartmoney:single_leg_event:flowpacket:1", + packet_ids: ["flowpacket:1"], + member_print_ids: ["print:1"], + underlying_id: "SPY", + event_kind: "single_leg_event", + event_window_ms: 500, + features: { + contract_count: 1, + print_count: 3, + total_size: 900, + total_premium: 75_000, + total_notional: 7_500_000, + start_ts: 10, + end_ts: 10, + window_ms: 500, + option_contract_id: "SPY-2025-01-17-450-C", + option_type: "C", + dte_days: 1, + moneyness: 1, + atm_proximity: 0.01, + aggressor_buy_ratio: 0.7, + aggressor_sell_ratio: 0.1, + aggressor_ratio: 0.8, + nbbo_coverage_ratio: 0.9, + nbbo_inside_ratio: 0.1, + nbbo_stale_ratio: 0, + quote_age_ms: 20, + venue_count: 2, + inter_fill_ms_mean: 100, + strike_count: 1, + strike_concentration: 1, + structure_legs: 0, + same_size_leg_symmetry: 0, + net_directional_bias: 0.6, + synthetic_iv_shock: null, + spread_widening: null, + underlying_move_bps: null, + days_to_event: null, + expiry_after_event: null, + pre_event_concentration: null, + special_print_ratio: 0 + }, + profile_scores: [ + { + profile_id: "institutional_directional", + probability: 0.74, + confidence_band: "high", + direction: "bullish", + reasons: ["large_parent_event"] + } + ], + primary_profile_id: "institutional_directional", + primary_direction: "bullish", + abstained: false, + suppressed_reasons: [] +}; + +describe("smart money event storage helpers", () => { + it("includes the correct table name in the DDL", () => { + const ddl = smartMoneyEventsTableDDL(); + expect(ddl).toContain(SMART_MONEY_EVENTS_TABLE); + expect(ddl).toContain("profile_scores_json"); + }); + + it("round-trips smart money event records", () => { + const restored = fromSmartMoneyEventRecord(toSmartMoneyEventRecord(event)); + expect(restored.event_id).toBe(event.event_id); + expect(restored.profile_scores).toEqual(event.profile_scores); + expect(restored.features.total_premium).toBe(event.features.total_premium); + }); +}); diff --git a/packages/types/src/events.ts b/packages/types/src/events.ts index 0ba5e57..c15dc7b 100644 --- a/packages/types/src/events.ts +++ b/packages/types/src/events.ts @@ -135,6 +135,98 @@ export const FlowPacketSchema = EventMetaSchema.merge( export type FlowPacket = z.infer; +export const SmartMoneyProfileIdSchema = z.enum([ + "institutional_directional", + "retail_whale", + "event_driven", + "vol_seller", + "arbitrage", + "hedge_reactive" +]); + +export type SmartMoneyProfileId = z.infer; + +export const SmartMoneyDirectionSchema = z.enum(["bullish", "bearish", "neutral", "mixed", "unknown"]); + +export type SmartMoneyDirection = z.infer; + +export const SmartMoneyEventKindSchema = z.enum(["single_leg_event", "multi_leg_event"]); + +export type SmartMoneyEventKind = z.infer; + +export const SmartMoneyConfidenceBandSchema = z.enum(["low", "medium", "high"]); + +export type SmartMoneyConfidenceBand = z.infer; + +export const SmartMoneyFeaturesSchema = z.object({ + contract_count: z.number().int().nonnegative(), + print_count: z.number().int().nonnegative(), + total_size: z.number().nonnegative(), + total_premium: z.number().nonnegative(), + total_notional: z.number().nonnegative(), + start_ts: z.number().int().nonnegative(), + end_ts: z.number().int().nonnegative(), + window_ms: z.number().int().nonnegative(), + option_contract_id: z.string().min(1).optional(), + option_type: z.enum(["C", "P"]).optional(), + dte_days: z.number().nonnegative().nullable(), + moneyness: z.number().nullable(), + atm_proximity: z.number().nullable(), + aggressor_buy_ratio: z.number().min(0).max(1), + aggressor_sell_ratio: z.number().min(0).max(1), + aggressor_ratio: z.number().min(0).max(1), + nbbo_coverage_ratio: z.number().min(0).max(1), + nbbo_inside_ratio: z.number().min(0).max(1), + nbbo_stale_ratio: z.number().min(0).max(1), + quote_age_ms: z.number().nonnegative().nullable(), + venue_count: z.number().int().nonnegative(), + inter_fill_ms_mean: z.number().nonnegative().nullable(), + strike_count: z.number().int().nonnegative(), + strike_concentration: z.number().min(0).max(1), + structure_type: z.string().optional(), + structure_legs: z.number().int().nonnegative(), + same_size_leg_symmetry: z.number().min(0).max(1), + net_directional_bias: z.number().min(-1).max(1), + synthetic_iv_shock: z.number().nullable(), + spread_widening: z.number().nullable(), + underlying_move_bps: z.number().nullable(), + days_to_event: z.number().nullable(), + expiry_after_event: z.boolean().nullable(), + pre_event_concentration: z.number().min(0).max(1).nullable(), + special_print_ratio: z.number().min(0).max(1) +}); + +export type SmartMoneyFeatures = z.infer; + +export const SmartMoneyProfileScoreSchema = z.object({ + profile_id: SmartMoneyProfileIdSchema, + probability: z.number().min(0).max(1), + confidence_band: SmartMoneyConfidenceBandSchema, + direction: SmartMoneyDirectionSchema, + reasons: z.array(z.string().min(1)) +}); + +export type SmartMoneyProfileScore = z.infer; + +export const SmartMoneyEventSchema = EventMetaSchema.merge( + z.object({ + event_id: z.string().min(1), + packet_ids: z.array(z.string().min(1)), + member_print_ids: z.array(z.string().min(1)), + underlying_id: z.string().min(1), + event_kind: SmartMoneyEventKindSchema, + event_window_ms: z.number().int().nonnegative(), + features: SmartMoneyFeaturesSchema, + profile_scores: z.array(SmartMoneyProfileScoreSchema), + primary_profile_id: SmartMoneyProfileIdSchema.nullable(), + primary_direction: SmartMoneyDirectionSchema, + abstained: z.boolean(), + suppressed_reasons: z.array(z.string().min(1)) + }) +); + +export type SmartMoneyEvent = z.infer; + export const ClassifierHitSchema = z.object({ classifier_id: z.string().min(1), confidence: z.number().min(0).max(1), @@ -153,7 +245,9 @@ export const AlertEventSchema = EventMetaSchema.merge( score: z.number(), severity: z.string().min(1), hits: z.array(ClassifierHitSchema), - evidence_refs: z.array(z.string().min(1)) + evidence_refs: z.array(z.string().min(1)), + primary_profile_id: SmartMoneyProfileIdSchema.optional(), + profile_scores: z.array(SmartMoneyProfileScoreSchema).optional() }) ); diff --git a/packages/types/src/live.ts b/packages/types/src/live.ts index f122d94..37fe7c8 100644 --- a/packages/types/src/live.ts +++ b/packages/types/src/live.ts @@ -9,7 +9,8 @@ import { FlowPacketSchema, InferredDarkEventSchema, OptionNBBOSchema, - OptionPrintSchema + OptionPrintSchema, + SmartMoneyEventSchema } from "./events"; import { OptionFlowFiltersSchema, @@ -30,6 +31,7 @@ export const LiveGenericChannelSchema = z.enum([ "equity-quotes", "equity-joins", "flow", + "smart-money", "classifier-hits", "alerts", "inferred-dark" @@ -42,6 +44,7 @@ export const LiveChannelSchema = z.enum([ "equity-quotes", "equity-joins", "flow", + "smart-money", "classifier-hits", "alerts", "inferred-dark", @@ -63,6 +66,9 @@ export const LiveSubscriptionSchema = z.discriminatedUnion("channel", [ channel: z.literal("flow"), filters: OptionFlowFiltersSchema.optional() }), + z.object({ + channel: z.literal("smart-money") + }), z.object({ channel: z.enum(["nbbo", "equity-quotes", "equity-joins", "classifier-hits", "alerts", "inferred-dark"]) }), @@ -90,6 +96,7 @@ const livePayloadSchemas = { "equity-quotes": EquityQuoteSchema, "equity-joins": EquityPrintJoinSchema, flow: FlowPacketSchema, + "smart-money": SmartMoneyEventSchema, "classifier-hits": ClassifierHitEventSchema, alerts: AlertEventSchema, "inferred-dark": InferredDarkEventSchema, diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 37830c3..031da57 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -9,6 +9,7 @@ import { SUBJECT_EQUITY_QUOTES, SUBJECT_INFERRED_DARK, SUBJECT_FLOW_PACKETS, + SUBJECT_SMART_MONEY_EVENTS, SUBJECT_OPTION_NBBO, SUBJECT_OPTION_SIGNAL_PRINTS, STREAM_ALERTS, @@ -19,6 +20,7 @@ import { STREAM_EQUITY_QUOTES, STREAM_INFERRED_DARK, STREAM_FLOW_PACKETS, + STREAM_SMART_MONEY_EVENTS, STREAM_OPTION_NBBO, STREAM_OPTION_SIGNAL_PRINTS, buildDurableConsumer, @@ -36,17 +38,21 @@ import { ensureEquityQuotesTable, ensureInferredDarkTable, ensureFlowPacketsTable, + ensureSmartMoneyEventsTable, ensureOptionNBBOTable, ensureOptionPrintsTable, fetchAlertsAfter, fetchAlertsBefore, fetchClassifierHitsAfter, fetchClassifierHitsBefore, + fetchSmartMoneyEventsAfter, + fetchSmartMoneyEventsBefore, fetchFlowPacketsAfter, fetchFlowPacketById, fetchFlowPacketsBefore, fetchRecentAlerts, fetchRecentClassifierHits, + fetchRecentSmartMoneyEvents, fetchRecentEquityPrintJoins, fetchRecentFlowPackets, fetchRecentInferredDark, @@ -95,6 +101,7 @@ import { OptionSecurityTypeSchema, OptionTypeSchema, FlowPacketSchema, + SmartMoneyEventSchema, OptionNBBOSchema, OptionPrintSchema, getSubscriptionKey @@ -256,6 +263,7 @@ type Channel = | "equity-joins" | "inferred-dark" | "flow" + | "smart-money" | "classifier-hits" | "alerts"; @@ -278,6 +286,7 @@ const equityQuoteSockets = new Set(); const equityJoinSockets = new Set(); const inferredDarkSockets = new Set(); const flowSockets = new Set(); +const smartMoneySockets = new Set(); const classifierHitSockets = new Set(); const alertSockets = new Set(); const liveSocketSubscriptions = new Map>(); @@ -772,6 +781,19 @@ const run = async () => { num_replicas: 1 }); + await ensureStream(jsm, { + name: STREAM_SMART_MONEY_EVENTS, + subjects: [SUBJECT_SMART_MONEY_EVENTS], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + await ensureStream(jsm, { name: STREAM_CLASSIFIER_HITS, subjects: [SUBJECT_CLASSIFIER_HITS], @@ -812,6 +834,7 @@ const run = async () => { await ensureEquityPrintJoinsTable(clickhouse); await ensureInferredDarkTable(clickhouse); await ensureFlowPacketsTable(clickhouse); + await ensureSmartMoneyEventsTable(clickhouse); await ensureClassifierHitsTable(clickhouse); await ensureAlertsTable(clickhouse); }); @@ -918,6 +941,11 @@ const run = async () => { stream: STREAM_FLOW_PACKETS, durableName: "api-flow-packets" }, + { + subject: SUBJECT_SMART_MONEY_EVENTS, + stream: STREAM_SMART_MONEY_EVENTS, + durableName: "api-smart-money-events" + }, { subject: SUBJECT_CLASSIFIER_HITS, stream: STREAM_CLASSIFIER_HITS, @@ -1057,18 +1085,24 @@ const run = async () => { consumerBindings[7].durableName ); - const classifierHitSubscription = await subscribeWithReset( + const smartMoneySubscription = await subscribeWithReset( consumerBindings[8].subject, consumerBindings[8].stream, consumerBindings[8].durableName ); - const alertSubscription = await subscribeWithReset( + const classifierHitSubscription = await subscribeWithReset( consumerBindings[9].subject, consumerBindings[9].stream, consumerBindings[9].durableName ); + const alertSubscription = await subscribeWithReset( + consumerBindings[10].subject, + consumerBindings[10].stream, + consumerBindings[10].durableName + ); + const fanoutLive = async ( subscription: LiveSubscription, item: unknown, @@ -1269,6 +1303,22 @@ const run = async () => { } }; + const pumpSmartMoney = async () => { + for await (const msg of smartMoneySubscription.messages) { + try { + const payload = SmartMoneyEventSchema.parse(smartMoneySubscription.decode(msg)); + broadcast(smartMoneySockets, { type: "smart-money", payload }); + await fanoutLive({ channel: "smart-money" }, payload, "smart-money"); + msg.ack(); + } catch (error) { + logger.error("failed to process smart money event", { + error: error instanceof Error ? error.message : String(error) + }); + msg.term(); + } + } + }; + const pumpClassifierHits = async () => { for await (const msg of classifierHitSubscription.messages) { try { @@ -1309,6 +1359,7 @@ const run = async () => { void pumpEquityJoins(); void pumpInferredDark(); void pumpFlow(); + void pumpSmartMoney(); void pumpClassifierHits(); void pumpAlerts(); @@ -1429,6 +1480,12 @@ const run = async () => { return jsonResponse({ data }); } + if (req.method === "GET" && url.pathname === "/flow/smart-money") { + const limit = parseLimit(url.searchParams.get("limit")); + const data = await fetchRecentSmartMoneyEvents(clickhouse, limit); + return jsonResponse({ data }); + } + if (req.method === "GET" && url.pathname === "/flow/classifier-hits") { const limit = parseLimit(url.searchParams.get("limit")); const data = await fetchRecentClassifierHits(clickhouse, limit); @@ -1507,6 +1564,14 @@ const run = async () => { ); } + if (req.method === "GET" && url.pathname === "/history/smart-money") { + const { beforeTs, beforeSeq, limit } = parseBeforeParams(url); + const data = await fetchSmartMoneyEventsBefore(clickhouse, beforeTs, beforeSeq, limit); + return jsonResponse( + buildHistoryResponse(data, (item) => ({ ts: item.source_ts, seq: item.seq })) + ); + } + if (req.method === "GET" && url.pathname === "/history/classifier-hits") { const { beforeTs, beforeSeq, limit } = parseBeforeParams(url); const data = await fetchClassifierHitsBefore(clickhouse, beforeTs, beforeSeq, limit); @@ -1651,6 +1716,14 @@ const run = async () => { return jsonResponse({ data, next }); } + if (req.method === "GET" && url.pathname === "/replay/smart-money") { + const { afterTs, afterSeq, limit } = parseReplayParams(url); + const data = await fetchSmartMoneyEventsAfter(clickhouse, afterTs, afterSeq, limit); + const last = data.at(-1); + const next = last ? { ts: last.source_ts, seq: last.seq } : null; + return jsonResponse({ data, next }); + } + if (req.method === "GET" && url.pathname === "/replay/classifier-hits") { const { afterTs, afterSeq, limit } = parseReplayParams(url); const data = await fetchClassifierHitsAfter(clickhouse, afterTs, afterSeq, limit); @@ -1739,6 +1812,14 @@ const run = async () => { return jsonResponse({ error: "websocket upgrade failed" }, 400); } + if (req.method === "GET" && url.pathname === "/ws/smart-money") { + if (serverRef.upgrade(req, { data: { channel: "smart-money" } })) { + return new Response(null, { status: 101 }); + } + + return jsonResponse({ error: "websocket upgrade failed" }, 400); + } + if (req.method === "GET" && url.pathname === "/ws/alerts") { if (serverRef.upgrade(req, { data: { channel: "alerts" } })) { return new Response(null, { status: 101 }); @@ -1781,6 +1862,8 @@ const run = async () => { inferredDarkSockets.add(socket); } else if (socket.data.channel === "flow") { flowSockets.add(socket); + } else if (socket.data.channel === "smart-money") { + smartMoneySockets.add(socket); } else if (socket.data.channel === "classifier-hits") { classifierHitSockets.add(socket); } else { @@ -1842,6 +1925,8 @@ const run = async () => { inferredDarkSockets.delete(socket); } else if (socket.data.channel === "flow") { flowSockets.delete(socket); + } else if (socket.data.channel === "smart-money") { + smartMoneySockets.delete(socket); } else if (socket.data.channel === "classifier-hits") { classifierHitSockets.delete(socket); } else { diff --git a/services/api/src/live.ts b/services/api/src/live.ts index 36b7aee..74276ec 100644 --- a/services/api/src/live.ts +++ b/services/api/src/live.ts @@ -9,6 +9,7 @@ import { fetchRecentFlowPackets, fetchRecentInferredDark, fetchRecentOptionNBBO, + fetchRecentSmartMoneyEvents, type ClickHouseClient } from "@islandflow/storage"; import type { OptionPrintQueryFilters } from "@islandflow/storage"; @@ -30,6 +31,7 @@ import { matchesOptionPrintFilters, OptionNBBOSchema, OptionPrintSchema, + SmartMoneyEventSchema, type OptionFlowFilters, type Cursor, type EquityCandle, @@ -51,6 +53,7 @@ const GENERIC_LIMIT_ENV_KEYS: Record = { "equity-quotes": "LIVE_LIMIT_EQUITY_QUOTES", "equity-joins": "LIVE_LIMIT_EQUITY_JOINS", flow: "LIVE_LIMIT_FLOW", + "smart-money": "LIVE_LIMIT_SMART_MONEY", "classifier-hits": "LIVE_LIMIT_CLASSIFIER_HITS", alerts: "LIVE_LIMIT_ALERTS", "inferred-dark": "LIVE_LIMIT_INFERRED_DARK" @@ -111,6 +114,7 @@ export const resolveGenericLiveLimits = (env: NodeJS.ProcessEnv = process.env): "equity-quotes": parseGenericLimit(env, "equity-quotes", DEFAULT_GENERIC_LIMIT), "equity-joins": parseGenericLimit(env, "equity-joins", DEFAULT_GENERIC_LIMIT), flow: parseGenericLimit(env, "flow", DEFAULT_GENERIC_LIMIT), + "smart-money": parseGenericLimit(env, "smart-money", DEFAULT_GENERIC_LIMIT), "classifier-hits": parseGenericLimit(env, "classifier-hits", DEFAULT_GENERIC_LIMIT), alerts: parseGenericLimit(env, "alerts", DEFAULT_GENERIC_LIMIT), "inferred-dark": parseGenericLimit(env, "inferred-dark", DEFAULT_GENERIC_LIMIT) @@ -185,6 +189,14 @@ const getGenericConfig = (limits: GenericLiveLimits): { cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), fetchRecent: fetchRecentFlowPackets }, + "smart-money": { + redisKey: "live:smart-money", + cursorField: "smart-money", + limit: limits["smart-money"], + parse: (value) => SmartMoneyEventSchema.parse(value), + cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), + fetchRecent: fetchRecentSmartMoneyEvents + }, "classifier-hits": { redisKey: "live:classifier-hits", cursorField: "classifier-hits", diff --git a/services/api/tests/live.test.ts b/services/api/tests/live.test.ts index 9b0ce07..3cb789e 100644 --- a/services/api/tests/live.test.ts +++ b/services/api/tests/live.test.ts @@ -154,6 +154,7 @@ describe("LiveStateManager", () => { "equity-quotes": 10000, "equity-joins": 10000, flow: 2, + "smart-money": 10000, "classifier-hits": 10000, alerts: 10000, "inferred-dark": 10000 diff --git a/services/compute/src/index.ts b/services/compute/src/index.ts index 8dc6c64..1e75bd5 100644 --- a/services/compute/src/index.ts +++ b/services/compute/src/index.ts @@ -8,6 +8,7 @@ import { SUBJECT_EQUITY_QUOTES, SUBJECT_INFERRED_DARK, SUBJECT_FLOW_PACKETS, + SUBJECT_SMART_MONEY_EVENTS, SUBJECT_OPTION_NBBO, SUBJECT_OPTION_SIGNAL_PRINTS, STREAM_ALERTS, @@ -17,6 +18,7 @@ import { STREAM_EQUITY_QUOTES, STREAM_INFERRED_DARK, STREAM_FLOW_PACKETS, + STREAM_SMART_MONEY_EVENTS, STREAM_OPTION_NBBO, STREAM_OPTION_SIGNAL_PRINTS, buildDurableConsumer, @@ -32,11 +34,13 @@ import { ensureEquityPrintJoinsTable, ensureInferredDarkTable, ensureFlowPacketsTable, + ensureSmartMoneyEventsTable, insertAlert, insertClassifierHit, insertEquityPrintJoin, insertInferredDark, - insertFlowPacket + insertFlowPacket, + insertSmartMoneyEvent } from "@islandflow/storage"; import { AlertEventSchema, @@ -46,6 +50,7 @@ import { EquityQuoteSchema, InferredDarkEventSchema, FlowPacketSchema, + SmartMoneyEventSchema, OptionNBBOSchema, OptionPrintSchema, type AlertEvent, @@ -55,11 +60,16 @@ import { type EquityPrintJoin, type InferredDarkEvent, type FlowPacket, + type SmartMoneyEvent, type OptionNBBO, type OptionPrint } from "@islandflow/types"; import { z } from "zod"; -import { evaluateClassifiers, type ClassifierConfig } from "./classifiers"; +import type { ClassifierConfig } from "./classifiers"; +import { + buildSmartMoneyEventFromPacket, + deriveClassifierHitsFromSmartMoneyEvent +} from "./parent-events"; import { parseContractId } from "./contracts"; import { createDarkInferenceState, @@ -886,7 +896,23 @@ const emitClassifiers = async ( js: Awaited>["js"], packet: FlowPacket ): Promise => { - const hits = evaluateClassifiers(packet, classifierConfig); + let smartMoneyEvent: SmartMoneyEvent; + try { + smartMoneyEvent = SmartMoneyEventSchema.parse(buildSmartMoneyEventFromPacket(packet)); + await insertSmartMoneyEvent(clickhouse, smartMoneyEvent); + await publishJson(js, SUBJECT_SMART_MONEY_EVENTS, smartMoneyEvent); + } catch (error) { + if (isExpectedShutdownNatsError(error)) { + return; + } + logger.error("failed to emit smart money event", { + error: error instanceof Error ? error.message : String(error), + packet_id: packet.id + }); + return; + } + + const hits = deriveClassifierHitsFromSmartMoneyEvent(smartMoneyEvent); if (hits.length === 0) { return; } @@ -922,7 +948,7 @@ const emitClassifiers = async ( source_ts: packet.source_ts, ingest_ts: packet.ingest_ts, seq: packet.seq, - trace_id: `alert:${packet.id}`, + trace_id: `alert:${smartMoneyEvent.event_id}`, score, severity, hits: hitEvents.map((hit) => ({ @@ -931,7 +957,11 @@ const emitClassifiers = async ( direction: hit.direction, explanations: hit.explanations })), - evidence_refs: [packet.id, ...packet.members] + evidence_refs: [smartMoneyEvent.event_id, packet.id, ...packet.members], + ...(smartMoneyEvent.primary_profile_id + ? { primary_profile_id: smartMoneyEvent.primary_profile_id } + : {}), + profile_scores: smartMoneyEvent.profile_scores }); try { @@ -1100,6 +1130,19 @@ const run = async () => { num_replicas: 1 }); + await ensureStream(jsm, { + name: STREAM_SMART_MONEY_EVENTS, + subjects: [SUBJECT_SMART_MONEY_EVENTS], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + await ensureStream(jsm, { name: STREAM_EQUITY_JOINS, subjects: [SUBJECT_EQUITY_JOINS], @@ -1173,6 +1216,7 @@ const run = async () => { await retry("clickhouse table init", 120, 500, async () => { await ensureFlowPacketsTable(clickhouse); + await ensureSmartMoneyEventsTable(clickhouse); await ensureEquityPrintJoinsTable(clickhouse); await ensureInferredDarkTable(clickhouse); await ensureClassifierHitsTable(clickhouse); diff --git a/services/compute/src/parent-events.ts b/services/compute/src/parent-events.ts new file mode 100644 index 0000000..f81c842 --- /dev/null +++ b/services/compute/src/parent-events.ts @@ -0,0 +1,320 @@ +import { + SmartMoneyEventSchema, + type ClassifierHit, + type FlowPacket, + type SmartMoneyDirection, + type SmartMoneyEvent, + type SmartMoneyFeatures, + type SmartMoneyProfileId, + type SmartMoneyProfileScore +} from "@islandflow/types"; +import { parseContractId } from "./contracts"; + +const MS_PER_DAY = 86_400_000; +const SPECIAL_CONDITIONS = new Set(["AUCTION", "CROSS", "OPENING", "CLOSING", "COMPLEX", "SPREAD"]); + +const clamp = (value: number, min = 0, max = 1): number => { + if (!Number.isFinite(value)) { + return min; + } + return Math.max(min, Math.min(max, value)); +}; + +const numberFeature = (packet: FlowPacket, key: string): number => { + const value = packet.features[key]; + return typeof value === "number" && Number.isFinite(value) ? value : 0; +}; + +const stringFeature = (packet: FlowPacket, key: string): string => { + const value = packet.features[key]; + return typeof value === "string" ? value : ""; +}; + +const boolFeature = (packet: FlowPacket, key: string): boolean | null => { + const value = packet.features[key]; + return typeof value === "boolean" ? value : null; +}; + +const confidenceBand = (probability: number): SmartMoneyProfileScore["confidence_band"] => { + if (probability >= 0.72) { + return "high"; + } + if (probability >= 0.52) { + return "medium"; + } + return "low"; +}; + +const score = ( + profile_id: SmartMoneyProfileId, + probability: number, + direction: SmartMoneyDirection, + reasons: string[] +): SmartMoneyProfileScore => ({ + profile_id, + probability: clamp(probability), + confidence_band: confidenceBand(probability), + direction, + reasons +}); + +const getReferenceTs = (packet: FlowPacket): number => { + return numberFeature(packet, "end_ts") || packet.source_ts; +}; + +const getDteDays = (packet: FlowPacket): number | null => { + const contract = parseContractId(stringFeature(packet, "option_contract_id")); + if (!contract) { + return null; + } + const expiryTs = Date.parse(`${contract.expiry}T00:00:00Z`); + if (!Number.isFinite(expiryTs)) { + return null; + } + const diff = expiryTs - getReferenceTs(packet); + return diff >= 0 ? Math.ceil(diff / MS_PER_DAY) : null; +}; + +const inferDirection = (packet: FlowPacket): SmartMoneyDirection => { + const structureRights = stringFeature(packet, "structure_rights"); + const optionType = stringFeature(packet, "option_type") || parseContractId(stringFeature(packet, "option_contract_id"))?.right; + const buy = numberFeature(packet, "nbbo_aggressive_buy_ratio"); + const sell = numberFeature(packet, "nbbo_aggressive_sell_ratio"); + const sellDominant = sell >= buy + 0.12; + + if (structureRights === "C") { + return sellDominant ? "bearish" : "bullish"; + } + if (structureRights === "P") { + return sellDominant ? "bullish" : "bearish"; + } + if (optionType === "C") { + return sellDominant ? "bearish" : "bullish"; + } + if (optionType === "P") { + return sellDominant ? "bullish" : "bearish"; + } + return "neutral"; +}; + +const buildFeatures = (packet: FlowPacket): SmartMoneyFeatures => { + const contractId = stringFeature(packet, "option_contract_id"); + const contract = parseContractId(contractId); + const underlyingMid = numberFeature(packet, "underlying_mid"); + const quoteAge = numberFeature(packet, "nbbo_age_ms") || numberFeature(packet, "underlying_quote_age_ms"); + const printCount = Math.max(0, Math.round(numberFeature(packet, "count") || packet.members.length)); + const staleCount = numberFeature(packet, "nbbo_stale_count"); + const missingCount = numberFeature(packet, "nbbo_missing_count"); + const structureLegs = Math.max(0, Math.round(numberFeature(packet, "structure_legs"))); + const strikeCount = Math.max(1, Math.round(numberFeature(packet, "structure_strikes") || (contract ? 1 : 0))); + const specialCount = numberFeature(packet, "special_print_count"); + const eventTs = numberFeature(packet, "corporate_event_ts"); + const referenceTs = getReferenceTs(packet); + const expiryTs = contract ? Date.parse(`${contract.expiry}T00:00:00Z`) : Number.NaN; + + const atmProximity = + contract && underlyingMid > 0 ? Math.abs(contract.strike - underlyingMid) / underlyingMid : null; + + return { + contract_count: Math.max(1, structureLegs || 1), + print_count: printCount, + total_size: numberFeature(packet, "total_size"), + total_premium: numberFeature(packet, "total_premium"), + total_notional: numberFeature(packet, "total_notional"), + start_ts: numberFeature(packet, "start_ts") || packet.source_ts, + end_ts: numberFeature(packet, "end_ts") || packet.source_ts, + window_ms: Math.max(0, Math.round(numberFeature(packet, "window_ms"))), + ...(contractId ? { option_contract_id: contractId } : {}), + ...(contract?.right === "C" || contract?.right === "P" ? { option_type: contract.right } : {}), + dte_days: getDteDays(packet), + moneyness: contract && underlyingMid > 0 ? contract.strike / underlyingMid : null, + atm_proximity: atmProximity, + aggressor_buy_ratio: clamp(numberFeature(packet, "nbbo_aggressive_buy_ratio")), + aggressor_sell_ratio: clamp(numberFeature(packet, "nbbo_aggressive_sell_ratio")), + aggressor_ratio: clamp(numberFeature(packet, "nbbo_aggressive_ratio")), + nbbo_coverage_ratio: clamp(numberFeature(packet, "nbbo_coverage_ratio")), + nbbo_inside_ratio: clamp(numberFeature(packet, "nbbo_inside_ratio")), + nbbo_stale_ratio: printCount > 0 ? clamp((staleCount + missingCount) / printCount) : 0, + quote_age_ms: quoteAge > 0 ? quoteAge : null, + venue_count: Math.max(1, Math.round(numberFeature(packet, "venue_count") || 1)), + inter_fill_ms_mean: printCount > 1 ? numberFeature(packet, "window_ms") / Math.max(1, printCount - 1) : null, + strike_count: strikeCount, + strike_concentration: strikeCount > 0 ? clamp(1 / strikeCount) : 0, + ...(stringFeature(packet, "structure_type") ? { structure_type: stringFeature(packet, "structure_type") } : {}), + structure_legs: structureLegs, + same_size_leg_symmetry: clamp(numberFeature(packet, "same_size_leg_symmetry")), + net_directional_bias: clamp( + numberFeature(packet, "nbbo_aggressive_buy_ratio") - numberFeature(packet, "nbbo_aggressive_sell_ratio"), + -1, + 1 + ), + synthetic_iv_shock: numberFeature(packet, "execution_iv_shock") || null, + spread_widening: numberFeature(packet, "nbbo_spread_z") || null, + underlying_move_bps: numberFeature(packet, "underlying_move_bps") || null, + days_to_event: eventTs > 0 ? (eventTs - referenceTs) / MS_PER_DAY : null, + expiry_after_event: eventTs > 0 && Number.isFinite(expiryTs) ? expiryTs >= eventTs : null, + pre_event_concentration: eventTs > 0 && eventTs >= referenceTs ? clamp(1 - (eventTs - referenceTs) / (21 * MS_PER_DAY)) : null, + special_print_ratio: printCount > 0 ? clamp(specialCount / printCount) : 0 + }; +}; + +const detectSuppression = (packet: FlowPacket, features: SmartMoneyFeatures): string[] => { + const reasons: string[] = []; + const conditions = String(packet.features.conditions ?? "") + .split(",") + .map((item) => item.trim().toUpperCase()) + .filter(Boolean); + if (conditions.some((condition) => SPECIAL_CONDITIONS.has(condition)) || features.special_print_ratio >= 0.34) { + reasons.push("special_print_or_complex_context"); + } + if (features.nbbo_coverage_ratio < 0.35 || features.nbbo_stale_ratio >= 0.5) { + reasons.push("stale_or_missing_quote_context"); + } + if (features.nbbo_inside_ratio >= 0.7 && features.aggressor_ratio < 0.35) { + reasons.push("inside_market_or_cross_like_execution"); + } + return reasons; +}; + +const evaluateProfiles = ( + packet: FlowPacket, + features: SmartMoneyFeatures, + suppressed: string[] +): SmartMoneyProfileScore[] => { + const direction = inferDirection(packet); + const dte = features.dte_days ?? 999; + const structure = features.structure_type ?? ""; + const isStructure = features.structure_legs >= 2 || Boolean(structure); + const buy = features.aggressor_buy_ratio; + const sell = features.aggressor_sell_ratio; + const premiumFactor = clamp(features.total_premium / 120_000); + const sizeFactor = clamp(features.total_size / 1800); + const burstFactor = clamp(features.print_count / 8); + const quality = clamp(features.nbbo_coverage_ratio - features.nbbo_stale_ratio); + const shortDatedOtm = + dte <= 7 && features.atm_proximity !== null && features.atm_proximity >= 0.05 && features.option_type === "C"; + const nearAtm = features.atm_proximity !== null && features.atm_proximity <= 0.015; + const preEvent = + features.days_to_event !== null && + features.days_to_event >= 0 && + features.days_to_event <= 21 && + features.expiry_after_event === true; + + const scores = [ + score( + "institutional_directional", + suppressed.length > 0 || shortDatedOtm + ? 0.18 + : 0.2 + premiumFactor * 0.25 + burstFactor * 0.18 + quality * 0.16 + (buy >= 0.58 || sell >= 0.58 ? 0.12 : 0), + direction, + [ + "large_parent_event", + "directional_aggressor_mix", + ...(shortDatedOtm ? ["retail_frenzy_guard"] : []), + ...suppressed + ] + ), + score( + "retail_whale", + 0.12 + + (shortDatedOtm ? 0.28 : 0) + + burstFactor * 0.18 + + clamp(features.synthetic_iv_shock ?? 0, 0, 0.2) + + (features.total_premium < 100_000 ? 0.1 : 0), + direction, + ["short_dated_otm_attention_flow", "burst_print_pattern"] + ), + score( + "event_driven", + 0.12 + (preEvent ? 0.32 : 0) + premiumFactor * 0.14 + clamp(features.spread_widening ?? 0, 0, 0.16), + direction === "unknown" ? "neutral" : direction, + ["event_calendar_alignment", "expiry_after_event", "pre_event_concentration"] + ), + score( + "vol_seller", + 0.12 + (sell >= 0.58 ? 0.24 : 0) + (structure === "straddle" || structure === "strangle" ? 0.2 : 0) + premiumFactor * 0.14, + "neutral", + ["sell_side_premium", "short_vol_structure_evidence"] + ), + score( + "arbitrage", + 0.08 + + (isStructure ? 0.18 : 0) + + (features.same_size_leg_symmetry >= 0.7 ? 0.24 : 0) + + (Math.abs(features.net_directional_bias) <= 0.15 ? 0.18 : 0), + "neutral", + ["matched_leg_symmetry", "near_flat_directional_exposure"] + ), + score( + "hedge_reactive", + 0.1 + + (dte <= 2 && nearAtm ? 0.32 : 0) + + clamp(Math.abs(features.underlying_move_bps ?? 0) / 80, 0, 0.18) + + sizeFactor * 0.12, + direction, + ["short_dated_atm_gamma_context", "underlying_move_linkage"] + ) + ]; + + return scores.sort((a, b) => b.probability - a.probability); +}; + +export const buildSmartMoneyEventFromPacket = (packet: FlowPacket): SmartMoneyEvent => { + const features = buildFeatures(packet); + const suppressed = detectSuppression(packet, features); + const profileScores = evaluateProfiles(packet, features, suppressed); + const primary = profileScores[0] ?? null; + const abstained = !primary || primary.probability < 0.42 || suppressed.includes("stale_or_missing_quote_context"); + const underlying = stringFeature(packet, "underlying_id") || parseContractId(features.option_contract_id ?? "")?.root || "UNKNOWN"; + const eventKind = features.structure_legs >= 2 || stringFeature(packet, "packet_kind") === "structure" + ? "multi_leg_event" + : "single_leg_event"; + + return SmartMoneyEventSchema.parse({ + source_ts: packet.source_ts, + ingest_ts: packet.ingest_ts, + seq: packet.seq, + trace_id: `smartmoney:${packet.id}`, + event_id: `smartmoney:${eventKind}:${packet.id}`, + packet_ids: [packet.id], + member_print_ids: packet.members, + underlying_id: underlying, + event_kind: eventKind, + event_window_ms: features.window_ms, + features, + profile_scores: profileScores, + primary_profile_id: abstained ? null : primary?.profile_id ?? null, + primary_direction: abstained ? "unknown" : primary?.direction ?? "unknown", + abstained, + suppressed_reasons: suppressed + }); +}; + +const LEGACY_PROFILE_MAP: Record = { + institutional_directional: "smart_money_institutional_directional", + retail_whale: "smart_money_retail_whale", + event_driven: "smart_money_event_driven", + vol_seller: "smart_money_vol_seller", + arbitrage: "smart_money_arbitrage", + hedge_reactive: "smart_money_hedge_reactive" +}; + +export const deriveClassifierHitsFromSmartMoneyEvent = (event: SmartMoneyEvent): ClassifierHit[] => { + if (event.abstained || !event.primary_profile_id) { + return []; + } + + return event.profile_scores + .filter((entry) => entry.profile_id === event.primary_profile_id || entry.probability >= 0.5) + .slice(0, 3) + .map((entry) => ({ + classifier_id: LEGACY_PROFILE_MAP[entry.profile_id], + confidence: entry.probability, + direction: entry.direction, + explanations: [ + `Profile ${entry.profile_id} probability ${(entry.probability * 100).toFixed(0)}%.`, + ...entry.reasons, + ...event.suppressed_reasons.map((reason) => `Suppression guard: ${reason}.`) + ] + })); +}; diff --git a/services/compute/tests/parent-events.test.ts b/services/compute/tests/parent-events.test.ts new file mode 100644 index 0000000..ac0ac81 --- /dev/null +++ b/services/compute/tests/parent-events.test.ts @@ -0,0 +1,58 @@ +import { describe, expect, it } from "bun:test"; +import { + buildSmartMoneyEventFromPacket, + deriveClassifierHitsFromSmartMoneyEvent +} from "../src/parent-events"; +import { buildFlowPacket } from "./helpers"; + +describe("smart money parent events", () => { + it("scores institutional directional parent events and derives legacy hits", () => { + const packet = buildFlowPacket({ + id: "flowpacket:institutional", + source_ts: Date.parse("2025-01-15T15:00:00Z"), + features: { + option_contract_id: "SPY-2025-02-21-450-C", + underlying_id: "SPY", + count: 8, + window_ms: 450, + total_size: 2200, + total_premium: 180_000, + total_notional: 18_000_000, + nbbo_coverage_ratio: 0.92, + nbbo_aggressive_ratio: 0.82, + nbbo_aggressive_buy_ratio: 0.78, + nbbo_aggressive_sell_ratio: 0.04, + nbbo_inside_ratio: 0.08, + underlying_mid: 448 + } + }); + + const event = buildSmartMoneyEventFromPacket(packet); + expect(event.event_kind).toBe("single_leg_event"); + expect(event.primary_profile_id).toBe("institutional_directional"); + expect(event.primary_direction).toBe("bullish"); + + const hits = deriveClassifierHitsFromSmartMoneyEvent(event); + expect(hits[0]?.classifier_id).toBe("smart_money_institutional_directional"); + }); + + it("abstains when quote context is stale or missing", () => { + const packet = buildFlowPacket({ + id: "flowpacket:stale", + features: { + option_contract_id: "SPY-2025-02-21-450-C", + count: 8, + window_ms: 450, + total_size: 2200, + total_premium: 180_000, + nbbo_coverage_ratio: 0.1, + nbbo_missing_count: 8 + } + }); + + const event = buildSmartMoneyEventFromPacket(packet); + expect(event.abstained).toBe(true); + expect(event.primary_profile_id).toBeNull(); + expect(event.suppressed_reasons).toContain("stale_or_missing_quote_context"); + }); +});