From 1583a5041210970ac9c10ee9d269da1a4864fcc9 Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Fri, 9 Jan 2026 15:29:41 -0500 Subject: [PATCH] Improve local defaults and replay candle fetch --- .env.example | 6 +- apps/web/app/page.tsx | 77 ++++++++++++--- packages/storage/src/clickhouse.ts | 3 +- scripts/dev.ts | 95 ++++++++++++++++++- services/api/src/index.ts | 8 +- services/candles/src/index.ts | 12 +-- services/compute/src/index.ts | 8 +- .../ingest-equities/src/adapters/synthetic.ts | 8 +- services/ingest-equities/src/index.ts | 6 +- .../ingest-options/src/adapters/synthetic.ts | 6 +- services/ingest-options/src/index.ts | 6 +- 11 files changed, 193 insertions(+), 42 deletions(-) diff --git a/.env.example b/.env.example index 508bd81..36bc452 100644 --- a/.env.example +++ b/.env.example @@ -1,7 +1,7 @@ -NATS_URL=nats://localhost:4222 -CLICKHOUSE_URL=http://localhost:8123 +NATS_URL=nats://127.0.0.1:4222 +CLICKHOUSE_URL=http://127.0.0.1:8123 CLICKHOUSE_DATABASE=default -REDIS_URL=redis://localhost:6379 +REDIS_URL=redis://127.0.0.1:6379 # Options ingest OPTIONS_INGEST_ADAPTER=synthetic diff --git a/apps/web/app/page.tsx b/apps/web/app/page.tsx index 3828c78..01dd36c 100644 --- a/apps/web/app/page.tsx +++ b/apps/web/app/page.tsx @@ -20,9 +20,8 @@ const NBBO_MAX_AGE_MS_SAFE = Number.isFinite(NBBO_MAX_AGE_MS) && NBBO_MAX_AGE_MS > 0 ? NBBO_MAX_AGE_MS : 1000; const LOCAL_HOSTS = new Set(["localhost", "127.0.0.1"]); const CANDLE_INTERVALS = [ - { label: "1s", ms: 1000 }, - { label: "5s", ms: 5000 }, - { label: "1m", ms: 60000 } + { label: "1m", ms: 60000 }, + { label: "5m", ms: 300000 } ]; type CandlestickSeries = ReturnType; @@ -63,6 +62,23 @@ const toChartCandle = (candle: EquityCandle): ChartCandle => { }; }; +const readErrorDetail = async (response: Response): Promise => { + const text = await response.text(); + if (!text) { + return ""; + } + try { + const payload = JSON.parse(text) as { + detail?: string; + error?: string; + message?: string; + }; + return payload.detail ?? payload.error ?? payload.message ?? text; + } catch { + return text; + } +}; + type WsStatus = "connecting" | "connected" | "disconnected"; type TapeMode = "live" | "replay"; @@ -1218,15 +1234,28 @@ type CandleChartProps = { ticker: string; intervalMs: number; mode: TapeMode; + replayTime?: number | null; }; -const CandleChart = ({ ticker, intervalMs, mode }: CandleChartProps) => { +const CandleChart = ({ ticker, intervalMs, mode, replayTime = null }: CandleChartProps) => { const containerRef = useRef(null); const chartRef = useRef(null); const seriesRef = useRef(null); const socketRef = useRef(null); const reconnectRef = useRef(null); const lastCandleRef = useRef<{ time: UTCTimestamp; seq: number } | null>(null); + const replayBucket = useMemo(() => { + if (mode !== "replay" || replayTime === null) { + return null; + } + return Math.floor(replayTime / intervalMs); + }, [mode, replayTime, intervalMs]); + const replayEndTs = useMemo(() => { + if (replayBucket === null) { + return null; + } + return (replayBucket + 1) * intervalMs - 1; + }, [replayBucket, intervalMs]); const [ready, setReady] = useState(false); const [status, setStatus] = useState(mode === "live" ? "connecting" : "connected"); const [lastUpdate, setLastUpdate] = useState(null); @@ -1307,6 +1336,16 @@ const CandleChart = ({ ticker, intervalMs, mode }: CandleChartProps) => { return; } + if (mode === "replay" && replayBucket === null) { + setError(null); + setHasData(false); + setLastUpdate(null); + lastCandleRef.current = null; + seriesRef.current.setData([]); + setStatus("connected"); + return; + } + let active = true; setError(null); setHasData(false); @@ -1322,9 +1361,15 @@ const CandleChart = ({ ticker, intervalMs, mode }: CandleChartProps) => { url.searchParams.set("interval_ms", intervalMs.toString()); url.searchParams.set("limit", "300"); url.searchParams.set("cache", "1"); + if (mode === "replay" && replayEndTs !== null) { + url.searchParams.set("end_ts", replayEndTs.toString()); + } const response = await fetch(url.toString()); if (!response.ok) { - throw new Error(`Candle fetch failed (${response.status})`); + const detail = await readErrorDetail(response); + throw new Error( + `Candle fetch failed (${response.status})${detail ? `: ${detail}` : ""}` + ); } const payload = (await response.json()) as { data?: EquityCandle[] }; if (!active || !seriesRef.current) { @@ -1361,7 +1406,7 @@ const CandleChart = ({ ticker, intervalMs, mode }: CandleChartProps) => { return () => { active = false; }; - }, [ready, ticker, intervalMs, mode]); + }, [ready, ticker, intervalMs, mode, replayBucket, replayEndTs]); useEffect(() => { if (!ready || mode !== "live" || !seriesRef.current) { @@ -1471,6 +1516,13 @@ const CandleChart = ({ ticker, intervalMs, mode }: CandleChartProps) => { }, [intervalMs]); const statusText = statusLabel(status, false, mode); + const intervalLabel = formatIntervalLabel(intervalMs); + const emptyLabel = + mode === "live" + ? status === "connected" + ? `No candles yet. First ${intervalLabel} candle appears after the window closes.` + : "Chart offline. Start candles service." + : "No candles for this replay window."; return (
@@ -1487,11 +1539,7 @@ const CandleChart = ({ ticker, intervalMs, mode }: CandleChartProps) => { {error ? (
Chart error: {error}
) : !hasData ? ( -
- {mode === "live" - ? "No candles yet. Start candles service." - : "No candles for this replay window."} -
+
{emptyLabel}
) : null}
); @@ -2280,7 +2328,12 @@ export default function HomePage() { Charting {chartTicker} )} - +
diff --git a/packages/storage/src/clickhouse.ts b/packages/storage/src/clickhouse.ts index 0aea7c5..594dec9 100644 --- a/packages/storage/src/clickhouse.ts +++ b/packages/storage/src/clickhouse.ts @@ -327,7 +327,8 @@ const coerceNumber = (value: unknown): unknown => { }; const quoteString = (value: string): string => { - return JSON.stringify(value); + const escaped = value.replace(/'/g, "''"); + return `'${escaped}'`; }; const normalizeNumericFields = ( diff --git a/scripts/dev.ts b/scripts/dev.ts index 84fbc51..97651cf 100644 --- a/scripts/dev.ts +++ b/scripts/dev.ts @@ -1,3 +1,5 @@ +import net from "node:net"; + type ChildSpec = { name: string; cmd: string[]; @@ -12,6 +14,54 @@ type Child = { const children: Child[] = []; let shuttingDown = false; +const sleep = (delayMs: number): Promise => { + return new Promise((resolve) => setTimeout(resolve, delayMs)); +}; + +const parseUrlHostPort = ( + value: string, + fallbackHost: string, + fallbackPort: number +): { host: string; port: number } => { + const candidate = value.split(",")[0]?.trim() ?? ""; + if (!candidate) { + return { host: fallbackHost, port: fallbackPort }; + } + + try { + const url = new URL(candidate.includes("://") ? candidate : `tcp://${candidate}`); + const port = url.port ? Number(url.port) : fallbackPort; + return { host: url.hostname || fallbackHost, port }; + } catch { + return { host: fallbackHost, port: fallbackPort }; + } +}; + +const checkTcp = (host: string, port: number, timeoutMs = 1000): Promise => { + return new Promise((resolve) => { + const socket = net.connect({ host, port }); + const finalize = (ok: boolean) => { + socket.removeAllListeners(); + socket.destroy(); + resolve(ok); + }; + + socket.setTimeout(timeoutMs); + socket.once("connect", () => finalize(true)); + socket.once("error", () => finalize(false)); + socket.once("timeout", () => finalize(false)); + }); +}; + +const checkHttp = async (url: string): Promise => { + try { + const response = await fetch(url); + return response.ok; + } catch { + return false; + } +}; + const spawnChild = ({ name, cmd, cwd }: ChildSpec): void => { const proc = Bun.spawn(cmd, { cwd, @@ -56,8 +106,44 @@ const shutdown = (code: number): void => { process.on("SIGINT", () => shutdown(0)); process.on("SIGTERM", () => shutdown(0)); -const tasks: ChildSpec[] = [ - { name: "infra", cmd: ["docker", "compose", "up"] }, +const waitForInfra = async (): Promise => { + const natsTarget = parseUrlHostPort(process.env.NATS_URL ?? "", "127.0.0.1", 4222); + const redisTarget = parseUrlHostPort(process.env.REDIS_URL ?? "", "127.0.0.1", 6379); + const clickhouseUrl = process.env.CLICKHOUSE_URL ?? "http://127.0.0.1:8123"; + const deadline = Date.now() + 90_000; + let lastLog = 0; + + while (Date.now() < deadline) { + const [natsOk, redisOk, clickhouseOk] = await Promise.all([ + checkTcp(natsTarget.host, natsTarget.port), + checkTcp(redisTarget.host, redisTarget.port), + checkHttp(`${clickhouseUrl.replace(/\/$/, "")}/ping`) + ]); + + if (natsOk && redisOk && clickhouseOk) { + console.log("[dev] Infra ready"); + return; + } + + const now = Date.now(); + if (now - lastLog > 5000) { + console.log( + `[dev] Waiting for infra... nats=${natsOk ? "up" : "down"} redis=${ + redisOk ? "up" : "down" + } clickhouse=${clickhouseOk ? "up" : "down"}` + ); + lastLog = now; + } + + await sleep(1000); + } + + console.error("[dev] Infra not ready after 90s. Check Docker/ports and retry."); + shutdown(1); +}; + +const infraTask: ChildSpec = { name: "infra", cmd: ["docker", "compose", "up"] }; +const serviceTasks: ChildSpec[] = [ { name: "web", cmd: ["bun", "run", "dev"], cwd: "apps/web" }, { name: "ingest-options", cmd: ["bun", "run", "dev"], cwd: "services/ingest-options" }, { name: "ingest-equities", cmd: ["bun", "run", "dev"], cwd: "services/ingest-equities" }, @@ -68,7 +154,10 @@ const tasks: ChildSpec[] = [ { name: "api", cmd: ["bun", "run", "dev"], cwd: "services/api" } ]; -for (const task of tasks) { +spawnChild(infraTask); +await waitForInfra(); + +for (const task of serviceTasks) { spawnChild(task); } diff --git a/services/api/src/index.ts b/services/api/src/index.ts index d789a42..df57d59 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -76,10 +76,10 @@ const logger = createLogger({ service }); const envSchema = z.object({ API_PORT: z.coerce.number().int().positive().default(4000), - NATS_URL: z.string().default("nats://localhost:4222"), - CLICKHOUSE_URL: z.string().default("http://localhost:8123"), + NATS_URL: z.string().default("nats://127.0.0.1:4222"), + CLICKHOUSE_URL: z.string().default("http://127.0.0.1:8123"), CLICKHOUSE_DATABASE: z.string().default("default"), - REDIS_URL: z.string().default("redis://localhost:6379"), + REDIS_URL: z.string().default("redis://127.0.0.1:6379"), REST_DEFAULT_LIMIT: z.coerce.number().int().positive().default(200) }); @@ -311,7 +311,7 @@ const run = async () => { servers: env.NATS_URL, name: service }, - { attempts: 20, delayMs: 500 } + { attempts: 120, delayMs: 500 } ); await ensureStream(jsm, { diff --git a/services/candles/src/index.ts b/services/candles/src/index.ts index dfb773d..a625509 100644 --- a/services/candles/src/index.ts +++ b/services/candles/src/index.ts @@ -26,11 +26,11 @@ const logger = createLogger({ service }); const metrics = createMetrics({ service }); const envSchema = z.object({ - NATS_URL: z.string().default("nats://localhost:4222"), - CLICKHOUSE_URL: z.string().default("http://localhost:8123"), + NATS_URL: z.string().default("nats://127.0.0.1:4222"), + CLICKHOUSE_URL: z.string().default("http://127.0.0.1:8123"), CLICKHOUSE_DATABASE: z.string().default("default"), - REDIS_URL: z.string().default("redis://localhost:6379"), - CANDLE_INTERVALS_MS: z.string().default("1000,5000,60000"), + REDIS_URL: z.string().default("redis://127.0.0.1:6379"), + CANDLE_INTERVALS_MS: z.string().default("60000,300000"), CANDLE_MAX_LATE_MS: z.coerce.number().int().nonnegative().default(0), CANDLE_CACHE_LIMIT: z.coerce.number().int().nonnegative().default(2000), CANDLE_DELIVER_POLICY: z @@ -185,7 +185,7 @@ const emitCandle = async ( const run = async () => { logger.info("service starting"); - const intervalsMs = parseIntervals(env.CANDLE_INTERVALS_MS, [1000, 5000, 60000]); + const intervalsMs = parseIntervals(env.CANDLE_INTERVALS_MS, [60000, 300000]); if (intervalsMs.length === 0) { throw new Error("CANDLE_INTERVALS_MS produced no valid intervals"); } @@ -200,7 +200,7 @@ const run = async () => { servers: env.NATS_URL, name: service }, - { attempts: 20, delayMs: 500 } + { attempts: 120, delayMs: 500 } ); await ensureStream(jsm, { diff --git a/services/compute/src/index.ts b/services/compute/src/index.ts index 0aa2d67..042e6df 100644 --- a/services/compute/src/index.ts +++ b/services/compute/src/index.ts @@ -74,10 +74,10 @@ const service = "compute"; const logger = createLogger({ service }); const envSchema = z.object({ - NATS_URL: z.string().default("nats://localhost:4222"), - CLICKHOUSE_URL: z.string().default("http://localhost:8123"), + NATS_URL: z.string().default("nats://127.0.0.1:4222"), + CLICKHOUSE_URL: z.string().default("http://127.0.0.1:8123"), CLICKHOUSE_DATABASE: z.string().default("default"), - REDIS_URL: z.string().default("redis://localhost:6379"), + REDIS_URL: z.string().default("redis://127.0.0.1:6379"), CLUSTER_WINDOW_MS: z.coerce.number().int().positive().default(500), ROLLING_WINDOW_SIZE: z.coerce.number().int().positive().default(50), ROLLING_TTL_SEC: z.coerce.number().int().nonnegative().default(86400), @@ -758,7 +758,7 @@ const run = async () => { servers: env.NATS_URL, name: service }, - { attempts: 20, delayMs: 500 } + { attempts: 120, delayMs: 500 } ); await ensureStream(jsm, { diff --git a/services/ingest-equities/src/adapters/synthetic.ts b/services/ingest-equities/src/adapters/synthetic.ts index 861ffa7..6aa9f16 100644 --- a/services/ingest-equities/src/adapters/synthetic.ts +++ b/services/ingest-equities/src/adapters/synthetic.ts @@ -22,6 +22,10 @@ const DARK_SEQUENCE: DarkScenario[] = [ "sell", "sell" ]; +const SYNTHETIC_SYMBOLS = [ + "SPY", + ...SP500_SYMBOLS.filter((symbol) => symbol !== "SPY") +]; const hashSymbol = (value: string): number => { let hash = 0; @@ -138,7 +142,7 @@ export const createSyntheticEquitiesAdapter = ( const now = Date.now(); const batchSize = 3; - const darkSymbol = SP500_SYMBOLS[darkSymbolIndex % SP500_SYMBOLS.length]; + const darkSymbol = SYNTHETIC_SYMBOLS[darkSymbolIndex % SYNTHETIC_SYMBOLS.length]; const darkHash = hashSymbol(darkSymbol); const darkBase = 25 + (darkHash % 475); const darkDrift = ((darkStep % 24) - 12) * 0.08; @@ -189,7 +193,7 @@ export const createSyntheticEquitiesAdapter = ( for (let i = 0; i < batchSize; i += 1) { seq += 1; - const symbol = SP500_SYMBOLS[(seq + i) % SP500_SYMBOLS.length]; + const symbol = SYNTHETIC_SYMBOLS[(seq + i) % SYNTHETIC_SYMBOLS.length]; const symbolHash = hashSymbol(symbol); const basePrice = 25 + (symbolHash % 475); const mid = formatPrice(basePrice + ((seq % 40) - 20) * 0.05); diff --git a/services/ingest-equities/src/index.ts b/services/ingest-equities/src/index.ts index de0c324..1572aa2 100644 --- a/services/ingest-equities/src/index.ts +++ b/services/ingest-equities/src/index.ts @@ -30,8 +30,8 @@ const service = "ingest-equities"; const logger = createLogger({ service }); const envSchema = z.object({ - NATS_URL: z.string().default("nats://localhost:4222"), - CLICKHOUSE_URL: z.string().default("http://localhost:8123"), + NATS_URL: z.string().default("nats://127.0.0.1:4222"), + CLICKHOUSE_URL: z.string().default("http://127.0.0.1:8123"), CLICKHOUSE_DATABASE: z.string().default("default"), EQUITIES_INGEST_ADAPTER: z.string().min(1).default("synthetic"), EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000), @@ -129,7 +129,7 @@ const run = async () => { servers: env.NATS_URL, name: service }, - { attempts: 20, delayMs: 500 } + { attempts: 120, delayMs: 500 } ); await ensureStream(jsm, { diff --git a/services/ingest-options/src/adapters/synthetic.ts b/services/ingest-options/src/adapters/synthetic.ts index 23bf302..a5cdf41 100644 --- a/services/ingest-options/src/adapters/synthetic.ts +++ b/services/ingest-options/src/adapters/synthetic.ts @@ -17,6 +17,10 @@ type Burst = { seed: number; }; +const SYNTHETIC_SYMBOLS = [ + "SPY", + ...SP500_SYMBOLS.filter((symbol) => symbol !== "SPY") +]; const MS_PER_DAY = 24 * 60 * 60 * 1000; const EXPIRY_OFFSETS = [0, 1, 7, 14, 28, 45, 60, 90]; const EXCHANGES = ["CBOE", "PHLX", "ISE", "ARCA", "BOX", "MIAX"]; @@ -177,7 +181,7 @@ const formatExpiry = (now: number, offsetDays: number): string => { }; const buildBurst = (burstIndex: number, now: number): Burst => { - const symbol = SP500_SYMBOLS[burstIndex % SP500_SYMBOLS.length]; + const symbol = SYNTHETIC_SYMBOLS[burstIndex % SYNTHETIC_SYMBOLS.length]; const symbolHash = hashSymbol(symbol); const seed = symbolHash + burstIndex * 7; const scenario = pickWeighted(SCENARIOS, seed); diff --git a/services/ingest-options/src/index.ts b/services/ingest-options/src/index.ts index 01de7b9..3fc61ba 100644 --- a/services/ingest-options/src/index.ts +++ b/services/ingest-options/src/index.ts @@ -28,8 +28,8 @@ const service = "ingest-options"; const logger = createLogger({ service }); const envSchema = z.object({ - NATS_URL: z.string().default("nats://localhost:4222"), - CLICKHOUSE_URL: z.string().default("http://localhost:8123"), + NATS_URL: z.string().default("nats://127.0.0.1:4222"), + CLICKHOUSE_URL: z.string().default("http://127.0.0.1:8123"), CLICKHOUSE_DATABASE: z.string().default("default"), OPTIONS_INGEST_ADAPTER: z.string().min(1).default("synthetic"), ALPACA_KEY_ID: z.string().default(""), @@ -225,7 +225,7 @@ const run = async () => { servers: env.NATS_URL, name: service }, - { attempts: 20, delayMs: 500 } + { attempts: 120, delayMs: 500 } ); await ensureStream(jsm, {