From a94baa745c808e07ae685fba5737755fc7f0b13c Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Sat, 27 Dec 2025 20:34:10 -0500 Subject: [PATCH] Document current state and flow UI --- README.md | 76 ++++++++++---- apps/web/app/globals.css | 6 ++ apps/web/app/page.tsx | 201 +++++++++++++++++++++++++++++++++++++- services/api/src/index.ts | 58 ++++++++++- 4 files changed, 313 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index e08459c..5598a5f 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,28 @@ This repository contains a real-time market-flow analysis platform focused on ** The system ingests real-time options trades/quotes and equity prints, clusters raw activity into higher-level flow events (sweeps, spreads, rolls, ladders), applies rule-first classifiers, and visualizes the results through a high-performance, TradingView-smooth interface with full replay and backtesting support. +## CURRENT STATE (Plan Progress) + +Plan progress (rough): [####------] + +Done now (in repo): +- Bun monorepo + infra docker compose (ClickHouse, Redis, NATS JetStream) +- Shared event schemas + logging + config helpers +- Synthetic options/equity prints published to NATS and persisted to ClickHouse +- Deterministic option FlowPacket clustering (time window) + persistence +- API: REST for prints/flow packets, WS for live options/equities/flow, replay endpoints +- UI: live tapes for options/equities/flow + replay toggle + pause controls + +In progress / blocked: +- Real data adapters (requires licensed data source) +- Rolling stats and advanced clustering + +Not started: +- Classifiers + alert scoring +- Dark pool inference +- Candle service and chart overlays +- Auth / secure deployment + ## Core Principles - **Explainability first** — every alert and signal is backed by observable data and explicit logic. @@ -11,27 +33,21 @@ The system ingests real-time options trades/quotes and equity prints, clusters r - **Market microstructure correctness** — conservative handling of aggressor inference, OI, and off-exchange prints. - **Low-latency, tangible UX** — smooth real-time interaction that feels like an instrument panel, not a spreadsheet. -## What This Does +## Current Capabilities -- Ingests real-time options market data (OPRA-derived via licensed sources) -- Ingests real-time equity trades and quotes, including off-exchange prints -- Clusters raw prints into parent flow events: - - sweeps - - ladders - - spreads - - rolls -- Applies rule-first classifiers: - - large bullish/bearish sweeps - - put selling / overwrites - - volatility trades (straddles/strangles) - - 0DTE gamma activity - - far-dated conviction -- Infers dark-pool-like behavior (absorption, accumulation, distribution) -- Visualizes everything in real time with: - - live flow terminals - - off-exchange print overlays - - inferred event markers - - replayable charts +- Synthetic options/equity prints with deterministic sequencing +- Raw event persistence in ClickHouse + streaming via NATS JetStream +- Deterministic option FlowPacket clustering (time-window) +- API gateway with REST, WS, and replay endpoints +- UI tapes for options/equities/flow packets with live/replay toggle and pause controls + +## Planned Capabilities (from PLAN.md) + +- Real-time licensed market data ingestors (options + equities) +- Rule-first classifiers and alert scoring +- Dark pool inference and evidence linking +- Candle aggregation + chart overlays +- Replay/backtesting metrics and calibration ## Tech Stack @@ -57,6 +73,26 @@ types/ ui/ chart/ +## Build and Run + +Install dependencies: +- `bun install` + +Start infra: +- `docker compose up -d` + +Start everything (infra + services + web): +- `bun run dev` + +Run just the web app (auto-picks a free port in 3001-3005): +- `bun --cwd apps/web run dev` + +Run just the API: +- `bun --cwd services/api run dev` + +Run tests: +- `bun test` + ## Status Active build for personal, non-delayed analytical use. Multi-user access and redistribution are intentionally out of scope. diff --git a/apps/web/app/globals.css b/apps/web/app/globals.css index aa60d70..aeeb5c0 100644 --- a/apps/web/app/globals.css +++ b/apps/web/app/globals.css @@ -243,6 +243,12 @@ h1 { color: #5b4c34; } +.flow-meta span { + display: inline-flex; + align-items: center; + gap: 6px; +} + .flag { padding: 2px 8px; border-radius: 999px; diff --git a/apps/web/app/page.tsx b/apps/web/app/page.tsx index 2d8b8fa..cba6fec 100644 --- a/apps/web/app/page.tsx +++ b/apps/web/app/page.tsx @@ -1,7 +1,7 @@ "use client"; import { useCallback, useEffect, useMemo, useRef, useState } from "react"; -import type { EquityPrint, OptionPrint } from "@islandflow/types"; +import type { EquityPrint, FlowPacket, OptionPrint } from "@islandflow/types"; const MAX_ITEMS = 60; const LOCAL_HOSTS = new Set(["localhost", "127.0.0.1"]); @@ -10,7 +10,7 @@ type WsStatus = "connecting" | "connected" | "disconnected"; type TapeMode = "live" | "replay"; -type MessageType = "option-print" | "equity-print"; +type MessageType = "option-print" | "equity-print" | "flow-packet"; type StreamMessage = { type: MessageType; @@ -89,6 +89,21 @@ const formatTime = (ts: number): string => { return new Date(ts).toLocaleTimeString(); }; +const parseNumber = (value: unknown, fallback: number): number => { + if (typeof value === "number" && Number.isFinite(value)) { + return value; + } + + if (typeof value === "string") { + const parsed = Number(value); + if (Number.isFinite(parsed)) { + return parsed; + } + } + + return fallback; +}; + const statusLabel = (status: WsStatus, paused: boolean, mode: TapeMode): string => { if (paused) { return "Paused"; @@ -298,6 +313,119 @@ const useTape = ( return { status, items, lastUpdate, paused, dropped, togglePause }; }; +const useFlowStream = (enabled: boolean): TapeState => { + const [status, setStatus] = useState(enabled ? "connecting" : "disconnected"); + const [items, setItems] = useState([]); + const [lastUpdate, setLastUpdate] = useState(null); + const [paused, setPaused] = useState(false); + const [dropped, setDropped] = useState(0); + const reconnectRef = useRef(null); + const socketRef = useRef(null); + const pausedRef = useRef(paused); + + useEffect(() => { + pausedRef.current = paused; + }, [paused]); + + const togglePause = useCallback(() => { + setPaused((prev) => { + const next = !prev; + if (!next) { + setDropped(0); + } + return next; + }); + }, []); + + useEffect(() => { + if (!enabled) { + setStatus("disconnected"); + return; + } + + let active = true; + + const connect = () => { + if (!active) { + return; + } + + setStatus("connecting"); + + const socket = new WebSocket(buildWsUrl("/ws/flow")); + socketRef.current = socket; + + socket.onopen = () => { + if (!active) { + return; + } + setStatus("connected"); + }; + + socket.onmessage = (event) => { + if (!active) { + return; + } + + try { + const message = JSON.parse(event.data) as StreamMessage; + if (!message || message.type !== "flow-packet") { + return; + } + + if (pausedRef.current) { + setDropped((prev) => prev + 1); + setLastUpdate(Date.now()); + return; + } + + setItems((prev) => { + const next = [message.payload, ...prev]; + return next.slice(0, MAX_ITEMS); + }); + setLastUpdate(Date.now()); + } catch (error) { + console.warn("Failed to parse flow packet", error); + } + }; + + socket.onclose = () => { + if (!active) { + return; + } + + setStatus("disconnected"); + reconnectRef.current = window.setTimeout(() => { + connect(); + }, 1000); + }; + + socket.onerror = () => { + if (!active) { + return; + } + + setStatus("disconnected"); + socket.close(); + }; + }; + + connect(); + + return () => { + active = false; + if (reconnectRef.current !== null) { + window.clearTimeout(reconnectRef.current); + } + if (socketRef.current) { + socketRef.current.close(); + } + }; + }, [enabled]); + + return { status, items, lastUpdate, paused, dropped, togglePause }; +}; + type TapeStatusProps = { status: WsStatus; lastUpdate: number | null; @@ -330,6 +458,14 @@ const TapeStatus = ({ status, lastUpdate, paused, dropped, mode, onTogglePause } ); }; +const formatFlowMetric = (value: number, suffix?: string): string => { + if (suffix) { + return `${value}${suffix}`; + } + + return value.toLocaleString(); +}; + export default function HomePage() { const [mode, setMode] = useState("live"); @@ -347,11 +483,13 @@ export default function HomePage() { expectedType: "equity-print" }); + const flow = useFlowStream(mode === "live"); + const lastSeen = useMemo(() => { - return [options.lastUpdate, equities.lastUpdate] + return [options.lastUpdate, equities.lastUpdate, flow.lastUpdate] .filter((value): value is number => value !== null) .sort((a, b) => b - a)[0] ?? null; - }, [options.lastUpdate, equities.lastUpdate]); + }, [options.lastUpdate, equities.lastUpdate, flow.lastUpdate]); const toggleMode = () => { setMode((prev) => (prev === "live" ? "replay" : "live")); @@ -468,6 +606,61 @@ export default function HomePage() { )} + +
+
+
+

Flow Packets

+

Deterministic clusters (live only).

+
+ +
+ +
+ {mode !== "live" ? ( +
Flow packets are live-only in this build.
+ ) : flow.items.length === 0 ? ( +
No flow packets yet. Start compute.
+ ) : ( + flow.items.map((packet) => { + const features = packet.features ?? {}; + const contract = String(features.option_contract_id ?? packet.id ?? "unknown"); + const count = parseNumber(features.count, packet.members.length); + const totalSize = parseNumber(features.total_size, 0); + const totalPremium = parseNumber(features.total_premium, 0); + const startTs = parseNumber(features.start_ts, packet.source_ts); + const endTs = parseNumber(features.end_ts, startTs); + const windowMs = parseNumber(features.window_ms, 0); + + return ( +
+
+
{contract}
+
+ {formatFlowMetric(count)} prints + {formatFlowMetric(totalSize)} size + ${formatPrice(totalPremium)} + {windowMs > 0 ? ( + {formatFlowMetric(windowMs, "ms")} + ) : null} +
+
+
+ {formatTime(startTs)} → {formatTime(endTs)} +
+
+ ); + }) + )} +
+
); diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 6e879cc..a2b7608 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -2,8 +2,10 @@ import { readEnv } from "@islandflow/config"; import { createLogger } from "@islandflow/observability"; import { SUBJECT_EQUITY_PRINTS, + SUBJECT_FLOW_PACKETS, SUBJECT_OPTION_PRINTS, STREAM_EQUITY_PRINTS, + STREAM_FLOW_PACKETS, STREAM_OPTION_PRINTS, buildDurableConsumer, connectJetStreamWithRetry, @@ -21,7 +23,7 @@ import { fetchOptionPrintsAfter, fetchRecentOptionPrints } from "@islandflow/storage"; -import { EquityPrintSchema, OptionPrintSchema } from "@islandflow/types"; +import { EquityPrintSchema, FlowPacketSchema, OptionPrintSchema } from "@islandflow/types"; import { z } from "zod"; const service = "api"; @@ -44,7 +46,7 @@ const replayParamsSchema = z.object({ limit: z.coerce.number().int().positive().max(1000).default(200) }); -type Channel = "options" | "equities"; +type Channel = "options" | "equities" | "flow"; type WsData = { channel: Channel; @@ -52,6 +54,7 @@ type WsData = { const optionSockets = new Set>(); const equitySockets = new Set>(); +const flowSockets = new Set>(); const jsonResponse = (body: unknown, status = 200): Response => { return new Response(JSON.stringify(body), { @@ -136,6 +139,19 @@ const run = async () => { num_replicas: 1 }); + await ensureStream(jsm, { + name: STREAM_FLOW_PACKETS, + subjects: [SUBJECT_FLOW_PACKETS], + retention: "limits", + storage: "file", + discard: "old", + max_msgs_per_subject: -1, + max_msgs: -1, + max_bytes: -1, + max_age: 0, + num_replicas: 1 + }); + const clickhouse = createClickHouseClient({ url: env.CLICKHOUSE_URL, database: env.CLICKHOUSE_DATABASE @@ -157,6 +173,12 @@ const run = async () => { buildDurableConsumer("api-equity-prints") ); + const flowSubscription = await subscribeJson( + js, + SUBJECT_FLOW_PACKETS, + buildDurableConsumer("api-flow-packets") + ); + const pumpOptions = async () => { for await (const msg of optionSubscription.messages) { try { @@ -187,8 +209,24 @@ const run = async () => { } }; + const pumpFlow = async () => { + for await (const msg of flowSubscription.messages) { + try { + const payload = FlowPacketSchema.parse(flowSubscription.decode(msg)); + broadcast(flowSockets, { type: "flow-packet", payload }); + msg.ack(); + } catch (error) { + logger.error("failed to process flow packet", { + error: error instanceof Error ? error.message : String(error) + }); + msg.term(); + } + } + }; + void pumpOptions(); void pumpEquities(); + void pumpFlow(); const server = Bun.serve({ port: env.API_PORT, @@ -249,14 +287,24 @@ const run = async () => { return jsonResponse({ error: "websocket upgrade failed" }, 400); } + if (req.method === "GET" && url.pathname === "/ws/flow") { + if (serverRef.upgrade(req, { data: { channel: "flow" } })) { + return new Response(null, { status: 101 }); + } + + return jsonResponse({ error: "websocket upgrade failed" }, 400); + } + return jsonResponse({ error: "not found" }, 404); }, websocket: { open: (socket) => { if (socket.data.channel === "options") { optionSockets.add(socket); - } else { + } else if (socket.data.channel === "equities") { equitySockets.add(socket); + } else { + flowSockets.add(socket); } logger.info("websocket connected", { channel: socket.data.channel }); @@ -264,8 +312,10 @@ const run = async () => { close: (socket) => { if (socket.data.channel === "options") { optionSockets.delete(socket); - } else { + } else if (socket.data.channel === "equities") { equitySockets.delete(socket); + } else { + flowSockets.delete(socket); } logger.info("websocket disconnected", { channel: socket.data.channel });