diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index c755228..5ca3a9f 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -1,3 +1,4 @@ +{"_type":"issue","id":"islandflow-pre","title":"Fix contract-focused options tape hydration","description":"Implement contract-focused options tape hydration so focused contract views preserve the clicked seed row, stop reapplying broad flow filters in the Options pane, and use raw contract-scoped ClickHouse queries consistently across live snapshots, history, and replay. Includes frontend replay source-grouping changes and regression tests for focus seed durability, focused filtering, and contract-scoped API behavior.\n","status":"closed","priority":1,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-08T03:27:31Z","created_by":"dirtydishes","updated_at":"2026-05-08T03:37:18Z","started_at":"2026-05-08T03:27:35Z","closed_at":"2026-05-08T03:37:18Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-9xs","title":"Fix terminal hydration and virtual row measurement crash","description":"Fix client crash caused by options-support hydration on non-JSON/404 responses and satisfy tanstack virtual measured-row data-index requirement across virtualized tables.\n","status":"closed","priority":1,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-07T06:14:33Z","created_by":"dirtydishes","updated_at":"2026-05-07T06:17:09Z","started_at":"2026-05-07T06:14:43Z","closed_at":"2026-05-07T06:17:09Z","close_reason":"Completed: added data-index attributes on measured virtual rows, hardened options-support hydration error handling/content-type validation, and guarded trace-id hydration loops against malformed payload entries.","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-35g","title":"Fix Docker deployment workspace lockfile drift","description":"Refresh deployment/docker workspace lockfile for Docker builds, add a drift guard for Docker-built workspaces, and document the separate deployment snapshot so frozen Bun installs cannot fail when repo dependencies change.\n","status":"closed","priority":1,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-07T06:02:06Z","created_by":"dirtydishes","updated_at":"2026-05-07T06:07:50Z","started_at":"2026-05-07T06:02:15Z","closed_at":"2026-05-07T06:07:50Z","close_reason":"Completed: synced deployment Docker workspace snapshot from repo root, refreshed deployment bun.lock, added sync/check scripts, and documented maintenance workflow. Local docker compose build validation is blocked here because Docker daemon is unavailable.","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-2ij","title":"Harden tape virtualization, scoped focus, and live feed health","description":"Implement the coordinated tape stability plan across web and API.\n\nScope:\n- replace fixed-height tape virtualization with measured virtualization and virtual-end history loading\n- replace scrollHeight anchoring with key-based anchor restore\n- compose canonical tape lists across seed/live/history sources\n- preserve clicked contract/ticker context during scoped focus transitions\n- separate backend hot-channel health from scoped quiet empty states\n- shrink browser hot windows and modestly reduce server cache limits\n- add regression tests and development instrumentation\n\nAcceptance:\n- no giant blank spacer gaps during tape scrolling\n- scroll remains stable while live data and history mutate the list\n- clicked deep-history option/equity rows remain visible immediately after focus\n- narrow scopes do not surface Feed behind unless backend channel health is stale\n","status":"closed","priority":1,"issue_type":"feature","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-07T05:35:18Z","created_by":"dirtydishes","updated_at":"2026-05-07T05:52:14Z","started_at":"2026-05-07T05:35:21Z","closed_at":"2026-05-07T05:52:14Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0} diff --git a/apps/web/app/terminal.test.ts b/apps/web/app/terminal.test.ts index e4d9a52..2ada99a 100644 --- a/apps/web/app/terminal.test.ts +++ b/apps/web/app/terminal.test.ts @@ -4,19 +4,23 @@ import { NAV_ITEMS, appendHistoryTail, buildDefaultFlowFilters, + buildOptionTapeQueryParams, classifierToneForFamily, composeTapeItems, deriveAlertDirection, countActiveFlowFilterGroups, + filterOptionTapeItems, findAnchorRestoreIndex, formatCompactUsd, formatOptionContractLabel, flushPausableTapeData, + getEffectiveOptionPrintFilters, getAlertWindowAnchorTs, getHotChannelFeedStatus, getScopedLiveAutoHydrationChannels, getLiveHistoryRetentionCap, getOptionTableSnapshot, + getOptionScope, getLiveFeedStatus, getLiveManifest, getRouteFeatures, @@ -30,6 +34,7 @@ import { shouldIncludeEquitiesForDarkUnderlyingFallback, shouldShowEquitiesSilentFeedWarning, selectPrimaryClassifierHit, + shouldClearOptionFocusSeed, smartMoneyProfileLabel, smartMoneyToneForProfile, statusLabel, @@ -42,6 +47,25 @@ const makeItem = (traceId: string, seq: number, ts: number) => ({ ts }); +const makeOptionPrint = (overrides: Record = {}) => + ({ + trace_id: "opt-1", + seq: 1, + ts: 1_000, + source_ts: 1_000, + ingest_ts: 1_001, + option_contract_id: "AAPL-2025-01-17-200-C", + underlying_id: "AAPL", + option_type: "call", + nbbo_side: "A", + notional: 250_000, + signal_pass: true, + price: 1, + size: 10, + exchange: "X", + ...overrides + }) as any; + const makeAlert = (overrides: Record = {}) => ({ trace_id: "alert-1", @@ -125,6 +149,31 @@ describe("live manifest", () => { expect(equitiesSubscription?.underlying_ids).toEqual(["AAPL"]); }); + it("drops option-print filters for contract-focused options subscriptions but keeps flow filters", () => { + const filters = { + ...buildDefaultFlowFilters(), + minNotional: 500_000, + optionTypes: ["put"] as const + }; + const manifest = getLiveManifest( + "/tape", + "AAPL", + 60000, + filters, + { + underlying_ids: ["AAPL"], + option_contract_id: "AAPL-2025-01-17-200-C" + }, + { underlying_ids: ["AAPL"] }, + undefined + ); + const optionsSubscription = manifest.find((subscription) => subscription.channel === "options"); + const flowSubscription = manifest.find((subscription) => subscription.channel === "flow"); + + expect(optionsSubscription?.filters).toBeUndefined(); + expect(flowSubscription?.filters).toBe(filters); + }); + it("scopes /signals subscriptions to signals channels only", () => { const channels = getLiveManifest("/signals", "SPY", 60000, buildDefaultFlowFilters()).map( (subscription) => subscription.channel @@ -154,6 +203,130 @@ describe("live manifest", () => { }); }); +describe("contract-focused option helpers", () => { + it("uses the focused contract underlying for option scope even when ticker input differs", () => { + expect( + getOptionScope(["MSFT"], "AAPL", { + kind: "option-contract", + contractId: "AAPL-2025-01-17-200-C", + underlyingId: "AAPL" + }) + ).toEqual({ + underlying_ids: ["AAPL"], + option_contract_id: "AAPL-2025-01-17-200-C" + }); + }); + + it("ignores broad flow filters for focused contract options", () => { + const filters = { + ...buildDefaultFlowFilters(), + minNotional: 500_000 + }; + const items = [ + makeOptionPrint({ + trace_id: "focused-low", + option_contract_id: "AAPL-2025-01-17-200-C", + notional: 100_000, + signal_pass: false + }), + makeOptionPrint({ + trace_id: "focused-high", + seq: 2, + ts: 2_000, + option_contract_id: "AAPL-2025-01-17-200-C", + notional: 750_000 + }), + makeOptionPrint({ + trace_id: "other-contract", + seq: 3, + ts: 3_000, + option_contract_id: "MSFT-2025-01-17-300-C", + underlying_id: "MSFT", + notional: 900_000 + }) + ]; + + expect( + filterOptionTapeItems( + items, + getEffectiveOptionPrintFilters(filters, true), + { + kind: "option-contract", + contractId: "AAPL-2025-01-17-200-C", + underlyingId: "AAPL" + }, + new Set(["MSFT"]), + "AAPL" + ).map((item) => item.trace_id) + ).toEqual(["focused-low", "focused-high"]); + }); + + it("includes option_contract_id and drops broad filters in focused replay query params", () => { + const filters = { + ...buildDefaultFlowFilters(), + minNotional: 500_000, + optionTypes: ["put"] as const + }; + + expect( + buildOptionTapeQueryParams(getEffectiveOptionPrintFilters(filters, true), { + underlying_ids: ["AAPL"], + option_contract_id: "AAPL-2025-01-17-200-C" + }) + ).toEqual({ + underlying_ids: "AAPL", + option_contract_id: "AAPL-2025-01-17-200-C" + }); + }); + + it("keeps the focus seed until the matching scoped subscription has loaded it", () => { + const seedItem = makeOptionPrint({ + trace_id: "focused-seed", + option_contract_id: "AAPL-2025-01-17-200-C" + }); + const seed = { + scopeKey: "option-contract:AAPL-2025-01-17-200-C", + subscriptionKey: getLiveSubscriptionKey({ + channel: "options", + underlying_ids: ["AAPL"], + option_contract_id: "AAPL-2025-01-17-200-C" + }), + items: [seedItem] + }; + + expect( + shouldClearOptionFocusSeed( + seed, + "option-contract:AAPL-2025-01-17-200-C", + getLiveSubscriptionKey({ + channel: "options", + filters: { + ...buildDefaultFlowFilters(), + minNotional: 500_000 + }, + underlying_ids: ["AAPL"] + }), + [makeOptionPrint({ trace_id: "broad-old" })], + [] + ) + ).toBe(false); + + expect( + shouldClearOptionFocusSeed( + seed, + "option-contract:AAPL-2025-01-17-200-C", + getLiveSubscriptionKey({ + channel: "options", + underlying_ids: ["AAPL"], + option_contract_id: "AAPL-2025-01-17-200-C" + }), + [seedItem], + [] + ) + ).toBe(true); + }); +}); + describe("route feature map", () => { it("maps /tape to tape panes and dependencies", () => { const features = getRouteFeatures("/tape"); diff --git a/apps/web/app/terminal.tsx b/apps/web/app/terminal.tsx index 01ee884..854ea85 100644 --- a/apps/web/app/terminal.tsx +++ b/apps/web/app/terminal.tsx @@ -350,9 +350,17 @@ type SelectedInstrument = type TapeFocusSeed = { scopeKey: string; + subscriptionKey?: string; items: T[]; }; +type OptionScope = Pick< + Extract, + "underlying_ids" | "option_contract_id" +>; + +type EquityScope = Pick, "underlying_ids">; + const formatIntervalLabel = (intervalMs: number): string => { const match = CANDLE_INTERVALS.find((interval) => interval.ms === intervalMs); if (match) { @@ -1956,6 +1964,13 @@ const useTape = ( const replaySourceKey = config.replaySourceKey ?? null; const onReplaySourceKey = config.onReplaySourceKey; const queryParams = config.queryParams; + const queryKey = useMemo( + () => + JSON.stringify( + Object.entries(queryParams ?? {}).sort(([left], [right]) => left.localeCompare(right)) + ), + [queryParams] + ); const hotWindowLimit = config.hotWindowLimit ?? LIVE_HOT_WINDOW; const [status, setStatus] = useState("connecting"); const [items, setItems] = useState([]); @@ -2046,7 +2061,7 @@ const useTape = ( pendingRef.current = []; pendingCountRef.current = 0; cancelFlush(); - }, [mode, replaySourceKey, cancelFlush]); + }, [mode, replaySourceKey, queryKey, cancelFlush]); useEffect(() => { if (mode !== "replay" || !latestPath) { @@ -2091,7 +2106,7 @@ const useTape = ( return () => { active = false; }; - }, [mode, latestPath, getItemTs, replaySourceKey, queryParams]); + }, [mode, latestPath, getItemTs, replaySourceKey, queryKey, queryParams]); useEffect(() => { if (mode !== "live" || config.liveEnabled === false) { @@ -2242,9 +2257,14 @@ const useTape = ( } } - if (onReplaySourceKey && sourcePrefix && replaySourceNotifiedRef.current !== sourcePrefix) { - replaySourceNotifiedRef.current = sourcePrefix; - onReplaySourceKey(sourcePrefix); + if (onReplaySourceKey) { + if (sourcePrefix && replaySourceNotifiedRef.current !== sourcePrefix) { + replaySourceNotifiedRef.current = sourcePrefix; + onReplaySourceKey(sourcePrefix); + } else if (!sourcePrefix && replaySourceNotifiedRef.current !== null) { + replaySourceNotifiedRef.current = null; + onReplaySourceKey(null); + } } const filtered = sourcePrefix @@ -2330,6 +2350,7 @@ const useTape = ( getReplayKey, replaySourceKey, onReplaySourceKey, + queryKey, queryParams ]); @@ -2784,6 +2805,99 @@ const appendOptionFlowFilters = (params: URLSearchParams, filters: OptionFlowFil } }; +const appendOptionScopeParams = ( + params: URLSearchParams, + optionScope: OptionScope | undefined +): void => { + if (optionScope?.underlying_ids?.length) { + params.set("underlying_ids", optionScope.underlying_ids.join(",")); + } + if (optionScope?.option_contract_id) { + params.set("option_contract_id", optionScope.option_contract_id); + } +}; + +export const getEffectiveOptionPrintFilters = ( + flowFilters: OptionFlowFilters, + isOptionContractFocused: boolean +): OptionFlowFilters | undefined => { + return isOptionContractFocused ? undefined : flowFilters; +}; + +export const getOptionScope = ( + activeTickers: string[], + instrumentUnderlying: string | null, + selectedInstrument: SelectedInstrument +): OptionScope => ({ + underlying_ids: + selectedInstrument?.kind === "option-contract" + ? instrumentUnderlying + ? [instrumentUnderlying] + : undefined + : activeTickers.length > 0 + ? activeTickers + : instrumentUnderlying + ? [instrumentUnderlying] + : undefined, + option_contract_id: + selectedInstrument?.kind === "option-contract" ? selectedInstrument.contractId : undefined +}); + +export const buildOptionTapeQueryParams = ( + filters: OptionFlowFilters | undefined, + optionScope: OptionScope | undefined +): Record => { + const params = new URLSearchParams(); + appendOptionFlowFilters(params, filters); + appendOptionScopeParams(params, optionScope); + return Object.fromEntries(params.entries()); +}; + +export const filterOptionTapeItems = ( + items: OptionPrint[], + filters: OptionFlowFilters | undefined, + selectedInstrument: SelectedInstrument, + tickerSet: Set, + instrumentUnderlying: string | null +): OptionPrint[] => { + return items.filter((print) => { + const contractId = normalizeContractId(print.option_contract_id); + if (selectedInstrument?.kind === "option-contract") { + return contractId === selectedInstrument.contractId; + } + if (!matchesOptionPrintFilters(print, filters)) { + return false; + } + const underlying = extractUnderlying(contractId); + if (tickerSet.size === 0) { + return !instrumentUnderlying || underlying === instrumentUnderlying; + } + return Boolean(underlying) && tickerSet.has(underlying.toUpperCase()); + }); +}; + +export const shouldClearOptionFocusSeed = ( + seed: TapeFocusSeed | null, + optionFocusScopeKey: string | null, + currentOptionSubscriptionKey: string | null, + liveItems: OptionPrint[], + historyItems: OptionPrint[] +): boolean => { + if (!seed) { + return false; + } + if (seed.scopeKey !== optionFocusScopeKey) { + return true; + } + if (seed.subscriptionKey && seed.subscriptionKey !== currentOptionSubscriptionKey) { + return false; + } + const liveKeys = new Set( + composeTapeItems([], liveItems, historyItems).map((item) => getTapeItemKey(item)) + ); + return seed.items.every((item) => liveKeys.has(getTapeItemKey(item))); +}; + const appendLiveScopeParams = (params: URLSearchParams, subscription: LiveSubscription): void => { if ((subscription.channel === "options" || subscription.channel === "equities") && subscription.underlying_ids?.length) { params.set("underlying_ids", subscription.underlying_ids.join(",")); @@ -2810,8 +2924,9 @@ export const getLiveManifest = ( chartTicker: string, chartIntervalMs: number, flowFilters: OptionFlowFilters, - optionScope?: Pick, "underlying_ids" | "option_contract_id">, - equityScope?: Pick, "underlying_ids"> + optionScope?: OptionScope, + equityScope?: EquityScope, + optionPrintFilters?: OptionFlowFilters ): LiveSubscription[] => { const features = getRouteFeatures(pathname); const subscriptions: LiveSubscription[] = []; @@ -2819,7 +2934,10 @@ export const getLiveManifest = ( if (features.options) { subscriptions.push({ channel: "options", - filters: flowFilters, + filters: + optionScope?.option_contract_id && optionPrintFilters === undefined + ? undefined + : optionPrintFilters ?? flowFilters, ...optionScope, snapshot_limit: LIVE_HOT_WINDOW_OPTIONS }); @@ -2868,11 +2986,7 @@ export const getLiveManifest = ( const useLiveSession = ( enabled: boolean, pathname: string, - chartTicker: string, - chartIntervalMs: number, - flowFilters: OptionFlowFilters, - optionScope?: Pick, "underlying_ids" | "option_contract_id">, - equityScope?: Pick, "underlying_ids"> + manifest: LiveSubscription[] ): LiveSessionState => { const [status, setStatus] = useState(enabled ? "connecting" : "disconnected"); const [connectedAt, setConnectedAt] = useState(null); @@ -2938,11 +3052,6 @@ const useLiveSession = ( const lastEventAtRef = useRef(null); const subscribedKeysRef = useRef>(new Set()); const subscribedMapRef = useRef>(new Map()); - const manifest = useMemo( - () => getLiveManifest(pathname, chartTicker.toUpperCase(), chartIntervalMs, flowFilters, optionScope, equityScope), - [pathname, chartTicker, chartIntervalMs, flowFilters, optionScope, equityScope] - ); - const replaceArrayState = ( setter: Dispatch>, ref: { current: T[] }, @@ -4857,20 +4966,21 @@ const useTerminalState = () => { }, [filterInput]); const tickerSet = useMemo(() => new Set(activeTickers), [activeTickers]); const instrumentUnderlying = selectedInstrument?.underlyingId.toUpperCase() ?? null; + const isOptionContractFocused = selectedInstrument?.kind === "option-contract"; + const focusedOptionContractId = + selectedInstrument?.kind === "option-contract" ? selectedInstrument.contractId : null; const optionFocusScopeKey = - selectedInstrument?.kind === "option-contract" - ? `option-contract:${selectedInstrument.contractId}` - : null; + focusedOptionContractId ? `option-contract:${focusedOptionContractId}` : null; const equityFocusScopeKey = selectedInstrument?.kind === "equity" ? `equity:${selectedInstrument.underlyingId.toUpperCase()}` : null; + const effectiveOptionPrintFilters = useMemo( + () => getEffectiveOptionPrintFilters(flowFilters, isOptionContractFocused), + [flowFilters, isOptionContractFocused] + ); const optionScope = useMemo( - () => ({ - underlying_ids: activeTickers.length > 0 ? activeTickers : instrumentUnderlying ? [instrumentUnderlying] : undefined, - option_contract_id: - selectedInstrument?.kind === "option-contract" ? selectedInstrument.contractId : undefined - }), + () => getOptionScope(activeTickers, instrumentUnderlying, selectedInstrument), [activeTickers, instrumentUnderlying, selectedInstrument] ); const equityScope = useMemo( @@ -4895,14 +5005,39 @@ const useTerminalState = () => { ? `Contract: ${display.ticker} ${display.expiration} ${display.strike}` : `Contract: ${selectedInstrument.contractId}`; }, [selectedInstrument]); - const liveSession = useLiveSession( - mode === "live", - pathname, - chartTicker, - chartIntervalMs, - flowFilters, - optionScope, - equityScope + const liveManifest = useMemo( + () => + getLiveManifest( + pathname, + chartTicker.toUpperCase(), + chartIntervalMs, + flowFilters, + optionScope, + equityScope, + effectiveOptionPrintFilters + ), + [ + pathname, + chartTicker, + chartIntervalMs, + flowFilters, + optionScope, + equityScope, + effectiveOptionPrintFilters + ] + ); + const liveSession = useLiveSession(mode === "live", pathname, liveManifest); + const currentOptionSubscription = useMemo( + () => + liveManifest.find( + (subscription): subscription is Extract => + subscription.channel === "options" + ) ?? null, + [liveManifest] + ); + const currentOptionSubscriptionKey = useMemo( + () => (currentOptionSubscription ? getLiveSubscriptionKey(currentOptionSubscription) : null), + [currentOptionSubscription] ); const equitiesLiveSubscriptionActive = routeFeatures.equities; @@ -4966,18 +5101,8 @@ const useTerminalState = () => { ); const disableReplayGrouping = useCallback(() => null, []); const optionQueryParams = useMemo>( - () => ({ - view: flowFilters.view ?? "signal", - security: - flowFilters.securityTypes?.length === 1 ? flowFilters.securityTypes[0] : undefined, - side: flowFilters.nbboSides?.length ? flowFilters.nbboSides.join(",") : undefined, - type: flowFilters.optionTypes?.length ? flowFilters.optionTypes.join(",") : undefined, - min_notional: - typeof flowFilters.minNotional === "number" - ? String(flowFilters.minNotional) - : undefined - }), - [flowFilters] + () => buildOptionTapeQueryParams(effectiveOptionPrintFilters, optionScope), + [effectiveOptionPrintFilters, optionScope] ); const options = useTape({ @@ -4992,9 +5117,10 @@ const useTerminalState = () => { pollMs: mode === "replay" ? 200 : undefined, captureScroll: optionsAnchor.capture, onNewItems: optionsScroll.onNewItems, - getReplayKey: extractReplaySource, - onReplaySourceKey: handleReplaySource, - queryParams: optionQueryParams + getReplayKey: isOptionContractFocused ? disableReplayGrouping : extractReplaySource, + onReplaySourceKey: isOptionContractFocused ? undefined : handleReplaySource, + queryParams: optionQueryParams, + replaySourceKey: isOptionContractFocused ? null : replaySource }); const equities = useTape({ @@ -5010,6 +5136,12 @@ const useTerminalState = () => { onNewItems: equitiesScroll.onNewItems }); + useEffect(() => { + if (isOptionContractFocused && replaySource !== null) { + setReplaySource(null); + } + }, [isOptionContractFocused, replaySource]); + const equityJoins = useTape({ mode, liveEnabled: false, @@ -5922,25 +6054,20 @@ const useTerminalState = () => { ); const filteredOptions = useMemo(() => { - return optionsFeed.items.filter((print) => { - if (!matchesOptionPrintFilters(print, flowFilters)) { - return false; - } - if ( - selectedInstrument?.kind === "option-contract" && - normalizeContractId(print.option_contract_id) !== selectedInstrument.contractId - ) { - return false; - } - if (tickerSet.size === 0) { - return ( - !instrumentUnderlying || - extractUnderlying(normalizeContractId(print.option_contract_id)) === instrumentUnderlying - ); - } - return matchesTicker(extractUnderlying(normalizeContractId(print.option_contract_id))); - }); - }, [flowFilters, optionsFeed.items, matchesTicker, tickerSet, selectedInstrument, instrumentUnderlying]); + return filterOptionTapeItems( + optionsFeed.items, + effectiveOptionPrintFilters, + selectedInstrument, + tickerSet, + instrumentUnderlying + ); + }, [ + effectiveOptionPrintFilters, + instrumentUnderlying, + optionsFeed.items, + selectedInstrument, + tickerSet + ]); const filteredEquities = useMemo(() => { if (tickerSet.size === 0) { @@ -5956,16 +6083,24 @@ const useTerminalState = () => { if (!optionFocusSeed) { return; } - if (optionFocusSeed.scopeKey !== optionFocusScopeKey) { - setOptionFocusSeed(null); - return; - } - const composedBaseItems = composeTapeItems([], liveOptions.liveItems ?? [], liveOptions.historyItems ?? []); - const liveKeys = new Set(composedBaseItems.map((item) => getTapeItemKey(item))); - if (optionFocusSeed.items.every((item) => liveKeys.has(getTapeItemKey(item)))) { + if ( + shouldClearOptionFocusSeed( + optionFocusSeed, + optionFocusScopeKey, + currentOptionSubscriptionKey, + liveOptions.liveItems ?? [], + liveOptions.historyItems ?? [] + ) + ) { setOptionFocusSeed(null); } - }, [liveOptions.historyItems, liveOptions.liveItems, optionFocusScopeKey, optionFocusSeed]); + }, [ + currentOptionSubscriptionKey, + liveOptions.historyItems, + liveOptions.liveItems, + optionFocusScopeKey, + optionFocusSeed + ]); useEffect(() => { if (!equityFocusSeed) { @@ -5988,15 +6123,21 @@ const useTerminalState = () => { const parsed = parseOptionContractId(contractId); const underlyingId = (print.underlying_id ?? parsed?.root ?? extractUnderlying(contractId)).toUpperCase(); const scopeKey = `option-contract:${contractId}`; + const subscriptionKey = getLiveSubscriptionKey({ + channel: "options", + underlying_ids: [underlyingId], + option_contract_id: contractId + }); const seedItems = composeTapeItems( [print], filteredOptions.filter((candidate) => normalizeContractId(candidate.option_contract_id) === contractId), [] ); - setOptionFocusSeed({ scopeKey, items: seedItems }); + setOptionFocusSeed({ scopeKey, subscriptionKey, items: seedItems }); bumpTapeDebugMetric("focusSeedRowCount", seedItems.length); logTapeDebug("option focus seed captured", { contract_id: contractId, + subscription_key: subscriptionKey, row_count: seedItems.length }); setSelectedInstrument({ diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 3035897..b7af494 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -82,7 +82,7 @@ import { fetchClassifierHitsByPacketIds, fetchRecentOptionPrints } from "@islandflow/storage"; -import type { EquityPrintQueryFilters, OptionPrintQueryFilters } from "@islandflow/storage"; +import type { EquityPrintQueryFilters } from "@islandflow/storage"; import { AlertEventSchema, ClassifierHitEventSchema, @@ -99,11 +99,6 @@ import { LiveSubscriptionSchema, matchesFlowPacketFilters, matchesOptionPrintFilters, - OptionFlowFilters, - OptionFlowViewSchema, - OptionNbboSideSchema, - OptionSecurityTypeSchema, - OptionTypeSchema, FlowPacketSchema, SmartMoneyEventSchema, OptionNBBOSchema, @@ -113,6 +108,7 @@ import { import { createClient } from "redis"; import { z } from "zod"; import { HOT_LIVE_REDIS_KEYS, LiveStateManager, shouldFanoutLiveEvent } from "./live"; +import { parseOptionPrintQuery } from "./option-queries"; const service = "api"; const logger = createLogger({ service }); @@ -224,33 +220,6 @@ const equityPrintRangeSchema = z.object({ end_ts: z.coerce.number().int().nonnegative(), limit: limitSchema.optional() }); -const optionSideListSchema = z - .string() - .transform((value) => - value - .split(",") - .map((entry) => entry.trim()) - .filter(Boolean) - ) - .pipe(z.array(OptionNbboSideSchema)); -const optionTypeListSchema = z - .string() - .transform((value) => - value - .split(",") - .map((entry) => entry.trim()) - .filter(Boolean) - ) - .pipe(z.array(OptionTypeSchema)); -const optionSecuritySchema = z.enum(["stock", "etf", "all"]); -const optionFilterQuerySchema = z.object({ - view: OptionFlowViewSchema.optional(), - security: optionSecuritySchema.optional(), - side: optionSideListSchema.optional(), - type: optionTypeListSchema.optional(), - min_notional: z.coerce.number().nonnegative().optional() -}); - type Channel = | "options" | "options-nbbo" @@ -351,43 +320,6 @@ const applyDeliverPolicy = ( } }; -const parseOptionPrintFilters = ( - url: URL -): { - view: z.infer; - storageFilters: Parameters[3]; - liveFilters: OptionFlowFilters; -} => { - const parsed = optionFilterQuerySchema.parse({ - view: url.searchParams.get("view") ?? undefined, - security: url.searchParams.get("security") ?? undefined, - side: url.searchParams.get("side") ?? undefined, - type: url.searchParams.get("type") ?? undefined, - min_notional: url.searchParams.get("min_notional") ?? undefined - }); - const view = parsed.view ?? "signal"; - const security = parsed.security ?? (view === "raw" ? "all" : "stock"); - const storageFilters = { - view, - security, - minNotional: parsed.min_notional, - nbboSides: parsed.side, - optionTypes: parsed.type - } as const; - const liveFilters: OptionFlowFilters = { - view, - securityTypes: - security === "all" - ? undefined - : ([security] as Array>), - nbboSides: parsed.side, - optionTypes: parsed.type, - minNotional: parsed.min_notional - }; - - return { view, storageFilters, liveFilters }; -}; - const parseReplayParams = (url: URL): { afterTs: number; afterSeq: number; limit: number } => { const params = replayParamsSchema.parse({ after_ts: url.searchParams.get("after_ts") ?? undefined, @@ -605,15 +537,6 @@ const parseScopeList = (url: URL, ...keys: string[]): string[] | undefined => { return unique.length > 0 ? unique : undefined; }; -const parseLiveOptionPrintFilters = (url: URL): OptionPrintQueryFilters => { - const { storageFilters } = parseOptionPrintFilters(url); - return { - ...storageFilters, - underlyingIds: parseScopeList(url, "underlying_id", "underlying_ids"), - optionContractId: url.searchParams.get("option_contract_id") ?? undefined - }; -}; - const parseLiveEquityPrintFilters = (url: URL): EquityPrintQueryFilters => ({ underlyingIds: parseScopeList(url, "underlying_id", "underlying_ids") }); @@ -1399,7 +1322,7 @@ const run = async () => { try { const limit = parseLimit(url.searchParams.get("limit")); const source = parseReplaySource(url) ?? undefined; - const { storageFilters } = parseOptionPrintFilters(url); + const { storageFilters } = parseOptionPrintQuery(url); const data = await fetchRecentOptionPrints(clickhouse, limit, source, storageFilters); return jsonResponse({ data }); } catch (error) { @@ -1525,7 +1448,7 @@ const run = async () => { try { const { beforeTs, beforeSeq, limit } = parseBeforeParams(url); const source = parseReplaySource(url) ?? undefined; - const storageFilters = parseLiveOptionPrintFilters(url); + const { storageFilters } = parseOptionPrintQuery(url); const data = await fetchOptionPrintsBefore( clickhouse, beforeTs, @@ -1668,7 +1591,7 @@ const run = async () => { try { const { afterTs, afterSeq, limit } = parseReplayParams(url); const source = parseReplaySource(url) ?? undefined; - const { storageFilters } = parseOptionPrintFilters(url); + const { storageFilters } = parseOptionPrintQuery(url); const data = await fetchOptionPrintsAfter( clickhouse, afterTs, diff --git a/services/api/src/live.ts b/services/api/src/live.ts index bd579da..0e2ab1b 100644 --- a/services/api/src/live.ts +++ b/services/api/src/live.ts @@ -345,6 +345,30 @@ const snapshotLimitFor = (subscription: LiveSubscription, configuredLimit: numbe return Math.max(1, Math.min(configuredLimit, Math.floor(requested))); }; +export const buildOptionSnapshotFilters = ( + subscription: Extract +): OptionPrintQueryFilters => { + if (subscription.option_contract_id) { + return { + view: "raw", + optionContractId: subscription.option_contract_id + }; + } + + return { + view: subscription.filters?.view ?? "signal", + security: + subscription.filters?.securityTypes?.length === 1 + ? subscription.filters.securityTypes[0] + : "all", + nbboSides: subscription.filters?.nbboSides, + optionTypes: subscription.filters?.optionTypes, + minNotional: subscription.filters?.minNotional, + underlyingIds: subscription.underlying_ids, + optionContractId: subscription.option_contract_id + }; +}; + const candleRedisKey = (underlyingId: string, intervalMs: number): string => `live:equity-candles:${underlyingId}:${intervalMs}`; @@ -489,18 +513,7 @@ export class LiveStateManager { if (subscription.filters?.view === "raw" || scoped) { this.stats.scopedClickHouseSnapshots += 1; const limit = snapshotLimitFor(subscription, this.generic.options.limit); - const storageFilters: OptionPrintQueryFilters = { - view: subscription.filters?.view ?? "signal", - security: - subscription.filters?.securityTypes?.length === 1 - ? subscription.filters.securityTypes[0] - : "all", - nbboSides: subscription.filters?.nbboSides, - optionTypes: subscription.filters?.optionTypes, - minNotional: subscription.filters?.minNotional, - underlyingIds: subscription.underlying_ids, - optionContractId: subscription.option_contract_id - }; + const storageFilters = buildOptionSnapshotFilters(subscription); const items = await fetchRecentOptionPrints( this.clickhouse, limit, diff --git a/services/api/src/option-queries.ts b/services/api/src/option-queries.ts new file mode 100644 index 0000000..193cbb2 --- /dev/null +++ b/services/api/src/option-queries.ts @@ -0,0 +1,107 @@ +import type { OptionPrintQueryFilters } from "@islandflow/storage"; +import { + OptionFlowViewSchema, + OptionNbboSideSchema, + OptionSecurityTypeSchema, + OptionTypeSchema, + type OptionFlowFilters +} from "@islandflow/types"; +import { z } from "zod"; + +const optionSideListSchema = z + .string() + .transform((value) => + value + .split(",") + .map((entry) => entry.trim()) + .filter(Boolean) + ) + .pipe(z.array(OptionNbboSideSchema)); + +const optionTypeListSchema = z + .string() + .transform((value) => + value + .split(",") + .map((entry) => entry.trim()) + .filter(Boolean) + ) + .pipe(z.array(OptionTypeSchema)); + +const optionSecuritySchema = z.enum(["stock", "etf", "all"]); + +const optionFilterQuerySchema = z.object({ + view: OptionFlowViewSchema.optional(), + security: optionSecuritySchema.optional(), + side: optionSideListSchema.optional(), + type: optionTypeListSchema.optional(), + min_notional: z.coerce.number().nonnegative().optional() +}); + +export type ParsedOptionPrintQuery = { + scope: { + underlyingIds?: string[]; + optionContractId?: string; + }; + flowFilters: OptionFlowFilters; + storageFilters: OptionPrintQueryFilters; + isContractDrilldown: boolean; +}; + +const parseScopeList = (url: URL, ...keys: string[]): string[] | undefined => { + const values = keys + .flatMap((key) => url.searchParams.getAll(key)) + .flatMap((value) => value.split(",")) + .map((value) => value.trim().toUpperCase()) + .filter(Boolean); + const unique = Array.from(new Set(values)); + return unique.length > 0 ? unique : undefined; +}; + +export const parseOptionPrintQuery = (url: URL): ParsedOptionPrintQuery => { + const parsed = optionFilterQuerySchema.parse({ + view: url.searchParams.get("view") ?? undefined, + security: url.searchParams.get("security") ?? undefined, + side: url.searchParams.get("side") ?? undefined, + type: url.searchParams.get("type") ?? undefined, + min_notional: url.searchParams.get("min_notional") ?? undefined + }); + const scope = { + underlyingIds: parseScopeList(url, "underlying_id", "underlying_ids"), + optionContractId: url.searchParams.get("option_contract_id") ?? undefined + }; + const view = parsed.view ?? "signal"; + const security = parsed.security ?? (view === "raw" ? "all" : "stock"); + const flowFilters: OptionFlowFilters = { + view, + securityTypes: + security === "all" + ? undefined + : ([security] as Array>), + nbboSides: parsed.side, + optionTypes: parsed.type, + minNotional: parsed.min_notional + }; + const isContractDrilldown = Boolean(scope.optionContractId); + const storageFilters: OptionPrintQueryFilters = isContractDrilldown + ? { + view: "raw", + optionContractId: scope.optionContractId + } + : { + view, + security, + minNotional: parsed.min_notional, + nbboSides: parsed.side, + optionTypes: parsed.type, + underlyingIds: scope.underlyingIds, + optionContractId: scope.optionContractId + }; + + return { + scope, + flowFilters, + storageFilters, + isContractDrilldown + }; +}; diff --git a/services/api/tests/live.test.ts b/services/api/tests/live.test.ts index 55232cc..3d0aa63 100644 --- a/services/api/tests/live.test.ts +++ b/services/api/tests/live.test.ts @@ -1,6 +1,7 @@ import { describe, expect, it } from "bun:test"; import type { ClickHouseClient } from "@islandflow/storage"; import { + buildOptionSnapshotFilters, HOT_LIVE_REDIS_KEYS, LiveStateManager, isLiveItemFresh, @@ -450,6 +451,74 @@ describe("LiveStateManager", () => { expect(isLiveItemFresh("options", snapshot.items[0], now)).toBe(false); }); + it("builds raw contract-only snapshot filters for focused option subscriptions", () => { + expect( + buildOptionSnapshotFilters({ + channel: "options", + filters: { + view: "signal", + minNotional: 500_000, + nbboSides: ["A"], + optionTypes: ["call"], + securityTypes: ["stock"] + }, + underlying_ids: ["AAPL"], + option_contract_id: "AAPL-2025-01-17-200-C" + }) + ).toEqual({ + view: "raw", + optionContractId: "AAPL-2025-01-17-200-C" + }); + }); + + it("returns raw contract rows for focused option snapshots even when broad filters would reject them", async () => { + const manager = new LiveStateManager( + makeClickHouse((query) => { + expect(query).toContain("option_contract_id = 'AAPL-2025-01-17-200-C'"); + expect(query).not.toContain("signal_pass = 1"); + expect(query).not.toContain("notional >="); + expect(query).not.toContain("nbbo_side IN"); + expect(query).not.toContain("option_type IN"); + return [ + { + source_ts: 1_000, + ingest_ts: 1_001, + seq: 1, + trace_id: "opt-raw", + ts: 1_000, + option_contract_id: "AAPL-2025-01-17-200-C", + underlying_id: "AAPL", + option_type: "put", + nbbo_side: "B", + notional: 50_000, + signal_pass: false, + price: 1, + size: 5, + exchange: "X" + } + ]; + }), + null + ); + + const snapshot = await manager.getSnapshot({ + channel: "options", + filters: { + view: "signal", + minNotional: 500_000, + nbboSides: ["A"], + optionTypes: ["call"], + securityTypes: ["stock"] + }, + underlying_ids: ["AAPL"], + option_contract_id: "AAPL-2025-01-17-200-C" + }); + + expect((snapshot.items as Array<{ trace_id: string }>).map((item) => item.trace_id)).toEqual([ + "opt-raw" + ]); + }); + it("seeds scoped equity snapshots from clickhouse rows older than 24h", async () => { const now = Date.now(); const staleTs = now - 25 * 60 * 60 * 1000; diff --git a/services/api/tests/option-queries.test.ts b/services/api/tests/option-queries.test.ts new file mode 100644 index 0000000..d189303 --- /dev/null +++ b/services/api/tests/option-queries.test.ts @@ -0,0 +1,59 @@ +import { describe, expect, it } from "bun:test"; +import { parseOptionPrintQuery } from "../src/option-queries"; + +describe("parseOptionPrintQuery", () => { + it("keeps broad option flow filters for non-contract requests", () => { + const url = new URL( + "http://localhost/prints/options?view=signal&security=stock&side=A&type=call&min_notional=500000&underlying_ids=AAPL,MSFT" + ); + + expect(parseOptionPrintQuery(url)).toEqual({ + scope: { + underlyingIds: ["AAPL", "MSFT"], + optionContractId: undefined + }, + flowFilters: { + view: "signal", + securityTypes: ["stock"], + nbboSides: ["A"], + optionTypes: ["call"], + minNotional: 500000 + }, + storageFilters: { + view: "signal", + security: "stock", + nbboSides: ["A"], + optionTypes: ["call"], + minNotional: 500000, + underlyingIds: ["AAPL", "MSFT"], + optionContractId: undefined + }, + isContractDrilldown: false + }); + }); + + it("switches contract requests to raw contract-only storage filters", () => { + const url = new URL( + "http://localhost/replay/options?view=signal&security=stock&side=A&type=call&min_notional=500000&underlying_id=AAPL&option_contract_id=AAPL-2025-01-17-200-C" + ); + + expect(parseOptionPrintQuery(url)).toEqual({ + scope: { + underlyingIds: ["AAPL"], + optionContractId: "AAPL-2025-01-17-200-C" + }, + flowFilters: { + view: "signal", + securityTypes: ["stock"], + nbboSides: ["A"], + optionTypes: ["call"], + minNotional: 500000 + }, + storageFilters: { + view: "raw", + optionContractId: "AAPL-2025-01-17-200-C" + }, + isContractDrilldown: true + }); + }); +});