Document current state and flow UI

This commit is contained in:
dirtydishes 2025-12-27 20:34:10 -05:00
parent 6c376b26dc
commit a94baa745c
4 changed files with 313 additions and 28 deletions

View file

@ -4,6 +4,28 @@ This repository contains a real-time market-flow analysis platform focused on **
The system ingests real-time options trades/quotes and equity prints, clusters raw activity into higher-level flow events (sweeps, spreads, rolls, ladders), applies rule-first classifiers, and visualizes the results through a high-performance, TradingView-smooth interface with full replay and backtesting support. The system ingests real-time options trades/quotes and equity prints, clusters raw activity into higher-level flow events (sweeps, spreads, rolls, ladders), applies rule-first classifiers, and visualizes the results through a high-performance, TradingView-smooth interface with full replay and backtesting support.
## CURRENT STATE (Plan Progress)
Plan progress (rough): [####------]
Done now (in repo):
- Bun monorepo + infra docker compose (ClickHouse, Redis, NATS JetStream)
- Shared event schemas + logging + config helpers
- Synthetic options/equity prints published to NATS and persisted to ClickHouse
- Deterministic option FlowPacket clustering (time window) + persistence
- API: REST for prints/flow packets, WS for live options/equities/flow, replay endpoints
- UI: live tapes for options/equities/flow + replay toggle + pause controls
In progress / blocked:
- Real data adapters (requires licensed data source)
- Rolling stats and advanced clustering
Not started:
- Classifiers + alert scoring
- Dark pool inference
- Candle service and chart overlays
- Auth / secure deployment
## Core Principles ## Core Principles
- **Explainability first** — every alert and signal is backed by observable data and explicit logic. - **Explainability first** — every alert and signal is backed by observable data and explicit logic.
@ -11,27 +33,21 @@ The system ingests real-time options trades/quotes and equity prints, clusters r
- **Market microstructure correctness** — conservative handling of aggressor inference, OI, and off-exchange prints. - **Market microstructure correctness** — conservative handling of aggressor inference, OI, and off-exchange prints.
- **Low-latency, tangible UX** — smooth real-time interaction that feels like an instrument panel, not a spreadsheet. - **Low-latency, tangible UX** — smooth real-time interaction that feels like an instrument panel, not a spreadsheet.
## What This Does ## Current Capabilities
- Ingests real-time options market data (OPRA-derived via licensed sources) - Synthetic options/equity prints with deterministic sequencing
- Ingests real-time equity trades and quotes, including off-exchange prints - Raw event persistence in ClickHouse + streaming via NATS JetStream
- Clusters raw prints into parent flow events: - Deterministic option FlowPacket clustering (time-window)
- sweeps - API gateway with REST, WS, and replay endpoints
- ladders - UI tapes for options/equities/flow packets with live/replay toggle and pause controls
- spreads
- rolls ## Planned Capabilities (from PLAN.md)
- Applies rule-first classifiers:
- large bullish/bearish sweeps - Real-time licensed market data ingestors (options + equities)
- put selling / overwrites - Rule-first classifiers and alert scoring
- volatility trades (straddles/strangles) - Dark pool inference and evidence linking
- 0DTE gamma activity - Candle aggregation + chart overlays
- far-dated conviction - Replay/backtesting metrics and calibration
- Infers dark-pool-like behavior (absorption, accumulation, distribution)
- Visualizes everything in real time with:
- live flow terminals
- off-exchange print overlays
- inferred event markers
- replayable charts
## Tech Stack ## Tech Stack
@ -57,6 +73,26 @@ types/
ui/ ui/
chart/ chart/
## Build and Run
Install dependencies:
- `bun install`
Start infra:
- `docker compose up -d`
Start everything (infra + services + web):
- `bun run dev`
Run just the web app (auto-picks a free port in 3001-3005):
- `bun --cwd apps/web run dev`
Run just the API:
- `bun --cwd services/api run dev`
Run tests:
- `bun test`
## Status ## Status
Active build for personal, non-delayed analytical use. Multi-user access and redistribution are intentionally out of scope. Active build for personal, non-delayed analytical use. Multi-user access and redistribution are intentionally out of scope.

View file

@ -243,6 +243,12 @@ h1 {
color: #5b4c34; color: #5b4c34;
} }
.flow-meta span {
display: inline-flex;
align-items: center;
gap: 6px;
}
.flag { .flag {
padding: 2px 8px; padding: 2px 8px;
border-radius: 999px; border-radius: 999px;

View file

@ -1,7 +1,7 @@
"use client"; "use client";
import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import type { EquityPrint, OptionPrint } from "@islandflow/types"; import type { EquityPrint, FlowPacket, OptionPrint } from "@islandflow/types";
const MAX_ITEMS = 60; const MAX_ITEMS = 60;
const LOCAL_HOSTS = new Set(["localhost", "127.0.0.1"]); const LOCAL_HOSTS = new Set(["localhost", "127.0.0.1"]);
@ -10,7 +10,7 @@ type WsStatus = "connecting" | "connected" | "disconnected";
type TapeMode = "live" | "replay"; type TapeMode = "live" | "replay";
type MessageType = "option-print" | "equity-print"; type MessageType = "option-print" | "equity-print" | "flow-packet";
type StreamMessage<T> = { type StreamMessage<T> = {
type: MessageType; type: MessageType;
@ -89,6 +89,21 @@ const formatTime = (ts: number): string => {
return new Date(ts).toLocaleTimeString(); return new Date(ts).toLocaleTimeString();
}; };
const parseNumber = (value: unknown, fallback: number): number => {
if (typeof value === "number" && Number.isFinite(value)) {
return value;
}
if (typeof value === "string") {
const parsed = Number(value);
if (Number.isFinite(parsed)) {
return parsed;
}
}
return fallback;
};
const statusLabel = (status: WsStatus, paused: boolean, mode: TapeMode): string => { const statusLabel = (status: WsStatus, paused: boolean, mode: TapeMode): string => {
if (paused) { if (paused) {
return "Paused"; return "Paused";
@ -298,6 +313,119 @@ const useTape = <T extends { ts: number; seq: number }>(
return { status, items, lastUpdate, paused, dropped, togglePause }; return { status, items, lastUpdate, paused, dropped, togglePause };
}; };
const useFlowStream = (enabled: boolean): TapeState<FlowPacket> => {
const [status, setStatus] = useState<WsStatus>(enabled ? "connecting" : "disconnected");
const [items, setItems] = useState<FlowPacket[]>([]);
const [lastUpdate, setLastUpdate] = useState<number | null>(null);
const [paused, setPaused] = useState<boolean>(false);
const [dropped, setDropped] = useState<number>(0);
const reconnectRef = useRef<number | null>(null);
const socketRef = useRef<WebSocket | null>(null);
const pausedRef = useRef(paused);
useEffect(() => {
pausedRef.current = paused;
}, [paused]);
const togglePause = useCallback(() => {
setPaused((prev) => {
const next = !prev;
if (!next) {
setDropped(0);
}
return next;
});
}, []);
useEffect(() => {
if (!enabled) {
setStatus("disconnected");
return;
}
let active = true;
const connect = () => {
if (!active) {
return;
}
setStatus("connecting");
const socket = new WebSocket(buildWsUrl("/ws/flow"));
socketRef.current = socket;
socket.onopen = () => {
if (!active) {
return;
}
setStatus("connected");
};
socket.onmessage = (event) => {
if (!active) {
return;
}
try {
const message = JSON.parse(event.data) as StreamMessage<FlowPacket>;
if (!message || message.type !== "flow-packet") {
return;
}
if (pausedRef.current) {
setDropped((prev) => prev + 1);
setLastUpdate(Date.now());
return;
}
setItems((prev) => {
const next = [message.payload, ...prev];
return next.slice(0, MAX_ITEMS);
});
setLastUpdate(Date.now());
} catch (error) {
console.warn("Failed to parse flow packet", error);
}
};
socket.onclose = () => {
if (!active) {
return;
}
setStatus("disconnected");
reconnectRef.current = window.setTimeout(() => {
connect();
}, 1000);
};
socket.onerror = () => {
if (!active) {
return;
}
setStatus("disconnected");
socket.close();
};
};
connect();
return () => {
active = false;
if (reconnectRef.current !== null) {
window.clearTimeout(reconnectRef.current);
}
if (socketRef.current) {
socketRef.current.close();
}
};
}, [enabled]);
return { status, items, lastUpdate, paused, dropped, togglePause };
};
type TapeStatusProps = { type TapeStatusProps = {
status: WsStatus; status: WsStatus;
lastUpdate: number | null; lastUpdate: number | null;
@ -330,6 +458,14 @@ const TapeStatus = ({ status, lastUpdate, paused, dropped, mode, onTogglePause }
); );
}; };
const formatFlowMetric = (value: number, suffix?: string): string => {
if (suffix) {
return `${value}${suffix}`;
}
return value.toLocaleString();
};
export default function HomePage() { export default function HomePage() {
const [mode, setMode] = useState<TapeMode>("live"); const [mode, setMode] = useState<TapeMode>("live");
@ -347,11 +483,13 @@ export default function HomePage() {
expectedType: "equity-print" expectedType: "equity-print"
}); });
const flow = useFlowStream(mode === "live");
const lastSeen = useMemo(() => { const lastSeen = useMemo(() => {
return [options.lastUpdate, equities.lastUpdate] return [options.lastUpdate, equities.lastUpdate, flow.lastUpdate]
.filter((value): value is number => value !== null) .filter((value): value is number => value !== null)
.sort((a, b) => b - a)[0] ?? null; .sort((a, b) => b - a)[0] ?? null;
}, [options.lastUpdate, equities.lastUpdate]); }, [options.lastUpdate, equities.lastUpdate, flow.lastUpdate]);
const toggleMode = () => { const toggleMode = () => {
setMode((prev) => (prev === "live" ? "replay" : "live")); setMode((prev) => (prev === "live" ? "replay" : "live"));
@ -468,6 +606,61 @@ export default function HomePage() {
)} )}
</div> </div>
</section> </section>
<section className="card">
<div className="card-header">
<div>
<h2>Flow Packets</h2>
<p className="card-subtitle">Deterministic clusters (live only).</p>
</div>
<TapeStatus
status={flow.status}
lastUpdate={flow.lastUpdate}
paused={flow.paused}
dropped={flow.dropped}
mode={mode}
onTogglePause={flow.togglePause}
/>
</div>
<div className="list">
{mode !== "live" ? (
<div className="empty">Flow packets are live-only in this build.</div>
) : flow.items.length === 0 ? (
<div className="empty">No flow packets yet. Start compute.</div>
) : (
flow.items.map((packet) => {
const features = packet.features ?? {};
const contract = String(features.option_contract_id ?? packet.id ?? "unknown");
const count = parseNumber(features.count, packet.members.length);
const totalSize = parseNumber(features.total_size, 0);
const totalPremium = parseNumber(features.total_premium, 0);
const startTs = parseNumber(features.start_ts, packet.source_ts);
const endTs = parseNumber(features.end_ts, startTs);
const windowMs = parseNumber(features.window_ms, 0);
return (
<div className="row" key={packet.id}>
<div>
<div className="contract">{contract}</div>
<div className="meta flow-meta">
<span>{formatFlowMetric(count)} prints</span>
<span>{formatFlowMetric(totalSize)} size</span>
<span>${formatPrice(totalPremium)}</span>
{windowMs > 0 ? (
<span>{formatFlowMetric(windowMs, "ms")}</span>
) : null}
</div>
</div>
<div className="time">
{formatTime(startTs)} {formatTime(endTs)}
</div>
</div>
);
})
)}
</div>
</section>
</div> </div>
</main> </main>
); );

View file

@ -2,8 +2,10 @@ import { readEnv } from "@islandflow/config";
import { createLogger } from "@islandflow/observability"; import { createLogger } from "@islandflow/observability";
import { import {
SUBJECT_EQUITY_PRINTS, SUBJECT_EQUITY_PRINTS,
SUBJECT_FLOW_PACKETS,
SUBJECT_OPTION_PRINTS, SUBJECT_OPTION_PRINTS,
STREAM_EQUITY_PRINTS, STREAM_EQUITY_PRINTS,
STREAM_FLOW_PACKETS,
STREAM_OPTION_PRINTS, STREAM_OPTION_PRINTS,
buildDurableConsumer, buildDurableConsumer,
connectJetStreamWithRetry, connectJetStreamWithRetry,
@ -21,7 +23,7 @@ import {
fetchOptionPrintsAfter, fetchOptionPrintsAfter,
fetchRecentOptionPrints fetchRecentOptionPrints
} from "@islandflow/storage"; } from "@islandflow/storage";
import { EquityPrintSchema, OptionPrintSchema } from "@islandflow/types"; import { EquityPrintSchema, FlowPacketSchema, OptionPrintSchema } from "@islandflow/types";
import { z } from "zod"; import { z } from "zod";
const service = "api"; const service = "api";
@ -44,7 +46,7 @@ const replayParamsSchema = z.object({
limit: z.coerce.number().int().positive().max(1000).default(200) limit: z.coerce.number().int().positive().max(1000).default(200)
}); });
type Channel = "options" | "equities"; type Channel = "options" | "equities" | "flow";
type WsData = { type WsData = {
channel: Channel; channel: Channel;
@ -52,6 +54,7 @@ type WsData = {
const optionSockets = new Set<WebSocket<WsData>>(); const optionSockets = new Set<WebSocket<WsData>>();
const equitySockets = new Set<WebSocket<WsData>>(); const equitySockets = new Set<WebSocket<WsData>>();
const flowSockets = new Set<WebSocket<WsData>>();
const jsonResponse = (body: unknown, status = 200): Response => { const jsonResponse = (body: unknown, status = 200): Response => {
return new Response(JSON.stringify(body), { return new Response(JSON.stringify(body), {
@ -136,6 +139,19 @@ const run = async () => {
num_replicas: 1 num_replicas: 1
}); });
await ensureStream(jsm, {
name: STREAM_FLOW_PACKETS,
subjects: [SUBJECT_FLOW_PACKETS],
retention: "limits",
storage: "file",
discard: "old",
max_msgs_per_subject: -1,
max_msgs: -1,
max_bytes: -1,
max_age: 0,
num_replicas: 1
});
const clickhouse = createClickHouseClient({ const clickhouse = createClickHouseClient({
url: env.CLICKHOUSE_URL, url: env.CLICKHOUSE_URL,
database: env.CLICKHOUSE_DATABASE database: env.CLICKHOUSE_DATABASE
@ -157,6 +173,12 @@ const run = async () => {
buildDurableConsumer("api-equity-prints") buildDurableConsumer("api-equity-prints")
); );
const flowSubscription = await subscribeJson(
js,
SUBJECT_FLOW_PACKETS,
buildDurableConsumer("api-flow-packets")
);
const pumpOptions = async () => { const pumpOptions = async () => {
for await (const msg of optionSubscription.messages) { for await (const msg of optionSubscription.messages) {
try { try {
@ -187,8 +209,24 @@ const run = async () => {
} }
}; };
const pumpFlow = async () => {
for await (const msg of flowSubscription.messages) {
try {
const payload = FlowPacketSchema.parse(flowSubscription.decode(msg));
broadcast(flowSockets, { type: "flow-packet", payload });
msg.ack();
} catch (error) {
logger.error("failed to process flow packet", {
error: error instanceof Error ? error.message : String(error)
});
msg.term();
}
}
};
void pumpOptions(); void pumpOptions();
void pumpEquities(); void pumpEquities();
void pumpFlow();
const server = Bun.serve<WsData>({ const server = Bun.serve<WsData>({
port: env.API_PORT, port: env.API_PORT,
@ -249,14 +287,24 @@ const run = async () => {
return jsonResponse({ error: "websocket upgrade failed" }, 400); return jsonResponse({ error: "websocket upgrade failed" }, 400);
} }
if (req.method === "GET" && url.pathname === "/ws/flow") {
if (serverRef.upgrade(req, { data: { channel: "flow" } })) {
return new Response(null, { status: 101 });
}
return jsonResponse({ error: "websocket upgrade failed" }, 400);
}
return jsonResponse({ error: "not found" }, 404); return jsonResponse({ error: "not found" }, 404);
}, },
websocket: { websocket: {
open: (socket) => { open: (socket) => {
if (socket.data.channel === "options") { if (socket.data.channel === "options") {
optionSockets.add(socket); optionSockets.add(socket);
} else { } else if (socket.data.channel === "equities") {
equitySockets.add(socket); equitySockets.add(socket);
} else {
flowSockets.add(socket);
} }
logger.info("websocket connected", { channel: socket.data.channel }); logger.info("websocket connected", { channel: socket.data.channel });
@ -264,8 +312,10 @@ const run = async () => {
close: (socket) => { close: (socket) => {
if (socket.data.channel === "options") { if (socket.data.channel === "options") {
optionSockets.delete(socket); optionSockets.delete(socket);
} else { } else if (socket.data.channel === "equities") {
equitySockets.delete(socket); equitySockets.delete(socket);
} else {
flowSockets.delete(socket);
} }
logger.info("websocket disconnected", { channel: socket.data.channel }); logger.info("websocket disconnected", { channel: socket.data.channel });