Improve local defaults and replay candle fetch

This commit is contained in:
dirtydishes 2026-01-09 15:29:41 -05:00
parent b9ad182473
commit 1583a50412
11 changed files with 193 additions and 42 deletions

View file

@ -1,7 +1,7 @@
NATS_URL=nats://localhost:4222 NATS_URL=nats://127.0.0.1:4222
CLICKHOUSE_URL=http://localhost:8123 CLICKHOUSE_URL=http://127.0.0.1:8123
CLICKHOUSE_DATABASE=default CLICKHOUSE_DATABASE=default
REDIS_URL=redis://localhost:6379 REDIS_URL=redis://127.0.0.1:6379
# Options ingest # Options ingest
OPTIONS_INGEST_ADAPTER=synthetic OPTIONS_INGEST_ADAPTER=synthetic

View file

@ -20,9 +20,8 @@ const NBBO_MAX_AGE_MS_SAFE =
Number.isFinite(NBBO_MAX_AGE_MS) && NBBO_MAX_AGE_MS > 0 ? NBBO_MAX_AGE_MS : 1000; Number.isFinite(NBBO_MAX_AGE_MS) && NBBO_MAX_AGE_MS > 0 ? NBBO_MAX_AGE_MS : 1000;
const LOCAL_HOSTS = new Set(["localhost", "127.0.0.1"]); const LOCAL_HOSTS = new Set(["localhost", "127.0.0.1"]);
const CANDLE_INTERVALS = [ const CANDLE_INTERVALS = [
{ label: "1s", ms: 1000 }, { label: "1m", ms: 60000 },
{ label: "5s", ms: 5000 }, { label: "5m", ms: 300000 }
{ label: "1m", ms: 60000 }
]; ];
type CandlestickSeries = ReturnType<IChartApi["addCandlestickSeries"]>; type CandlestickSeries = ReturnType<IChartApi["addCandlestickSeries"]>;
@ -63,6 +62,23 @@ const toChartCandle = (candle: EquityCandle): ChartCandle => {
}; };
}; };
const readErrorDetail = async (response: Response): Promise<string> => {
const text = await response.text();
if (!text) {
return "";
}
try {
const payload = JSON.parse(text) as {
detail?: string;
error?: string;
message?: string;
};
return payload.detail ?? payload.error ?? payload.message ?? text;
} catch {
return text;
}
};
type WsStatus = "connecting" | "connected" | "disconnected"; type WsStatus = "connecting" | "connected" | "disconnected";
type TapeMode = "live" | "replay"; type TapeMode = "live" | "replay";
@ -1218,15 +1234,28 @@ type CandleChartProps = {
ticker: string; ticker: string;
intervalMs: number; intervalMs: number;
mode: TapeMode; mode: TapeMode;
replayTime?: number | null;
}; };
const CandleChart = ({ ticker, intervalMs, mode }: CandleChartProps) => { const CandleChart = ({ ticker, intervalMs, mode, replayTime = null }: CandleChartProps) => {
const containerRef = useRef<HTMLDivElement | null>(null); const containerRef = useRef<HTMLDivElement | null>(null);
const chartRef = useRef<IChartApi | null>(null); const chartRef = useRef<IChartApi | null>(null);
const seriesRef = useRef<CandlestickSeries | null>(null); const seriesRef = useRef<CandlestickSeries | null>(null);
const socketRef = useRef<WebSocket | null>(null); const socketRef = useRef<WebSocket | null>(null);
const reconnectRef = useRef<number | null>(null); const reconnectRef = useRef<number | null>(null);
const lastCandleRef = useRef<{ time: UTCTimestamp; seq: number } | null>(null); const lastCandleRef = useRef<{ time: UTCTimestamp; seq: number } | null>(null);
const replayBucket = useMemo(() => {
if (mode !== "replay" || replayTime === null) {
return null;
}
return Math.floor(replayTime / intervalMs);
}, [mode, replayTime, intervalMs]);
const replayEndTs = useMemo(() => {
if (replayBucket === null) {
return null;
}
return (replayBucket + 1) * intervalMs - 1;
}, [replayBucket, intervalMs]);
const [ready, setReady] = useState(false); const [ready, setReady] = useState(false);
const [status, setStatus] = useState<WsStatus>(mode === "live" ? "connecting" : "connected"); const [status, setStatus] = useState<WsStatus>(mode === "live" ? "connecting" : "connected");
const [lastUpdate, setLastUpdate] = useState<number | null>(null); const [lastUpdate, setLastUpdate] = useState<number | null>(null);
@ -1307,6 +1336,16 @@ const CandleChart = ({ ticker, intervalMs, mode }: CandleChartProps) => {
return; return;
} }
if (mode === "replay" && replayBucket === null) {
setError(null);
setHasData(false);
setLastUpdate(null);
lastCandleRef.current = null;
seriesRef.current.setData([]);
setStatus("connected");
return;
}
let active = true; let active = true;
setError(null); setError(null);
setHasData(false); setHasData(false);
@ -1322,9 +1361,15 @@ const CandleChart = ({ ticker, intervalMs, mode }: CandleChartProps) => {
url.searchParams.set("interval_ms", intervalMs.toString()); url.searchParams.set("interval_ms", intervalMs.toString());
url.searchParams.set("limit", "300"); url.searchParams.set("limit", "300");
url.searchParams.set("cache", "1"); url.searchParams.set("cache", "1");
if (mode === "replay" && replayEndTs !== null) {
url.searchParams.set("end_ts", replayEndTs.toString());
}
const response = await fetch(url.toString()); const response = await fetch(url.toString());
if (!response.ok) { if (!response.ok) {
throw new Error(`Candle fetch failed (${response.status})`); const detail = await readErrorDetail(response);
throw new Error(
`Candle fetch failed (${response.status})${detail ? `: ${detail}` : ""}`
);
} }
const payload = (await response.json()) as { data?: EquityCandle[] }; const payload = (await response.json()) as { data?: EquityCandle[] };
if (!active || !seriesRef.current) { if (!active || !seriesRef.current) {
@ -1361,7 +1406,7 @@ const CandleChart = ({ ticker, intervalMs, mode }: CandleChartProps) => {
return () => { return () => {
active = false; active = false;
}; };
}, [ready, ticker, intervalMs, mode]); }, [ready, ticker, intervalMs, mode, replayBucket, replayEndTs]);
useEffect(() => { useEffect(() => {
if (!ready || mode !== "live" || !seriesRef.current) { if (!ready || mode !== "live" || !seriesRef.current) {
@ -1471,6 +1516,13 @@ const CandleChart = ({ ticker, intervalMs, mode }: CandleChartProps) => {
}, [intervalMs]); }, [intervalMs]);
const statusText = statusLabel(status, false, mode); const statusText = statusLabel(status, false, mode);
const intervalLabel = formatIntervalLabel(intervalMs);
const emptyLabel =
mode === "live"
? status === "connected"
? `No candles yet. First ${intervalLabel} candle appears after the window closes.`
: "Chart offline. Start candles service."
: "No candles for this replay window.";
return ( return (
<div className="chart-panel"> <div className="chart-panel">
@ -1487,11 +1539,7 @@ const CandleChart = ({ ticker, intervalMs, mode }: CandleChartProps) => {
{error ? ( {error ? (
<div className="empty chart-empty">Chart error: {error}</div> <div className="empty chart-empty">Chart error: {error}</div>
) : !hasData ? ( ) : !hasData ? (
<div className="empty chart-empty"> <div className="empty chart-empty">{emptyLabel}</div>
{mode === "live"
? "No candles yet. Start candles service."
: "No candles for this replay window."}
</div>
) : null} ) : null}
</div> </div>
); );
@ -2280,7 +2328,12 @@ export default function HomePage() {
<span className="chart-hint">Charting {chartTicker}</span> <span className="chart-hint">Charting {chartTicker}</span>
)} )}
</div> </div>
<CandleChart ticker={chartTicker} intervalMs={chartIntervalMs} mode={mode} /> <CandleChart
ticker={chartTicker}
intervalMs={chartIntervalMs}
mode={mode}
replayTime={equities.replayTime}
/>
</section> </section>
<section className="card card-options"> <section className="card card-options">

View file

@ -327,7 +327,8 @@ const coerceNumber = (value: unknown): unknown => {
}; };
const quoteString = (value: string): string => { const quoteString = (value: string): string => {
return JSON.stringify(value); const escaped = value.replace(/'/g, "''");
return `'${escaped}'`;
}; };
const normalizeNumericFields = ( const normalizeNumericFields = (

View file

@ -1,3 +1,5 @@
import net from "node:net";
type ChildSpec = { type ChildSpec = {
name: string; name: string;
cmd: string[]; cmd: string[];
@ -12,6 +14,54 @@ type Child = {
const children: Child[] = []; const children: Child[] = [];
let shuttingDown = false; let shuttingDown = false;
const sleep = (delayMs: number): Promise<void> => {
return new Promise((resolve) => setTimeout(resolve, delayMs));
};
const parseUrlHostPort = (
value: string,
fallbackHost: string,
fallbackPort: number
): { host: string; port: number } => {
const candidate = value.split(",")[0]?.trim() ?? "";
if (!candidate) {
return { host: fallbackHost, port: fallbackPort };
}
try {
const url = new URL(candidate.includes("://") ? candidate : `tcp://${candidate}`);
const port = url.port ? Number(url.port) : fallbackPort;
return { host: url.hostname || fallbackHost, port };
} catch {
return { host: fallbackHost, port: fallbackPort };
}
};
const checkTcp = (host: string, port: number, timeoutMs = 1000): Promise<boolean> => {
return new Promise((resolve) => {
const socket = net.connect({ host, port });
const finalize = (ok: boolean) => {
socket.removeAllListeners();
socket.destroy();
resolve(ok);
};
socket.setTimeout(timeoutMs);
socket.once("connect", () => finalize(true));
socket.once("error", () => finalize(false));
socket.once("timeout", () => finalize(false));
});
};
const checkHttp = async (url: string): Promise<boolean> => {
try {
const response = await fetch(url);
return response.ok;
} catch {
return false;
}
};
const spawnChild = ({ name, cmd, cwd }: ChildSpec): void => { const spawnChild = ({ name, cmd, cwd }: ChildSpec): void => {
const proc = Bun.spawn(cmd, { const proc = Bun.spawn(cmd, {
cwd, cwd,
@ -56,8 +106,44 @@ const shutdown = (code: number): void => {
process.on("SIGINT", () => shutdown(0)); process.on("SIGINT", () => shutdown(0));
process.on("SIGTERM", () => shutdown(0)); process.on("SIGTERM", () => shutdown(0));
const tasks: ChildSpec[] = [ const waitForInfra = async (): Promise<void> => {
{ name: "infra", cmd: ["docker", "compose", "up"] }, const natsTarget = parseUrlHostPort(process.env.NATS_URL ?? "", "127.0.0.1", 4222);
const redisTarget = parseUrlHostPort(process.env.REDIS_URL ?? "", "127.0.0.1", 6379);
const clickhouseUrl = process.env.CLICKHOUSE_URL ?? "http://127.0.0.1:8123";
const deadline = Date.now() + 90_000;
let lastLog = 0;
while (Date.now() < deadline) {
const [natsOk, redisOk, clickhouseOk] = await Promise.all([
checkTcp(natsTarget.host, natsTarget.port),
checkTcp(redisTarget.host, redisTarget.port),
checkHttp(`${clickhouseUrl.replace(/\/$/, "")}/ping`)
]);
if (natsOk && redisOk && clickhouseOk) {
console.log("[dev] Infra ready");
return;
}
const now = Date.now();
if (now - lastLog > 5000) {
console.log(
`[dev] Waiting for infra... nats=${natsOk ? "up" : "down"} redis=${
redisOk ? "up" : "down"
} clickhouse=${clickhouseOk ? "up" : "down"}`
);
lastLog = now;
}
await sleep(1000);
}
console.error("[dev] Infra not ready after 90s. Check Docker/ports and retry.");
shutdown(1);
};
const infraTask: ChildSpec = { name: "infra", cmd: ["docker", "compose", "up"] };
const serviceTasks: ChildSpec[] = [
{ name: "web", cmd: ["bun", "run", "dev"], cwd: "apps/web" }, { name: "web", cmd: ["bun", "run", "dev"], cwd: "apps/web" },
{ name: "ingest-options", cmd: ["bun", "run", "dev"], cwd: "services/ingest-options" }, { name: "ingest-options", cmd: ["bun", "run", "dev"], cwd: "services/ingest-options" },
{ name: "ingest-equities", cmd: ["bun", "run", "dev"], cwd: "services/ingest-equities" }, { name: "ingest-equities", cmd: ["bun", "run", "dev"], cwd: "services/ingest-equities" },
@ -68,7 +154,10 @@ const tasks: ChildSpec[] = [
{ name: "api", cmd: ["bun", "run", "dev"], cwd: "services/api" } { name: "api", cmd: ["bun", "run", "dev"], cwd: "services/api" }
]; ];
for (const task of tasks) { spawnChild(infraTask);
await waitForInfra();
for (const task of serviceTasks) {
spawnChild(task); spawnChild(task);
} }

View file

@ -76,10 +76,10 @@ const logger = createLogger({ service });
const envSchema = z.object({ const envSchema = z.object({
API_PORT: z.coerce.number().int().positive().default(4000), API_PORT: z.coerce.number().int().positive().default(4000),
NATS_URL: z.string().default("nats://localhost:4222"), NATS_URL: z.string().default("nats://127.0.0.1:4222"),
CLICKHOUSE_URL: z.string().default("http://localhost:8123"), CLICKHOUSE_URL: z.string().default("http://127.0.0.1:8123"),
CLICKHOUSE_DATABASE: z.string().default("default"), CLICKHOUSE_DATABASE: z.string().default("default"),
REDIS_URL: z.string().default("redis://localhost:6379"), REDIS_URL: z.string().default("redis://127.0.0.1:6379"),
REST_DEFAULT_LIMIT: z.coerce.number().int().positive().default(200) REST_DEFAULT_LIMIT: z.coerce.number().int().positive().default(200)
}); });
@ -311,7 +311,7 @@ const run = async () => {
servers: env.NATS_URL, servers: env.NATS_URL,
name: service name: service
}, },
{ attempts: 20, delayMs: 500 } { attempts: 120, delayMs: 500 }
); );
await ensureStream(jsm, { await ensureStream(jsm, {

View file

@ -26,11 +26,11 @@ const logger = createLogger({ service });
const metrics = createMetrics({ service }); const metrics = createMetrics({ service });
const envSchema = z.object({ const envSchema = z.object({
NATS_URL: z.string().default("nats://localhost:4222"), NATS_URL: z.string().default("nats://127.0.0.1:4222"),
CLICKHOUSE_URL: z.string().default("http://localhost:8123"), CLICKHOUSE_URL: z.string().default("http://127.0.0.1:8123"),
CLICKHOUSE_DATABASE: z.string().default("default"), CLICKHOUSE_DATABASE: z.string().default("default"),
REDIS_URL: z.string().default("redis://localhost:6379"), REDIS_URL: z.string().default("redis://127.0.0.1:6379"),
CANDLE_INTERVALS_MS: z.string().default("1000,5000,60000"), CANDLE_INTERVALS_MS: z.string().default("60000,300000"),
CANDLE_MAX_LATE_MS: z.coerce.number().int().nonnegative().default(0), CANDLE_MAX_LATE_MS: z.coerce.number().int().nonnegative().default(0),
CANDLE_CACHE_LIMIT: z.coerce.number().int().nonnegative().default(2000), CANDLE_CACHE_LIMIT: z.coerce.number().int().nonnegative().default(2000),
CANDLE_DELIVER_POLICY: z CANDLE_DELIVER_POLICY: z
@ -185,7 +185,7 @@ const emitCandle = async (
const run = async () => { const run = async () => {
logger.info("service starting"); logger.info("service starting");
const intervalsMs = parseIntervals(env.CANDLE_INTERVALS_MS, [1000, 5000, 60000]); const intervalsMs = parseIntervals(env.CANDLE_INTERVALS_MS, [60000, 300000]);
if (intervalsMs.length === 0) { if (intervalsMs.length === 0) {
throw new Error("CANDLE_INTERVALS_MS produced no valid intervals"); throw new Error("CANDLE_INTERVALS_MS produced no valid intervals");
} }
@ -200,7 +200,7 @@ const run = async () => {
servers: env.NATS_URL, servers: env.NATS_URL,
name: service name: service
}, },
{ attempts: 20, delayMs: 500 } { attempts: 120, delayMs: 500 }
); );
await ensureStream(jsm, { await ensureStream(jsm, {

View file

@ -74,10 +74,10 @@ const service = "compute";
const logger = createLogger({ service }); const logger = createLogger({ service });
const envSchema = z.object({ const envSchema = z.object({
NATS_URL: z.string().default("nats://localhost:4222"), NATS_URL: z.string().default("nats://127.0.0.1:4222"),
CLICKHOUSE_URL: z.string().default("http://localhost:8123"), CLICKHOUSE_URL: z.string().default("http://127.0.0.1:8123"),
CLICKHOUSE_DATABASE: z.string().default("default"), CLICKHOUSE_DATABASE: z.string().default("default"),
REDIS_URL: z.string().default("redis://localhost:6379"), REDIS_URL: z.string().default("redis://127.0.0.1:6379"),
CLUSTER_WINDOW_MS: z.coerce.number().int().positive().default(500), CLUSTER_WINDOW_MS: z.coerce.number().int().positive().default(500),
ROLLING_WINDOW_SIZE: z.coerce.number().int().positive().default(50), ROLLING_WINDOW_SIZE: z.coerce.number().int().positive().default(50),
ROLLING_TTL_SEC: z.coerce.number().int().nonnegative().default(86400), ROLLING_TTL_SEC: z.coerce.number().int().nonnegative().default(86400),
@ -758,7 +758,7 @@ const run = async () => {
servers: env.NATS_URL, servers: env.NATS_URL,
name: service name: service
}, },
{ attempts: 20, delayMs: 500 } { attempts: 120, delayMs: 500 }
); );
await ensureStream(jsm, { await ensureStream(jsm, {

View file

@ -22,6 +22,10 @@ const DARK_SEQUENCE: DarkScenario[] = [
"sell", "sell",
"sell" "sell"
]; ];
const SYNTHETIC_SYMBOLS = [
"SPY",
...SP500_SYMBOLS.filter((symbol) => symbol !== "SPY")
];
const hashSymbol = (value: string): number => { const hashSymbol = (value: string): number => {
let hash = 0; let hash = 0;
@ -138,7 +142,7 @@ export const createSyntheticEquitiesAdapter = (
const now = Date.now(); const now = Date.now();
const batchSize = 3; const batchSize = 3;
const darkSymbol = SP500_SYMBOLS[darkSymbolIndex % SP500_SYMBOLS.length]; const darkSymbol = SYNTHETIC_SYMBOLS[darkSymbolIndex % SYNTHETIC_SYMBOLS.length];
const darkHash = hashSymbol(darkSymbol); const darkHash = hashSymbol(darkSymbol);
const darkBase = 25 + (darkHash % 475); const darkBase = 25 + (darkHash % 475);
const darkDrift = ((darkStep % 24) - 12) * 0.08; const darkDrift = ((darkStep % 24) - 12) * 0.08;
@ -189,7 +193,7 @@ export const createSyntheticEquitiesAdapter = (
for (let i = 0; i < batchSize; i += 1) { for (let i = 0; i < batchSize; i += 1) {
seq += 1; seq += 1;
const symbol = SP500_SYMBOLS[(seq + i) % SP500_SYMBOLS.length]; const symbol = SYNTHETIC_SYMBOLS[(seq + i) % SYNTHETIC_SYMBOLS.length];
const symbolHash = hashSymbol(symbol); const symbolHash = hashSymbol(symbol);
const basePrice = 25 + (symbolHash % 475); const basePrice = 25 + (symbolHash % 475);
const mid = formatPrice(basePrice + ((seq % 40) - 20) * 0.05); const mid = formatPrice(basePrice + ((seq % 40) - 20) * 0.05);

View file

@ -30,8 +30,8 @@ const service = "ingest-equities";
const logger = createLogger({ service }); const logger = createLogger({ service });
const envSchema = z.object({ const envSchema = z.object({
NATS_URL: z.string().default("nats://localhost:4222"), NATS_URL: z.string().default("nats://127.0.0.1:4222"),
CLICKHOUSE_URL: z.string().default("http://localhost:8123"), CLICKHOUSE_URL: z.string().default("http://127.0.0.1:8123"),
CLICKHOUSE_DATABASE: z.string().default("default"), CLICKHOUSE_DATABASE: z.string().default("default"),
EQUITIES_INGEST_ADAPTER: z.string().min(1).default("synthetic"), EQUITIES_INGEST_ADAPTER: z.string().min(1).default("synthetic"),
EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000), EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000),
@ -129,7 +129,7 @@ const run = async () => {
servers: env.NATS_URL, servers: env.NATS_URL,
name: service name: service
}, },
{ attempts: 20, delayMs: 500 } { attempts: 120, delayMs: 500 }
); );
await ensureStream(jsm, { await ensureStream(jsm, {

View file

@ -17,6 +17,10 @@ type Burst = {
seed: number; seed: number;
}; };
const SYNTHETIC_SYMBOLS = [
"SPY",
...SP500_SYMBOLS.filter((symbol) => symbol !== "SPY")
];
const MS_PER_DAY = 24 * 60 * 60 * 1000; const MS_PER_DAY = 24 * 60 * 60 * 1000;
const EXPIRY_OFFSETS = [0, 1, 7, 14, 28, 45, 60, 90]; const EXPIRY_OFFSETS = [0, 1, 7, 14, 28, 45, 60, 90];
const EXCHANGES = ["CBOE", "PHLX", "ISE", "ARCA", "BOX", "MIAX"]; const EXCHANGES = ["CBOE", "PHLX", "ISE", "ARCA", "BOX", "MIAX"];
@ -177,7 +181,7 @@ const formatExpiry = (now: number, offsetDays: number): string => {
}; };
const buildBurst = (burstIndex: number, now: number): Burst => { const buildBurst = (burstIndex: number, now: number): Burst => {
const symbol = SP500_SYMBOLS[burstIndex % SP500_SYMBOLS.length]; const symbol = SYNTHETIC_SYMBOLS[burstIndex % SYNTHETIC_SYMBOLS.length];
const symbolHash = hashSymbol(symbol); const symbolHash = hashSymbol(symbol);
const seed = symbolHash + burstIndex * 7; const seed = symbolHash + burstIndex * 7;
const scenario = pickWeighted(SCENARIOS, seed); const scenario = pickWeighted(SCENARIOS, seed);

View file

@ -28,8 +28,8 @@ const service = "ingest-options";
const logger = createLogger({ service }); const logger = createLogger({ service });
const envSchema = z.object({ const envSchema = z.object({
NATS_URL: z.string().default("nats://localhost:4222"), NATS_URL: z.string().default("nats://127.0.0.1:4222"),
CLICKHOUSE_URL: z.string().default("http://localhost:8123"), CLICKHOUSE_URL: z.string().default("http://127.0.0.1:8123"),
CLICKHOUSE_DATABASE: z.string().default("default"), CLICKHOUSE_DATABASE: z.string().default("default"),
OPTIONS_INGEST_ADAPTER: z.string().min(1).default("synthetic"), OPTIONS_INGEST_ADAPTER: z.string().min(1).default("synthetic"),
ALPACA_KEY_ID: z.string().default(""), ALPACA_KEY_ID: z.string().default(""),
@ -225,7 +225,7 @@ const run = async () => {
servers: env.NATS_URL, servers: env.NATS_URL,
name: service name: service
}, },
{ attempts: 20, delayMs: 500 } { attempts: 120, delayMs: 500 }
); );
await ensureStream(jsm, { await ensureStream(jsm, {