From 27b0a399e6da51f7512cfbd5593b3b5273938363 Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Tue, 28 Apr 2026 16:29:44 -0400 Subject: [PATCH] Add smart-money option signal path and tape filters --- .beads/issues.jsonl | 2 + README.md | 76 ++- apps/web/app/globals.css | 49 ++ apps/web/app/terminal.tsx | 248 ++++++++-- packages/bus/src/subjects.ts | 2 + packages/storage/src/clickhouse.ts | 101 +++- packages/storage/src/option-prints.ts | 26 +- packages/storage/tests/option-prints.test.ts | 27 +- packages/types/src/events.ts | 14 +- packages/types/src/index.ts | 1 + packages/types/src/live.ts | 17 +- packages/types/src/options-flow.ts | 464 ++++++++++++++++++ packages/types/tests/live.test.ts | 18 +- packages/types/tests/options-flow.test.ts | 132 +++++ services/api/src/index.ts | 205 ++++++-- services/api/src/live.ts | 58 ++- services/api/tests/live.test.ts | 77 +++ services/compute/src/index.ts | 44 +- .../ingest-equities/src/adapters/synthetic.ts | 115 +++-- services/ingest-equities/src/index.ts | 12 +- .../ingest-options/src/adapters/synthetic.ts | 203 +++++++- services/ingest-options/src/index.ts | 103 +++- services/replay/src/index.ts | 8 + 23 files changed, 1827 insertions(+), 175 deletions(-) create mode 100644 .beads/issues.jsonl create mode 100644 packages/types/src/options-flow.ts create mode 100644 packages/types/tests/options-flow.test.ts diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl new file mode 100644 index 0000000..9b58daa --- /dev/null +++ b/.beads/issues.jsonl @@ -0,0 +1,2 @@ +{"_type":"issue","id":"islandflow-e4r","title":"Implement smart-money flow filtering and synthetic firehose modes","description":"Implement the approved multi-surface plan for named synthetic market profiles, options raw-vs-signal filtering, live/API filter contracts, Tape page client-side flow filters, firehose-readiness improvements, tests, and README updates.","status":"closed","priority":1,"issue_type":"feature","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T20:10:49Z","created_by":"dirtydishes","updated_at":"2026-04-28T20:29:29Z","started_at":"2026-04-28T20:10:53Z","closed_at":"2026-04-28T20:29:29Z","close_reason":"Implemented synthetic market profiles, options signal-path filtering, signal-aware API/replay contracts, Tape page filters, tests, and README updates. Follow-up tracked in islandflow-biq.","dependency_count":0,"dependent_count":0,"comment_count":0} +{"_type":"issue","id":"islandflow-biq","title":"Finish raw live options delivery and filter/backpressure observability","description":"The smart-money signal path and Tape filters are in place, but the next firehose pass should finish server-side selective raw live delivery for options subscriptions and add explicit filtered-out/backpressure observability for API/web counters. This was discovered while landing islandflow-e4r.\n","status":"open","priority":2,"issue_type":"task","owner":"dishes@dpdrm.com","created_at":"2026-04-28T20:28:58Z","created_by":"dirtydishes","updated_at":"2026-04-28T20:28:58Z","dependencies":[{"issue_id":"islandflow-biq","depends_on_id":"islandflow-e4r","type":"discovered-from","created_at":"2026-04-28T16:28:58Z","created_by":"dirtydishes","metadata":"{}"}],"dependency_count":0,"dependent_count":0,"comment_count":0} diff --git a/README.md b/README.md index 314b051..f0a2b60 100644 --- a/README.md +++ b/README.md @@ -125,6 +125,15 @@ All runtime configuration comes from `.env`. - `OPTIONS_INGEST_ADAPTER` (`synthetic` | `alpaca` | `ibkr` | `databento`) - `EQUITIES_INGEST_ADAPTER` (`synthetic` | `alpaca`) - `EMIT_INTERVAL_MS` (synthetic emit cadence) +- `SYNTHETIC_MARKET_MODE` (`realistic` | `active` | `firehose`, default `realistic`) +- `SYNTHETIC_OPTIONS_MODE` (optional per-service override; falls back to `SYNTHETIC_MARKET_MODE`) +- `SYNTHETIC_EQUITIES_MODE` (optional per-service override; falls back to `SYNTHETIC_MARKET_MODE`) + +### Synthetic mode profiles + +- `realistic` is the default local mode. Options produce materially more ordinary prints, fewer repeated bursts, and fewer alert-driving sweeps/spikes. Equities produce smaller batches and less relentless off-exchange activity. +- `active` is a busier demo mode that still leaves meaningful visible history in the UI. +- `firehose` is the stress profile for backpressure, hot-window eviction, and Databento-readiness validation. ### Options adapter settings @@ -142,6 +151,30 @@ All runtime configuration comes from `.env`. - Classifiers: `CLASSIFIER_SWEEP_MIN_PREMIUM`, `CLASSIFIER_SWEEP_MIN_COUNT`, `CLASSIFIER_SWEEP_MIN_PREMIUM_Z`, `CLASSIFIER_SPIKE_MIN_PREMIUM`, `CLASSIFIER_SPIKE_MIN_SIZE`, `CLASSIFIER_SPIKE_MIN_PREMIUM_Z`, `CLASSIFIER_SPIKE_MIN_SIZE_Z`, `CLASSIFIER_Z_MIN_SAMPLES`, `CLASSIFIER_MIN_NBBO_COVERAGE`, `CLASSIFIER_MIN_AGGRESSOR_RATIO`, `CLASSIFIER_0DTE_MAX_ATM_PCT`, `CLASSIFIER_0DTE_MIN_PREMIUM`, `CLASSIFIER_0DTE_MIN_SIZE` - Dark inference: `EQUITY_QUOTE_MAX_AGE_MS`, `DARK_INFER_WINDOW_MS`, `DARK_INFER_COOLDOWN_MS`, `DARK_INFER_MIN_BLOCK_SIZE`, `DARK_INFER_MIN_ACCUM_SIZE`, `DARK_INFER_MIN_ACCUM_COUNT`, `DARK_INFER_MIN_PRINT_SIZE`, `DARK_INFER_MAX_EVIDENCE`, `DARK_INFER_MAX_SPREAD_PCT` +### Options signal filtering + +- `OPTIONS_SIGNAL_MODE` (`smart-money` | `balanced` | `all`, default `smart-money`) +- `OPTIONS_SIGNAL_MIN_NOTIONAL` (default `10000`) +- `OPTIONS_SIGNAL_ETF_MIN_NOTIONAL` (default `50000`) +- `OPTIONS_SIGNAL_BID_SIDE_MIN_NOTIONAL` (default `25000`) +- `OPTIONS_SIGNAL_MID_MIN_NOTIONAL` (default `20000`) +- `OPTIONS_SIGNAL_NBBO_MAX_AGE_MS` (default `1500`) +- `OPTIONS_SIGNAL_ETF_UNDERLYINGS` (default `SPY,QQQ,IWM,DIA,TLT,GLD,SLV,XLF,XLE,XLV,XLI,XLP,XLU,XLY,SMH,ARKK`) + +Default `smart-money` behavior: + +- reject sub-`10k` options prints, +- reject ETF prints below `50k`, +- reject `B` / `BB` prints below `25k`, +- reject non-`SWEEP` / non-`ISO` `MID` prints below `20k`, +- require `50k` when NBBO is missing or stale, +- auto-keep `100k+`, +- keep ask-side `A` / `AA` prints at `10k+`, +- keep `SWEEP` / `ISO` prints at `25k+`, +- keep `500+` contract prints at `10k+`. + +`balanced` uses the same shape with lower thresholds. `all` marks every option print as signal-passing. + ### Candles - `CANDLE_INTERVALS_MS`, `CANDLE_MAX_LATE_MS`, `CANDLE_CACHE_LIMIT`, `CANDLE_DELIVER_POLICY`, `CANDLE_CONSUMER_RESET` @@ -156,6 +189,7 @@ All runtime configuration comes from `.env`. - `NEXT_PUBLIC_LIVE_HOT_WINDOW` (frontend hot live window cap; default `2000`) - `NEXT_PUBLIC_PINNED_EVIDENCE_TTL_MS` (pinned evidence TTL; default `1200000`) - `NEXT_PUBLIC_PINNED_EVIDENCE_MAX_ITEMS` (pinned evidence cache guardrail; default `4000`) +- `NEXT_PUBLIC_FLOW_FILTER_PRESET` (`smart-money` | `balanced` | `all`, default `smart-money`) ### Replay service @@ -170,8 +204,48 @@ All runtime configuration comes from `.env`. - Python dependencies are required only for IBKR/Databento sidecars (`services/ingest-options/py/requirements.txt`). - Candle construction is server-side; the client consumes prebuilt OHLC events. +- Option prints now persist as enriched raw rows and can be queried as either: + - `view=signal` — default live/UI path and compute input. + - `view=raw` — audit/debug path that preserves every stored print. +- The default Tape page options/packets posture is now stock-only, hides `B` / `BB`, keeps calls and puts visible, and applies in-memory min-notional controls immediately. - Live retention uses a two-tier model: - API/Redis maintain a bounded hot cache per live generic channel. - - UI keeps a bounded hot window for rendering performance. + - UI keeps a bounded hot window for rendering performance around the signal view rather than raw noise. - Alert/drawer evidence is pinned and hydrated by id/trace so details remain inspectable after hot-window eviction. +- Firehose-readiness strategy: + - preserve raw ingest for storage/replay, + - feed compute and default live UI from the filtered signal path, + - add filterable live subscription contracts now so selective delivery can move server-side without reshaping the protocol later. - This repository is for personal, non-redistributed usage. + +## Useful Examples + +Realistic local demo: + +```bash +SYNTHETIC_MARKET_MODE=realistic \ +OPTIONS_SIGNAL_MODE=smart-money \ +bun run dev +``` + +Active demo: + +```bash +SYNTHETIC_MARKET_MODE=active bun run dev +``` + +Firehose stress test: + +```bash +SYNTHETIC_MARKET_MODE=firehose \ +NEXT_PUBLIC_LIVE_HOT_WINDOW=2000 \ +bun run dev +``` + +Show raw options flow for debugging: + +```text +/prints/options?view=raw&security=all +/history/options?view=raw&security=all&before_ts=&before_seq= +/replay/options?view=raw&security=all&after_ts=&after_seq= +``` diff --git a/apps/web/app/globals.css b/apps/web/app/globals.css index e83dce9..ecb69b0 100644 --- a/apps/web/app/globals.css +++ b/apps/web/app/globals.css @@ -419,6 +419,55 @@ h3 { gap: 10px; } +.flow-filter-panel { + display: flex; + flex-wrap: wrap; + justify-content: flex-end; + gap: 10px 16px; + padding: 10px 12px; + border: 1px solid var(--border); + border-radius: 12px; + background: linear-gradient(180deg, rgba(255, 255, 255, 0.04), rgba(255, 255, 255, 0.02)); +} + +.flow-filter-group { + display: flex; + flex-wrap: wrap; + align-items: center; + gap: 8px; +} + +.flow-filter-label { + color: var(--muted); + font-size: 0.72rem; + letter-spacing: 0.08em; + text-transform: uppercase; +} + +.flow-filter-check { + display: inline-flex; + align-items: center; + gap: 6px; + font-size: 0.84rem; + text-transform: uppercase; +} + +.filter-chip { + border: 1px solid var(--border); + border-radius: 999px; + background: rgba(255, 255, 255, 0.03); + color: var(--text); + padding: 6px 10px; + font: inherit; + cursor: pointer; +} + +.filter-chip.is-active { + border-color: rgba(127, 234, 170, 0.6); + background: rgba(127, 234, 170, 0.14); + color: var(--accent-strong); +} + .overview-strip, .replay-matrix { display: grid; diff --git a/apps/web/app/terminal.tsx b/apps/web/app/terminal.tsx index 2eb5da5..dedf475 100644 --- a/apps/web/app/terminal.tsx +++ b/apps/web/app/terminal.tsx @@ -24,9 +24,18 @@ import type { InferredDarkEvent, LiveServerMessage, LiveSubscription, + OptionFlowFilters, + OptionNbboSide, + OptionSecurityType, + OptionType, OptionNBBO, OptionPrint } from "@islandflow/types"; +import { + getSubscriptionKey as getLiveSubscriptionKey, + matchesFlowPacketFilters, + matchesOptionPrintFilters +} from "@islandflow/types"; import { createChart, type IChartApi, type SeriesMarker, type UTCTimestamp } from "lightweight-charts"; const parseBoundedInt = ( @@ -61,6 +70,7 @@ const PINNED_EVIDENCE_MAX_ITEMS = parseBoundedInt( const NBBO_MAX_AGE_MS = Number(process.env.NEXT_PUBLIC_NBBO_MAX_AGE_MS); const NBBO_MAX_AGE_MS_SAFE = Number.isFinite(NBBO_MAX_AGE_MS) && NBBO_MAX_AGE_MS > 0 ? NBBO_MAX_AGE_MS : 1000; +const FLOW_FILTER_PRESET = process.env.NEXT_PUBLIC_FLOW_FILTER_PRESET ?? "smart-money"; const LOCAL_HOSTS = new Set(["localhost", "127.0.0.1"]); const CANDLE_INTERVALS = [ { label: "1m", ms: 60000 }, @@ -614,6 +624,33 @@ const getJoinBoolean = (join: EquityPrintJoin, key: string): boolean => { type NbboSide = "AA" | "A" | "B" | "BB"; +const DEFAULT_FLOW_SIDES: OptionNbboSide[] = ["AA", "A", "MID"]; +const DEFAULT_FLOW_OPTION_TYPES: OptionType[] = ["call", "put"]; +const DEFAULT_FLOW_SECURITY_TYPES: OptionSecurityType[] = ["stock"]; + +const buildDefaultFlowFilters = (): OptionFlowFilters => ({ + view: "signal", + securityTypes: DEFAULT_FLOW_SECURITY_TYPES, + nbboSides: DEFAULT_FLOW_SIDES, + optionTypes: DEFAULT_FLOW_OPTION_TYPES, + minNotional: + FLOW_FILTER_PRESET === "all" + ? undefined + : FLOW_FILTER_PRESET === "balanced" + ? 5_000 + : undefined +}); + +const toggleFilterValue = (values: T[] | undefined, value: T, enabled: boolean): T[] => { + const current = new Set(values ?? []); + if (enabled) { + current.add(value); + } else { + current.delete(value); + } + return [...current].sort(); +}; + const classifyNbboSide = (price: number, quote: OptionNBBO | null | undefined): NbboSide | null => { if (!quote || !Number.isFinite(price)) { return null; @@ -935,6 +972,7 @@ type TapeConfig = { getReplayKey?: (item: T) => string | null; replaySourceKey?: string | null; onReplaySourceKey?: (key: string | null) => void; + queryParams?: Record; }; const useTape = ( @@ -947,6 +985,7 @@ const useTape = ( const getReplayKey = config.getReplayKey ?? extractTracePrefix; const replaySourceKey = config.replaySourceKey ?? null; const onReplaySourceKey = config.onReplaySourceKey; + const queryParams = config.queryParams; const [status, setStatus] = useState("connecting"); const [items, setItems] = useState([]); const [lastUpdate, setLastUpdate] = useState(null); @@ -1053,6 +1092,11 @@ const useTape = ( try { const url = new URL(buildApiUrl(latestPath)); url.searchParams.set("limit", "1"); + for (const [key, value] of Object.entries(queryParams ?? {})) { + if (value) { + url.searchParams.set(key, value); + } + } if (replaySourceKey) { url.searchParams.set("source", replaySourceKey); } @@ -1076,7 +1120,7 @@ const useTape = ( return () => { active = false; }; - }, [mode, latestPath, getItemTs, replaySourceKey]); + }, [mode, latestPath, getItemTs, replaySourceKey, queryParams]); useEffect(() => { if (mode !== "live" || config.liveEnabled === false) { @@ -1196,6 +1240,11 @@ const useTape = ( url.searchParams.set("after_ts", cursor.ts.toString()); url.searchParams.set("after_seq", cursor.seq.toString()); url.searchParams.set("limit", batchSize.toString()); + for (const [key, value] of Object.entries(queryParams ?? {})) { + if (value) { + url.searchParams.set(key, value); + } + } const desiredSource = replaySourceKey ?? replaySourceRef.current; if (desiredSource) { url.searchParams.set("source", desiredSource); @@ -1309,7 +1358,8 @@ const useTape = ( getItemTs, getReplayKey, replaySourceKey, - onReplaySourceKey + onReplaySourceKey, + queryParams ]); return { @@ -1589,21 +1639,11 @@ type LiveSessionState = { chartOverlay: EquityPrint[]; }; -const getLiveSubscriptionKey = (subscription: LiveSubscription): string => { - switch (subscription.channel) { - case "equity-candles": - return `${subscription.channel}|${subscription.underlying_id}|${subscription.interval_ms}`; - case "equity-overlay": - return `${subscription.channel}|${subscription.underlying_id}`; - default: - return subscription.channel; - } -}; - const getLiveManifest = ( pathname: string, chartTicker: string, - chartIntervalMs: number + chartIntervalMs: number, + flowFilters: OptionFlowFilters ): LiveSubscription[] => { const chartSubs: LiveSubscription[] = [ { channel: "equity-candles", underlying_id: chartTicker, interval_ms: chartIntervalMs }, @@ -1612,10 +1652,10 @@ const getLiveManifest = ( if (pathname === "/tape") { return [ - { channel: "options" }, + { channel: "options", filters: flowFilters }, { channel: "nbbo" }, { channel: "equities" }, - { channel: "flow" } + { channel: "flow", filters: flowFilters } ]; } @@ -1645,7 +1685,8 @@ const useLiveSession = ( enabled: boolean, pathname: string, chartTicker: string, - chartIntervalMs: number + chartIntervalMs: number, + flowFilters: OptionFlowFilters ): LiveSessionState => { const [status, setStatus] = useState(enabled ? "connecting" : "disconnected"); const [lastUpdate, setLastUpdate] = useState(null); @@ -1664,8 +1705,8 @@ const useLiveSession = ( const subscribedKeysRef = useRef>(new Set()); const subscribedMapRef = useRef>(new Map()); const manifest = useMemo( - () => getLiveManifest(pathname, chartTicker.toUpperCase(), chartIntervalMs), - [pathname, chartTicker, chartIntervalMs] + () => getLiveManifest(pathname, chartTicker.toUpperCase(), chartIntervalMs, flowFilters), + [pathname, chartTicker, chartIntervalMs, flowFilters] ); useEffect(() => { @@ -3079,6 +3120,7 @@ const useTerminalState = () => { const [selectedDarkEvent, setSelectedDarkEvent] = useState(null); const [selectedClassifierHit, setSelectedClassifierHit] = useState(null); const [filterInput, setFilterInput] = useState(""); + const [flowFilters, setFlowFilters] = useState(() => buildDefaultFlowFilters()); const [chartIntervalMs, setChartIntervalMs] = useState(CANDLE_INTERVALS[0].ms); const activeTickers = useMemo(() => { const parts = filterInput @@ -3089,7 +3131,13 @@ const useTerminalState = () => { }, [filterInput]); const tickerSet = useMemo(() => new Set(activeTickers), [activeTickers]); const chartTicker = useMemo(() => activeTickers[0] ?? "SPY", [activeTickers]); - const liveSession = useLiveSession(mode === "live", pathname, chartTicker, chartIntervalMs); + const liveSession = useLiveSession( + mode === "live", + pathname, + chartTicker, + chartIntervalMs, + flowFilters + ); const handleReplaySource = useCallback((value: string | null) => { setReplaySource(value); @@ -3115,6 +3163,20 @@ const useTerminalState = () => { classifierScroll.isAtTopRef ); const disableReplayGrouping = useCallback(() => null, []); + const optionQueryParams = useMemo>( + () => ({ + view: flowFilters.view ?? "signal", + security: + flowFilters.securityTypes?.length === 1 ? flowFilters.securityTypes[0] : undefined, + side: flowFilters.nbboSides?.length ? flowFilters.nbboSides.join(",") : undefined, + type: flowFilters.optionTypes?.length ? flowFilters.optionTypes.join(",") : undefined, + min_notional: + typeof flowFilters.minNotional === "number" + ? String(flowFilters.minNotional) + : undefined + }), + [flowFilters] + ); const options = useTape({ mode, @@ -3128,7 +3190,8 @@ const useTerminalState = () => { captureScroll: optionsAnchor.capture, onNewItems: optionsScroll.onNewItems, getReplayKey: extractReplaySource, - onReplaySourceKey: handleReplaySource + onReplaySourceKey: handleReplaySource, + queryParams: optionQueryParams }); const equities = useTape({ @@ -3672,13 +3735,16 @@ const useTerminalState = () => { ); const filteredOptions = useMemo(() => { - if (tickerSet.size === 0) { - return optionsFeed.items; - } - return optionsFeed.items.filter((print) => - matchesTicker(extractUnderlying(normalizeContractId(print.option_contract_id))) - ); - }, [optionsFeed.items, matchesTicker, tickerSet]); + return optionsFeed.items.filter((print) => { + if (!matchesOptionPrintFilters(print, flowFilters)) { + return false; + } + if (tickerSet.size === 0) { + return true; + } + return matchesTicker(extractUnderlying(normalizeContractId(print.option_contract_id))); + }); + }, [flowFilters, optionsFeed.items, matchesTicker, tickerSet]); const filteredEquities = useMemo(() => { if (tickerSet.size === 0) { @@ -3698,13 +3764,16 @@ const useTerminalState = () => { }, [resolvedEquityJoinMap, equityPrintMap, inferredDarkFeed.items, matchesTicker, tickerSet]); const filteredFlow = useMemo(() => { - if (tickerSet.size === 0) { - return flowFeed.items; - } - return flowFeed.items.filter((packet) => - matchesTicker(extractUnderlying(extractPacketContract(packet))) - ); - }, [flowFeed.items, extractPacketContract, matchesTicker, tickerSet]); + return flowFeed.items.filter((packet) => { + if (!matchesFlowPacketFilters(packet, flowFilters)) { + return false; + } + if (tickerSet.size === 0) { + return true; + } + return matchesTicker(extractUnderlying(extractPacketContract(packet))); + }); + }, [flowFeed.items, flowFilters, extractPacketContract, matchesTicker, tickerSet]); const filteredAlerts = useMemo(() => { if (tickerSet.size === 0) { @@ -4000,6 +4069,8 @@ const useTerminalState = () => { setSelectedClassifierHit, filterInput, setFilterInput, + flowFilters, + setFlowFilters, chartIntervalMs, setChartIntervalMs, optionsScroll, @@ -4088,6 +4159,101 @@ const PageFrame = ({ title, actions, children }: PageFrameProps) => { ); }; +const FlowFilterControls = () => { + const state = useTerminal(); + const filters = state.flowFilters; + + const toggleSecurity = (value: OptionSecurityType, enabled: boolean) => { + state.setFlowFilters((prev) => ({ + ...prev, + securityTypes: toggleFilterValue(prev.securityTypes, value, enabled) + })); + }; + + const toggleSide = (value: OptionNbboSide, enabled: boolean) => { + state.setFlowFilters((prev) => ({ + ...prev, + nbboSides: toggleFilterValue(prev.nbboSides, value, enabled) + })); + }; + + const toggleOptionType = (value: OptionType, enabled: boolean) => { + state.setFlowFilters((prev) => ({ + ...prev, + optionTypes: toggleFilterValue(prev.optionTypes, value, enabled) + })); + }; + + const applyMinNotional = (value: number | undefined) => { + state.setFlowFilters((prev) => ({ + ...prev, + minNotional: value + })); + }; + + return ( +
+
+ Security + {(["stock", "etf"] as OptionSecurityType[]).map((value) => ( + + ))} +
+
+ Side + {(["AA", "A", "MID", "B", "BB"] as OptionNbboSide[]).map((value) => ( + + ))} +
+
+ Type + {(["call", "put"] as OptionType[]).map((value) => ( + + ))} +
+
+ Min Notional + {[ + { label: "All signal", value: undefined }, + { label: ">= 25k", value: 25_000 }, + { label: ">= 50k", value: 50_000 }, + { label: ">= 100k", value: 100_000 } + ].map((preset) => ( + + ))} +
+
+ ); +}; + type PaneProps = { title: string; status?: ReactNode; @@ -4250,8 +4416,8 @@ const OptionsPane = ({ limit }: OptionsPaneProps) => { const nbboAge = quote ? Math.abs(print.ts - quote.ts) : null; const nbboStale = nbboAge !== null && nbboAge > NBBO_MAX_AGE_MS_SAFE; const nbboMid = quote ? (quote.bid + quote.ask) / 2 : null; - const nbboSide = classifyNbboSide(print.price, quote); - const notional = print.price * print.size * 100; + const nbboSide = print.nbbo_side ?? classifyNbboSide(print.price, quote); + const notional = print.notional ?? print.price * print.size * 100; return (
@@ -4295,11 +4461,13 @@ const OptionsPane = ({ limit }: OptionsPaneProps) => { ) : null} - {nbboStale ? Stale : null} + {print.nbbo_side === "STALE" || nbboStale ? Stale : null}
) : (
- NBBO missing + + {print.nbbo_side === "STALE" ? "NBBO stale" : "NBBO missing"} +
)} @@ -5051,7 +5219,7 @@ export function OverviewRoute() { export function TapeRoute() { return ( - + }>
diff --git a/packages/bus/src/subjects.ts b/packages/bus/src/subjects.ts index 82274c4..24fc427 100644 --- a/packages/bus/src/subjects.ts +++ b/packages/bus/src/subjects.ts @@ -1,5 +1,7 @@ export const STREAM_OPTION_PRINTS = "OPTIONS_PRINTS"; export const SUBJECT_OPTION_PRINTS = "options.prints"; +export const STREAM_OPTION_SIGNAL_PRINTS = "OPTIONS_SIGNAL_PRINTS"; +export const SUBJECT_OPTION_SIGNAL_PRINTS = "options.prints.signal"; export const STREAM_OPTION_NBBO = "OPTIONS_NBBO"; export const SUBJECT_OPTION_NBBO = "options.nbbo"; export const STREAM_EQUITY_PRINTS = "EQUITY_PRINTS"; diff --git a/packages/storage/src/clickhouse.ts b/packages/storage/src/clickhouse.ts index 44e04ed..5656214 100644 --- a/packages/storage/src/clickhouse.ts +++ b/packages/storage/src/clickhouse.ts @@ -20,11 +20,14 @@ import type { InferredDarkEvent, FlowPacket, OptionNBBO, - OptionPrint + OptionPrint, + OptionFlowFilters, + OptionFlowView } from "@islandflow/types"; import { normalizeOptionPrint, optionPrintsTableDDL, + optionPrintsTableMigrations, OPTION_PRINTS_TABLE } from "./option-prints"; import { normalizeOptionNBBO, optionNBBOTableDDL, OPTION_NBBO_TABLE } from "./option-nbbo"; @@ -221,6 +224,9 @@ export const ensureOptionPrintsTable = async ( await client.exec({ query: optionPrintsTableDDL() }); + for (const query of optionPrintsTableMigrations()) { + await client.exec({ query }); + } }; export const ensureOptionNBBOTable = async ( @@ -499,19 +505,78 @@ const normalizeNumericFields = ( const normalizeOptionRow = (row: unknown): unknown => { if (row && typeof row === "object") { - return normalizeNumericFields(row as Record, [ + const record = normalizeNumericFields(row as Record, [ "source_ts", "ingest_ts", "seq", "ts", "price", - "size" + "size", + "notional" ]); + + if ("is_etf" in record) { + record.is_etf = Boolean(record.is_etf); + } + if ("signal_pass" in record) { + record.signal_pass = Boolean(record.signal_pass); + } + if (record.signal_reasons == null) { + record.signal_reasons = []; + } + return record; } return row; }; +export type OptionPrintQueryFilters = { + view?: OptionFlowView; + minNotional?: number; + security?: "stock" | "etf" | "all"; + optionTypes?: string[]; + nbboSides?: string[]; +}; + +const buildOptionPrintFilterConditions = ( + filters: OptionPrintQueryFilters | undefined, + tracePrefix: string | undefined +): string[] => { + const conditions: string[] = []; + const traceCondition = buildTracePrefixCondition(tracePrefix); + if (traceCondition) { + conditions.push(traceCondition); + } + + if (!filters) { + return conditions; + } + + if ((filters.view ?? "signal") === "signal") { + conditions.push("signal_pass = 1"); + } + + if (typeof filters.minNotional === "number" && Number.isFinite(filters.minNotional)) { + conditions.push(`notional >= ${filters.minNotional}`); + } + + if (filters.security === "stock") { + conditions.push("(is_etf = 0 OR is_etf IS NULL)"); + } else if (filters.security === "etf") { + conditions.push("is_etf = 1"); + } + + if (filters.optionTypes && filters.optionTypes.length > 0) { + conditions.push(`option_type IN (${buildStringList(filters.optionTypes)})`); + } + + if (filters.nbboSides && filters.nbboSides.length > 0) { + conditions.push(`nbbo_side IN (${buildStringList(filters.nbboSides)})`); + } + + return conditions; +}; + const normalizeOptionNbboRow = (row: unknown): unknown => { if (row && typeof row === "object") { return normalizeNumericFields(row as Record, [ @@ -683,11 +748,12 @@ const normalizeAlertRow = (row: unknown): AlertRecord | null => { export const fetchRecentOptionPrints = async ( client: ClickHouseClient, limit: number, - tracePrefix?: string + tracePrefix?: string, + filters?: OptionPrintQueryFilters ): Promise => { const safeLimit = clampLimit(limit); - const condition = buildTracePrefixCondition(tracePrefix); - const whereClause = condition ? ` WHERE ${condition}` : ""; + const conditions = buildOptionPrintFilterConditions(filters, tracePrefix); + const whereClause = conditions.length > 0 ? ` WHERE ${conditions.join(" AND ")}` : ""; const result = await client.query({ query: `SELECT * FROM ${OPTION_PRINTS_TABLE}${whereClause} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, format: "JSONEachRow" @@ -855,16 +921,19 @@ export const fetchOptionPrintsAfter = async ( afterTs: number, afterSeq: number, limit: number, - tracePrefix?: string + tracePrefix?: string, + filters?: OptionPrintQueryFilters ): Promise => { const safeLimit = clampLimit(limit); const safeAfterTs = clampCursor(afterTs); const safeAfterSeq = clampCursor(afterSeq); - const traceCondition = buildTracePrefixCondition(tracePrefix); - const traceClause = traceCondition ? ` AND ${traceCondition}` : ""; + const conditions = [ + `((ts, seq) > (${safeAfterTs}, ${safeAfterSeq}))`, + ...buildOptionPrintFilterConditions(filters, tracePrefix) + ]; const result = await client.query({ - query: `SELECT * FROM ${OPTION_PRINTS_TABLE} WHERE (ts, seq) > (${safeAfterTs}, ${safeAfterSeq})${traceClause} ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`, + query: `SELECT * FROM ${OPTION_PRINTS_TABLE} WHERE ${conditions.join(" AND ")} ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`, format: "JSONEachRow" }); @@ -1122,14 +1191,14 @@ export const fetchOptionPrintsBefore = async ( beforeTs: number, beforeSeq: number, limit: number, - tracePrefix?: string + tracePrefix?: string, + filters?: OptionPrintQueryFilters ): Promise => { const safeLimit = clampLimit(limit); - const conditions = [buildBeforeTupleCondition("ts", "seq", beforeTs, beforeSeq)]; - const traceCondition = buildTracePrefixCondition(tracePrefix); - if (traceCondition) { - conditions.push(traceCondition); - } + const conditions = [ + buildBeforeTupleCondition("ts", "seq", beforeTs, beforeSeq), + ...buildOptionPrintFilterConditions(filters, tracePrefix) + ]; const result = await client.query({ query: `SELECT * FROM ${OPTION_PRINTS_TABLE} WHERE ${conditions.join(" AND ")} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`, diff --git a/packages/storage/src/option-prints.ts b/packages/storage/src/option-prints.ts index 525038e..7d9c983 100644 --- a/packages/storage/src/option-prints.ts +++ b/packages/storage/src/option-prints.ts @@ -14,16 +14,38 @@ CREATE TABLE IF NOT EXISTS ${OPTION_PRINTS_TABLE} ( price Float64, size UInt32, exchange String, - conditions Array(String) + conditions Array(String), + underlying_id Nullable(String), + option_type Nullable(String), + notional Nullable(Float64), + nbbo_side Nullable(String), + is_etf Nullable(Bool), + signal_pass Nullable(Bool), + signal_reasons Array(String) DEFAULT [], + signal_profile Nullable(String) ) ENGINE = MergeTree ORDER BY (ts, option_contract_id) `; }; +export const optionPrintsTableMigrations = (): string[] => { + return [ + `ALTER TABLE ${OPTION_PRINTS_TABLE} ADD COLUMN IF NOT EXISTS underlying_id Nullable(String)`, + `ALTER TABLE ${OPTION_PRINTS_TABLE} ADD COLUMN IF NOT EXISTS option_type Nullable(String)`, + `ALTER TABLE ${OPTION_PRINTS_TABLE} ADD COLUMN IF NOT EXISTS notional Nullable(Float64)`, + `ALTER TABLE ${OPTION_PRINTS_TABLE} ADD COLUMN IF NOT EXISTS nbbo_side Nullable(String)`, + `ALTER TABLE ${OPTION_PRINTS_TABLE} ADD COLUMN IF NOT EXISTS is_etf Nullable(Bool)`, + `ALTER TABLE ${OPTION_PRINTS_TABLE} ADD COLUMN IF NOT EXISTS signal_pass Nullable(Bool)`, + `ALTER TABLE ${OPTION_PRINTS_TABLE} ADD COLUMN IF NOT EXISTS signal_reasons Array(String) DEFAULT []`, + `ALTER TABLE ${OPTION_PRINTS_TABLE} ADD COLUMN IF NOT EXISTS signal_profile Nullable(String)` + ]; +}; + export const normalizeOptionPrint = (print: OptionPrint): OptionPrint => { return { ...print, - conditions: print.conditions ?? [] + conditions: print.conditions ?? [], + signal_reasons: print.signal_reasons ?? [] }; }; diff --git a/packages/storage/tests/option-prints.test.ts b/packages/storage/tests/option-prints.test.ts index 81c50c2..7643eeb 100644 --- a/packages/storage/tests/option-prints.test.ts +++ b/packages/storage/tests/option-prints.test.ts @@ -1,5 +1,10 @@ import { describe, expect, it } from "bun:test"; -import { createClickHouseClient, fetchOptionPrintsBefore, fetchOptionPrintsByTraceIds } from "../src/clickhouse"; +import { + createClickHouseClient, + fetchOptionPrintsBefore, + fetchOptionPrintsByTraceIds, + fetchRecentOptionPrints +} from "../src/clickhouse"; import { normalizeOptionPrint, optionPrintsTableDDL, OPTION_PRINTS_TABLE } from "../src/option-prints"; const basePrint = { @@ -38,12 +43,24 @@ describe("option-prints storage helpers", () => { }; }; + await fetchRecentOptionPrints(client, 25, undefined, { + view: "signal", + security: "stock", + nbboSides: ["AA", "A"], + optionTypes: ["call"], + minNotional: 25_000 + }); await fetchOptionPrintsBefore(client, 100, 5, 20, "alpaca"); await fetchOptionPrintsByTraceIds(client, ["trace-1", "trace-2"]); - expect(queries[0]).toContain("(ts, seq) < (100, 5)"); - expect(queries[0]).toContain("startsWith(trace_id, 'alpaca')"); - expect(queries[0]).toContain("ORDER BY ts DESC, seq DESC LIMIT 20"); - expect(queries[1]).toContain("trace_id IN ('trace-1', 'trace-2')"); + expect(queries[0]).toContain("signal_pass = 1"); + expect(queries[0]).toContain("(is_etf = 0 OR is_etf IS NULL)"); + expect(queries[0]).toContain("nbbo_side IN ('AA', 'A')"); + expect(queries[0]).toContain("option_type IN ('call')"); + expect(queries[0]).toContain("notional >= 25000"); + expect(queries[1]).toContain("(ts, seq) < (100, 5)"); + expect(queries[1]).toContain("startsWith(trace_id, 'alpaca')"); + expect(queries[1]).toContain("ORDER BY ts DESC, seq DESC LIMIT 20"); + expect(queries[2]).toContain("trace_id IN ('trace-1', 'trace-2')"); }); }); diff --git a/packages/types/src/events.ts b/packages/types/src/events.ts index b27a45f..072e427 100644 --- a/packages/types/src/events.ts +++ b/packages/types/src/events.ts @@ -1,4 +1,5 @@ import { z } from "zod"; +import { OptionNbboSideSchema, OptionTypeSchema, OptionsSignalModeSchema } from "./options-flow"; export const EventMetaSchema = z.object({ source_ts: z.number().int().nonnegative(), @@ -16,7 +17,18 @@ export const OptionPrintSchema = EventMetaSchema.merge( price: z.number().nonnegative(), size: z.number().int().positive(), exchange: z.string().min(1), - conditions: z.array(z.string().min(1)).optional() + conditions: z.array(z.string().min(1)).optional(), + underlying_id: z.preprocess((value) => (value === null ? undefined : value), z.string().min(1).optional()), + option_type: z.preprocess((value) => (value === null ? undefined : value), OptionTypeSchema.optional()), + notional: z.preprocess((value) => (value === null ? undefined : value), z.number().nonnegative().optional()), + nbbo_side: z.preprocess((value) => (value === null ? undefined : value), OptionNbboSideSchema.optional()), + is_etf: z.preprocess((value) => (value === null ? undefined : value), z.boolean().optional()), + signal_pass: z.preprocess((value) => (value === null ? undefined : value), z.boolean().optional()), + signal_reasons: z.array(z.string().min(1)).optional(), + signal_profile: z.preprocess( + (value) => (value === null ? undefined : value), + OptionsSignalModeSchema.optional() + ) }) ); diff --git a/packages/types/src/index.ts b/packages/types/src/index.ts index 44f18f5..ce55e57 100644 --- a/packages/types/src/index.ts +++ b/packages/types/src/index.ts @@ -1,3 +1,4 @@ export * from "./events"; export * from "./live"; +export * from "./options-flow"; export * from "./sp500"; diff --git a/packages/types/src/live.ts b/packages/types/src/live.ts index c5fc399..3d86883 100644 --- a/packages/types/src/live.ts +++ b/packages/types/src/live.ts @@ -10,6 +10,10 @@ import { OptionNBBOSchema, OptionPrintSchema } from "./events"; +import { + OptionFlowFiltersSchema, + optionFlowFilterKey +} from "./options-flow"; export const CursorSchema = z.object({ ts: z.number().int().nonnegative(), @@ -47,7 +51,15 @@ export type LiveGenericChannel = z.infer; export const LiveSubscriptionSchema = z.discriminatedUnion("channel", [ z.object({ - channel: LiveGenericChannelSchema + channel: z.literal("options"), + filters: OptionFlowFiltersSchema.optional() + }), + z.object({ + channel: z.literal("flow"), + filters: OptionFlowFiltersSchema.optional() + }), + z.object({ + channel: z.enum(["nbbo", "equities", "equity-joins", "classifier-hits", "alerts", "inferred-dark"]) }), z.object({ channel: z.literal("equity-candles"), @@ -165,6 +177,9 @@ export type LiveServerMessage = z.infer; export const getSubscriptionKey = (subscription: LiveSubscription): string => { switch (subscription.channel) { + case "options": + case "flow": + return `${subscription.channel}|${optionFlowFilterKey(subscription.filters)}`; case "equity-candles": return `${subscription.channel}|${subscription.underlying_id}|${subscription.interval_ms}`; case "equity-overlay": diff --git a/packages/types/src/options-flow.ts b/packages/types/src/options-flow.ts new file mode 100644 index 0000000..75dd581 --- /dev/null +++ b/packages/types/src/options-flow.ts @@ -0,0 +1,464 @@ +import { z } from "zod"; +import type { FlowPacket, OptionNBBO, OptionPrint } from "./events"; + +export const SyntheticMarketModeSchema = z.enum(["realistic", "active", "firehose"]); +export type SyntheticMarketMode = z.infer; + +export const OptionTypeSchema = z.enum(["call", "put"]); +export type OptionType = z.infer; + +export const OptionNbboSideSchema = z.enum(["AA", "A", "MID", "B", "BB", "MISSING", "STALE"]); +export type OptionNbboSide = z.infer; + +export const OptionFlowViewSchema = z.enum(["signal", "raw"]); +export type OptionFlowView = z.infer; + +export const OptionSecurityTypeSchema = z.enum(["stock", "etf"]); +export type OptionSecurityType = z.infer; + +export const OptionsSignalModeSchema = z.enum(["smart-money", "balanced", "all"]); +export type OptionsSignalMode = z.infer; + +export const OptionFlowFiltersSchema = z.object({ + view: OptionFlowViewSchema.optional(), + securityTypes: z.array(OptionSecurityTypeSchema).optional(), + nbboSides: z.array(OptionNbboSideSchema).optional(), + optionTypes: z.array(OptionTypeSchema).optional(), + minNotional: z.number().nonnegative().optional() +}); + +export type OptionFlowFilters = z.infer; + +export type ParsedOptionContract = { + root: string; + expiry: string; + strike: number; + right: "C" | "P"; +}; + +export type SyntheticModeResolution = { + market: SyntheticMarketMode; + options: SyntheticMarketMode; + equities: SyntheticMarketMode; +}; + +export type OptionsSignalConfig = { + mode: OptionsSignalMode; + minNotional: number; + etfMinNotional: number; + bidSideMinNotional: number; + midMinNotional: number; + missingNbboMinNotional: number; + largePrintMinSize: number; + largePrintMinNotional: number; + sweepMinNotional: number; + autoKeepMinNotional: number; + nbboMaxAgeMs: number; + etfUnderlyings: Set; +}; + +export type DerivedOptionPrintMetadata = { + underlying_id?: string; + option_type?: OptionType; + notional?: number; + nbbo_side?: OptionNbboSide; + is_etf?: boolean; +}; + +export type OptionSignalDecision = { + signalPass: boolean; + signalReasons: string[]; + signalProfile: OptionsSignalMode; +}; + +const parseDashedContract = (value: string): ParsedOptionContract | null => { + const parts = value.split("-"); + if (parts.length < 6) { + return null; + } + + const rightRaw = parts.at(-1) ?? ""; + if (rightRaw !== "C" && rightRaw !== "P") { + return null; + } + + const strikeRaw = parts.at(-2) ?? ""; + const strike = Number(strikeRaw); + const expiryParts = parts.slice(-5, -2); + const expiry = expiryParts.join("-"); + const root = parts.slice(0, -5).join("-"); + + if (!root || !expiry || !Number.isFinite(strike)) { + return null; + } + + return { + root, + expiry, + strike, + right: rightRaw + }; +}; + +const parseOccContract = (value: string): ParsedOptionContract | null => { + if (value.length < 15) { + return null; + } + + const tail = value.slice(-15); + const root = value.slice(0, -15).trim(); + const expiryRaw = tail.slice(0, 6); + const right = tail.slice(6, 7); + const strikeRaw = tail.slice(7); + + if (!/^\d{6}$/.test(expiryRaw) || !/^\d{8}$/.test(strikeRaw)) { + return null; + } + + if (right !== "C" && right !== "P") { + return null; + } + + const year = 2000 + Number(expiryRaw.slice(0, 2)); + const month = Number(expiryRaw.slice(2, 4)) - 1; + const day = Number(expiryRaw.slice(4, 6)); + const expiryDate = new Date(Date.UTC(year, month, day)); + const expiry = expiryDate.toISOString().slice(0, 10); + const strike = Number(strikeRaw) / 1000; + + if (!root || !Number.isFinite(strike)) { + return null; + } + + return { + root, + expiry, + strike, + right + }; +}; + +export const parseOptionContractId = (value: string | undefined): ParsedOptionContract | null => { + if (!value) { + return null; + } + + return parseDashedContract(value) ?? parseOccContract(value); +}; + +export const resolveSyntheticMarketModes = (input: { + syntheticMarketMode?: string | null | undefined; + syntheticOptionsMode?: string | null | undefined; + syntheticEquitiesMode?: string | null | undefined; +}): SyntheticModeResolution => { + const market = SyntheticMarketModeSchema.catch("realistic").parse( + input.syntheticMarketMode ?? "realistic" + ); + const options = SyntheticMarketModeSchema.catch(market).parse( + input.syntheticOptionsMode ?? market + ); + const equities = SyntheticMarketModeSchema.catch(market).parse( + input.syntheticEquitiesMode ?? market + ); + + return { market, options, equities }; +}; + +export const classifyOptionNbboSide = ( + price: number, + quote: Pick | null | undefined, + tradeTs: number, + maxAgeMs: number +): OptionNbboSide => { + if (!quote || !Number.isFinite(price)) { + return "MISSING"; + } + + const bid = quote.bid; + const ask = quote.ask; + if (!Number.isFinite(bid) || !Number.isFinite(ask) || ask <= 0) { + return "MISSING"; + } + + const ageMs = Math.abs(tradeTs - quote.ts); + if (ageMs > maxAgeMs) { + return "STALE"; + } + + const spread = Math.max(0, ask - bid); + const epsilon = Math.max(0.01, spread * 0.05); + + if (price > ask + epsilon) { + return "AA"; + } + if (price >= ask - epsilon) { + return "A"; + } + if (price < bid - epsilon) { + return "BB"; + } + if (price <= bid + epsilon) { + return "B"; + } + + return "MID"; +}; + +export const deriveOptionPrintMetadata = ( + print: Pick, + quote: Pick | null | undefined, + config: Pick +): DerivedOptionPrintMetadata => { + const parsed = parseOptionContractId(print.option_contract_id); + const underlying = parsed?.root?.toUpperCase(); + const optionType = parsed?.right === "C" ? "call" : parsed?.right === "P" ? "put" : undefined; + const notional = Number.isFinite(print.price) && Number.isFinite(print.size) + ? Number((print.price * print.size * 100).toFixed(2)) + : undefined; + + return { + underlying_id: underlying, + option_type: optionType, + notional, + nbbo_side: classifyOptionNbboSide(print.price, quote, print.ts, config.nbboMaxAgeMs), + is_etf: underlying ? config.etfUnderlyings.has(underlying) : undefined + }; +}; + +const hasCondition = (conditions: string[] | undefined, expected: string): boolean => { + return (conditions ?? []).some((condition) => condition.toUpperCase() === expected); +}; + +const balancedThresholds = (config: OptionsSignalConfig): OptionsSignalConfig => ({ + ...config, + minNotional: Math.min(config.minNotional, 5_000), + etfMinNotional: Math.min(config.etfMinNotional, 25_000), + bidSideMinNotional: Math.min(config.bidSideMinNotional, 15_000), + midMinNotional: Math.min(config.midMinNotional, 12_500), + missingNbboMinNotional: Math.min(config.missingNbboMinNotional, 25_000), + sweepMinNotional: Math.min(config.sweepMinNotional, 15_000), + autoKeepMinNotional: Math.min(config.autoKeepMinNotional, 75_000) +}); + +export const evaluateOptionSignal = ( + print: Pick< + OptionPrint, + "size" | "conditions" | "signal_profile" | "underlying_id" | "option_type" | "notional" | "nbbo_side" | "is_etf" + >, + baseConfig: OptionsSignalConfig +): OptionSignalDecision => { + const mode = print.signal_profile ?? baseConfig.mode; + if (mode === "all") { + return { + signalPass: true, + signalReasons: ["mode:all"], + signalProfile: "all" + }; + } + + const config = mode === "balanced" ? balancedThresholds(baseConfig) : baseConfig; + const reasons: string[] = []; + const notional = print.notional ?? 0; + const side = print.nbbo_side ?? "MISSING"; + const isSweepOrIso = hasCondition(print.conditions, "SWEEP") || hasCondition(print.conditions, "ISO"); + + if (notional < config.minNotional) { + return { + signalPass: false, + signalReasons: ["reject:min-notional"], + signalProfile: mode + }; + } + + if (notional >= config.autoKeepMinNotional) { + reasons.push("keep:auto-large"); + } + + if (print.is_etf && notional < config.etfMinNotional) { + return { + signalPass: false, + signalReasons: ["reject:etf-min-notional"], + signalProfile: mode + }; + } + + if ((side === "B" || side === "BB") && notional < config.bidSideMinNotional) { + return { + signalPass: false, + signalReasons: ["reject:bid-side-min-notional"], + signalProfile: mode + }; + } + + if (side === "MID" && !isSweepOrIso && notional < config.midMinNotional) { + return { + signalPass: false, + signalReasons: ["reject:mid-min-notional"], + signalProfile: mode + }; + } + + if ((side === "MISSING" || side === "STALE") && notional < config.missingNbboMinNotional) { + return { + signalPass: false, + signalReasons: ["reject:missing-nbbo-min-notional"], + signalProfile: mode + }; + } + + if ((side === "A" || side === "AA") && notional >= config.minNotional) { + reasons.push("keep:ask-side"); + } + + if (isSweepOrIso && notional >= config.sweepMinNotional) { + reasons.push("keep:sweep-or-iso"); + } + + if (print.size >= config.largePrintMinSize && notional >= config.largePrintMinNotional) { + reasons.push("keep:large-contract-count"); + } + + if (reasons.length === 0) { + return { + signalPass: false, + signalReasons: ["reject:no-signal-rule"], + signalProfile: mode + }; + } + + return { + signalPass: true, + signalReasons: reasons, + signalProfile: mode + }; +}; + +const sortStrings = (values: string[] | undefined): string[] | undefined => { + if (!values || values.length === 0) { + return undefined; + } + return [...new Set(values)].sort(); +}; + +export const normalizeOptionFlowFilters = ( + filters: OptionFlowFilters | undefined +): OptionFlowFilters | undefined => { + if (!filters) { + return undefined; + } + + return { + view: filters.view, + securityTypes: sortStrings(filters.securityTypes) as OptionSecurityType[] | undefined, + nbboSides: sortStrings(filters.nbboSides) as OptionNbboSide[] | undefined, + optionTypes: sortStrings(filters.optionTypes) as OptionType[] | undefined, + minNotional: + typeof filters.minNotional === "number" && Number.isFinite(filters.minNotional) + ? filters.minNotional + : undefined + }; +}; + +export const optionFlowFilterKey = (filters: OptionFlowFilters | undefined): string => { + return JSON.stringify(normalizeOptionFlowFilters(filters) ?? {}); +}; + +export const matchesOptionPrintFilters = ( + print: Pick, + filters: OptionFlowFilters | undefined +): boolean => { + if (!filters) { + return true; + } + + const view = filters.view ?? "signal"; + if (view === "signal" && print.signal_pass === false) { + return false; + } + + if (filters.securityTypes?.length) { + const securityType: OptionSecurityType = print.is_etf ? "etf" : "stock"; + if (!filters.securityTypes.includes(securityType)) { + return false; + } + } + + if (filters.nbboSides?.length) { + const side = print.nbbo_side ?? "MISSING"; + if (!filters.nbboSides.includes(side)) { + return false; + } + } + + if (filters.optionTypes?.length) { + const optionType = print.option_type; + if (!optionType || !filters.optionTypes.includes(optionType)) { + return false; + } + } + + if (typeof filters.minNotional === "number" && (print.notional ?? 0) < filters.minNotional) { + return false; + } + + return true; +}; + +export const matchesFlowPacketFilters = ( + packet: FlowPacket, + filters: OptionFlowFilters | undefined +): boolean => { + if (!filters) { + return true; + } + + const features = packet.features ?? {}; + const totalNotional = typeof features.total_notional === "number" ? features.total_notional : Number(features.total_notional ?? 0); + if (typeof filters.minNotional === "number" && (!Number.isFinite(totalNotional) || totalNotional < filters.minNotional)) { + return false; + } + + if (filters.securityTypes?.length) { + const isEtf = typeof features.is_etf === "boolean" ? features.is_etf : features.is_etf === 1; + const securityType: OptionSecurityType = isEtf ? "etf" : "stock"; + if (!filters.securityTypes.includes(securityType)) { + return false; + } + } + + if (filters.optionTypes?.length) { + const optionType = + typeof features.option_type === "string" + ? features.option_type + : typeof features.structure_rights === "string" + ? features.structure_rights.toLowerCase() + : null; + if ( + !optionType || + !filters.optionTypes.some((selected) => optionType.includes(selected)) + ) { + return false; + } + } + + if (filters.nbboSides?.length) { + const sideToFeature: Record = { + AA: "nbbo_aa_count", + A: "nbbo_a_count", + MID: "nbbo_mid_count", + B: "nbbo_b_count", + BB: "nbbo_bb_count", + MISSING: "nbbo_missing_count", + STALE: "nbbo_stale_count" + }; + const matchesSide = filters.nbboSides.some((side) => { + const value = features[sideToFeature[side]]; + return typeof value === "number" ? value > 0 : Number(value ?? 0) > 0; + }); + if (!matchesSide) { + return false; + } + } + + return true; +}; diff --git a/packages/types/tests/live.test.ts b/packages/types/tests/live.test.ts index e53929b..88dd080 100644 --- a/packages/types/tests/live.test.ts +++ b/packages/types/tests/live.test.ts @@ -8,7 +8,21 @@ import { describe("live protocol types", () => { it("builds stable keys for generic and parameterized subscriptions", () => { - expect(getSubscriptionKey({ channel: "flow" })).toBe("flow"); + expect(getSubscriptionKey({ channel: "flow" })).toBe("flow|{}"); + expect( + getSubscriptionKey({ + channel: "options", + filters: { + view: "signal", + securityTypes: ["stock"], + nbboSides: ["A", "AA"], + optionTypes: ["call", "put"], + minNotional: 25000 + } + }) + ).toBe( + 'options|{"view":"signal","securityTypes":["stock"],"nbboSides":["A","AA"],"optionTypes":["call","put"],"minNotional":25000}' + ); expect( getSubscriptionKey({ channel: "equity-candles", @@ -25,7 +39,7 @@ describe("live protocol types", () => { const parsed = LiveClientMessageSchema.parse({ op: "subscribe", subscriptions: [ - { channel: "flow" }, + { channel: "flow", filters: { nbboSides: ["AA", "A"], minNotional: 50000 } }, { channel: "equity-candles", underlying_id: "SPY", interval_ms: 60000 } ] }); diff --git a/packages/types/tests/options-flow.test.ts b/packages/types/tests/options-flow.test.ts new file mode 100644 index 0000000..801b378 --- /dev/null +++ b/packages/types/tests/options-flow.test.ts @@ -0,0 +1,132 @@ +import { describe, expect, it } from "bun:test"; +import { + deriveOptionPrintMetadata, + evaluateOptionSignal, + resolveSyntheticMarketModes, + type OptionsSignalConfig +} from "../src/options-flow"; + +const baseConfig: OptionsSignalConfig = { + mode: "smart-money", + minNotional: 10_000, + etfMinNotional: 50_000, + bidSideMinNotional: 25_000, + midMinNotional: 20_000, + missingNbboMinNotional: 50_000, + largePrintMinSize: 500, + largePrintMinNotional: 10_000, + sweepMinNotional: 25_000, + autoKeepMinNotional: 100_000, + nbboMaxAgeMs: 1_500, + etfUnderlyings: new Set(["SPY", "QQQ"]) +}; + +describe("options-flow helpers", () => { + it("resolves synthetic modes with per-service overrides", () => { + expect( + resolveSyntheticMarketModes({ + syntheticMarketMode: "active", + syntheticOptionsMode: "firehose" + }) + ).toEqual({ + market: "active", + options: "firehose", + equities: "active" + }); + }); + + it("derives underlying, notional, nbbo side, and etf metadata", () => { + const metadata = deriveOptionPrintMetadata( + { + option_contract_id: "SPY-2025-01-17-450-C", + price: 2.5, + size: 100, + ts: 5_000 + }, + { + bid: 2.3, + ask: 2.5, + ts: 4_500 + }, + baseConfig + ); + + expect(metadata.underlying_id).toBe("SPY"); + expect(metadata.option_type).toBe("call"); + expect(metadata.notional).toBe(25_000); + expect(metadata.nbbo_side).toBe("A"); + expect(metadata.is_etf).toBe(true); + }); + + it("accepts and rejects smart-money thresholds at boundaries", () => { + const acceptedAsk = evaluateOptionSignal( + { + size: 100, + conditions: [], + underlying_id: "AAPL", + option_type: "call", + notional: 10_000, + nbbo_side: "A", + is_etf: false + }, + baseConfig + ); + expect(acceptedAsk.signalPass).toBe(true); + + const rejectedLow = evaluateOptionSignal( + { + size: 100, + conditions: [], + underlying_id: "AAPL", + option_type: "call", + notional: 9_999, + nbbo_side: "A", + is_etf: false + }, + baseConfig + ); + expect(rejectedLow.signalPass).toBe(false); + + const rejectedBid = evaluateOptionSignal( + { + size: 100, + conditions: [], + underlying_id: "AAPL", + option_type: "put", + notional: 24_999, + nbbo_side: "B", + is_etf: false + }, + baseConfig + ); + expect(rejectedBid.signalPass).toBe(false); + + const acceptedSweep = evaluateOptionSignal( + { + size: 100, + conditions: ["SWEEP"], + underlying_id: "AAPL", + option_type: "call", + notional: 25_000, + nbbo_side: "MID", + is_etf: false + }, + baseConfig + ); + expect(acceptedSweep.signalPass).toBe(true); + + const rejectedEtf = evaluateOptionSignal( + { + size: 100, + conditions: [], + underlying_id: "SPY", + option_type: "call", + notional: 49_999, + nbbo_side: "A", + is_etf: true + }, + baseConfig + ); + expect(rejectedEtf.signalPass).toBe(false); + }); +}); diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 090c641..c0bb2b5 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -10,7 +10,7 @@ import { SUBJECT_INFERRED_DARK, SUBJECT_FLOW_PACKETS, SUBJECT_OPTION_NBBO, - SUBJECT_OPTION_PRINTS, + SUBJECT_OPTION_SIGNAL_PRINTS, STREAM_ALERTS, STREAM_CLASSIFIER_HITS, STREAM_EQUITY_CANDLES, @@ -20,7 +20,7 @@ import { STREAM_INFERRED_DARK, STREAM_FLOW_PACKETS, STREAM_OPTION_NBBO, - STREAM_OPTION_PRINTS, + STREAM_OPTION_SIGNAL_PRINTS, buildDurableConsumer, connectJetStreamWithRetry, ensureStream, @@ -85,6 +85,13 @@ import { LiveServerMessage, LiveSubscription, LiveSubscriptionSchema, + matchesFlowPacketFilters, + matchesOptionPrintFilters, + OptionFlowFilters, + OptionFlowViewSchema, + OptionNbboSideSchema, + OptionSecurityTypeSchema, + OptionTypeSchema, FlowPacketSchema, OptionNBBOSchema, OptionPrintSchema, @@ -199,6 +206,32 @@ const equityPrintRangeSchema = z.object({ end_ts: z.coerce.number().int().nonnegative(), limit: limitSchema.optional() }); +const optionSideListSchema = z + .string() + .transform((value) => + value + .split(",") + .map((entry) => entry.trim()) + .filter(Boolean) + ) + .pipe(z.array(OptionNbboSideSchema)); +const optionTypeListSchema = z + .string() + .transform((value) => + value + .split(",") + .map((entry) => entry.trim()) + .filter(Boolean) + ) + .pipe(z.array(OptionTypeSchema)); +const optionSecuritySchema = z.enum(["stock", "etf", "all"]); +const optionFilterQuerySchema = z.object({ + view: OptionFlowViewSchema.optional(), + security: optionSecuritySchema.optional(), + side: optionSideListSchema.optional(), + type: optionTypeListSchema.optional(), + min_notional: z.coerce.number().nonnegative().optional() +}); type Channel = | "options" @@ -235,6 +268,7 @@ const classifierHitSockets = new Set(); const alertSockets = new Set(); const liveSocketSubscriptions = new Map>(); const subscriptionSockets = new Map>(); +const subscriptionDefinitions = new Map(); const liveHeartbeats = new Map>(); const jsonResponse = (body: unknown, status = 200): Response => { @@ -254,6 +288,43 @@ const parseLimit = (value: string | null): number => { return limitSchema.parse(value); }; +const parseOptionPrintFilters = ( + url: URL +): { + view: z.infer; + storageFilters: Parameters[3]; + liveFilters: OptionFlowFilters; +} => { + const parsed = optionFilterQuerySchema.parse({ + view: url.searchParams.get("view") ?? undefined, + security: url.searchParams.get("security") ?? undefined, + side: url.searchParams.get("side") ?? undefined, + type: url.searchParams.get("type") ?? undefined, + min_notional: url.searchParams.get("min_notional") ?? undefined + }); + const view = parsed.view ?? "signal"; + const security = parsed.security ?? (view === "raw" ? "all" : "stock"); + const storageFilters = { + view, + security, + minNotional: parsed.min_notional, + nbboSides: parsed.side, + optionTypes: parsed.type + } as const; + const liveFilters: OptionFlowFilters = { + view, + securityTypes: + security === "all" + ? undefined + : ([security] as Array>), + nbboSides: parsed.side, + optionTypes: parsed.type, + minNotional: parsed.min_notional + }; + + return { view, storageFilters, liveFilters }; +}; + const parseReplayParams = (url: URL): { afterTs: number; afterSeq: number; limit: number } => { const params = replayParamsSchema.parse({ after_ts: url.searchParams.get("after_ts") ?? undefined, @@ -412,6 +483,7 @@ const subscribeSocket = (socket: LiveSocket, subscription: LiveSubscription): vo const sockets = subscriptionSockets.get(key) ?? new Set(); sockets.add(socket); subscriptionSockets.set(key, sockets); + subscriptionDefinitions.set(key, subscription); }; const unsubscribeSocket = (socket: LiveSocket, subscription: LiveSubscription): void => { @@ -425,6 +497,7 @@ const unsubscribeSocket = (socket: LiveSocket, subscription: LiveSubscription): sockets.delete(socket); if (sockets.size === 0) { subscriptionSockets.delete(key); + subscriptionDefinitions.delete(key); } }; @@ -436,6 +509,7 @@ const cleanupLiveSocket = (socket: LiveSocket): void => { sockets?.delete(socket); if (sockets && sockets.size === 0) { subscriptionSockets.delete(key); + subscriptionDefinitions.delete(key); } } } @@ -504,8 +578,8 @@ const run = async () => { ); await ensureStream(jsm, { - name: STREAM_OPTION_PRINTS, - subjects: [SUBJECT_OPTION_PRINTS], + name: STREAM_OPTION_SIGNAL_PRINTS, + subjects: [SUBJECT_OPTION_SIGNAL_PRINTS], retention: "limits", storage: "file", discard: "old", @@ -722,8 +796,8 @@ const run = async () => { }; const optionSubscription = await subscribeWithReset( - SUBJECT_OPTION_PRINTS, - STREAM_OPTION_PRINTS, + SUBJECT_OPTION_SIGNAL_PRINTS, + STREAM_OPTION_SIGNAL_PRINTS, "api-option-prints" ); @@ -786,20 +860,44 @@ const run = async () => { item: unknown, ingestChannel: "options" | "nbbo" | "equities" | "equity-candles" | "equity-overlay" | "equity-joins" | "flow" | "classifier-hits" | "alerts" | "inferred-dark" ) => { - const key = getSubscriptionKey(subscription); - const sockets = subscriptionSockets.get(key); const watermark = await liveState.ingest(ingestChannel, item); - if (!sockets || sockets.size === 0) { + const matchingSubscriptions = + subscription.channel === "options" || subscription.channel === "flow" + ? [...subscriptionDefinitions.entries()].filter(([, candidate]) => candidate.channel === subscription.channel) + : [[getSubscriptionKey(subscription), subscription] as const]; + + if (matchingSubscriptions.length === 0) { return; } - for (const socket of sockets) { - sendLiveMessage(socket, { - op: "event", - subscription, - item, - watermark - }); + for (const [key, candidate] of matchingSubscriptions) { + const sockets = subscriptionSockets.get(key); + if (!sockets || sockets.size === 0) { + continue; + } + + if ( + candidate.channel === "options" && + !matchesOptionPrintFilters(OptionPrintSchema.parse(item), candidate.filters) + ) { + continue; + } + + if ( + candidate.channel === "flow" && + !matchesFlowPacketFilters(FlowPacketSchema.parse(item), candidate.filters) + ) { + continue; + } + + for (const socket of sockets) { + sendLiveMessage(socket, { + op: "event", + subscription: candidate, + item, + watermark + }); + } } }; @@ -996,10 +1094,21 @@ const run = async () => { } if (req.method === "GET" && url.pathname === "/prints/options") { - const limit = parseLimit(url.searchParams.get("limit")); - const source = parseReplaySource(url) ?? undefined; - const data = await fetchRecentOptionPrints(clickhouse, limit, source); - return jsonResponse({ data }); + try { + const limit = parseLimit(url.searchParams.get("limit")); + const source = parseReplaySource(url) ?? undefined; + const { storageFilters } = parseOptionPrintFilters(url); + const data = await fetchRecentOptionPrints(clickhouse, limit, source, storageFilters); + return jsonResponse({ data }); + } catch (error) { + return jsonResponse( + { + error: "invalid options query", + detail: error instanceof Error ? error.message : String(error) + }, + 400 + ); + } } if (req.method === "GET" && url.pathname === "/nbbo/options") { @@ -1105,10 +1214,28 @@ const run = async () => { } if (req.method === "GET" && url.pathname === "/history/options") { - const { beforeTs, beforeSeq, limit } = parseBeforeParams(url); - const source = parseReplaySource(url) ?? undefined; - const data = await fetchOptionPrintsBefore(clickhouse, beforeTs, beforeSeq, limit, source); - return jsonResponse(buildHistoryResponse(data, (item) => ({ ts: item.ts, seq: item.seq }))); + try { + const { beforeTs, beforeSeq, limit } = parseBeforeParams(url); + const source = parseReplaySource(url) ?? undefined; + const { storageFilters } = parseOptionPrintFilters(url); + const data = await fetchOptionPrintsBefore( + clickhouse, + beforeTs, + beforeSeq, + limit, + source, + storageFilters + ); + return jsonResponse(buildHistoryResponse(data, (item) => ({ ts: item.ts, seq: item.seq }))); + } catch (error) { + return jsonResponse( + { + error: "invalid options history query", + detail: error instanceof Error ? error.message : String(error) + }, + 400 + ); + } } if (req.method === "GET" && url.pathname === "/history/nbbo") { @@ -1183,12 +1310,30 @@ const run = async () => { } if (req.method === "GET" && url.pathname === "/replay/options") { - const { afterTs, afterSeq, limit } = parseReplayParams(url); - const source = parseReplaySource(url) ?? undefined; - const data = await fetchOptionPrintsAfter(clickhouse, afterTs, afterSeq, limit, source); - const last = data.at(-1); - const next = last ? { ts: last.ts, seq: last.seq } : null; - return jsonResponse({ data, next }); + try { + const { afterTs, afterSeq, limit } = parseReplayParams(url); + const source = parseReplaySource(url) ?? undefined; + const { storageFilters } = parseOptionPrintFilters(url); + const data = await fetchOptionPrintsAfter( + clickhouse, + afterTs, + afterSeq, + limit, + source, + storageFilters + ); + const last = data.at(-1); + const next = last ? { ts: last.ts, seq: last.seq } : null; + return jsonResponse({ data, next }); + } catch (error) { + return jsonResponse( + { + error: "invalid options replay query", + detail: error instanceof Error ? error.message : String(error) + }, + 400 + ); + } } if (req.method === "GET" && url.pathname === "/replay/nbbo") { diff --git a/services/api/src/live.ts b/services/api/src/live.ts index d170b69..77d9dc5 100644 --- a/services/api/src/live.ts +++ b/services/api/src/live.ts @@ -1,4 +1,5 @@ import { + fetchRecentOptionPrints, fetchRecentAlerts, fetchRecentClassifierHits, fetchRecentEquityCandles, @@ -7,9 +8,9 @@ import { fetchRecentFlowPackets, fetchRecentInferredDark, fetchRecentOptionNBBO, - fetchRecentOptionPrints, type ClickHouseClient } from "@islandflow/storage"; +import type { OptionPrintQueryFilters } from "@islandflow/storage"; import { AlertEventSchema, ClassifierHitEventSchema, @@ -22,8 +23,11 @@ import { InferredDarkEventSchema, LiveGenericChannel, LiveSubscription, + matchesFlowPacketFilters, + matchesOptionPrintFilters, OptionNBBOSchema, OptionPrintSchema, + type OptionFlowFilters, type Cursor, type EquityCandle, type EquityPrint, @@ -124,7 +128,8 @@ const getGenericConfig = (limits: GenericLiveLimits): { limit: limits.options, parse: (value) => OptionPrintSchema.parse(value), cursor: (item) => ({ ts: item.ts, seq: item.seq }), - fetchRecent: fetchRecentOptionPrints + fetchRecent: (clickhouse, limit) => + fetchRecentOptionPrints(clickhouse, limit, undefined, { view: "signal" }) }, nbbo: { redisKey: "live:nbbo", @@ -279,6 +284,55 @@ export class LiveStateManager { async getSnapshot(subscription: LiveSubscription): Promise> { switch (subscription.channel) { + case "options": { + if (subscription.filters?.view === "raw") { + const storageFilters: OptionPrintQueryFilters = { + view: "raw", + security: + subscription.filters.securityTypes?.length === 1 + ? subscription.filters.securityTypes[0] + : "all", + nbboSides: subscription.filters.nbboSides, + optionTypes: subscription.filters.optionTypes, + minNotional: subscription.filters.minNotional + }; + const items = await fetchRecentOptionPrints( + this.clickhouse, + this.generic.options.limit, + undefined, + storageFilters + ); + return { + subscription, + items, + watermark: items[0] ? { ts: items[0].ts, seq: items[0].seq } : null, + next_before: nextBeforeForItems(items, (item) => ({ ts: item.ts, seq: item.seq })) + }; + } + + const config = this.generic.options; + const items = (this.genericItems.get("options") ?? []).filter((item) => + matchesOptionPrintFilters(item, subscription.filters) + ); + return { + subscription, + items, + watermark: this.genericCursors.get(config.cursorField) ?? null, + next_before: nextBeforeForItems(items, config.cursor) + }; + } + case "flow": { + const config = this.generic.flow; + const items = (this.genericItems.get("flow") ?? []).filter((item) => + matchesFlowPacketFilters(item, subscription.filters) + ); + return { + subscription, + items, + watermark: this.genericCursors.get(config.cursorField) ?? null, + next_before: nextBeforeForItems(items, config.cursor) + }; + } case "equity-candles": { const key = candleRedisKey(subscription.underlying_id, subscription.interval_ms); const cursorField = candleCursorField(subscription.underlying_id, subscription.interval_ms); diff --git a/services/api/tests/live.test.ts b/services/api/tests/live.test.ts index 037da47..05e99e9 100644 --- a/services/api/tests/live.test.ts +++ b/services/api/tests/live.test.ts @@ -196,4 +196,81 @@ describe("LiveStateManager", () => { expect(stats.trimOperations).toBeGreaterThan(0); expect(stats.cacheDepthByKey["live:flow"]).toBe(2); }); + + it("filters option and flow snapshots using subscription filters", async () => { + const manager = new LiveStateManager(makeClickHouse(), null); + + await manager.ingest("options", { + source_ts: 100, + ingest_ts: 101, + seq: 1, + trace_id: "opt-1", + ts: 100, + option_contract_id: "AAPL-2025-01-17-200-C", + price: 1, + size: 100, + exchange: "X", + underlying_id: "AAPL", + option_type: "call", + notional: 10000, + nbbo_side: "A", + is_etf: false, + signal_pass: true, + signal_reasons: ["keep:ask-side"], + signal_profile: "smart-money" + }); + await manager.ingest("options", { + source_ts: 110, + ingest_ts: 111, + seq: 2, + trace_id: "opt-2", + ts: 110, + option_contract_id: "SPY-2025-01-17-500-P", + price: 1, + size: 100, + exchange: "X", + underlying_id: "SPY", + option_type: "put", + notional: 10000, + nbbo_side: "B", + is_etf: true, + signal_pass: true, + signal_reasons: ["keep:ask-side"], + signal_profile: "smart-money" + }); + await manager.ingest("flow", { + source_ts: 120, + ingest_ts: 121, + seq: 3, + trace_id: "flow-1", + id: "flow-1", + members: ["opt-1"], + features: { + option_contract_id: "AAPL-2025-01-17-200-C", + total_notional: 10000, + is_etf: false, + option_type: "call", + nbbo_a_count: 1, + nbbo_aa_count: 0, + nbbo_mid_count: 0, + nbbo_b_count: 0, + nbbo_bb_count: 0, + nbbo_missing_count: 0, + nbbo_stale_count: 0 + }, + join_quality: {} + }); + + const optionSnapshot = await manager.getSnapshot({ + channel: "options", + filters: { securityTypes: ["stock"], nbboSides: ["A"], optionTypes: ["call"] } + }); + const flowSnapshot = await manager.getSnapshot({ + channel: "flow", + filters: { securityTypes: ["stock"], nbboSides: ["A"], optionTypes: ["call"] } + }); + + expect(optionSnapshot.items).toHaveLength(1); + expect(flowSnapshot.items).toHaveLength(1); + }); }); diff --git a/services/compute/src/index.ts b/services/compute/src/index.ts index 5ed60e3..8dc6c64 100644 --- a/services/compute/src/index.ts +++ b/services/compute/src/index.ts @@ -9,7 +9,7 @@ import { SUBJECT_INFERRED_DARK, SUBJECT_FLOW_PACKETS, SUBJECT_OPTION_NBBO, - SUBJECT_OPTION_PRINTS, + SUBJECT_OPTION_SIGNAL_PRINTS, STREAM_ALERTS, STREAM_CLASSIFIER_HITS, STREAM_EQUITY_JOINS, @@ -18,7 +18,7 @@ import { STREAM_INFERRED_DARK, STREAM_FLOW_PACKETS, STREAM_OPTION_NBBO, - STREAM_OPTION_PRINTS, + STREAM_OPTION_SIGNAL_PRINTS, buildDurableConsumer, connectJetStreamWithRetry, ensureStream, @@ -231,6 +231,9 @@ type NbboPlacementCounts = { type ClusterState = { contractId: string; + underlyingId: string | null; + optionType: string | null; + isEtf: boolean | null; startTs: number; endTs: number; startSourceTs: number; @@ -530,6 +533,9 @@ const buildCluster = (print: OptionPrint): ClusterState => { recordPlacement(placements, classifyPlacement(print.price, selectNbbo(print.option_contract_id, print.ts))); return { contractId: print.option_contract_id, + underlyingId: print.underlying_id ?? null, + optionType: print.option_type ?? null, + isEtf: typeof print.is_etf === "boolean" ? print.is_etf : null, startTs: print.ts, endTs: print.ts, startSourceTs: print.source_ts, @@ -546,6 +552,15 @@ const buildCluster = (print: OptionPrint): ClusterState => { }; const updateCluster = (cluster: ClusterState, print: OptionPrint): ClusterState => { + if (!cluster.underlyingId && print.underlying_id) { + cluster.underlyingId = print.underlying_id; + } + if (!cluster.optionType && print.option_type) { + cluster.optionType = print.option_type; + } + if (cluster.isEtf === null && typeof print.is_etf === "boolean") { + cluster.isEtf = print.is_etf; + } cluster.endTs = Math.max(cluster.endTs, print.ts); cluster.endIngestTs = Math.max(cluster.endIngestTs, print.ingest_ts); cluster.endSeq = Math.max(cluster.endSeq, print.seq); @@ -705,6 +720,15 @@ const flushCluster = async ( } } } + if (cluster.underlyingId) { + features.underlying_id = cluster.underlyingId; + } + if (cluster.optionType) { + features.option_type = cluster.optionType; + } + if (cluster.isEtf !== null) { + features.is_etf = cluster.isEtf; + } const placementTotal = cluster.placements.aa + @@ -1012,8 +1036,8 @@ const run = async () => { ); await ensureStream(jsm, { - name: STREAM_OPTION_PRINTS, - subjects: [SUBJECT_OPTION_PRINTS], + name: STREAM_OPTION_SIGNAL_PRINTS, + subjects: [SUBJECT_OPTION_SIGNAL_PRINTS], retention: "limits", storage: "file", discard: "old", @@ -1162,7 +1186,7 @@ const run = async () => { if (env.COMPUTE_CONSUMER_RESET) { try { - await jsm.consumers.delete(STREAM_OPTION_PRINTS, durableName); + await jsm.consumers.delete(STREAM_OPTION_SIGNAL_PRINTS, durableName); logger.warn("reset jetstream consumer", { durable: durableName }); } catch (error) { const message = error instanceof Error ? error.message : String(error); @@ -1172,14 +1196,14 @@ const run = async () => { } } else { try { - const info = await jsm.consumers.info(STREAM_OPTION_PRINTS, durableName); + const info = await jsm.consumers.info(STREAM_OPTION_SIGNAL_PRINTS, durableName); if (info?.config?.deliver_policy && info.config.deliver_policy !== env.COMPUTE_DELIVER_POLICY) { logger.warn("resetting consumer due to deliver policy change", { durable: durableName, current: info.config.deliver_policy, desired: env.COMPUTE_DELIVER_POLICY }); - await jsm.consumers.delete(STREAM_OPTION_PRINTS, durableName); + await jsm.consumers.delete(STREAM_OPTION_SIGNAL_PRINTS, durableName); } } catch (error) { const message = error instanceof Error ? error.message : String(error); @@ -1292,7 +1316,7 @@ const run = async () => { const opts = buildDurableConsumer(durableName); applyDeliverPolicy(opts, env.COMPUTE_DELIVER_POLICY); try { - return await subscribeJson(js, SUBJECT_OPTION_PRINTS, opts); + return await subscribeJson(js, SUBJECT_OPTION_SIGNAL_PRINTS, opts); } catch (error) { const message = error instanceof Error ? error.message : String(error); const shouldReset = @@ -1307,7 +1331,7 @@ const run = async () => { logger.warn("resetting jetstream consumer", { durable: durableName, error: message }); try { - await jsm.consumers.delete(STREAM_OPTION_PRINTS, durableName); + await jsm.consumers.delete(STREAM_OPTION_SIGNAL_PRINTS, durableName); } catch (deleteError) { const deleteMessage = deleteError instanceof Error ? deleteError.message : String(deleteError); if (!deleteMessage.includes("not found")) { @@ -1320,7 +1344,7 @@ const run = async () => { const resetOpts = buildDurableConsumer(durableName); applyDeliverPolicy(resetOpts, env.COMPUTE_DELIVER_POLICY); - return await subscribeJson(js, SUBJECT_OPTION_PRINTS, resetOpts); + return await subscribeJson(js, SUBJECT_OPTION_SIGNAL_PRINTS, resetOpts); } })(); diff --git a/services/ingest-equities/src/adapters/synthetic.ts b/services/ingest-equities/src/adapters/synthetic.ts index 6aa9f16..01a2de3 100644 --- a/services/ingest-equities/src/adapters/synthetic.ts +++ b/services/ingest-equities/src/adapters/synthetic.ts @@ -1,8 +1,14 @@ -import { SP500_SYMBOLS, type EquityPrint, type EquityQuote } from "@islandflow/types"; +import { + SP500_SYMBOLS, + type EquityPrint, + type EquityQuote, + type SyntheticMarketMode +} from "@islandflow/types"; import type { EquityIngestAdapter, EquityIngestHandlers } from "./types"; type SyntheticEquitiesAdapterConfig = { emitIntervalMs: number; + mode: SyntheticMarketMode; }; const EXCHANGES = ["NYSE", "NASDAQ", "ARCA", "BATS", "IEX", "TEST"]; @@ -22,10 +28,7 @@ const DARK_SEQUENCE: DarkScenario[] = [ "sell", "sell" ]; -const SYNTHETIC_SYMBOLS = [ - "SPY", - ...SP500_SYMBOLS.filter((symbol) => symbol !== "SPY") -]; +const SYNTHETIC_SYMBOLS = ["SPY", ...(SP500_SYMBOLS as readonly string[])]; const hashSymbol = (value: string): number => { let hash = 0; @@ -124,6 +127,30 @@ const priceForPlacement = ( export const createSyntheticEquitiesAdapter = ( config: SyntheticEquitiesAdapterConfig ): EquityIngestAdapter => { + const profile = + config.mode === "firehose" + ? { + batchSize: 10, + darkEvery: true, + offExchangeMod: 2, + litSizeBase: 40, + litSizeRange: 1400 + } + : config.mode === "active" + ? { + batchSize: 5, + darkEvery: true, + offExchangeMod: 4, + litSizeBase: 20, + litSizeRange: 900 + } + : { + batchSize: 2, + darkEvery: false, + offExchangeMod: 8, + litSizeBase: 10, + litSizeRange: 300 + }; return { name: "synthetic", start: (handlers: EquityIngestHandlers) => { @@ -140,7 +167,7 @@ export const createSyntheticEquitiesAdapter = ( } const now = Date.now(); - const batchSize = 3; + const batchSize = profile.batchSize; const darkSymbol = SYNTHETIC_SYMBOLS[darkSymbolIndex % SYNTHETIC_SYMBOLS.length]; const darkHash = hashSymbol(darkSymbol); @@ -151,44 +178,46 @@ export const createSyntheticEquitiesAdapter = ( const scenario = DARK_SEQUENCE[darkStep % DARK_SEQUENCE.length]; const darkTs = now; - if (handlers.onQuote) { - quoteSeq += 1; - const quoteEvent = buildSyntheticQuote( - quoteSeq, - darkTs - 2, + if (profile.darkEvery) { + if (handlers.onQuote) { + quoteSeq += 1; + const quoteEvent = buildSyntheticQuote( + quoteSeq, + darkTs - 2, + darkSymbol, + darkQuote.bid, + darkQuote.ask + ); + void handlers.onQuote(quoteEvent); + } + + seq += 1; + let darkPlacement: PricePlacement = "MID"; + let darkSize = config.mode === "firehose" ? 4000 : 2600; + if (scenario === "buy") { + darkPlacement = darkStep % 2 === 0 ? "A" : "AA"; + darkSize = config.mode === "firehose" ? 1500 : 800; + } else if (scenario === "sell") { + darkPlacement = darkStep % 2 === 0 ? "B" : "BB"; + darkSize = config.mode === "firehose" ? 1500 : 800; + } + const darkPrice = priceForPlacement(darkMid, darkQuote, darkPlacement); + const darkPrint = buildSyntheticPrint( + seq, + darkTs, darkSymbol, - darkQuote.bid, - darkQuote.ask + darkPrice, + darkSize, + DARK_EXCHANGE, + true ); - void handlers.onQuote(quoteEvent); - } + void handlers.onTrade(darkPrint); - seq += 1; - let darkPlacement: PricePlacement = "MID"; - let darkSize = 2600; - if (scenario === "buy") { - darkPlacement = darkStep % 2 === 0 ? "A" : "AA"; - darkSize = 800; - } else if (scenario === "sell") { - darkPlacement = darkStep % 2 === 0 ? "B" : "BB"; - darkSize = 800; - } - const darkPrice = priceForPlacement(darkMid, darkQuote, darkPlacement); - const darkPrint = buildSyntheticPrint( - seq, - darkTs, - darkSymbol, - darkPrice, - darkSize, - DARK_EXCHANGE, - true - ); - void handlers.onTrade(darkPrint); - - darkStep += 1; - if (darkStep >= DARK_SEQUENCE.length) { - darkStep = 0; - darkSymbolIndex += 1; + darkStep += 1; + if (darkStep >= DARK_SEQUENCE.length) { + darkStep = 0; + darkSymbolIndex += 1; + } } for (let i = 0; i < batchSize; i += 1) { @@ -201,9 +230,9 @@ export const createSyntheticEquitiesAdapter = ( const placement: PricePlacement = seq % 11 === 0 ? "A" : seq % 13 === 0 ? "B" : "MID"; const price = priceForPlacement(mid, quote, placement); - const size = 10 + (seq % 600); + const size = profile.litSizeBase + (seq % profile.litSizeRange); const exchange = EXCHANGES[(seq + symbolHash) % EXCHANGES.length]; - const offExchangeFlag = (seq + i) % 6 === 0; + const offExchangeFlag = (seq + i) % profile.offExchangeMod === 0; const eventTs = now + i * 4; if (handlers.onQuote) { diff --git a/services/ingest-equities/src/index.ts b/services/ingest-equities/src/index.ts index 6b87b3f..588d855 100644 --- a/services/ingest-equities/src/index.ts +++ b/services/ingest-equities/src/index.ts @@ -19,6 +19,7 @@ import { import { EquityPrintSchema, EquityQuoteSchema, + resolveSyntheticMarketModes, type EquityPrint, type EquityQuote } from "@islandflow/types"; @@ -36,6 +37,8 @@ const envSchema = z.object({ CLICKHOUSE_DATABASE: z.string().default("default"), EQUITIES_INGEST_ADAPTER: z.string().min(1).default("synthetic"), EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000), + SYNTHETIC_MARKET_MODE: z.string().default("realistic"), + SYNTHETIC_EQUITIES_MODE: z.string().default(""), // Alpaca (equities) ALPACA_KEY_ID: z.string().default(""), @@ -63,6 +66,10 @@ const envSchema = z.object({ }); const env = readEnv(envSchema); +const syntheticModes = resolveSyntheticMarketModes({ + syntheticMarketMode: env.SYNTHETIC_MARKET_MODE, + syntheticEquitiesMode: env.SYNTHETIC_EQUITIES_MODE +}); const state = { shuttingDown: false, @@ -153,7 +160,10 @@ const parseSymbolList = (value: string): string[] => { const selectAdapter = (name: string): EquityIngestAdapter => { if (name === "synthetic") { - return createSyntheticEquitiesAdapter({ emitIntervalMs: env.EMIT_INTERVAL_MS }); + return createSyntheticEquitiesAdapter({ + emitIntervalMs: env.EMIT_INTERVAL_MS, + mode: syntheticModes.equities + }); } if (name === "alpaca") { diff --git a/services/ingest-options/src/adapters/synthetic.ts b/services/ingest-options/src/adapters/synthetic.ts index a5cdf41..fbdf3d6 100644 --- a/services/ingest-options/src/adapters/synthetic.ts +++ b/services/ingest-options/src/adapters/synthetic.ts @@ -1,8 +1,14 @@ -import { SP500_SYMBOLS, type OptionNBBO, type OptionPrint } from "@islandflow/types"; +import { + SP500_SYMBOLS, + type OptionNBBO, + type OptionPrint, + type SyntheticMarketMode +} from "@islandflow/types"; import type { OptionIngestAdapter, OptionIngestHandlers } from "./types"; type SyntheticOptionsAdapterConfig = { emitIntervalMs: number; + mode: SyntheticMarketMode; }; type Burst = { @@ -17,17 +23,18 @@ type Burst = { seed: number; }; -const SYNTHETIC_SYMBOLS = [ - "SPY", - ...SP500_SYMBOLS.filter((symbol) => symbol !== "SPY") -]; +const SYNTHETIC_SYMBOLS = ["SPY", ...(SP500_SYMBOLS as readonly string[])]; const MS_PER_DAY = 24 * 60 * 60 * 1000; const EXPIRY_OFFSETS = [0, 1, 7, 14, 28, 45, 60, 90]; const EXCHANGES = ["CBOE", "PHLX", "ISE", "ARCA", "BOX", "MIAX"]; const CONDITIONS = ["SWEEP", "ISO", "FILL", "TEST"]; -const BURST_RUN_RANGE: [number, number] = [2, 4]; +type SyntheticOptionsProfile = { + burstRunRange: [number, number]; + scenarios: Scenario[]; + pricePlacements: Record[]>; +}; -type PricePlacement = "AA" | "A" | "B" | "BB"; +type PricePlacement = "AA" | "A" | "MID" | "B" | "BB"; type WeightedValue = { value: T; @@ -45,7 +52,70 @@ type Scenario = { conditions?: string[]; }; -const SCENARIOS: Scenario[] = [ +const REALISTIC_SCENARIOS: Scenario[] = [ + { + id: "ask_lift", + weight: 18, + right: "either", + countRange: [1, 2], + sizeRange: [30, 180], + premiumRange: [9_000, 35_000], + priceTrend: "flat", + conditions: ["FILL"] + }, + { + id: "mid_block", + weight: 14, + right: "either", + countRange: [1, 2], + sizeRange: [120, 480], + premiumRange: [12_000, 45_000], + priceTrend: "flat", + conditions: ["FILL"] + }, + { + id: "bullish_sweep", + weight: 8, + right: "C", + countRange: [2, 3], + sizeRange: [180, 520], + premiumRange: [25_000, 90_000], + priceTrend: "up", + conditions: ["SWEEP"] + }, + { + id: "bearish_sweep", + weight: 8, + right: "P", + countRange: [2, 3], + sizeRange: [180, 520], + premiumRange: [25_000, 90_000], + priceTrend: "up", + conditions: ["SWEEP"] + }, + { + id: "contract_spike", + weight: 6, + right: "either", + countRange: [2, 3], + sizeRange: [500, 900], + premiumRange: [18_000, 70_000], + priceTrend: "flat", + conditions: ["ISO"] + }, + { + id: "noise", + weight: 46, + right: "either", + countRange: [1, 2], + sizeRange: [5, 60], + premiumRange: [500, 6_000], + priceTrend: "flat", + conditions: ["FILL"] + } +]; + +const ACTIVE_SCENARIOS: Scenario[] = [ { id: "bullish_sweep", weight: 35, @@ -88,7 +158,50 @@ const SCENARIOS: Scenario[] = [ } ]; -const PRICE_PLACEMENTS: Record[]> = { +const REALISTIC_PRICE_PLACEMENTS: Record[]> = { + ask_lift: [ + { value: "A", weight: 45 }, + { value: "AA", weight: 20 }, + { value: "MID", weight: 25 }, + { value: "B", weight: 8 }, + { value: "BB", weight: 2 } + ], + mid_block: [ + { value: "MID", weight: 60 }, + { value: "A", weight: 20 }, + { value: "B", weight: 20 } + ], + bullish_sweep: [ + { value: "AA", weight: 20 }, + { value: "A", weight: 50 }, + { value: "MID", weight: 15 }, + { value: "B", weight: 10 }, + { value: "BB", weight: 5 } + ], + bearish_sweep: [ + { value: "AA", weight: 10 }, + { value: "A", weight: 20 }, + { value: "MID", weight: 15 }, + { value: "B", weight: 35 }, + { value: "BB", weight: 20 } + ], + contract_spike: [ + { value: "A", weight: 25 }, + { value: "MID", weight: 40 }, + { value: "B", weight: 25 }, + { value: "AA", weight: 5 }, + { value: "BB", weight: 5 } + ], + noise: [ + { value: "MID", weight: 40 }, + { value: "A", weight: 20 }, + { value: "B", weight: 20 }, + { value: "AA", weight: 10 }, + { value: "BB", weight: 10 } + ] +}; + +const ACTIVE_PRICE_PLACEMENTS: Record[]> = { bullish_sweep: [ { value: "AA", weight: 25 }, { value: "A", weight: 40 }, @@ -115,7 +228,52 @@ const PRICE_PLACEMENTS: Record[]> = { ] }; -const PLACEMENT_PATTERN: PricePlacement[] = ["A", "AA", "B", "BB"]; +const FIREHOSE_PRICE_PLACEMENTS: Record[]> = { + ...ACTIVE_PRICE_PLACEMENTS, + noise: [ + { value: "A", weight: 20 }, + { value: "AA", weight: 20 }, + { value: "MID", weight: 20 }, + { value: "B", weight: 20 }, + { value: "BB", weight: 20 } + ] +}; + +const PLACEMENT_PATTERN: PricePlacement[] = ["A", "AA", "MID", "B", "BB"]; + +const SYNTHETIC_PROFILES: Record = { + realistic: { + burstRunRange: [1, 2], + scenarios: REALISTIC_SCENARIOS, + pricePlacements: REALISTIC_PRICE_PLACEMENTS + }, + active: { + burstRunRange: [2, 4], + scenarios: ACTIVE_SCENARIOS, + pricePlacements: ACTIVE_PRICE_PLACEMENTS + }, + firehose: { + burstRunRange: [4, 7], + scenarios: ACTIVE_SCENARIOS.map((scenario): Scenario => + scenario.id === "noise" + ? { + ...scenario, + weight: 20, + countRange: [5, 8], + sizeRange: [20, 300], + premiumRange: [800, 12_000] + } + : { + ...scenario, + weight: scenario.weight + 10, + countRange: [scenario.countRange[0] + 2, scenario.countRange[1] + 3], + sizeRange: [scenario.sizeRange[0], scenario.sizeRange[1] * 2], + premiumRange: [scenario.premiumRange[0], scenario.premiumRange[1] * 1.5] + } + ), + pricePlacements: FIREHOSE_PRICE_PLACEMENTS + } +}; const pick = (items: T[], seed: number): T => { return items[Math.abs(seed) % items.length]; @@ -153,8 +311,12 @@ const pickWeightedValue = (items: WeightedValue[], seed: number): T => { return pickWeighted(items, seed).value; }; -const pickPlacement = (burst: Burst, index: number): PricePlacement => { - const placementOptions = PRICE_PLACEMENTS[burst.scenarioId] ?? PRICE_PLACEMENTS.noise; +const pickPlacement = ( + burst: Burst, + index: number, + profile: SyntheticOptionsProfile +): PricePlacement => { + const placementOptions = profile.pricePlacements[burst.scenarioId] ?? profile.pricePlacements.noise; const offset = Math.abs(burst.seed) % PLACEMENT_PATTERN.length; if (index < PLACEMENT_PATTERN.length) { return PLACEMENT_PATTERN[(offset + index) % PLACEMENT_PATTERN.length]; @@ -180,11 +342,11 @@ const formatExpiry = (now: number, offsetDays: number): string => { return expiryDate.toISOString().slice(0, 10); }; -const buildBurst = (burstIndex: number, now: number): Burst => { +const buildBurst = (burstIndex: number, now: number, profile: SyntheticOptionsProfile): Burst => { const symbol = SYNTHETIC_SYMBOLS[burstIndex % SYNTHETIC_SYMBOLS.length]; const symbolHash = hashSymbol(symbol); const seed = symbolHash + burstIndex * 7; - const scenario = pickWeighted(SCENARIOS, seed); + const scenario = pickWeighted(profile.scenarios, seed); const baseUnderlying = 30 + (symbolHash % 470); const expiryOffset = pick(EXPIRY_OFFSETS, symbolHash + burstIndex); const expiry = formatExpiry(now, expiryOffset); @@ -231,6 +393,7 @@ const buildBurst = (burstIndex: number, now: number): Burst => { export const createSyntheticOptionsAdapter = ( config: SyntheticOptionsAdapterConfig ): OptionIngestAdapter => { + const profile = SYNTHETIC_PROFILES[config.mode]; return { name: "synthetic", start: (handlers: OptionIngestHandlers) => { @@ -250,8 +413,12 @@ export const createSyntheticOptionsAdapter = ( const now = Date.now(); if (!currentBurst || remainingRuns <= 0) { burstIndex += 1; - currentBurst = buildBurst(burstIndex, now); - remainingRuns = pickInt(BURST_RUN_RANGE[0], BURST_RUN_RANGE[1], burstIndex * 23); + currentBurst = buildBurst(burstIndex, now, profile); + remainingRuns = pickInt( + profile.burstRunRange[0], + profile.burstRunRange[1], + burstIndex * 23 + ); } const burst = currentBurst; @@ -267,13 +434,15 @@ export const createSyntheticOptionsAdapter = ( const bid = Math.max(0.01, Number((mid - spread / 2).toFixed(2))); const ask = Math.max(bid + 0.01, Number((mid + spread / 2).toFixed(2))); const tick = Math.max(0.01, Number((spread * 0.25).toFixed(2))); - const placement = pickPlacement(burst, i); + const placement = pickPlacement(burst, i, profile); let tradePrice = mid; if (placement === "AA") { tradePrice = ask + tick; } else if (placement === "A") { tradePrice = ask; + } else if (placement === "MID") { + tradePrice = mid; } else if (placement === "BB") { tradePrice = Math.max(0.01, bid - tick); } else { diff --git a/services/ingest-options/src/index.ts b/services/ingest-options/src/index.ts index 15b49dd..4c8010c 100644 --- a/services/ingest-options/src/index.ts +++ b/services/ingest-options/src/index.ts @@ -3,8 +3,10 @@ import { createLogger } from "@islandflow/observability"; import { SUBJECT_OPTION_NBBO, SUBJECT_OPTION_PRINTS, + SUBJECT_OPTION_SIGNAL_PRINTS, STREAM_OPTION_NBBO, STREAM_OPTION_PRINTS, + STREAM_OPTION_SIGNAL_PRINTS, connectJetStreamWithRetry, ensureStream, publishJson @@ -16,7 +18,16 @@ import { insertOptionNBBO, insertOptionPrint } from "@islandflow/storage"; -import { OptionNBBOSchema, OptionPrintSchema, type OptionNBBO, type OptionPrint } from "@islandflow/types"; +import { + OptionNBBOSchema, + OptionPrintSchema, + evaluateOptionSignal, + deriveOptionPrintMetadata, + resolveSyntheticMarketModes, + type OptionNBBO, + type OptionPrint, + type OptionsSignalConfig +} from "@islandflow/types"; import { createAlpacaOptionsAdapter } from "./adapters/alpaca"; import { createDatabentoOptionsAdapter } from "./adapters/databento"; import { createIbkrOptionsAdapter } from "./adapters/ibkr"; @@ -68,6 +79,17 @@ const envSchema = z.object({ IBKR_CURRENCY: z.string().min(1).default("USD"), IBKR_PYTHON_BIN: z.string().min(1).default("python3"), EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000), + SYNTHETIC_MARKET_MODE: z.string().default("realistic"), + SYNTHETIC_OPTIONS_MODE: z.string().default(""), + OPTIONS_SIGNAL_MODE: z.enum(["smart-money", "balanced", "all"]).default("smart-money"), + OPTIONS_SIGNAL_MIN_NOTIONAL: z.coerce.number().nonnegative().default(10_000), + OPTIONS_SIGNAL_ETF_MIN_NOTIONAL: z.coerce.number().nonnegative().default(50_000), + OPTIONS_SIGNAL_BID_SIDE_MIN_NOTIONAL: z.coerce.number().nonnegative().default(25_000), + OPTIONS_SIGNAL_MID_MIN_NOTIONAL: z.coerce.number().nonnegative().default(20_000), + OPTIONS_SIGNAL_NBBO_MAX_AGE_MS: z.coerce.number().int().positive().default(1500), + OPTIONS_SIGNAL_ETF_UNDERLYINGS: z + .string() + .default("SPY,QQQ,IWM,DIA,TLT,GLD,SLV,XLF,XLE,XLV,XLI,XLP,XLU,XLY,SMH,ARKK"), TESTING_MODE: z .preprocess((value) => { if (typeof value === "string") { @@ -86,11 +108,34 @@ const envSchema = z.object({ }); const env = readEnv(envSchema); +const syntheticModes = resolveSyntheticMarketModes({ + syntheticMarketMode: env.SYNTHETIC_MARKET_MODE, + syntheticOptionsMode: env.SYNTHETIC_OPTIONS_MODE +}); +const optionsSignalConfig: OptionsSignalConfig = { + mode: env.OPTIONS_SIGNAL_MODE, + minNotional: env.OPTIONS_SIGNAL_MIN_NOTIONAL, + etfMinNotional: env.OPTIONS_SIGNAL_ETF_MIN_NOTIONAL, + bidSideMinNotional: env.OPTIONS_SIGNAL_BID_SIDE_MIN_NOTIONAL, + midMinNotional: env.OPTIONS_SIGNAL_MID_MIN_NOTIONAL, + missingNbboMinNotional: 50_000, + largePrintMinSize: 500, + largePrintMinNotional: env.OPTIONS_SIGNAL_MIN_NOTIONAL, + sweepMinNotional: env.OPTIONS_SIGNAL_BID_SIDE_MIN_NOTIONAL, + autoKeepMinNotional: 100_000, + nbboMaxAgeMs: env.OPTIONS_SIGNAL_NBBO_MAX_AGE_MS, + etfUnderlyings: new Set( + env.OPTIONS_SIGNAL_ETF_UNDERLYINGS.split(",") + .map((value) => value.trim().toUpperCase()) + .filter(Boolean) + ) +}; const state = { shuttingDown: false, shutdownPromise: null as Promise | null }; +const latestNbboByContract = new Map(); const getErrorMessage = (error: unknown): string => { return error instanceof Error ? error.message : String(error); @@ -169,7 +214,10 @@ const retry = async ( const selectAdapter = (name: string): OptionIngestAdapter => { if (name === "synthetic") { - return createSyntheticOptionsAdapter({ emitIntervalMs: env.EMIT_INTERVAL_MS }); + return createSyntheticOptionsAdapter({ + emitIntervalMs: env.EMIT_INTERVAL_MS, + mode: syntheticModes.options + }); } if (name === "alpaca") { @@ -277,6 +325,19 @@ const run = async () => { num_replicas: 1 }); + await ensureStream(jsm, { + name: STREAM_OPTION_SIGNAL_PRINTS, + subjects: [SUBJECT_OPTION_SIGNAL_PRINTS], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + const clickhouse = createClickHouseClient({ url: env.CLICKHOUSE_URL, database: env.CLICKHOUSE_DATABASE @@ -303,15 +364,41 @@ const run = async () => { return; } - const print = OptionPrintSchema.parse(candidate); + const rawPrint = OptionPrintSchema.parse(candidate); + const derived = deriveOptionPrintMetadata( + rawPrint, + latestNbboByContract.get(rawPrint.option_contract_id), + optionsSignalConfig + ); + const signalDecision = evaluateOptionSignal( + { + ...rawPrint, + ...derived, + signal_profile: optionsSignalConfig.mode + }, + optionsSignalConfig + ); + const print = OptionPrintSchema.parse({ + ...rawPrint, + ...derived, + signal_pass: signalDecision.signalPass, + signal_reasons: signalDecision.signalReasons, + signal_profile: signalDecision.signalProfile + }); try { await insertOptionPrint(clickhouse, print); await publishJson(js, SUBJECT_OPTION_PRINTS, print); + if (print.signal_pass) { + await publishJson(js, SUBJECT_OPTION_SIGNAL_PRINTS, print); + } logger.info("published option print", { trace_id: print.trace_id, seq: print.seq, - option_contract_id: print.option_contract_id + option_contract_id: print.option_contract_id, + signal_pass: print.signal_pass, + nbbo_side: print.nbbo_side, + notional: print.notional }); } catch (error) { if (isExpectedShutdownError(error)) { @@ -335,6 +422,14 @@ const run = async () => { } const nbbo = OptionNBBOSchema.parse(candidate); + const existing = latestNbboByContract.get(nbbo.option_contract_id); + if ( + !existing || + nbbo.ts > existing.ts || + (nbbo.ts === existing.ts && nbbo.seq >= existing.seq) + ) { + latestNbboByContract.set(nbbo.option_contract_id, nbbo); + } try { await insertOptionNBBO(clickhouse, nbbo); diff --git a/services/replay/src/index.ts b/services/replay/src/index.ts index 9de942b..1ba8342 100644 --- a/services/replay/src/index.ts +++ b/services/replay/src/index.ts @@ -5,10 +5,12 @@ import { SUBJECT_EQUITY_QUOTES, SUBJECT_OPTION_NBBO, SUBJECT_OPTION_PRINTS, + SUBJECT_OPTION_SIGNAL_PRINTS, STREAM_EQUITY_PRINTS, STREAM_EQUITY_QUOTES, STREAM_OPTION_NBBO, STREAM_OPTION_PRINTS, + STREAM_OPTION_SIGNAL_PRINTS, connectJetStreamWithRetry, ensureStream, publishJson @@ -304,6 +306,9 @@ const run = async () => { const def = STREAM_DEFS[kind]; await ensureStream(jsm, buildStreamConfig(def.streamName, def.subject)); } + if (streamKinds.includes("options")) { + await ensureStream(jsm, buildStreamConfig(STREAM_OPTION_SIGNAL_PRINTS, SUBJECT_OPTION_SIGNAL_PRINTS)); + } const clickhouse = createClickHouseClient({ url: env.CLICKHOUSE_URL, @@ -411,6 +416,9 @@ const run = async () => { try { await publishJson(js, stream.subject, event); + if (stream.kind === "options" && (event as OptionPrint).signal_pass) { + await publishJson(js, SUBJECT_OPTION_SIGNAL_PRINTS, event as OptionPrint); + } } catch (error) { logger.error("failed to publish replay event", { error: error instanceof Error ? error.message : String(error),