"use client"; import { useEffect, useMemo, useRef, useState } from "react"; import type { EquityPrint, OptionPrint } from "@islandflow/types"; const MAX_ITEMS = 60; const LOCAL_HOSTS = new Set(["localhost", "127.0.0.1"]); type WsStatus = "connecting" | "connected" | "disconnected"; type MessageType = "option-print" | "equity-print"; type StreamMessage = { type: MessageType; payload: T; }; type TapeState = { status: WsStatus; items: T[]; lastUpdate: number | null; }; const buildWsUrl = (path: string): string => { const envBase = process.env.NEXT_PUBLIC_API_URL; if (envBase) { const url = new URL(envBase); url.protocol = url.protocol === "https:" ? "wss:" : "ws:"; url.pathname = path; url.search = ""; url.hash = ""; return url.toString(); } const { protocol, hostname } = window.location; const wsProtocol = protocol === "https:" ? "wss" : "ws"; const isLocal = LOCAL_HOSTS.has(hostname); const host = isLocal ? `${hostname}:4000` : window.location.host; return `${wsProtocol}://${host}${path}`; }; const formatPrice = (price: number): string => { return price.toFixed(2); }; const formatSize = (size: number): string => { return size.toLocaleString(); }; const formatTime = (ts: number): string => { return new Date(ts).toLocaleTimeString(); }; const statusLabel = (status: WsStatus): string => { switch (status) { case "connected": return "Live"; case "connecting": return "Connecting"; case "disconnected": default: return "Disconnected"; } }; const useTape = (path: string, expectedType: MessageType): TapeState => { const [status, setStatus] = useState("connecting"); const [items, setItems] = useState([]); const [lastUpdate, setLastUpdate] = useState(null); const reconnectRef = useRef(null); const socketRef = useRef(null); useEffect(() => { let active = true; const connect = () => { if (!active) { return; } setStatus("connecting"); const socket = new WebSocket(buildWsUrl(path)); socketRef.current = socket; socket.onopen = () => { if (!active) { return; } setStatus("connected"); }; socket.onmessage = (event) => { if (!active) { return; } try { const message = JSON.parse(event.data) as StreamMessage; if (!message || message.type !== expectedType) { return; } setItems((prev) => { const next = [message.payload, ...prev]; return next.slice(0, MAX_ITEMS); }); setLastUpdate(Date.now()); } catch (error) { console.warn("Failed to parse websocket payload", error); } }; socket.onclose = () => { if (!active) { return; } setStatus("disconnected"); reconnectRef.current = window.setTimeout(() => { connect(); }, 1000); }; socket.onerror = () => { if (!active) { return; } setStatus("disconnected"); socket.close(); }; }; connect(); return () => { active = false; if (reconnectRef.current !== null) { window.clearTimeout(reconnectRef.current); } if (socketRef.current) { socketRef.current.close(); } }; }, [path, expectedType]); return { status, items, lastUpdate }; }; type TapeStatusProps = { status: WsStatus; lastUpdate: number | null; }; const TapeStatus = ({ status, lastUpdate }: TapeStatusProps) => { return (
{statusLabel(status)} {lastUpdate ? ( Updated {formatTime(lastUpdate)} ) : ( Waiting for data )}
); }; export default function HomePage() { const options = useTape("/ws/options", "option-print"); const equities = useTape("/ws/equities", "equity-print"); const lastSeen = useMemo(() => { return [options.lastUpdate, equities.lastUpdate] .filter((value): value is number => value !== null) .sort((a, b) => b - a)[0] ?? null; }, [options.lastUpdate, equities.lastUpdate]); return (

Realtime flow workspace

Islandflow

Options + equities streaming over WebSocket from the local API gateway.

Last update {lastSeen ? formatTime(lastSeen) : "Waiting for data"}

Options Tape

Newest prints first (max {MAX_ITEMS}).

{options.items.length === 0 ? (
No option prints yet. Start ingest-options.
) : ( options.items.map((print) => (
{print.option_contract_id}
${formatPrice(print.price)} {formatSize(print.size)}x {print.exchange} {print.conditions?.length ? ( {print.conditions.join(", ")} ) : null}
{formatTime(print.ts)}
)) )}

Equities Tape

Off-exchange flag highlighted.

{equities.items.length === 0 ? (
No equity prints yet. Start ingest-equities.
) : ( equities.items.map((print) => (
{print.underlying_id}
${formatPrice(print.price)} {formatSize(print.size)}x {print.exchange} {print.offExchangeFlag ? ( Off-Ex ) : ( Lit )}
{formatTime(print.ts)}
)) )}
); }