diff --git a/apps/web/app/globals.css b/apps/web/app/globals.css index 452ffe5..aa60d70 100644 --- a/apps/web/app/globals.css +++ b/apps/web/app/globals.css @@ -9,6 +9,7 @@ --accent: #2f6d4f; --accent-soft: rgba(47, 109, 79, 0.18); --warning: #c46f2a; + --replay: #1f4a7b; --grid: rgba(82, 64, 36, 0.12); } @@ -69,7 +70,7 @@ h1 { .summary { display: grid; - gap: 8px; + gap: 12px; padding: 16px 20px; border-radius: 16px; border: 1px solid var(--panel-border); @@ -88,6 +89,23 @@ h1 { font-size: 1rem; } +.mode-button { + border: 1px solid rgba(31, 74, 123, 0.35); + border-radius: 999px; + padding: 8px 14px; + background: rgba(31, 74, 123, 0.12); + color: #1f4a7b; + font-size: 0.75rem; + letter-spacing: 0.12em; + text-transform: uppercase; + cursor: pointer; +} + +.mode-button:focus-visible { + outline: 2px solid rgba(31, 74, 123, 0.4); + outline-offset: 2px; +} + .cards { display: grid; gap: 28px; @@ -126,6 +144,11 @@ h1 { box-shadow: 0 0 0 4px var(--accent-soft); } +.status-replay .status-dot { + background: var(--replay); + box-shadow: 0 0 0 4px rgba(31, 74, 123, 0.18); +} + .status-connecting .status-dot { animation: pulse 1.2s ease-in-out infinite; } diff --git a/apps/web/app/page.tsx b/apps/web/app/page.tsx index 782f2d7..2d8b8fa 100644 --- a/apps/web/app/page.tsx +++ b/apps/web/app/page.tsx @@ -8,6 +8,8 @@ const LOCAL_HOSTS = new Set(["localhost", "127.0.0.1"]); type WsStatus = "connecting" | "connected" | "disconnected"; +type TapeMode = "live" | "replay"; + type MessageType = "option-print" | "equity-print"; type StreamMessage = { @@ -15,6 +17,16 @@ type StreamMessage = { payload: T; }; +type ReplayCursor = { + ts: number; + seq: number; +}; + +type ReplayResponse = { + data: T[]; + next: ReplayCursor | null; +}; + type TapeState = { status: WsStatus; items: T[]; @@ -44,6 +56,27 @@ const buildWsUrl = (path: string): string => { return `${wsProtocol}://${host}${path}`; }; +const buildApiUrl = (path: string): string => { + const envBase = process.env.NEXT_PUBLIC_API_URL; + + if (envBase) { + const url = new URL(envBase); + const secure = url.protocol === "https:" || url.protocol === "wss:"; + url.protocol = secure ? "https:" : "http:"; + url.pathname = path; + url.search = ""; + url.hash = ""; + return url.toString(); + } + + const { protocol, hostname } = window.location; + const httpProtocol = protocol === "https:" ? "https" : "http"; + const isLocal = LOCAL_HOSTS.has(hostname); + const host = isLocal ? `${hostname}:4000` : window.location.host; + + return `${httpProtocol}://${host}${path}`; +}; + const formatPrice = (price: number): string => { return price.toFixed(2); }; @@ -56,11 +89,15 @@ const formatTime = (ts: number): string => { return new Date(ts).toLocaleTimeString(); }; -const statusLabel = (status: WsStatus, paused: boolean): string => { +const statusLabel = (status: WsStatus, paused: boolean, mode: TapeMode): string => { if (paused) { return "Paused"; } + if (mode === "replay") { + return status === "disconnected" ? "Replay Down" : "Replay"; + } + switch (status) { case "connected": return "Live"; @@ -72,7 +109,21 @@ const statusLabel = (status: WsStatus, paused: boolean): string => { } }; -const useTape = (path: string, expectedType: MessageType): TapeState => { +type TapeConfig = { + mode: TapeMode; + wsPath: string; + replayPath: string; + expectedType: MessageType; + batchSize?: number; + pollMs?: number; +}; + +const useTape = ( + config: TapeConfig +): TapeState => { + const { mode, wsPath, replayPath, expectedType } = config; + const batchSize = config.batchSize ?? 40; + const pollMs = config.pollMs ?? 1000; const [status, setStatus] = useState("connecting"); const [items, setItems] = useState([]); const [lastUpdate, setLastUpdate] = useState(null); @@ -80,6 +131,12 @@ const useTape = (path: string, expectedType: MessageType): TapeState => { const [dropped, setDropped] = useState(0); const reconnectRef = useRef(null); const socketRef = useRef(null); + const cursorRef = useRef({ ts: 0, seq: 0 }); + const pausedRef = useRef(paused); + + useEffect(() => { + pausedRef.current = paused; + }, [paused]); const togglePause = useCallback(() => { setPaused((prev) => { @@ -92,6 +149,18 @@ const useTape = (path: string, expectedType: MessageType): TapeState => { }, []); useEffect(() => { + setItems([]); + setLastUpdate(null); + setDropped(0); + setStatus("connecting"); + cursorRef.current = { ts: 0, seq: 0 }; + }, [mode]); + + useEffect(() => { + if (mode !== "live") { + return; + } + let active = true; const connect = () => { @@ -101,7 +170,7 @@ const useTape = (path: string, expectedType: MessageType): TapeState => { setStatus("connecting"); - const socket = new WebSocket(buildWsUrl(path)); + const socket = new WebSocket(buildWsUrl(wsPath)); socketRef.current = socket; socket.onopen = () => { @@ -122,7 +191,7 @@ const useTape = (path: string, expectedType: MessageType): TapeState => { return; } - if (paused) { + if (pausedRef.current) { setDropped((prev) => prev + 1); setLastUpdate(Date.now()); return; @@ -170,7 +239,61 @@ const useTape = (path: string, expectedType: MessageType): TapeState => { socketRef.current.close(); } }; - }, [path, expectedType, paused]); + }, [mode, wsPath, expectedType]); + + useEffect(() => { + if (mode !== "replay") { + return; + } + + let active = true; + + const poll = async () => { + if (!active || pausedRef.current) { + return; + } + + try { + const cursor = cursorRef.current; + const url = new URL(buildApiUrl(replayPath)); + url.searchParams.set("after_ts", cursor.ts.toString()); + url.searchParams.set("after_seq", cursor.seq.toString()); + url.searchParams.set("limit", batchSize.toString()); + + const response = await fetch(url.toString()); + if (!response.ok) { + throw new Error(`Replay request failed with ${response.status}`); + } + + const payload = (await response.json()) as ReplayResponse; + if (payload.data.length > 0) { + const nextItems = [...payload.data].reverse(); + setItems((prev) => { + const next = [...nextItems, ...prev]; + return next.slice(0, MAX_ITEMS); + }); + setLastUpdate(Date.now()); + } + + if (payload.next) { + cursorRef.current = payload.next; + } + + setStatus("connected"); + } catch (error) { + console.warn("Replay poll failed", error); + setStatus("disconnected"); + } + }; + + void poll(); + const interval = window.setInterval(poll, pollMs); + + return () => { + active = false; + window.clearInterval(interval); + }; + }, [mode, replayPath, batchSize, pollMs]); return { status, items, lastUpdate, paused, dropped, togglePause }; }; @@ -180,14 +303,18 @@ type TapeStatusProps = { lastUpdate: number | null; paused: boolean; dropped: number; + mode: TapeMode; onTogglePause: () => void; }; -const TapeStatus = ({ status, lastUpdate, paused, dropped, onTogglePause }: TapeStatusProps) => { +const TapeStatus = ({ status, lastUpdate, paused, dropped, mode, onTogglePause }: TapeStatusProps) => { + const replayClass = mode === "replay" ? "status-replay" : ""; + const pausedClass = paused ? "status-paused" : ""; + return ( -
+
- {statusLabel(status, paused)} + {statusLabel(status, paused, mode)} {lastUpdate ? ( Updated {formatTime(lastUpdate)} ) : ( @@ -204,8 +331,21 @@ const TapeStatus = ({ status, lastUpdate, paused, dropped, onTogglePause }: Tape }; export default function HomePage() { - const options = useTape("/ws/options", "option-print"); - const equities = useTape("/ws/equities", "equity-print"); + const [mode, setMode] = useState("live"); + + const options = useTape({ + mode, + wsPath: "/ws/options", + replayPath: "/replay/options", + expectedType: "option-print" + }); + + const equities = useTape({ + mode, + wsPath: "/ws/equities", + replayPath: "/replay/equities", + expectedType: "equity-print" + }); const lastSeen = useMemo(() => { return [options.lastUpdate, equities.lastUpdate] @@ -213,6 +353,10 @@ export default function HomePage() { .sort((a, b) => b - a)[0] ?? null; }, [options.lastUpdate, equities.lastUpdate]); + const toggleMode = () => { + setMode((prev) => (prev === "live" ? "replay" : "live")); + }; + return (
@@ -220,7 +364,7 @@ export default function HomePage() {

Realtime flow workspace

Islandflow

- Options + equities streaming over WebSocket from the local API gateway. + Options + equities streaming over WebSocket or replayed from ClickHouse.

@@ -228,6 +372,9 @@ export default function HomePage() { {lastSeen ? formatTime(lastSeen) : "Waiting for data"} +
@@ -243,13 +390,18 @@ export default function HomePage() { lastUpdate={options.lastUpdate} paused={options.paused} dropped={options.dropped} + mode={mode} onTogglePause={options.togglePause} />
{options.items.length === 0 ? ( -
No option prints yet. Start ingest-options.
+
+ {mode === "live" + ? "No option prints yet. Start ingest-options." + : "Replay queue empty. Ensure ClickHouse has data."} +
) : ( options.items.map((print) => (
@@ -282,13 +434,18 @@ export default function HomePage() { lastUpdate={equities.lastUpdate} paused={equities.paused} dropped={equities.dropped} + mode={mode} onTogglePause={equities.togglePause} />
{equities.items.length === 0 ? ( -
No equity prints yet. Start ingest-equities.
+
+ {mode === "live" + ? "No equity prints yet. Start ingest-equities." + : "Replay queue empty. Ensure ClickHouse has data."} +
) : ( equities.items.map((print) => (
diff --git a/packages/storage/src/clickhouse.ts b/packages/storage/src/clickhouse.ts index 2298fbf..16d7cd8 100644 --- a/packages/storage/src/clickhouse.ts +++ b/packages/storage/src/clickhouse.ts @@ -76,6 +76,14 @@ const clampLimit = (limit: number): number => { return Math.max(1, Math.min(1000, Math.floor(limit))); }; +const clampCursor = (value: number): number => { + if (!Number.isFinite(value)) { + return 0; + } + + return Math.max(0, Math.floor(value)); +}; + const coerceNumber = (value: unknown): unknown => { if (typeof value === "string") { const parsed = Number(value); @@ -102,6 +110,45 @@ const normalizeNumericFields = ( return record; }; +const normalizeOptionRow = (row: unknown): unknown => { + if (row && typeof row === "object") { + return normalizeNumericFields(row as Record, [ + "source_ts", + "ingest_ts", + "seq", + "ts", + "price", + "size" + ]); + } + + return row; +}; + +const normalizeEquityRow = (row: unknown): unknown => { + if (row && typeof row === "object") { + const record = normalizeNumericFields(row as Record, [ + "source_ts", + "ingest_ts", + "seq", + "ts", + "price", + "size" + ]); + + if ("offExchangeFlag" in record) { + return { + ...record, + offExchangeFlag: Boolean(record.offExchangeFlag) + }; + } + + return record; + } + + return row; +}; + export const fetchRecentOptionPrints = async ( client: ClickHouseClient, limit: number @@ -113,22 +160,7 @@ export const fetchRecentOptionPrints = async ( }); const rows = await result.json(); - const normalized = rows.map((row) => { - if (row && typeof row === "object") { - return normalizeNumericFields(row as Record, [ - "source_ts", - "ingest_ts", - "seq", - "ts", - "price", - "size" - ]); - } - - return row; - }); - - return OptionPrintSchema.array().parse(normalized); + return OptionPrintSchema.array().parse(rows.map(normalizeOptionRow)); }; export const fetchRecentEquityPrints = async ( @@ -142,29 +174,43 @@ export const fetchRecentEquityPrints = async ( }); const rows = await result.json(); - const normalized = rows.map((row) => { - if (row && typeof row === "object") { - const record = normalizeNumericFields(row as Record, [ - "source_ts", - "ingest_ts", - "seq", - "ts", - "price", - "size" - ]); + return EquityPrintSchema.array().parse(rows.map(normalizeEquityRow)); +}; - if ("offExchangeFlag" in record) { - return { - ...record, - offExchangeFlag: Boolean(record.offExchangeFlag) - }; - } +export const fetchOptionPrintsAfter = async ( + client: ClickHouseClient, + afterTs: number, + afterSeq: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const safeAfterTs = clampCursor(afterTs); + const safeAfterSeq = clampCursor(afterSeq); - return record; - } - - return row; + const result = await client.query({ + query: `SELECT * FROM ${OPTION_PRINTS_TABLE} WHERE (ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`, + format: "JSONEachRow" }); - return EquityPrintSchema.array().parse(normalized); + const rows = await result.json(); + return OptionPrintSchema.array().parse(rows.map(normalizeOptionRow)); +}; + +export const fetchEquityPrintsAfter = async ( + client: ClickHouseClient, + afterTs: number, + afterSeq: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const safeAfterTs = clampCursor(afterTs); + const safeAfterSeq = clampCursor(afterSeq); + + const result = await client.query({ + query: `SELECT * FROM ${EQUITY_PRINTS_TABLE} WHERE (ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY ts ASC, seq ASC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + return EquityPrintSchema.array().parse(rows.map(normalizeEquityRow)); }; diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 0ff3ed1..d8b0477 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -14,7 +14,9 @@ import { createClickHouseClient, ensureEquityPrintsTable, ensureOptionPrintsTable, + fetchEquityPrintsAfter, fetchRecentEquityPrints, + fetchOptionPrintsAfter, fetchRecentOptionPrints } from "@islandflow/storage"; import { EquityPrintSchema, OptionPrintSchema } from "@islandflow/types"; @@ -34,6 +36,11 @@ const envSchema = z.object({ const env = readEnv(envSchema); const limitSchema = z.coerce.number().int().positive().max(1000); +const replayParamsSchema = z.object({ + after_ts: z.coerce.number().int().nonnegative().default(0), + after_seq: z.coerce.number().int().nonnegative().default(0), + limit: z.coerce.number().int().positive().max(1000).default(200) +}); type Channel = "options" | "equities"; @@ -61,6 +68,20 @@ const parseLimit = (value: string | null): number => { return limitSchema.parse(value); }; +const parseReplayParams = (url: URL): { afterTs: number; afterSeq: number; limit: number } => { + const params = replayParamsSchema.parse({ + after_ts: url.searchParams.get("after_ts") ?? undefined, + after_seq: url.searchParams.get("after_seq") ?? undefined, + limit: url.searchParams.get("limit") ?? undefined + }); + + return { + afterTs: params.after_ts, + afterSeq: params.after_seq, + limit: params.limit + }; +}; + const broadcast = (sockets: Set>, payload: unknown): void => { const message = JSON.stringify(payload); @@ -187,6 +208,22 @@ const run = async () => { return jsonResponse({ data }); } + if (req.method === "GET" && url.pathname === "/replay/options") { + const { afterTs, afterSeq, limit } = parseReplayParams(url); + const data = await fetchOptionPrintsAfter(clickhouse, afterTs, afterSeq, limit); + const last = data.at(-1); + const next = last ? { ts: last.ts, seq: last.seq } : null; + return jsonResponse({ data, next }); + } + + if (req.method === "GET" && url.pathname === "/replay/equities") { + const { afterTs, afterSeq, limit } = parseReplayParams(url); + const data = await fetchEquityPrintsAfter(clickhouse, afterTs, afterSeq, limit); + const last = data.at(-1); + const next = last ? { ts: last.ts, seq: last.seq } : null; + return jsonResponse({ data, next }); + } + if (req.method === "GET" && url.pathname === "/ws/options") { if (serverRef.upgrade(req, { data: { channel: "options" } })) { return new Response(null, { status: 101 });