From 6dc279099f5ce3075d07d65950f027d1b1dd29f2 Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Sun, 28 Dec 2025 12:34:12 -0500 Subject: [PATCH] Add Alpaca dev adapter and option selection --- README.md | 22 + bun.lock | 6 + services/ingest-options/package.json | 2 + services/ingest-options/py/ibkr_stream.py | 102 +++ services/ingest-options/py/requirements.txt | 1 + .../ingest-options/src/adapters/alpaca.ts | 608 ++++++++++++++++++ services/ingest-options/src/adapters/ibkr.ts | 128 +++- services/ingest-options/src/index.ts | 54 +- 8 files changed, 917 insertions(+), 6 deletions(-) create mode 100644 services/ingest-options/py/ibkr_stream.py create mode 100644 services/ingest-options/py/requirements.txt create mode 100644 services/ingest-options/src/adapters/alpaca.ts diff --git a/README.md b/README.md index ef5507a..80080b2 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,7 @@ Not started: - Deterministic option FlowPacket clustering (time-window) - API gateway with REST, WS, and replay endpoints - UI tapes for options/equities/flow packets with live/replay toggle and pause controls +- Alpaca options adapter (dev-only) with bounded contract selection ## Planned Capabilities (from PLAN.md) @@ -91,6 +92,27 @@ Run just the web app (auto-picks a free port in 3001-3005): Run just the API: - `bun --cwd services/api run dev` +IBKR adapter (options, via Python `ib_insync`): +- Install Python deps: `python3 -m pip install -r services/ingest-options/py/requirements.txt` +- Set `INGEST_ADAPTER=ibkr` and configure: + - `IBKR_HOST`, `IBKR_PORT`, `IBKR_CLIENT_ID` + - `IBKR_SYMBOL`, `IBKR_EXPIRY` (YYYYMMDD), `IBKR_STRIKE`, `IBKR_RIGHT` + - Optional: `IBKR_EXCHANGE` (default `SMART`), `IBKR_CURRENCY` (default `USD`), `IBKR_PYTHON_BIN` + +Alpaca adapter (options, dev-only bridge): +- Set `INGEST_ADAPTER=alpaca` and configure: + - `ALPACA_KEY_ID`, `ALPACA_SECRET_KEY` + - `ALPACA_UNDERLYINGS` (comma-separated, default `SPY`) + - Optional: `ALPACA_FEED` (`indicative` default, `opra` with subscription) + - Optional: `ALPACA_MAX_QUOTES` (default `200`), `ALPACA_REST_URL`, `ALPACA_WS_BASE_URL` + - Optional selection tuning: `ALPACA_STRIKES_PER_SIDE` (default `8`), `ALPACA_MAX_DTE_DAYS` (default `30`), + `ALPACA_MONEYNESS_PCT` (default `0.06`), `ALPACA_MONEYNESS_FALLBACK_PCT` (default `0.10`) + +Alpaca selection policy (dev-only, deterministic): +- Pick nearest weekly and nearest monthly expiries within 30 DTE (fallback to earliest expiries if missing) +- For each expiry, select 8 strikes per side closest to ATM within ±6% (fallback to ±10% if needed) +- Subscriptions are built once at startup to keep the stream bounded and repeatable + Run tests: - `bun test` diff --git a/bun.lock b/bun.lock index 30a81e3..3123f18 100644 --- a/bun.lock +++ b/bun.lock @@ -102,6 +102,8 @@ "@islandflow/observability": "workspace:*", "@islandflow/storage": "workspace:*", "@islandflow/types": "workspace:*", + "@msgpack/msgpack": "^3.1.3", + "ws": "^8.18.3", "zod": "^3.23.8", }, }, @@ -144,6 +146,8 @@ "@islandflow/web": ["@islandflow/web@workspace:apps/web"], + "@msgpack/msgpack": ["@msgpack/msgpack@3.1.3", "", {}, "sha512-47XIizs9XZXvuJgoaJUIE2lFoID8ugvc0jzSHP+Ptfk8nTbnR8g788wv48N03Kx0UkAv559HWRQ3yzOgzlRNUA=="], + "@next/env": ["@next/env@14.2.35", "", {}, "sha512-DuhvCtj4t9Gwrx80dmz2F4t/zKQ4ktN8WrMwOuVzkJfBilwAwGr6v16M5eI8yCuZ63H9TTuEU09Iu2HqkzFPVQ=="], "@next/swc-darwin-arm64": ["@next/swc-darwin-arm64@14.2.33", "", { "os": "darwin", "cpu": "arm64" }, "sha512-HqYnb6pxlsshoSTubdXKu15g3iivcbsMXg4bYpjL2iS/V6aQot+iyF4BUc2qA/J/n55YtvE4PHMKWBKGCF/+wA=="], @@ -220,6 +224,8 @@ "undici-types": ["undici-types@6.21.0", "", {}, "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ=="], + "ws": ["ws@8.18.3", "", { "peerDependencies": { "bufferutil": "^4.0.1", "utf-8-validate": ">=5.0.2" }, "optionalPeers": ["bufferutil", "utf-8-validate"] }, "sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg=="], + "zod": ["zod@3.25.76", "", {}, "sha512-gzUt/qt81nXsFGKIFcC3YnfEAx5NkunCfnDlvuBSSFS02bcXu4Lmea0AFIUwbLWxWPx3d9p8S5QoaujKcNQxcQ=="], } } diff --git a/services/ingest-options/package.json b/services/ingest-options/package.json index 05fc47e..c923cf5 100644 --- a/services/ingest-options/package.json +++ b/services/ingest-options/package.json @@ -11,6 +11,8 @@ "@islandflow/observability": "workspace:*", "@islandflow/storage": "workspace:*", "@islandflow/types": "workspace:*", + "@msgpack/msgpack": "^3.1.3", + "ws": "^8.18.3", "zod": "^3.23.8" } } diff --git a/services/ingest-options/py/ibkr_stream.py b/services/ingest-options/py/ibkr_stream.py new file mode 100644 index 0000000..87979e2 --- /dev/null +++ b/services/ingest-options/py/ibkr_stream.py @@ -0,0 +1,102 @@ +import argparse +import json +import signal +import sys +import time +from datetime import timezone + +from ib_insync import IB, Option + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Stream IBKR option trades as JSON lines.") + parser.add_argument("--host", required=True) + parser.add_argument("--port", type=int, required=True) + parser.add_argument("--client-id", type=int, required=True) + parser.add_argument("--symbol", required=True) + parser.add_argument("--expiry", required=True) + parser.add_argument("--strike", type=float, required=True) + parser.add_argument("--right", required=True) + parser.add_argument("--exchange", required=True) + parser.add_argument("--currency", required=True) + return parser.parse_args() + + +def main() -> int: + args = parse_args() + + ib = IB() + + def shutdown(_signal: int, _frame) -> None: + if ib.isConnected(): + ib.disconnect() + sys.exit(0) + + signal.signal(signal.SIGINT, shutdown) + signal.signal(signal.SIGTERM, shutdown) + + try: + ib.connect(args.host, args.port, clientId=args.client_id) + except Exception as exc: + print(f"IBKR connection failed: {exc}", file=sys.stderr) + return 1 + + contract = Option( + args.symbol, + args.expiry, + args.strike, + args.right, + exchange=args.exchange, + currency=args.currency, + ) + + try: + ib.qualifyContracts(contract) + except Exception as exc: + print(f"IBKR contract qualification failed: {exc}", file=sys.stderr) + ib.disconnect() + return 1 + + ticker = ib.reqMktData(contract, "", False, False) + last_key = None + + def on_update(updated_ticker) -> None: + nonlocal last_key + if updated_ticker.last is None or updated_ticker.lastSize is None: + return + + ts = updated_ticker.time + if ts is None: + ts_ms = int(time.time() * 1000) + else: + if ts.tzinfo is None: + ts = ts.replace(tzinfo=timezone.utc) + ts_ms = int(ts.timestamp() * 1000) + + key = (ts_ms, updated_ticker.last, updated_ticker.lastSize) + if key == last_key: + return + last_key = key + + exchange = None + if hasattr(updated_ticker, "lastExchange"): + exchange = updated_ticker.lastExchange + if not exchange: + exchange = updated_ticker.exchange or "IBKR" + + payload = { + "ts": ts_ms, + "price": float(updated_ticker.last), + "size": int(updated_ticker.lastSize), + "exchange": exchange, + } + + print(json.dumps(payload), flush=True) + + ticker.updateEvent += on_update + ib.run() + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/services/ingest-options/py/requirements.txt b/services/ingest-options/py/requirements.txt new file mode 100644 index 0000000..f7ca71d --- /dev/null +++ b/services/ingest-options/py/requirements.txt @@ -0,0 +1 @@ +ib_insync>=0.9.83 diff --git a/services/ingest-options/src/adapters/alpaca.ts b/services/ingest-options/src/adapters/alpaca.ts new file mode 100644 index 0000000..756f3f3 --- /dev/null +++ b/services/ingest-options/src/adapters/alpaca.ts @@ -0,0 +1,608 @@ +import { decode, encode } from "@msgpack/msgpack"; +import { createLogger } from "@islandflow/observability"; +import type { OptionIngestAdapter, OptionIngestHandlers } from "./types"; +import WebSocket from "ws"; + +type AlpacaFeed = "indicative" | "opra"; + +type AlpacaOptionsAdapterConfig = { + keyId: string; + secretKey: string; + restUrl: string; + wsBaseUrl: string; + feed: AlpacaFeed; + underlyings: string[]; + strikesPerSide: number; + maxDteDays: number; + moneynessPct: number; + moneynessFallbackPct: number; + maxQuotes: number; +}; + +type OptionContract = { + symbol: string; + root: string; + expiry: string; + strike: number; + right: "C" | "P"; +}; + +type OptionSnapshotResponse = { + snapshots?: Record; + next_page_token?: string | null; +}; + +type StockSnapshotResponse = { + latestTrade?: { p?: number }; + latestQuote?: { bp?: number; ap?: number }; +}; + +type AlpacaTradeMessage = { + T: "t"; + S: string; + t: string; + p: number; + s: number; + x?: string; + c?: string; +}; + +type AlpacaQuoteMessage = { + T: "q"; + S: string; + t: string; + bp: number; + bs: number; + ap: number; + as: number; + bx?: string; + ax?: string; + c?: string; +}; + +type ExpiryInfo = { + iso: string; + date: Date; + dte: number; + isMonthly: boolean; +}; + +const logger = createLogger({ service: "ingest-options" }); + +const MS_PER_DAY = 24 * 60 * 60 * 1000; + +const formatDate = (date: Date): string => date.toISOString().slice(0, 10); + +const startOfUtcDay = (date: Date): Date => + new Date(Date.UTC(date.getUTCFullYear(), date.getUTCMonth(), date.getUTCDate())); + +const diffDays = (from: Date, to: Date): number => + Math.round((startOfUtcDay(to).getTime() - startOfUtcDay(from).getTime()) / MS_PER_DAY); + +const isThirdFriday = (date: Date): boolean => { + const day = date.getUTCDate(); + return date.getUTCDay() === 5 && day >= 15 && day <= 21; +}; + +const parseOccSymbol = (symbol: string): OptionContract | null => { + if (symbol.length < 15) { + return null; + } + + const tail = symbol.slice(-15); + const root = symbol.slice(0, -15); + const expiryRaw = tail.slice(0, 6); + const right = tail.slice(6, 7); + const strikeRaw = tail.slice(7); + + if (!/^\d{6}$/.test(expiryRaw) || !/^\d{8}$/.test(strikeRaw)) { + return null; + } + + if (right !== "C" && right !== "P") { + return null; + } + + const year = 2000 + Number(expiryRaw.slice(0, 2)); + const month = Number(expiryRaw.slice(2, 4)) - 1; + const day = Number(expiryRaw.slice(4, 6)); + const expiryDate = new Date(Date.UTC(year, month, day)); + const expiry = formatDate(expiryDate); + const strike = Number(strikeRaw) / 1000; + + if (!root || !Number.isFinite(strike)) { + return null; + } + + return { + symbol, + root, + expiry, + strike, + right + }; +}; + +const formatStrike = (strike: number): string => { + const fixed = strike.toFixed(3); + return fixed.replace(/\.?0+$/, ""); +}; + +const formatContractId = (contract: OptionContract): string => + `${contract.root}-${contract.expiry}-${formatStrike(contract.strike)}-${contract.right}`; + +const normalizeUnderlyings = (value: string[]): string[] => { + const seen = new Set(); + const result: string[] = []; + + for (const entry of value) { + const symbol = entry.trim().toUpperCase(); + if (!symbol || seen.has(symbol)) { + continue; + } + + seen.add(symbol); + result.push(symbol); + } + + return result; +}; + +const buildHeaders = (config: AlpacaOptionsAdapterConfig): Record => ({ + "APCA-API-KEY-ID": config.keyId, + "APCA-API-SECRET-KEY": config.secretKey +}); + +const fetchJson = async ( + url: URL, + config: AlpacaOptionsAdapterConfig +): Promise => { + const response = await fetch(url.toString(), { + headers: buildHeaders(config) + }); + + if (!response.ok) { + throw new Error(`Alpaca request failed (${response.status}) for ${url.toString()}`); + } + + return (await response.json()) as T; +}; + +const fetchUnderlyingPrice = async ( + symbol: string, + config: AlpacaOptionsAdapterConfig +): Promise => { + const url = new URL(`/v2/stocks/${symbol}/snapshot`, config.restUrl); + const payload = await fetchJson(url, config); + const tradePrice = payload.latestTrade?.p; + const bid = payload.latestQuote?.bp; + const ask = payload.latestQuote?.ap; + + if (typeof tradePrice === "number" && tradePrice > 0) { + return tradePrice; + } + + if (typeof bid === "number" && typeof ask === "number" && bid > 0 && ask > 0) { + return (bid + ask) / 2; + } + + throw new Error(`Unable to resolve underlying price for ${symbol}`); +}; + +const fetchOptionSnapshots = async ( + underlying: string, + config: AlpacaOptionsAdapterConfig, + startDate: string, + endDate: string, + strikeLower: number, + strikeUpper: number +): Promise => { + const contracts: OptionContract[] = []; + const seen = new Set(); + let pageToken: string | undefined; + + do { + const url = new URL(`/v1beta1/options/snapshots/${underlying}`, config.restUrl); + url.searchParams.set("feed", config.feed); + url.searchParams.set("limit", "1000"); + url.searchParams.set("expiration_date_gte", startDate); + url.searchParams.set("expiration_date_lte", endDate); + url.searchParams.set("strike_price_gte", strikeLower.toFixed(3)); + url.searchParams.set("strike_price_lte", strikeUpper.toFixed(3)); + if (pageToken) { + url.searchParams.set("page_token", pageToken); + } + + const payload = await fetchJson(url, config); + const snapshots = payload.snapshots ?? {}; + + for (const symbol of Object.keys(snapshots)) { + if (seen.has(symbol)) { + continue; + } + + const parsed = parseOccSymbol(symbol); + if (!parsed) { + continue; + } + + seen.add(symbol); + contracts.push(parsed); + } + + pageToken = payload.next_page_token ?? undefined; + } while (pageToken); + + return contracts; +}; + +const selectExpiries = ( + contracts: OptionContract[], + maxDteDays: number +): ExpiryInfo[] => { + const today = new Date(); + const expiryMap = new Map(); + + for (const contract of contracts) { + if (expiryMap.has(contract.expiry)) { + continue; + } + + const parts = contract.expiry.split("-").map((part) => Number(part)); + if (parts.length !== 3 || parts.some((part) => Number.isNaN(part))) { + continue; + } + + const expiryDate = new Date(Date.UTC(parts[0], parts[1] - 1, parts[2])); + const dte = diffDays(today, expiryDate); + + if (dte < 0 || dte > maxDteDays) { + continue; + } + + expiryMap.set(contract.expiry, { + iso: contract.expiry, + date: expiryDate, + dte, + isMonthly: isThirdFriday(expiryDate) + }); + } + + const expiries = Array.from(expiryMap.values()).sort((a, b) => a.dte - b.dte); + const monthly = expiries.filter((expiry) => expiry.isMonthly); + const weekly = expiries.filter((expiry) => !expiry.isMonthly); + + const selected: ExpiryInfo[] = []; + if (weekly.length > 0) { + selected.push(weekly[0]); + } + if (monthly.length > 0 && monthly[0].iso !== selected[0]?.iso) { + selected.push(monthly[0]); + } + + for (const expiry of expiries) { + if (selected.length >= 2) { + break; + } + if (!selected.some((picked) => picked.iso === expiry.iso)) { + selected.push(expiry); + } + } + + return selected.sort((a, b) => a.dte - b.dte); +}; + +const selectContractsForUnderlying = ( + contracts: OptionContract[], + price: number, + config: AlpacaOptionsAdapterConfig +) => { + const selectedExpiries = selectExpiries(contracts, config.maxDteDays); + const expirySet = new Set(selectedExpiries.map((expiry) => expiry.iso)); + const strikesByExpiry = new Map>(); + + for (const contract of contracts) { + if (!expirySet.has(contract.expiry)) { + continue; + } + + const strikeMap = strikesByExpiry.get(contract.expiry) ?? new Map(); + const entry = strikeMap.get(contract.strike) ?? {}; + + if (contract.right === "C") { + entry.call = contract.symbol; + } else { + entry.put = contract.symbol; + } + + strikeMap.set(contract.strike, entry); + strikesByExpiry.set(contract.expiry, strikeMap); + } + + const symbols: string[] = []; + const contractIds = new Map(); + let insufficient = false; + + for (const expiry of selectedExpiries) { + const strikeMap = strikesByExpiry.get(expiry.iso); + if (!strikeMap) { + insufficient = true; + continue; + } + + const minStrike = price * (1 - config.moneynessPct); + const maxStrike = price * (1 + config.moneynessPct); + const strikePairs = Array.from(strikeMap.entries()) + .filter(([strike, pair]) => pair.call && pair.put && strike >= minStrike && strike <= maxStrike) + .map(([strike, pair]) => ({ + strike, + call: pair.call as string, + put: pair.put as string, + distance: Math.abs(strike - price) + })) + .sort((a, b) => (a.distance === b.distance ? a.strike - b.strike : a.distance - b.distance)); + + if (strikePairs.length < config.strikesPerSide) { + insufficient = true; + } + + const selected = strikePairs.slice(0, config.strikesPerSide); + for (const strike of selected) { + symbols.push(strike.call, strike.put); + const callContract = parseOccSymbol(strike.call); + const putContract = parseOccSymbol(strike.put); + if (callContract) { + contractIds.set(strike.call, formatContractId(callContract)); + } + if (putContract) { + contractIds.set(strike.put, formatContractId(putContract)); + } + } + } + + return { + symbols, + contractIds, + selectedExpiries, + insufficient + }; +}; + +const decodePayload = (data: WebSocket.RawData): unknown => { + if (typeof data === "string") { + return JSON.parse(data) as unknown; + } + + if (data instanceof ArrayBuffer) { + return decode(new Uint8Array(data)); + } + + if (ArrayBuffer.isView(data)) { + return decode(new Uint8Array(data.buffer, data.byteOffset, data.byteLength)); + } + + return decode(new Uint8Array(data as ArrayBuffer)); +}; + +const parseTimestamp = (value: string): number => { + const parsed = Date.parse(value); + if (Number.isFinite(parsed)) { + return parsed; + } + return Date.now(); +}; + +export const createAlpacaOptionsAdapter = ( + config: AlpacaOptionsAdapterConfig +): OptionIngestAdapter => { + return { + name: "alpaca", + start: async (handlers: OptionIngestHandlers) => { + if (!config.keyId || !config.secretKey) { + throw new Error("Alpaca adapter requires ALPACA_KEY_ID and ALPACA_SECRET_KEY."); + } + + const underlyings = normalizeUnderlyings(config.underlyings); + if (underlyings.length === 0) { + throw new Error("Alpaca adapter requires at least one underlying symbol."); + } + + let selectedSymbols: string[] = []; + let contractIdMap = new Map(); + + for (const underlying of underlyings) { + const price = await fetchUnderlyingPrice(underlying, config); + const today = startOfUtcDay(new Date()); + const startDate = formatDate(today); + const endDate = formatDate(new Date(today.getTime() + config.maxDteDays * MS_PER_DAY)); + const strikeLower = price * (1 - config.moneynessPct); + const strikeUpper = price * (1 + config.moneynessPct); + + let contracts = await fetchOptionSnapshots( + underlying, + config, + startDate, + endDate, + strikeLower, + strikeUpper + ); + + let selection = selectContractsForUnderlying(contracts, price, config); + + if (selection.insufficient && config.moneynessFallbackPct > config.moneynessPct) { + const fallbackLower = price * (1 - config.moneynessFallbackPct); + const fallbackUpper = price * (1 + config.moneynessFallbackPct); + contracts = await fetchOptionSnapshots( + underlying, + config, + startDate, + endDate, + fallbackLower, + fallbackUpper + ); + selection = selectContractsForUnderlying(contracts, price, { + ...config, + moneynessPct: config.moneynessFallbackPct + }); + logger.warn("alpaca selection widened moneyness window", { + underlying, + moneyness_pct: config.moneynessFallbackPct + }); + } + + const expiryList = selection.selectedExpiries.map((expiry) => expiry.iso); + logger.info("alpaca contract selection", { + underlying, + price, + expiries: expiryList, + contracts: selection.symbols.length + }); + + selectedSymbols = selectedSymbols.concat(selection.symbols); + for (const [symbol, contractId] of selection.contractIds) { + contractIdMap.set(symbol, contractId); + } + } + + if (selectedSymbols.length === 0) { + throw new Error("Alpaca adapter did not select any option contracts."); + } + + if (selectedSymbols.length > config.maxQuotes) { + selectedSymbols = selectedSymbols.slice(0, config.maxQuotes); + contractIdMap = new Map( + selectedSymbols.map((symbol) => [symbol, contractIdMap.get(symbol) ?? symbol]) + ); + logger.warn("alpaca contract list truncated to max quote limit", { + max_quotes: config.maxQuotes, + selected: selectedSymbols.length + }); + } + + const wsBase = config.wsBaseUrl.endsWith("/") + ? config.wsBaseUrl.slice(0, -1) + : config.wsBaseUrl; + const wsUrl = `${wsBase}/${config.feed}`; + const ws = new WebSocket(wsUrl, { + headers: { + ...buildHeaders(config), + "Content-Type": "application/msgpack" + } + }); + + let seq = 0; + let stopped = false; + + ws.on("open", () => { + const subscribe: Record = { + action: "subscribe", + trades: selectedSymbols + }; + + if (handlers.onNBBO) { + subscribe.quotes = selectedSymbols; + } + + ws.send(encode(subscribe)); + }); + + ws.on("message", (data) => { + if (stopped) { + return; + } + + let payload: unknown; + try { + payload = decodePayload(data); + } catch (error) { + logger.warn("alpaca message decode failed", { + error: error instanceof Error ? error.message : String(error) + }); + return; + } + + if (!Array.isArray(payload)) { + return; + } + + for (const entry of payload) { + if (!entry || typeof entry !== "object") { + continue; + } + + const message = entry as AlpacaTradeMessage | AlpacaQuoteMessage | { T?: string; msg?: string }; + const type = message.T; + + if (type === "t") { + const trade = message as AlpacaTradeMessage; + const contractId = contractIdMap.get(trade.S); + if (!contractId) { + continue; + } + + const sourceTs = parseTimestamp(trade.t); + seq += 1; + void handlers.onTrade({ + source_ts: sourceTs, + ingest_ts: Date.now(), + seq, + trace_id: `alpaca-${seq}`, + ts: sourceTs, + option_contract_id: contractId, + price: trade.p, + size: trade.s, + exchange: trade.x ?? "OPRA", + conditions: trade.c ? [trade.c] : undefined + }); + continue; + } + + if (type === "q" && handlers.onNBBO) { + const quote = message as AlpacaQuoteMessage; + const contractId = contractIdMap.get(quote.S); + if (!contractId) { + continue; + } + + const sourceTs = parseTimestamp(quote.t); + seq += 1; + void handlers.onNBBO({ + source_ts: sourceTs, + ingest_ts: Date.now(), + seq, + trace_id: `alpaca-${seq}`, + ts: sourceTs, + option_contract_id: contractId, + bid: quote.bp, + ask: quote.ap, + bidSize: quote.bs, + askSize: quote.as + }); + continue; + } + + if (type === "error") { + logger.error("alpaca stream error", { message }); + } else if (type === "success" || type === "subscription") { + logger.info("alpaca stream status", { message }); + } + } + }); + + ws.on("error", (error) => { + logger.error("alpaca websocket error", { + error: error instanceof Error ? error.message : String(error) + }); + }); + + ws.on("close", (code, reason) => { + logger.warn("alpaca websocket closed", { code, reason: reason.toString() }); + }); + + return () => { + stopped = true; + ws.close(); + }; + } + }; +}; diff --git a/services/ingest-options/src/adapters/ibkr.ts b/services/ingest-options/src/adapters/ibkr.ts index 30a75c0..68d2ece 100644 --- a/services/ingest-options/src/adapters/ibkr.ts +++ b/services/ingest-options/src/adapters/ibkr.ts @@ -4,6 +4,59 @@ type IbkrOptionsAdapterConfig = { host: string; port: number; clientId: number; + symbol: string; + expiry: string; + strike: number; + right: "C" | "P"; + exchange: string; + currency: string; + pythonBin: string; +}; + +type IbkrTradeMessage = { + ts: number; + price: number; + size: number; + exchange?: string; +}; + +const formatExpiry = (expiry: string): string => { + if (/^\d{8}$/.test(expiry)) { + return `${expiry.slice(0, 4)}-${expiry.slice(4, 6)}-${expiry.slice(6, 8)}`; + } + + return expiry; +}; + +const readLines = async ( + stream: ReadableStream, + onLine: (line: string) => void +): Promise => { + const reader = stream.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + while (true) { + const { value, done } = await reader.read(); + if (done) { + break; + } + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split("\n"); + buffer = lines.pop() ?? ""; + + for (const line of lines) { + const trimmed = line.trim(); + if (trimmed.length > 0) { + onLine(trimmed); + } + } + } + + if (buffer.trim().length > 0) { + onLine(buffer.trim()); + } }; export const createIbkrOptionsAdapter = ( @@ -11,10 +64,77 @@ export const createIbkrOptionsAdapter = ( ): OptionIngestAdapter => { return { name: "ibkr", - start: (_handlers: OptionIngestHandlers) => { - throw new Error( - `IBKR adapter not implemented. Requested ${config.host}:${config.port} clientId=${config.clientId}.` - ); + start: (handlers: OptionIngestHandlers) => { + const scriptPath = new URL("../../py/ibkr_stream.py", import.meta.url).pathname; + const args = [ + config.pythonBin, + scriptPath, + "--host", + config.host, + "--port", + String(config.port), + "--client-id", + String(config.clientId), + "--symbol", + config.symbol, + "--expiry", + config.expiry, + "--strike", + String(config.strike), + "--right", + config.right, + "--exchange", + config.exchange, + "--currency", + config.currency + ]; + + const child = Bun.spawn(args, { + stdout: "pipe", + stderr: "inherit" + }); + + if (!child.stdout) { + throw new Error("IBKR adapter failed to attach stdout."); + } + + let seq = 0; + const contractId = `${config.symbol}-${formatExpiry(config.expiry)}-${config.strike}-${config.right}`; + + const handleLine = (line: string) => { + try { + const payload = JSON.parse(line) as IbkrTradeMessage; + if (!payload || typeof payload.ts !== "number") { + return; + } + + const sourceTs = Number.isFinite(payload.ts) ? payload.ts : Date.now(); + const ingestTs = Date.now(); + seq += 1; + + void handlers.onTrade({ + source_ts: sourceTs, + ingest_ts: ingestTs, + seq, + trace_id: `ibkr-${seq}`, + ts: sourceTs, + option_contract_id: contractId, + price: payload.price, + size: payload.size, + exchange: payload.exchange ?? "IBKR" + }); + } catch { + // Ignore malformed lines to keep stream alive. + } + }; + + void readLines(child.stdout, handleLine); + + const stop = () => { + child.kill(); + }; + + return stop; } }; }; diff --git a/services/ingest-options/src/index.ts b/services/ingest-options/src/index.ts index 676b645..d141178 100644 --- a/services/ingest-options/src/index.ts +++ b/services/ingest-options/src/index.ts @@ -13,6 +13,7 @@ import { insertOptionPrint } from "@islandflow/storage"; import { OptionPrintSchema, type OptionPrint } from "@islandflow/types"; +import { createAlpacaOptionsAdapter } from "./adapters/alpaca"; import { createIbkrOptionsAdapter } from "./adapters/ibkr"; import { createSyntheticOptionsAdapter } from "./adapters/synthetic"; import type { OptionIngestAdapter, StopHandler } from "./adapters/types"; @@ -25,10 +26,30 @@ const envSchema = z.object({ NATS_URL: z.string().default("nats://localhost:4222"), CLICKHOUSE_URL: z.string().default("http://localhost:8123"), CLICKHOUSE_DATABASE: z.string().default("default"), - INGEST_ADAPTER: z.string().min(1).default("synthetic"), + INGEST_ADAPTER: z.string().min(1).default("alpaca"), + ALPACA_KEY_ID: z.string().default("PKQDUYKNHDYCPONSMWIXZHT6QV"), + ALPACA_SECRET_KEY: z.string().default("5ktmszfCiWg125GtPguuFpSeTB2zHNewScncAaY4hnKo"), + ALPACA_REST_URL: z.string().default("https://data.alpaca.markets"), + ALPACA_WS_BASE_URL: z.string().default("wss://stream.data.alpaca.markets/v1beta1"), + ALPACA_FEED: z.enum(["indicative", "opra"]).default("indicative"), + ALPACA_UNDERLYINGS: z.string().default("SPY"), + ALPACA_STRIKES_PER_SIDE: z.coerce.number().int().positive().default(8), + ALPACA_MAX_DTE_DAYS: z.coerce.number().int().positive().default(30), + ALPACA_MONEYNESS_PCT: z.coerce.number().positive().default(0.06), + ALPACA_MONEYNESS_FALLBACK_PCT: z.coerce.number().positive().default(0.1), + ALPACA_MAX_QUOTES: z.coerce.number().int().positive().default(200), IBKR_HOST: z.string().default("127.0.0.1"), IBKR_PORT: z.coerce.number().int().positive().default(7497), IBKR_CLIENT_ID: z.coerce.number().int().nonnegative().default(0), + IBKR_SYMBOL: z.string().min(1).default("SPY"), + IBKR_EXPIRY: z.string().min(1).default("20250117"), + IBKR_STRIKE: z.coerce.number().positive().default(450), + IBKR_RIGHT: z + .preprocess((value) => (typeof value === "string" ? value.toUpperCase() : value), z.enum(["C", "P"])) + .default("C"), + IBKR_EXCHANGE: z.string().min(1).default("SMART"), + IBKR_CURRENCY: z.string().min(1).default("USD"), + IBKR_PYTHON_BIN: z.string().min(1).default("python3"), EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000) }); @@ -70,11 +91,40 @@ const selectAdapter = (name: string): OptionIngestAdapter => { return createSyntheticOptionsAdapter({ emitIntervalMs: env.EMIT_INTERVAL_MS }); } + if (name === "alpaca") { + if (!env.ALPACA_KEY_ID || !env.ALPACA_SECRET_KEY) { + throw new Error("ALPACA_KEY_ID and ALPACA_SECRET_KEY are required for the alpaca adapter."); + } + + const underlyings = env.ALPACA_UNDERLYINGS.split(",").map((symbol) => symbol.trim()); + + return createAlpacaOptionsAdapter({ + keyId: env.ALPACA_KEY_ID, + secretKey: env.ALPACA_SECRET_KEY, + restUrl: env.ALPACA_REST_URL, + wsBaseUrl: env.ALPACA_WS_BASE_URL, + feed: env.ALPACA_FEED, + underlyings, + strikesPerSide: env.ALPACA_STRIKES_PER_SIDE, + maxDteDays: env.ALPACA_MAX_DTE_DAYS, + moneynessPct: env.ALPACA_MONEYNESS_PCT, + moneynessFallbackPct: env.ALPACA_MONEYNESS_FALLBACK_PCT, + maxQuotes: env.ALPACA_MAX_QUOTES + }); + } + if (name === "ibkr") { return createIbkrOptionsAdapter({ host: env.IBKR_HOST, port: env.IBKR_PORT, - clientId: env.IBKR_CLIENT_ID + clientId: env.IBKR_CLIENT_ID, + symbol: env.IBKR_SYMBOL, + expiry: env.IBKR_EXPIRY, + strike: env.IBKR_STRIKE, + right: env.IBKR_RIGHT, + exchange: env.IBKR_EXCHANGE, + currency: env.IBKR_CURRENCY, + pythonBin: env.IBKR_PYTHON_BIN }); }