diff --git a/.env.example b/.env.example index 0e77d57..02c301e 100644 --- a/.env.example +++ b/.env.example @@ -3,13 +3,13 @@ CLICKHOUSE_URL=http://localhost:8123 CLICKHOUSE_DATABASE=default # Options ingest -INGEST_ADAPTER=alpaca +OPTIONS_INGEST_ADAPTER=alpaca ALPACA_KEY_ID= ALPACA_SECRET_KEY= ALPACA_REST_URL=https://data.alpaca.markets ALPACA_WS_BASE_URL=wss://stream.data.alpaca.markets/v1beta1 ALPACA_FEED=indicative -ALPACA_UNDERLYINGS=SPY +ALPACA_UNDERLYINGS=SPY,NVDA,AAPL ALPACA_STRIKES_PER_SIDE=8 ALPACA_MAX_DTE_DAYS=30 ALPACA_MONEYNESS_PCT=0.06 @@ -42,4 +42,5 @@ IBKR_CURRENCY=USD IBKR_PYTHON_BIN=python3 # Equities ingest +EQUITIES_INGEST_ADAPTER=synthetic EMIT_INTERVAL_MS=1000 diff --git a/README.md b/README.md index a558da6..5bac783 100644 --- a/README.md +++ b/README.md @@ -98,17 +98,21 @@ Run just the web app (auto-picks a free port in 3001-3005): Run just the API: - `bun --cwd services/api run dev` +Adapter selection (env): +- Options: `OPTIONS_INGEST_ADAPTER` (defaults to `alpaca`) +- Equities: `EQUITIES_INGEST_ADAPTER` (defaults to `synthetic`) + IBKR adapter (options, via Python `ib_insync`): - Install Python deps: `python3 -m pip install -r services/ingest-options/py/requirements.txt` -- Set `INGEST_ADAPTER=ibkr` and configure: +- Set `OPTIONS_INGEST_ADAPTER=ibkr` and configure: - `IBKR_HOST`, `IBKR_PORT`, `IBKR_CLIENT_ID` - `IBKR_SYMBOL`, `IBKR_EXPIRY` (YYYYMMDD), `IBKR_STRIKE`, `IBKR_RIGHT` - Optional: `IBKR_EXCHANGE` (default `SMART`), `IBKR_CURRENCY` (default `USD`), `IBKR_PYTHON_BIN` Alpaca adapter (options, dev-only bridge): -- Set `INGEST_ADAPTER=alpaca` and configure: +- Set `OPTIONS_INGEST_ADAPTER=alpaca` and configure: - `ALPACA_KEY_ID`, `ALPACA_SECRET_KEY` - - `ALPACA_UNDERLYINGS` (comma-separated, default `SPY`) + - `ALPACA_UNDERLYINGS` (comma-separated, default `SPY,NVDA,AAPL`) - Optional: `ALPACA_FEED` (`indicative` default, `opra` with subscription) - Optional: `ALPACA_MAX_QUOTES` (default `200`), `ALPACA_REST_URL`, `ALPACA_WS_BASE_URL` - Optional selection tuning: `ALPACA_STRIKES_PER_SIDE` (default `8`), `ALPACA_MAX_DTE_DAYS` (default `30`), @@ -121,7 +125,7 @@ Alpaca selection policy (dev-only, deterministic): Databento historical replay adapter (options, via Python `databento`): - Install Python deps: `python3 -m pip install -r services/ingest-options/py/requirements.txt` -- Set `INGEST_ADAPTER=databento` and configure: +- Set `OPTIONS_INGEST_ADAPTER=databento` and configure: - `DATABENTO_API_KEY`, `DATABENTO_START` (ISO date/time) - Optional: `DATABENTO_END`, `DATABENTO_DATASET` (default `OPRA.PILLAR`), `DATABENTO_SCHEMA` (default `trades`) - Optional: `DATABENTO_SYMBOLS` (`ALL` or comma list), `DATABENTO_STYPE_IN`/`DATABENTO_STYPE_OUT` (default `raw_symbol`) diff --git a/services/compute/src/index.ts b/services/compute/src/index.ts index 31fe445..6717c5d 100644 --- a/services/compute/src/index.ts +++ b/services/compute/src/index.ts @@ -205,11 +205,38 @@ const run = async () => { await ensureFlowPacketsTable(clickhouse); }); - const subscription = await subscribeJson( - js, - SUBJECT_OPTION_PRINTS, - buildDurableConsumer("compute-option-prints") - ); + const durableName = "compute-option-prints"; + const subscription = await (async () => { + try { + return await subscribeJson(js, SUBJECT_OPTION_PRINTS, buildDurableConsumer(durableName)); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + const shouldReset = + message.includes("duplicate subscription") || + message.includes("durable requires") || + message.includes("subject does not match consumer"); + + if (!shouldReset) { + throw error; + } + + logger.warn("resetting jetstream consumer", { durable: durableName, error: message }); + + try { + await jsm.consumers.delete(STREAM_OPTION_PRINTS, durableName); + } catch (deleteError) { + const deleteMessage = deleteError instanceof Error ? deleteError.message : String(deleteError); + if (!deleteMessage.includes("not found")) { + logger.warn("failed to delete jetstream consumer", { + durable: durableName, + error: deleteMessage + }); + } + } + + return await subscribeJson(js, SUBJECT_OPTION_PRINTS, buildDurableConsumer(durableName)); + } + })(); const shutdown = async (signal: string) => { logger.info("service stopping", { signal }); diff --git a/services/ingest-equities/src/index.ts b/services/ingest-equities/src/index.ts index 0fa3218..c02bebc 100644 --- a/services/ingest-equities/src/index.ts +++ b/services/ingest-equities/src/index.ts @@ -24,7 +24,7 @@ const envSchema = z.object({ NATS_URL: z.string().default("nats://localhost:4222"), CLICKHOUSE_URL: z.string().default("http://localhost:8123"), CLICKHOUSE_DATABASE: z.string().default("default"), - INGEST_ADAPTER: z.string().min(1).default("synthetic"), + EQUITIES_INGEST_ADAPTER: z.string().min(1).default("synthetic"), EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000) }); @@ -102,7 +102,7 @@ const run = async () => { await ensureEquityPrintsTable(clickhouse); }); - const adapter = selectAdapter(env.INGEST_ADAPTER); + const adapter = selectAdapter(env.EQUITIES_INGEST_ADAPTER); logger.info("ingest adapter selected", { adapter: adapter.name }); const stopAdapter: StopHandler = await adapter.start({ diff --git a/services/ingest-options/src/index.ts b/services/ingest-options/src/index.ts index c4b4be5..483f4b3 100644 --- a/services/ingest-options/src/index.ts +++ b/services/ingest-options/src/index.ts @@ -27,13 +27,13 @@ const envSchema = z.object({ NATS_URL: z.string().default("nats://localhost:4222"), CLICKHOUSE_URL: z.string().default("http://localhost:8123"), CLICKHOUSE_DATABASE: z.string().default("default"), - INGEST_ADAPTER: z.string().min(1).default("alpaca"), + OPTIONS_INGEST_ADAPTER: z.string().min(1).default("alpaca"), ALPACA_KEY_ID: z.string().default(""), ALPACA_SECRET_KEY: z.string().default(""), ALPACA_REST_URL: z.string().default("https://data.alpaca.markets"), ALPACA_WS_BASE_URL: z.string().default("wss://stream.data.alpaca.markets/v1beta1"), ALPACA_FEED: z.enum(["indicative", "opra"]).default("indicative"), - ALPACA_UNDERLYINGS: z.string().default("SPY"), + ALPACA_UNDERLYINGS: z.string().default("SPY,NVDA,AAPL"), ALPACA_STRIKES_PER_SIDE: z.coerce.number().int().positive().default(8), ALPACA_MAX_DTE_DAYS: z.coerce.number().int().positive().default(30), ALPACA_MONEYNESS_PCT: z.coerce.number().positive().default(0.06), @@ -203,7 +203,7 @@ const run = async () => { await ensureOptionPrintsTable(clickhouse); }); - const adapter = selectAdapter(env.INGEST_ADAPTER); + const adapter = selectAdapter(env.OPTIONS_INGEST_ADAPTER); logger.info("ingest adapter selected", { adapter: adapter.name }); const stopAdapter: StopHandler = await adapter.start({