From 75fc6f93737d2195f24aa69843c022dd89a76279 Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Tue, 28 Apr 2026 17:13:46 -0400 Subject: [PATCH] Fix live tape freshness and filter UX --- .beads/issues.jsonl | 1 + apps/web/app/globals.css | 174 +++++- apps/web/app/terminal.test.ts | 78 +++ apps/web/app/terminal.tsx | 554 +++++++++++++++--- services/api/src/live.ts | 120 +++- services/api/tests/live.test.ts | 240 +++++++- .../ingest-options/src/adapters/synthetic.ts | 53 +- .../ingest-options/tests/synthetic.test.ts | 26 + 8 files changed, 1087 insertions(+), 159 deletions(-) create mode 100644 apps/web/app/terminal.test.ts create mode 100644 services/ingest-options/tests/synthetic.test.ts diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index 9b58daa..d5a4458 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -1,2 +1,3 @@ +{"_type":"issue","id":"islandflow-0v6","title":"Fix tape freshness, NBBO coverage, pause controls, and filter popup","description":"Implement the tape fixes requested for synthetic options notional sizing, strict live freshness, live-mode pause/resume behavior, stronger NBBO snapshot coverage, and moving flow filters behind a popup. Includes server-side live cache changes, web terminal state/UI changes, and tests for synthetic pricing, live snapshot freshness/NBBO retention, and live pause/filter interactions.","status":"closed","priority":1,"issue_type":"task","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T21:02:52Z","created_by":"dirtydishes","updated_at":"2026-04-28T21:13:38Z","started_at":"2026-04-28T21:02:57Z","closed_at":"2026-04-28T21:13:38Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-e4r","title":"Implement smart-money flow filtering and synthetic firehose modes","description":"Implement the approved multi-surface plan for named synthetic market profiles, options raw-vs-signal filtering, live/API filter contracts, Tape page client-side flow filters, firehose-readiness improvements, tests, and README updates.","status":"closed","priority":1,"issue_type":"feature","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T20:10:49Z","created_by":"dirtydishes","updated_at":"2026-04-28T20:29:29Z","started_at":"2026-04-28T20:10:53Z","closed_at":"2026-04-28T20:29:29Z","close_reason":"Implemented synthetic market profiles, options signal-path filtering, signal-aware API/replay contracts, Tape page filters, tests, and README updates. Follow-up tracked in islandflow-biq.","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-biq","title":"Finish raw live options delivery and filter/backpressure observability","description":"The smart-money signal path and Tape filters are in place, but the next firehose pass should finish server-side selective raw live delivery for options subscriptions and add explicit filtered-out/backpressure observability for API/web counters. This was discovered while landing islandflow-e4r.\n","status":"open","priority":2,"issue_type":"task","owner":"dishes@dpdrm.com","created_at":"2026-04-28T20:28:58Z","created_by":"dirtydishes","updated_at":"2026-04-28T20:28:58Z","dependencies":[{"issue_id":"islandflow-biq","depends_on_id":"islandflow-e4r","type":"discovered-from","created_at":"2026-04-28T16:28:58Z","created_by":"dirtydishes","metadata":"{}"}],"dependency_count":0,"dependent_count":0,"comment_count":0} diff --git a/apps/web/app/globals.css b/apps/web/app/globals.css index ecb69b0..0910153 100644 --- a/apps/web/app/globals.css +++ b/apps/web/app/globals.css @@ -220,6 +220,13 @@ input { animation: pulse 1.3s ease-in-out infinite; } +.feed-status-stale .feed-status-dot, +.status-stale .status-dot, +.chart-status-stale .chart-dot { + background: var(--accent); + box-shadow: 0 0 0 4px rgba(245, 166, 35, 0.18); +} + .feed-status-disconnected .feed-status-dot, .status-disconnected .status-dot, .chart-status-disconnected .chart-dot { @@ -417,55 +424,144 @@ h3 { display: flex; align-items: center; gap: 10px; + position: relative; } -.flow-filter-panel { - display: flex; - flex-wrap: wrap; - justify-content: flex-end; - gap: 10px 16px; - padding: 10px 12px; - border: 1px solid var(--border); - border-radius: 12px; - background: linear-gradient(180deg, rgba(255, 255, 255, 0.04), rgba(255, 255, 255, 0.02)); +.flow-filter-popover { + position: relative; } -.flow-filter-group { - display: flex; - flex-wrap: wrap; +.flow-filter-trigger { + display: inline-flex; align-items: center; gap: 8px; } -.flow-filter-label { - color: var(--muted); - font-size: 0.72rem; - letter-spacing: 0.08em; +.flow-filter-trigger.is-active { + border-color: rgba(245, 166, 35, 0.55); + background: linear-gradient(180deg, rgba(245, 166, 35, 0.18), rgba(245, 166, 35, 0.07)); +} + +.flow-filter-badge { + min-width: 22px; + padding: 2px 6px; + border-radius: 999px; + background: rgba(245, 166, 35, 0.22); + color: #ffe4b3; + font-family: var(--font-mono), monospace; + font-size: 0.7rem; + text-align: center; +} + +.flow-filter-popover-panel { + position: absolute; + top: calc(100% + 12px); + right: 0; + z-index: 30; + width: min(420px, calc(100vw - 72px)); + max-height: min(70vh, 560px); + overflow: auto; + border: 1px solid rgba(245, 166, 35, 0.24); + border-radius: 18px; + background: + linear-gradient(180deg, rgba(255, 255, 255, 0.06), rgba(255, 255, 255, 0.02)), + rgba(11, 16, 22, 0.92); + box-shadow: + 0 24px 60px rgba(0, 0, 0, 0.42), + inset 0 1px 0 rgba(255, 255, 255, 0.04); + backdrop-filter: blur(18px); +} + +.flow-filter-popover-head { + display: flex; + align-items: flex-start; + justify-content: space-between; + gap: 12px; + padding: 16px 16px 14px; + border-bottom: 1px solid rgba(255, 255, 255, 0.07); +} + +.flow-filter-popover-title { + font-family: var(--font-display), sans-serif; + font-size: 0.9rem; + letter-spacing: 0.12em; text-transform: uppercase; } +.flow-filter-popover-copy { + margin-top: 6px; + color: var(--text-dim); + font-size: 0.78rem; +} + +.flow-filter-popover-body { + display: grid; + gap: 12px; + padding: 14px; +} + +.flow-filter-section { + display: grid; + gap: 10px; + padding: 12px; + border: 1px solid rgba(255, 255, 255, 0.06); + border-radius: 14px; + background: linear-gradient(180deg, rgba(255, 255, 255, 0.045), rgba(0, 0, 0, 0.1)); +} + +.flow-filter-section-title { + color: #ffd89a; + font-size: 0.72rem; + letter-spacing: 0.18em; + text-transform: uppercase; +} + +.flow-filter-checkbox-grid, +.flow-filter-chip-grid { + display: grid; + grid-template-columns: repeat(2, minmax(0, 1fr)); + gap: 8px; +} + +.flow-filter-checkbox-grid-wide { + grid-template-columns: repeat(3, minmax(0, 1fr)); +} + .flow-filter-check { display: inline-flex; align-items: center; - gap: 6px; - font-size: 0.84rem; + gap: 8px; + min-height: 42px; + padding: 10px 12px; + border: 1px solid rgba(255, 255, 255, 0.06); + border-radius: 12px; + background: rgba(255, 255, 255, 0.02); + font-size: 0.82rem; text-transform: uppercase; + cursor: pointer; +} + +.flow-filter-check input { + margin: 0; + accent-color: var(--accent); } .filter-chip { border: 1px solid var(--border); - border-radius: 999px; + border-radius: 12px; background: rgba(255, 255, 255, 0.03); color: var(--text); - padding: 6px 10px; + min-height: 42px; + padding: 8px 12px; font: inherit; cursor: pointer; + text-transform: uppercase; } .filter-chip.is-active { - border-color: rgba(127, 234, 170, 0.6); - background: rgba(127, 234, 170, 0.14); - color: var(--accent-strong); + border-color: rgba(245, 166, 35, 0.45); + background: linear-gradient(180deg, rgba(245, 166, 35, 0.18), rgba(245, 166, 35, 0.07)); + color: #ffe4b3; } .overview-strip, @@ -1099,6 +1195,10 @@ h3 { .terminal-topbar-controls { flex: 1 1 auto; } + + .flow-filter-popover-panel { + width: min(420px, calc(100vw - 56px)); + } } @media (max-width: 720px) { @@ -1144,6 +1244,34 @@ h3 { width: 100%; } + .page-actions { + width: 100%; + } + + .flow-filter-popover { + width: 100%; + } + + .flow-filter-trigger { + width: 100%; + justify-content: space-between; + } + + .flow-filter-popover-panel { + position: fixed; + top: calc(var(--topbar-height) + 26px); + left: 14px; + right: 14px; + width: auto; + max-height: min(68vh, 560px); + } + + .flow-filter-checkbox-grid, + .flow-filter-checkbox-grid-wide, + .flow-filter-chip-grid { + grid-template-columns: minmax(0, 1fr); + } + .row { flex-direction: column; align-items: flex-start; diff --git a/apps/web/app/terminal.test.ts b/apps/web/app/terminal.test.ts new file mode 100644 index 0000000..1b353b2 --- /dev/null +++ b/apps/web/app/terminal.test.ts @@ -0,0 +1,78 @@ +import { describe, expect, it } from "bun:test"; +import { + buildDefaultFlowFilters, + countActiveFlowFilterGroups, + flushPausableTapeData, + getLiveFeedStatus, + nextFlowFilterPopoverState, + reducePausableTapeData, + toggleFilterValue +} from "./terminal"; + +const makeItem = (traceId: string, seq: number, ts: number) => ({ + trace_id: traceId, + seq, + ts +}); + +describe("live tape pausable helpers", () => { + it("queues new items while paused and flushes them on resume", () => { + let state = reducePausableTapeData( + { visible: [], queued: [], seenKeys: new Set(), dropped: 0 }, + [makeItem("a", 1, 100), makeItem("b", 2, 200)], + false + ); + + expect(state.visible.map((item) => item.trace_id)).toEqual(["b", "a"]); + expect(state.dropped).toBe(0); + + state = reducePausableTapeData(state, [makeItem("c", 3, 300)], true); + expect(state.visible.map((item) => item.trace_id)).toEqual(["b", "a"]); + expect(state.queued.map((item) => item.trace_id)).toEqual(["c"]); + expect(state.dropped).toBe(1); + + state = flushPausableTapeData(state); + expect(state.visible.map((item) => item.trace_id)).toEqual(["c", "b", "a"]); + expect(state.queued).toHaveLength(0); + expect(state.dropped).toBe(0); + }); + + it("does not duplicate unchanged arrays", () => { + let state = reducePausableTapeData( + { visible: [], queued: [], seenKeys: new Set(), dropped: 0 }, + [makeItem("a", 1, 100)], + false + ); + + state = reducePausableTapeData(state, [makeItem("a", 1, 100)], false); + expect(state.visible.map((item) => item.trace_id)).toEqual(["a"]); + }); + + it("marks connected feeds stale once their freshest event ages past the threshold", () => { + expect(getLiveFeedStatus("connected", 1000, 500, 1400)).toBe("connected"); + expect(getLiveFeedStatus("connected", 1000, 500, 1601)).toBe("stale"); + expect(getLiveFeedStatus("disconnected", 1000, 500, 1601)).toBe("disconnected"); + }); +}); + +describe("flow filter popup helpers", () => { + it("opens and closes the popup via toggle and dismiss actions", () => { + expect(nextFlowFilterPopoverState(false, "toggle")).toBe(true); + expect(nextFlowFilterPopoverState(true, "toggle")).toBe(false); + expect(nextFlowFilterPopoverState(true, "dismiss")).toBe(false); + }); + + it("tracks active filter groups and resets to defaults", () => { + const defaults = buildDefaultFlowFilters(); + const next = { + ...defaults, + securityTypes: toggleFilterValue(defaults.securityTypes, "etf", true), + nbboSides: toggleFilterValue(defaults.nbboSides, "B", true), + minNotional: 25_000 + }; + + expect(countActiveFlowFilterGroups(defaults)).toBe(0); + expect(countActiveFlowFilterGroups(next)).toBe(3); + expect(buildDefaultFlowFilters()).toEqual(defaults); + }); +}); diff --git a/apps/web/app/terminal.tsx b/apps/web/app/terminal.tsx index dedf475..c39d418 100644 --- a/apps/web/app/terminal.tsx +++ b/apps/web/app/terminal.tsx @@ -11,7 +11,9 @@ import { useMemo, useRef, useState, - type ReactNode + type Dispatch, + type ReactNode, + type SetStateAction } from "react"; import type { AlertEvent, @@ -55,6 +57,10 @@ const parseBoundedInt = ( }; const LIVE_HOT_WINDOW = parseBoundedInt(process.env.NEXT_PUBLIC_LIVE_HOT_WINDOW, 2000, 100, 100000); +const LIVE_OPTIONS_STALE_MS = 15_000; +const LIVE_NBBO_STALE_MS = 15_000; +const LIVE_EQUITIES_STALE_MS = 15_000; +const LIVE_FLOW_STALE_MS = 30_000; const PINNED_EVIDENCE_TTL_MS = parseBoundedInt( process.env.NEXT_PUBLIC_PINNED_EVIDENCE_TTL_MS, 20 * 60 * 1000, @@ -192,7 +198,7 @@ const readErrorDetail = async (response: Response): Promise => { } }; -type WsStatus = "connecting" | "connected" | "disconnected"; +type WsStatus = "connecting" | "connected" | "disconnected" | "stale"; type TapeMode = "live" | "replay"; @@ -352,6 +358,103 @@ const mergeNewest = ( return deduped.slice(0, safeLimit); }; +const getTapeItemKey = (item: SortableItem): string => { + return buildItemKey(item) ?? `${extractSortTs(item)}:${extractSortSeq(item)}`; +}; + +type PausableTapeData = { + visible: T[]; + queued: T[]; + seenKeys: Set; + dropped: number; +}; + +export const reducePausableTapeData = ( + current: PausableTapeData, + incoming: T[], + paused: boolean +): PausableTapeData => { + if (incoming.length === 0) { + return current; + } + + const nextSeenKeys = new Set(current.seenKeys); + const unseen: T[] = []; + + for (const item of incoming) { + const key = getTapeItemKey(item); + if (nextSeenKeys.has(key)) { + continue; + } + nextSeenKeys.add(key); + unseen.push(item); + } + + if (unseen.length === 0) { + return current; + } + + if (paused) { + return { + visible: current.visible, + queued: mergeNewest(unseen, current.queued, LIVE_HOT_WINDOW, (evicted) => + incrementRetentionMetric("hotWindowEvictions", evicted) + ), + seenKeys: nextSeenKeys, + dropped: current.dropped + unseen.length + }; + } + + const nextBatch = current.queued.length > 0 ? [...current.queued, ...unseen] : unseen; + return { + visible: mergeNewest(nextBatch, current.visible, LIVE_HOT_WINDOW, (evicted) => + incrementRetentionMetric("hotWindowEvictions", evicted) + ), + queued: [], + seenKeys: nextSeenKeys, + dropped: 0 + }; +}; + +export const flushPausableTapeData = ( + current: PausableTapeData +): PausableTapeData => { + if (current.queued.length === 0) { + return current.dropped === 0 ? current : { ...current, dropped: 0 }; + } + + return { + visible: mergeNewest(current.queued, current.visible, LIVE_HOT_WINDOW, (evicted) => + incrementRetentionMetric("hotWindowEvictions", evicted) + ), + queued: [], + seenKeys: current.seenKeys, + dropped: 0 + }; +}; + +const EMPTY_PAUSABLE_TAPE = { + visible: [], + queued: [], + seenKeys: new Set(), + dropped: 0 +}; + +export const getLiveFeedStatus = ( + sourceStatus: WsStatus, + freshestTs: number | null, + thresholdMs: number, + now = Date.now() +): WsStatus => { + if (sourceStatus !== "connected") { + return sourceStatus; + } + if (freshestTs === null) { + return "connected"; + } + return isFreshLiveItem(freshestTs, thresholdMs, now) ? "connected" : "stale"; +}; + type TapeState = { status: WsStatus; items: T[]; @@ -628,7 +731,7 @@ const DEFAULT_FLOW_SIDES: OptionNbboSide[] = ["AA", "A", "MID"]; const DEFAULT_FLOW_OPTION_TYPES: OptionType[] = ["call", "put"]; const DEFAULT_FLOW_SECURITY_TYPES: OptionSecurityType[] = ["stock"]; -const buildDefaultFlowFilters = (): OptionFlowFilters => ({ +export const buildDefaultFlowFilters = (): OptionFlowFilters => ({ view: "signal", securityTypes: DEFAULT_FLOW_SECURITY_TYPES, nbboSides: DEFAULT_FLOW_SIDES, @@ -637,11 +740,53 @@ const buildDefaultFlowFilters = (): OptionFlowFilters => ({ FLOW_FILTER_PRESET === "all" ? undefined : FLOW_FILTER_PRESET === "balanced" - ? 5_000 - : undefined + ? 5_000 + : undefined }); -const toggleFilterValue = (values: T[] | undefined, value: T, enabled: boolean): T[] => { +const sameFilterValues = (left: T[] | undefined, right: T[] | undefined): boolean => { + const leftValues = [...(left ?? [])].sort(); + const rightValues = [...(right ?? [])].sort(); + if (leftValues.length !== rightValues.length) { + return false; + } + return leftValues.every((value, index) => value === rightValues[index]); +}; + +export const countActiveFlowFilterGroups = (filters: OptionFlowFilters): number => { + const defaults = buildDefaultFlowFilters(); + let count = 0; + + if (!sameFilterValues(filters.securityTypes, defaults.securityTypes)) { + count += 1; + } + if (!sameFilterValues(filters.nbboSides, defaults.nbboSides)) { + count += 1; + } + if (!sameFilterValues(filters.optionTypes, defaults.optionTypes)) { + count += 1; + } + if ((filters.minNotional ?? undefined) !== (defaults.minNotional ?? undefined)) { + count += 1; + } + + return count; +}; + +const isFreshLiveItem = (ts: number, thresholdMs: number, now = Date.now()): boolean => now - ts <= thresholdMs; + +const filterFreshLiveItems = ( + items: T[], + thresholdMs: number, + getItemTs: (item: T) => number = extractSortTs, + now = Date.now() +): T[] => items.filter((item) => isFreshLiveItem(getItemTs(item), thresholdMs, now)); + +export const toggleFilterValue = ( + values: T[] | undefined, + value: T, + enabled: boolean +): T[] => { const current = new Set(values ?? []); if (enabled) { current.add(value); @@ -651,6 +796,13 @@ const toggleFilterValue = (values: T[] | undefined, value: T, return [...current].sort(); }; +export const nextFlowFilterPopoverState = ( + current: boolean, + action: "toggle" | "dismiss" +): boolean => { + return action === "toggle" ? !current : false; +}; + const classifyNbboSide = (price: number, quote: OptionNBBO | null | undefined): NbboSide | null => { if (!quote || !Number.isFinite(price)) { return null; @@ -949,6 +1101,8 @@ const statusLabel = (status: WsStatus, paused: boolean, mode: TapeMode): string switch (status) { case "connected": return "Live"; + case "stale": + return "Live feed behind"; case "connecting": return "Connecting"; case "disconnected": @@ -1389,6 +1543,115 @@ const toStaticTapeState = ( togglePause: () => {} }); +type PausableTapeViewConfig = { + enabled: boolean; + sourceStatus: WsStatus; + sourceItems: T[]; + lastUpdate: number | null; + freshnessMs: number; + onNewItems?: (count: number) => void; + captureScroll?: () => void; + getItemTs?: (item: T) => number; +}; + +const usePausableTapeView = ( + config: PausableTapeViewConfig +): TapeState => { + const [paused, setPaused] = useState(false); + const [data, setData] = useState>(EMPTY_PAUSABLE_TAPE); + const [clock, setClock] = useState(() => Date.now()); + + useEffect(() => { + const handle = window.setInterval(() => { + setClock(Date.now()); + }, 1000); + + return () => { + window.clearInterval(handle); + }; + }, []); + + useEffect(() => { + if (!config.enabled) { + setPaused(false); + setData(EMPTY_PAUSABLE_TAPE); + return; + } + + setData((current) => { + const next = reducePausableTapeData(current, config.sourceItems, paused); + if (next === current) { + return current; + } + + const unseenCount = next.seenKeys.size - current.seenKeys.size; + if (!paused && unseenCount > 0) { + config.onNewItems?.(unseenCount); + config.captureScroll?.(); + } + + return next; + }); + }, [config.enabled, config.sourceItems, config.onNewItems, config.captureScroll, paused]); + + useEffect(() => { + if (!config.enabled || paused) { + return; + } + + setData((current) => { + const next = flushPausableTapeData(current); + if (next === current) { + return current; + } + + if (current.queued.length > 0) { + config.onNewItems?.(current.queued.length); + config.captureScroll?.(); + } + + return next; + }); + }, [config.enabled, config.onNewItems, config.captureScroll, paused]); + + const togglePause = useCallback(() => { + setPaused((current) => !current); + }, []); + + const getItemTs = config.getItemTs ?? extractSortTs; + const freshestTs = useMemo(() => { + if (config.sourceItems.length === 0) { + return null; + } + + let newest = Number.NEGATIVE_INFINITY; + for (const item of config.sourceItems) { + newest = Math.max(newest, getItemTs(item)); + } + + return Number.isFinite(newest) ? newest : null; + }, [config.sourceItems, getItemTs]); + + const status = config.enabled + ? getLiveFeedStatus(config.sourceStatus, freshestTs, config.freshnessMs, clock) + : "disconnected"; + const items = + status === "stale" + ? [] + : filterFreshLiveItems(data.visible, config.freshnessMs, getItemTs, clock); + + return { + status, + items, + lastUpdate: status === "stale" ? null : config.lastUpdate, + replayTime: null, + replayComplete: false, + paused, + dropped: data.dropped, + togglePause + }; +}; + const useLiveStream = ( config: { enabled: boolean; @@ -3286,22 +3549,44 @@ const useTerminalState = () => { getReplayKey: disableReplayGrouping }); - const optionsFeed = - mode === "live" - ? toStaticTapeState(liveSession.status, liveSession.options, liveSession.lastUpdate) - : options; + const liveOptions = usePausableTapeView({ + enabled: mode === "live", + sourceStatus: liveSession.status, + sourceItems: liveSession.options, + lastUpdate: liveSession.lastUpdate, + freshnessMs: LIVE_OPTIONS_STALE_MS, + captureScroll: optionsAnchor.capture, + onNewItems: optionsScroll.onNewItems + }); + const liveEquities = usePausableTapeView({ + enabled: mode === "live", + sourceStatus: liveSession.status, + sourceItems: liveSession.equities, + lastUpdate: liveSession.lastUpdate, + freshnessMs: LIVE_EQUITIES_STALE_MS, + captureScroll: equitiesAnchor.capture, + onNewItems: equitiesScroll.onNewItems + }); + const liveFlow = usePausableTapeView({ + enabled: mode === "live", + sourceStatus: liveSession.status, + sourceItems: liveSession.flow, + lastUpdate: liveSession.lastUpdate, + freshnessMs: LIVE_FLOW_STALE_MS, + captureScroll: flowAnchor.capture, + onNewItems: flowScroll.onNewItems, + getItemTs: (item) => item.source_ts + }); + + const optionsFeed = mode === "live" ? liveOptions : options; const nbboFeed = mode === "live" ? toStaticTapeState(liveSession.status, liveSession.nbbo, liveSession.lastUpdate) : nbbo; - const equitiesFeed = - mode === "live" - ? toStaticTapeState(liveSession.status, liveSession.equities, liveSession.lastUpdate) - : equities; + const equitiesFeed = mode === "live" ? liveEquities : equities; const equityJoinsFeed = mode === "live" ? toStaticTapeState(liveSession.status, liveSession.equityJoins, liveSession.lastUpdate) : equityJoins; - const flowFeed = - mode === "live" ? toStaticTapeState(liveSession.status, liveSession.flow, liveSession.lastUpdate) : flow; + const flowFeed = mode === "live" ? liveFlow : flow; const alertsFeed = mode === "live" ? toStaticTapeState(liveSession.status, liveSession.alerts, liveSession.lastUpdate) : alerts; const classifierHitsFeed = @@ -4159,101 +4444,196 @@ const PageFrame = ({ title, actions, children }: PageFrameProps) => { ); }; -const FlowFilterControls = () => { - const state = useTerminal(); - const filters = state.flowFilters; +type FlowFilterPopoverProps = { + filters: OptionFlowFilters; + onChange: Dispatch>; +}; + +const FlowFilterSection = ({ + title, + children +}: { + title: string; + children: ReactNode; +}) => { + return ( +
+
{title}
+ {children} +
+ ); +}; + +export const FlowFilterPopover = ({ filters, onChange }: FlowFilterPopoverProps) => { + const [open, setOpen] = useState(false); + const rootRef = useRef(null); + const activeCount = countActiveFlowFilterGroups(filters); const toggleSecurity = (value: OptionSecurityType, enabled: boolean) => { - state.setFlowFilters((prev) => ({ + onChange((prev) => ({ ...prev, securityTypes: toggleFilterValue(prev.securityTypes, value, enabled) })); }; const toggleSide = (value: OptionNbboSide, enabled: boolean) => { - state.setFlowFilters((prev) => ({ + onChange((prev) => ({ ...prev, nbboSides: toggleFilterValue(prev.nbboSides, value, enabled) })); }; const toggleOptionType = (value: OptionType, enabled: boolean) => { - state.setFlowFilters((prev) => ({ + onChange((prev) => ({ ...prev, optionTypes: toggleFilterValue(prev.optionTypes, value, enabled) })); }; const applyMinNotional = (value: number | undefined) => { - state.setFlowFilters((prev) => ({ + onChange((prev) => ({ ...prev, minNotional: value })); }; + useEffect(() => { + if (!open) { + return; + } + + const handlePointerDown = (event: MouseEvent) => { + if (!rootRef.current?.contains(event.target as Node)) { + setOpen((current) => nextFlowFilterPopoverState(current, "dismiss")); + } + }; + + const handleKeyDown = (event: KeyboardEvent) => { + if (event.key === "Escape") { + setOpen((current) => nextFlowFilterPopoverState(current, "dismiss")); + } + }; + + document.addEventListener("mousedown", handlePointerDown); + document.addEventListener("keydown", handleKeyDown); + + return () => { + document.removeEventListener("mousedown", handlePointerDown); + document.removeEventListener("keydown", handleKeyDown); + }; + }, [open]); + return ( -
-
- Security - {(["stock", "etf"] as OptionSecurityType[]).map((value) => ( - - ))} -
-
- Side - {(["AA", "A", "MID", "B", "BB"] as OptionNbboSide[]).map((value) => ( - - ))} -
-
- Type - {(["call", "put"] as OptionType[]).map((value) => ( - - ))} -
-
- Min Notional - {[ - { label: "All signal", value: undefined }, - { label: ">= 25k", value: 25_000 }, - { label: ">= 50k", value: 50_000 }, - { label: ">= 100k", value: 100_000 } - ].map((preset) => ( - - ))} -
+
+ + + {open ? ( +
+
+
+
Flow Filters
+
Changes apply immediately.
+
+ +
+ +
+ +
+ {(["stock", "etf"] as OptionSecurityType[]).map((value) => ( + + ))} +
+
+ + +
+ {(["AA", "A", "MID", "B", "BB"] as OptionNbboSide[]).map((value) => ( + + ))} +
+
+ + +
+ {(["call", "put"] as OptionType[]).map((value) => ( + + ))} +
+
+ + +
+ {[ + { label: "All signal", value: undefined }, + { label: ">= 25k", value: 25_000 }, + { label: ">= 50k", value: 50_000 }, + { label: ">= 100k", value: 100_000 } + ].map((preset) => ( + + ))} +
+
+
+
+ ) : null}
); }; +const FlowFilterControls = () => { + const state = useTerminal(); + + return ; +}; + type PaneProps = { title: string; status?: ReactNode; @@ -4402,7 +4782,9 @@ const OptionsPane = ({ limit }: OptionsPaneProps) => { {state.tickerSet.size > 0 ? "No option prints match the current filter." : state.mode === "live" - ? "No option prints yet. Start ingest-options." + ? state.options.status === "stale" + ? "Live feed behind. Waiting for fresh option prints." + : "No option prints yet. Start ingest-options." : "Replay queue empty. Ensure ClickHouse has data."}
) : ( @@ -4524,7 +4906,9 @@ const EquitiesPane = ({ limit }: EquitiesPaneProps) => { {state.tickerSet.size > 0 ? "No equity prints match the current filter." : state.mode === "live" - ? "No equity prints yet. Start ingest-equities." + ? state.equities.status === "stale" + ? "Live feed behind. Waiting for fresh equity prints." + : "No equity prints yet. Start ingest-equities." : "Replay queue empty. Ensure ClickHouse has data."} ) : ( @@ -4600,7 +4984,9 @@ const FlowPane = ({ limit, title = "Flow" }: FlowPaneProps) => { {state.tickerSet.size > 0 ? "No flow packets match the current filter." : state.mode === "live" - ? "No flow packets yet. Start compute." + ? state.flow.status === "stale" + ? "Live feed behind. Waiting for fresh flow packets." + : "No flow packets yet. Start compute." : "Replay queue empty. Ensure ClickHouse has data."} ) : ( diff --git a/services/api/src/live.ts b/services/api/src/live.ts index 77d9dc5..81234c0 100644 --- a/services/api/src/live.ts +++ b/services/api/src/live.ts @@ -65,6 +65,13 @@ type GenericFeedConfig = { fetchRecent: (clickhouse: ClickHouseClient, limit: number) => Promise; }; +const LIVE_FRESHNESS_THRESHOLDS: Partial> = { + options: 15_000, + nbbo: 15_000, + equities: 15_000, + flow: 30_000 +}; + export type GenericLiveLimits = Record; const parseGenericLimit = ( @@ -201,6 +208,76 @@ const parseJsonList = (payloads: string[], parse: (value: unknown) => T): T[] return items; }; +const compareCursors = (a: Cursor, b: Cursor): number => (b.ts - a.ts) || (b.seq - a.seq); + +const sortGenericItems = (items: T[], cursorOf: (item: T) => Cursor): T[] => + [...items].sort((a, b) => compareCursors(cursorOf(a), cursorOf(b))); + +const keepNewestNbboByContract = ( + items: T[], + cursorOf: (item: T) => Cursor, + limit: number +): T[] => { + const latestByContract = new Map(); + + for (const item of items) { + const existing = latestByContract.get(item.option_contract_id); + if (!existing || compareCursors(cursorOf(item), cursorOf(existing)) < 0) { + latestByContract.set(item.option_contract_id, item); + } + } + + return sortGenericItems(Array.from(latestByContract.values()), cursorOf).slice(0, limit); +}; + +const normalizeGenericItems = ( + channel: LiveGenericChannel, + items: T[], + config: GenericFeedConfig +): T[] => { + if (channel === "nbbo") { + return keepNewestNbboByContract( + items as Array, + config.cursor, + config.limit + ); + } + + return sortGenericItems(items, config.cursor).slice(0, config.limit); +}; + +const extractFreshnessTs = (channel: LiveGenericChannel, item: any): number | null => { + switch (channel) { + case "options": + case "nbbo": + case "equities": + return typeof item.ts === "number" ? item.ts : null; + case "flow": + return typeof item.source_ts === "number" ? item.source_ts : null; + default: + return null; + } +}; + +const filterFreshGenericItems = ( + channel: LiveGenericChannel, + items: T[], + now = Date.now() +): T[] => { + const thresholdMs = LIVE_FRESHNESS_THRESHOLDS[channel]; + if (!thresholdMs) { + return items; + } + + return items.filter((item) => { + const ts = extractFreshnessTs(channel, item); + if (ts === null) { + return false; + } + return now - ts <= thresholdMs; + }); +}; + const nextBeforeForItems = (items: T[], cursorOf: (item: T) => Cursor): Cursor | null => { const last = items.at(-1); return last ? cursorOf(last) : null; @@ -263,17 +340,24 @@ export class LiveStateManager { const config = this.generic[channel]; if (this.redis?.isOpen) { const payloads = await this.redis.lRange(config.redisKey, 0, config.limit - 1); - const cached = parseJsonList(payloads, config.parse); + const cached = normalizeGenericItems(channel, parseJsonList(payloads, config.parse), config); if (cached.length > 0) { this.genericItems.set(channel, cached); this.stats.genericHydrateFromRedis += 1; this.stats.cacheDepthByKey.set(config.redisKey, cached.length); this.genericCursors.set(config.cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, config.cursorField))); + await this.persistList( + config.redisKey, + config.cursorField, + cached, + config.limit, + this.genericCursors.get(config.cursorField) ?? null + ); return; } } - const fresh = await config.fetchRecent(this.clickhouse, config.limit); + const fresh = normalizeGenericItems(channel, await config.fetchRecent(this.clickhouse, config.limit), config); this.stats.genericHydrateFromClickHouse += 1; this.stats.cacheDepthByKey.set(config.redisKey, fresh.length); this.genericItems.set(channel, fresh); @@ -302,17 +386,21 @@ export class LiveStateManager { undefined, storageFilters ); + const freshItems = filterFreshGenericItems("options", items); return { subscription, - items, + items: freshItems, watermark: items[0] ? { ts: items[0].ts, seq: items[0].seq } : null, - next_before: nextBeforeForItems(items, (item) => ({ ts: item.ts, seq: item.seq })) + next_before: nextBeforeForItems(freshItems, (item) => ({ ts: item.ts, seq: item.seq })) }; } const config = this.generic.options; - const items = (this.genericItems.get("options") ?? []).filter((item) => - matchesOptionPrintFilters(item, subscription.filters) + const items = filterFreshGenericItems( + "options", + (this.genericItems.get("options") ?? []).filter((item) => + matchesOptionPrintFilters(item, subscription.filters) + ) ); return { subscription, @@ -323,8 +411,11 @@ export class LiveStateManager { } case "flow": { const config = this.generic.flow; - const items = (this.genericItems.get("flow") ?? []).filter((item) => - matchesFlowPacketFilters(item, subscription.filters) + const items = filterFreshGenericItems( + "flow", + (this.genericItems.get("flow") ?? []).filter((item) => + matchesFlowPacketFilters(item, subscription.filters) + ) ); return { subscription, @@ -363,7 +454,10 @@ export class LiveStateManager { } default: { const config = this.generic[subscription.channel]; - const items = this.genericItems.get(subscription.channel) ?? []; + const items = filterFreshGenericItems( + subscription.channel, + this.genericItems.get(subscription.channel) ?? [] + ); return { subscription, items, @@ -410,13 +504,7 @@ export class LiveStateManager { const config = this.generic[channel]; const parsed = config.parse(item); const items = this.genericItems.get(channel) ?? []; - const next = [parsed, ...items] - .sort((a, b) => { - const aCursor = config.cursor(a); - const bCursor = config.cursor(b); - return (bCursor.ts - aCursor.ts) || (bCursor.seq - aCursor.seq); - }) - .slice(0, config.limit); + const next = normalizeGenericItems(channel, [parsed, ...items], config); this.genericItems.set(channel, next); this.stats.cacheDepthByKey.set(config.redisKey, next.length); const cursor = config.cursor(parsed); diff --git a/services/api/tests/live.test.ts b/services/api/tests/live.test.ts index 05e99e9..f40eb1f 100644 --- a/services/api/tests/live.test.ts +++ b/services/api/tests/live.test.ts @@ -63,11 +63,12 @@ describe("LiveStateManager", () => { it("hydrates snapshots from redis generic windows", async () => { const redis = makeRedis(); + const now = Date.now(); await redis.lPush( "live:flow", JSON.stringify({ - source_ts: 100, - ingest_ts: 101, + source_ts: now, + ingest_ts: now + 1, seq: 1, trace_id: "flow-1", id: "flow-1", @@ -76,15 +77,15 @@ describe("LiveStateManager", () => { join_quality: {} }) ); - await redis.hSet("live:cursors", "flow", JSON.stringify({ ts: 100, seq: 1 })); + await redis.hSet("live:cursors", "flow", JSON.stringify({ ts: now, seq: 1 })); const manager = new LiveStateManager(makeClickHouse(), redis as never); await manager.hydrate(); const snapshot = await manager.getSnapshot({ channel: "flow" }); expect(snapshot.items).toHaveLength(1); - expect(snapshot.watermark).toEqual({ ts: 100, seq: 1 }); - expect(snapshot.next_before).toEqual({ ts: 100, seq: 1 }); + expect(snapshot.watermark).toEqual({ ts: now, seq: 1 }); + expect(snapshot.next_before).toEqual({ ts: now, seq: 1 }); }); it("persists parameterized candle and overlay caches on ingest", async () => { @@ -136,6 +137,7 @@ describe("LiveStateManager", () => { it("trims generic windows to configured per-channel limits", async () => { const redis = makeRedis(); + const now = Date.now(); const manager = new LiveStateManager( makeClickHouse(), redis as never, @@ -152,8 +154,8 @@ describe("LiveStateManager", () => { ); await manager.ingest("flow", { - source_ts: 100, - ingest_ts: 101, + source_ts: now, + ingest_ts: now + 1, seq: 1, trace_id: "flow-1", id: "flow-1", @@ -162,8 +164,8 @@ describe("LiveStateManager", () => { join_quality: {} }); await manager.ingest("flow", { - source_ts: 110, - ingest_ts: 111, + source_ts: now + 10, + ingest_ts: now + 11, seq: 2, trace_id: "flow-2", id: "flow-2", @@ -172,8 +174,8 @@ describe("LiveStateManager", () => { join_quality: {} }); await manager.ingest("flow", { - source_ts: 120, - ingest_ts: 121, + source_ts: now + 20, + ingest_ts: now + 21, seq: 3, trace_id: "flow-3", id: "flow-3", @@ -199,13 +201,14 @@ describe("LiveStateManager", () => { it("filters option and flow snapshots using subscription filters", async () => { const manager = new LiveStateManager(makeClickHouse(), null); + const now = Date.now(); await manager.ingest("options", { - source_ts: 100, - ingest_ts: 101, + source_ts: now, + ingest_ts: now + 1, seq: 1, trace_id: "opt-1", - ts: 100, + ts: now, option_contract_id: "AAPL-2025-01-17-200-C", price: 1, size: 100, @@ -220,11 +223,11 @@ describe("LiveStateManager", () => { signal_profile: "smart-money" }); await manager.ingest("options", { - source_ts: 110, - ingest_ts: 111, + source_ts: now + 10, + ingest_ts: now + 11, seq: 2, trace_id: "opt-2", - ts: 110, + ts: now + 10, option_contract_id: "SPY-2025-01-17-500-P", price: 1, size: 100, @@ -239,8 +242,8 @@ describe("LiveStateManager", () => { signal_profile: "smart-money" }); await manager.ingest("flow", { - source_ts: 120, - ingest_ts: 121, + source_ts: now + 20, + ingest_ts: now + 21, seq: 3, trace_id: "flow-1", id: "flow-1", @@ -273,4 +276,203 @@ describe("LiveStateManager", () => { expect(optionSnapshot.items).toHaveLength(1); expect(flowSnapshot.items).toHaveLength(1); }); + + it("suppresses stale items from live snapshots while preserving fresh ones", async () => { + const manager = new LiveStateManager(makeClickHouse(), null); + const now = Date.now(); + + await manager.ingest("options", { + source_ts: now - 20_000, + ingest_ts: now - 19_999, + seq: 1, + trace_id: "opt-stale", + ts: now - 20_000, + option_contract_id: "AAPL-2025-01-17-200-C", + price: 1, + size: 10, + exchange: "X" + }); + await manager.ingest("options", { + source_ts: now - 5_000, + ingest_ts: now - 4_999, + seq: 2, + trace_id: "opt-fresh", + ts: now - 5_000, + option_contract_id: "AAPL-2025-01-17-205-C", + price: 1, + size: 10, + exchange: "X" + }); + + await manager.ingest("nbbo", { + source_ts: now - 20_000, + ingest_ts: now - 19_999, + seq: 1, + trace_id: "nbbo-stale", + ts: now - 20_000, + option_contract_id: "AAPL-2025-01-17-200-C", + bid: 1, + ask: 1.1, + bidSize: 10, + askSize: 10 + }); + await manager.ingest("nbbo", { + source_ts: now - 5_000, + ingest_ts: now - 4_999, + seq: 2, + trace_id: "nbbo-fresh", + ts: now - 5_000, + option_contract_id: "AAPL-2025-01-17-205-C", + bid: 1, + ask: 1.1, + bidSize: 10, + askSize: 10 + }); + + await manager.ingest("equities", { + source_ts: now - 20_000, + ingest_ts: now - 19_999, + seq: 1, + trace_id: "eq-stale", + ts: now - 20_000, + underlying_id: "AAPL", + price: 100, + size: 10, + exchange: "X", + offExchangeFlag: false + }); + await manager.ingest("equities", { + source_ts: now - 5_000, + ingest_ts: now - 4_999, + seq: 2, + trace_id: "eq-fresh", + ts: now - 5_000, + underlying_id: "AAPL", + price: 101, + size: 10, + exchange: "X", + offExchangeFlag: false + }); + + await manager.ingest("flow", { + source_ts: now - 40_000, + ingest_ts: now - 39_999, + seq: 1, + trace_id: "flow-stale", + id: "flow-stale", + members: ["opt-stale"], + features: {}, + join_quality: {} + }); + await manager.ingest("flow", { + source_ts: now - 5_000, + ingest_ts: now - 4_999, + seq: 2, + trace_id: "flow-fresh", + id: "flow-fresh", + members: ["opt-fresh"], + features: {}, + join_quality: {} + }); + + const [optionsSnapshot, nbboSnapshot, equitiesSnapshot, flowSnapshot] = await Promise.all([ + manager.getSnapshot({ channel: "options" }), + manager.getSnapshot({ channel: "nbbo" }), + manager.getSnapshot({ channel: "equities" }), + manager.getSnapshot({ channel: "flow" }) + ]); + + expect((optionsSnapshot.items as Array<{ trace_id: string }>).map((item) => item.trace_id)).toEqual([ + "opt-fresh" + ]); + expect((nbboSnapshot.items as Array<{ trace_id: string }>).map((item) => item.trace_id)).toEqual([ + "nbbo-fresh" + ]); + expect((equitiesSnapshot.items as Array<{ trace_id: string }>).map((item) => item.trace_id)).toEqual([ + "eq-fresh" + ]); + expect((flowSnapshot.items as Array<{ id: string }>).map((item) => item.id)).toEqual([ + "flow-fresh" + ]); + }); + + it("keeps only the newest NBBO quote per contract across hydrate and ingest", async () => { + const redis = makeRedis(); + const now = Date.now(); + + await redis.lPush( + "live:nbbo", + JSON.stringify({ + source_ts: now - 2_000, + ingest_ts: now - 1_999, + seq: 1, + trace_id: "nbbo-old", + ts: now - 2_000, + option_contract_id: "AAPL-2025-01-17-200-C", + bid: 1, + ask: 1.1, + bidSize: 10, + askSize: 10 + }) + ); + await redis.lPush( + "live:nbbo", + JSON.stringify({ + source_ts: now - 1_000, + ingest_ts: now - 999, + seq: 2, + trace_id: "nbbo-new", + ts: now - 1_000, + option_contract_id: "AAPL-2025-01-17-200-C", + bid: 1.2, + ask: 1.3, + bidSize: 12, + askSize: 12 + }) + ); + await redis.lPush( + "live:nbbo", + JSON.stringify({ + source_ts: now - 500, + ingest_ts: now - 499, + seq: 3, + trace_id: "nbbo-other", + ts: now - 500, + option_contract_id: "MSFT-2025-01-17-300-C", + bid: 2, + ask: 2.1, + bidSize: 15, + askSize: 15 + }) + ); + await redis.hSet("live:cursors", "nbbo", JSON.stringify({ ts: now - 500, seq: 3 })); + + const manager = new LiveStateManager(makeClickHouse(), redis as never); + await manager.hydrate(); + + await manager.ingest("nbbo", { + source_ts: now - 250, + ingest_ts: now - 249, + seq: 4, + trace_id: "nbbo-latest", + ts: now - 250, + option_contract_id: "AAPL-2025-01-17-200-C", + bid: 1.4, + ask: 1.5, + bidSize: 14, + askSize: 14 + }); + + const snapshot = await manager.getSnapshot({ channel: "nbbo" }); + expect(snapshot.items).toHaveLength(2); + expect( + (snapshot.items as Array<{ option_contract_id: string; trace_id: string }>).map((item) => [ + item.option_contract_id, + item.trace_id + ]) + ).toEqual([ + ["AAPL-2025-01-17-200-C", "nbbo-latest"], + ["MSFT-2025-01-17-300-C", "nbbo-other"] + ]); + }); }); diff --git a/services/ingest-options/src/adapters/synthetic.ts b/services/ingest-options/src/adapters/synthetic.ts index fbdf3d6..003d70c 100644 --- a/services/ingest-options/src/adapters/synthetic.ts +++ b/services/ingest-options/src/adapters/synthetic.ts @@ -23,6 +23,8 @@ type Burst = { seed: number; }; +const OPTION_CONTRACT_MULTIPLIER = 100; + const SYNTHETIC_SYMBOLS = ["SPY", ...(SP500_SYMBOLS as readonly string[])]; const MS_PER_DAY = 24 * 60 * 60 * 1000; const EXPIRY_OFFSETS = [0, 1, 7, 14, 28, 45, 60, 90]; @@ -47,7 +49,7 @@ type Scenario = { right: "C" | "P" | "either"; countRange: [number, number]; sizeRange: [number, number]; - premiumRange: [number, number]; + targetNotionalRange: [number, number]; priceTrend: "up" | "down" | "flat"; conditions?: string[]; }; @@ -59,7 +61,7 @@ const REALISTIC_SCENARIOS: Scenario[] = [ right: "either", countRange: [1, 2], sizeRange: [30, 180], - premiumRange: [9_000, 35_000], + targetNotionalRange: [9_000, 35_000], priceTrend: "flat", conditions: ["FILL"] }, @@ -69,7 +71,7 @@ const REALISTIC_SCENARIOS: Scenario[] = [ right: "either", countRange: [1, 2], sizeRange: [120, 480], - premiumRange: [12_000, 45_000], + targetNotionalRange: [12_000, 45_000], priceTrend: "flat", conditions: ["FILL"] }, @@ -79,7 +81,7 @@ const REALISTIC_SCENARIOS: Scenario[] = [ right: "C", countRange: [2, 3], sizeRange: [180, 520], - premiumRange: [25_000, 90_000], + targetNotionalRange: [25_000, 90_000], priceTrend: "up", conditions: ["SWEEP"] }, @@ -89,7 +91,7 @@ const REALISTIC_SCENARIOS: Scenario[] = [ right: "P", countRange: [2, 3], sizeRange: [180, 520], - premiumRange: [25_000, 90_000], + targetNotionalRange: [25_000, 90_000], priceTrend: "up", conditions: ["SWEEP"] }, @@ -99,7 +101,7 @@ const REALISTIC_SCENARIOS: Scenario[] = [ right: "either", countRange: [2, 3], sizeRange: [500, 900], - premiumRange: [18_000, 70_000], + targetNotionalRange: [18_000, 70_000], priceTrend: "flat", conditions: ["ISO"] }, @@ -109,7 +111,7 @@ const REALISTIC_SCENARIOS: Scenario[] = [ right: "either", countRange: [1, 2], sizeRange: [5, 60], - premiumRange: [500, 6_000], + targetNotionalRange: [500, 6_000], priceTrend: "flat", conditions: ["FILL"] } @@ -122,7 +124,7 @@ const ACTIVE_SCENARIOS: Scenario[] = [ right: "C", countRange: [7, 10], sizeRange: [600, 1800], - premiumRange: [120_000, 240_000], + targetNotionalRange: [120_000, 240_000], priceTrend: "up", conditions: ["SWEEP"] }, @@ -132,7 +134,7 @@ const ACTIVE_SCENARIOS: Scenario[] = [ right: "P", countRange: [7, 10], sizeRange: [600, 1800], - premiumRange: [120_000, 240_000], + targetNotionalRange: [120_000, 240_000], priceTrend: "up", conditions: ["SWEEP"] }, @@ -142,7 +144,7 @@ const ACTIVE_SCENARIOS: Scenario[] = [ right: "either", countRange: [5, 8], sizeRange: [1200, 3200], - premiumRange: [60_000, 140_000], + targetNotionalRange: [60_000, 140_000], priceTrend: "flat", conditions: ["ISO"] }, @@ -152,7 +154,7 @@ const ACTIVE_SCENARIOS: Scenario[] = [ right: "either", countRange: [2, 4], sizeRange: [10, 200], - premiumRange: [500, 5000], + targetNotionalRange: [500, 5000], priceTrend: "flat", conditions: ["FILL"] } @@ -261,14 +263,17 @@ const SYNTHETIC_PROFILES: Record = weight: 20, countRange: [5, 8], sizeRange: [20, 300], - premiumRange: [800, 12_000] + targetNotionalRange: [800, 12_000] } : { ...scenario, weight: scenario.weight + 10, countRange: [scenario.countRange[0] + 2, scenario.countRange[1] + 3], sizeRange: [scenario.sizeRange[0], scenario.sizeRange[1] * 2], - premiumRange: [scenario.premiumRange[0], scenario.premiumRange[1] * 1.5] + targetNotionalRange: [ + scenario.targetNotionalRange[0], + scenario.targetNotionalRange[1] * 1.5 + ] } ), pricePlacements: FIREHOSE_PRICE_PLACEMENTS @@ -367,12 +372,20 @@ const buildBurst = (burstIndex: number, now: number, profile: SyntheticOptionsPr const exchange = pick(EXCHANGES, burstIndex + symbolHash); const printCount = pickInt(scenario.countRange[0], scenario.countRange[1], symbolHash + burstIndex * 13); const baseSize = pickInt(scenario.sizeRange[0], scenario.sizeRange[1], symbolHash + burstIndex * 17); - const premiumTarget = pickFloat( - scenario.premiumRange[0], - scenario.premiumRange[1], + const targetNotional = pickFloat( + scenario.targetNotionalRange[0], + scenario.targetNotionalRange[1], symbolHash + burstIndex * 19 ); - const basePricePer = Math.max(0.05, Number((premiumTarget / (baseSize * printCount)).toFixed(2))); + const basePricePer = Math.max( + 0.05, + Number( + ( + targetNotional / + (baseSize * printCount * OPTION_CONTRACT_MULTIPLIER) + ).toFixed(2) + ) + ); const conditions = scenario.conditions?.length ? scenario.conditions : [pick(CONDITIONS, burstIndex)]; const priceStep = scenario.priceTrend === "up" ? 0.01 : scenario.priceTrend === "down" ? -0.01 : 0; @@ -390,6 +403,12 @@ const buildBurst = (burstIndex: number, now: number, profile: SyntheticOptionsPr }; }; +export const buildSyntheticBurstForTest = ( + burstIndex: number, + now: number, + mode: SyntheticMarketMode +): Burst => buildBurst(burstIndex, now, SYNTHETIC_PROFILES[mode]); + export const createSyntheticOptionsAdapter = ( config: SyntheticOptionsAdapterConfig ): OptionIngestAdapter => { diff --git a/services/ingest-options/tests/synthetic.test.ts b/services/ingest-options/tests/synthetic.test.ts new file mode 100644 index 0000000..95f11e3 --- /dev/null +++ b/services/ingest-options/tests/synthetic.test.ts @@ -0,0 +1,26 @@ +import { describe, expect, it } from "bun:test"; +import { buildSyntheticBurstForTest } from "../src/adapters/synthetic"; + +const totalBurstNotional = (burst: { + basePrice: number; + baseSize: number; + printCount: number; +}): number => burst.basePrice * burst.baseSize * burst.printCount * 100; + +describe("synthetic options burst sizing", () => { + it("keeps realistic-mode ask lifts inside the configured notional band", () => { + const burst = buildSyntheticBurstForTest(2, Date.UTC(2026, 0, 2), "realistic"); + + expect(burst.scenarioId).toBe("ask_lift"); + expect(totalBurstNotional(burst)).toBeGreaterThanOrEqual(9_000); + expect(totalBurstNotional(burst)).toBeLessThanOrEqual(35_000); + }); + + it("keeps active-mode sweeps inside the configured notional band", () => { + const burst = buildSyntheticBurstForTest(1, Date.UTC(2026, 0, 2), "active"); + + expect(burst.scenarioId).toBe("bearish_sweep"); + expect(totalBurstNotional(burst)).toBeGreaterThanOrEqual(120_000); + expect(totalBurstNotional(burst)).toBeLessThanOrEqual(240_000); + }); +});