diff --git a/README.md b/README.md index 5598a5f..ef5507a 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,7 @@ Not started: ## Current Capabilities - Synthetic options/equity prints with deterministic sequencing +- Ingest adapter seam (env-selected, default `synthetic`) for options and equities - Raw event persistence in ClickHouse + streaming via NATS JetStream - Deterministic option FlowPacket clustering (time-window) - API gateway with REST, WS, and replay endpoints diff --git a/services/ingest-equities/src/adapters/synthetic.ts b/services/ingest-equities/src/adapters/synthetic.ts new file mode 100644 index 0000000..2b79d36 --- /dev/null +++ b/services/ingest-equities/src/adapters/synthetic.ts @@ -0,0 +1,54 @@ +import type { EquityPrint } from "@islandflow/types"; +import type { EquityIngestAdapter, EquityIngestHandlers } from "./types"; + +type SyntheticEquitiesAdapterConfig = { + emitIntervalMs: number; +}; + +const buildSyntheticPrint = (seq: number, now: number): EquityPrint => { + return { + source_ts: now, + ingest_ts: now, + seq, + trace_id: `ingest-equities-${seq}`, + ts: now, + underlying_id: "SPY", + price: 450.1, + size: 100, + exchange: "TEST", + offExchangeFlag: false + }; +}; + +export const createSyntheticEquitiesAdapter = ( + config: SyntheticEquitiesAdapterConfig +): EquityIngestAdapter => { + return { + name: "synthetic", + start: (handlers: EquityIngestHandlers) => { + let seq = 0; + let timer: ReturnType | null = null; + let stopped = false; + + const emit = () => { + if (stopped) { + return; + } + + seq += 1; + const now = Date.now(); + const print = buildSyntheticPrint(seq, now); + void handlers.onTrade(print); + }; + + timer = setInterval(emit, config.emitIntervalMs); + + return () => { + stopped = true; + if (timer) { + clearInterval(timer); + } + }; + } + }; +}; diff --git a/services/ingest-equities/src/adapters/types.ts b/services/ingest-equities/src/adapters/types.ts new file mode 100644 index 0000000..ea86de7 --- /dev/null +++ b/services/ingest-equities/src/adapters/types.ts @@ -0,0 +1,13 @@ +import type { EquityPrint, EquityQuote } from "@islandflow/types"; + +export type StopHandler = () => void | Promise; + +export type EquityIngestHandlers = { + onTrade: (print: EquityPrint) => void | Promise; + onQuote?: (quote: EquityQuote) => void | Promise; +}; + +export type EquityIngestAdapter = { + name: string; + start: (handlers: EquityIngestHandlers) => StopHandler | Promise; +}; diff --git a/services/ingest-equities/src/index.ts b/services/ingest-equities/src/index.ts index 298906b..0fa3218 100644 --- a/services/ingest-equities/src/index.ts +++ b/services/ingest-equities/src/index.ts @@ -13,6 +13,8 @@ import { insertEquityPrint } from "@islandflow/storage"; import { EquityPrintSchema, type EquityPrint } from "@islandflow/types"; +import { createSyntheticEquitiesAdapter } from "./adapters/synthetic"; +import type { EquityIngestAdapter, StopHandler } from "./adapters/types"; import { z } from "zod"; const service = "ingest-equities"; @@ -22,15 +24,14 @@ const envSchema = z.object({ NATS_URL: z.string().default("nats://localhost:4222"), CLICKHOUSE_URL: z.string().default("http://localhost:8123"), CLICKHOUSE_DATABASE: z.string().default("default"), + INGEST_ADAPTER: z.string().min(1).default("synthetic"), EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000) }); const env = readEnv(envSchema); const state = { - shuttingDown: false, - seq: 0, - timer: null as ReturnType | null + shuttingDown: false }; const retry = async ( @@ -60,22 +61,12 @@ const retry = async ( throw lastError ?? new Error(`${label} failed after retries`); }; -const buildSyntheticPrint = (): EquityPrint => { - const now = Date.now(); - state.seq += 1; +const selectAdapter = (name: string): EquityIngestAdapter => { + if (name === "synthetic") { + return createSyntheticEquitiesAdapter({ emitIntervalMs: env.EMIT_INTERVAL_MS }); + } - return { - source_ts: now, - ingest_ts: now, - seq: state.seq, - trace_id: `ingest-equities-${state.seq}`, - ts: now, - underlying_id: "SPY", - price: 450.1, - size: 100, - exchange: "TEST", - offExchangeFlag: false - }; + throw new Error(`Unknown ingest adapter: ${name}`); }; const run = async () => { @@ -111,33 +102,33 @@ const run = async () => { await ensureEquityPrintsTable(clickhouse); }); - const emit = async () => { - if (state.shuttingDown) { - return; + const adapter = selectAdapter(env.INGEST_ADAPTER); + logger.info("ingest adapter selected", { adapter: adapter.name }); + + const stopAdapter: StopHandler = await adapter.start({ + onTrade: async (candidate: EquityPrint) => { + if (state.shuttingDown) { + return; + } + + const print = EquityPrintSchema.parse(candidate); + + try { + await insertEquityPrint(clickhouse, print); + await publishJson(js, SUBJECT_EQUITY_PRINTS, print); + logger.info("published equity print", { + trace_id: print.trace_id, + seq: print.seq, + underlying_id: print.underlying_id + }); + } catch (error) { + logger.error("failed to publish equity print", { + error: error instanceof Error ? error.message : String(error), + trace_id: print.trace_id + }); + } } - - const candidate = buildSyntheticPrint(); - const print = EquityPrintSchema.parse(candidate); - - try { - await insertEquityPrint(clickhouse, print); - await publishJson(js, SUBJECT_EQUITY_PRINTS, print); - logger.info("published equity print", { - trace_id: print.trace_id, - seq: print.seq, - underlying_id: print.underlying_id - }); - } catch (error) { - logger.error("failed to publish equity print", { - error: error instanceof Error ? error.message : String(error), - trace_id: print.trace_id - }); - } - }; - - state.timer = setInterval(() => { - void emit(); - }, env.EMIT_INTERVAL_MS); + }); const shutdown = async (signal: string) => { if (state.shuttingDown) { @@ -145,9 +136,7 @@ const run = async () => { } state.shuttingDown = true; - if (state.timer) { - clearInterval(state.timer); - } + await stopAdapter(); logger.info("service stopping", { signal }); diff --git a/services/ingest-options/src/adapters/ibkr.ts b/services/ingest-options/src/adapters/ibkr.ts new file mode 100644 index 0000000..30a75c0 --- /dev/null +++ b/services/ingest-options/src/adapters/ibkr.ts @@ -0,0 +1,20 @@ +import type { OptionIngestAdapter, OptionIngestHandlers } from "./types"; + +type IbkrOptionsAdapterConfig = { + host: string; + port: number; + clientId: number; +}; + +export const createIbkrOptionsAdapter = ( + config: IbkrOptionsAdapterConfig +): OptionIngestAdapter => { + return { + name: "ibkr", + start: (_handlers: OptionIngestHandlers) => { + throw new Error( + `IBKR adapter not implemented. Requested ${config.host}:${config.port} clientId=${config.clientId}.` + ); + } + }; +}; diff --git a/services/ingest-options/src/adapters/synthetic.ts b/services/ingest-options/src/adapters/synthetic.ts new file mode 100644 index 0000000..280a4c4 --- /dev/null +++ b/services/ingest-options/src/adapters/synthetic.ts @@ -0,0 +1,54 @@ +import type { OptionPrint } from "@islandflow/types"; +import type { OptionIngestAdapter, OptionIngestHandlers } from "./types"; + +type SyntheticOptionsAdapterConfig = { + emitIntervalMs: number; +}; + +const buildSyntheticPrint = (seq: number, now: number): OptionPrint => { + return { + source_ts: now, + ingest_ts: now, + seq, + trace_id: `ingest-options-${seq}`, + ts: now, + option_contract_id: "SPY-2025-01-17-450-C", + price: 1.25, + size: 10, + exchange: "TEST", + conditions: ["TEST"] + }; +}; + +export const createSyntheticOptionsAdapter = ( + config: SyntheticOptionsAdapterConfig +): OptionIngestAdapter => { + return { + name: "synthetic", + start: (handlers: OptionIngestHandlers) => { + let seq = 0; + let timer: ReturnType | null = null; + let stopped = false; + + const emit = () => { + if (stopped) { + return; + } + + seq += 1; + const now = Date.now(); + const print = buildSyntheticPrint(seq, now); + void handlers.onTrade(print); + }; + + timer = setInterval(emit, config.emitIntervalMs); + + return () => { + stopped = true; + if (timer) { + clearInterval(timer); + } + }; + } + }; +}; diff --git a/services/ingest-options/src/adapters/types.ts b/services/ingest-options/src/adapters/types.ts new file mode 100644 index 0000000..6c64745 --- /dev/null +++ b/services/ingest-options/src/adapters/types.ts @@ -0,0 +1,13 @@ +import type { OptionNBBO, OptionPrint } from "@islandflow/types"; + +export type StopHandler = () => void | Promise; + +export type OptionIngestHandlers = { + onTrade: (print: OptionPrint) => void | Promise; + onNBBO?: (nbbo: OptionNBBO) => void | Promise; +}; + +export type OptionIngestAdapter = { + name: string; + start: (handlers: OptionIngestHandlers) => StopHandler | Promise; +}; diff --git a/services/ingest-options/src/index.ts b/services/ingest-options/src/index.ts index 31b5e8a..676b645 100644 --- a/services/ingest-options/src/index.ts +++ b/services/ingest-options/src/index.ts @@ -13,6 +13,9 @@ import { insertOptionPrint } from "@islandflow/storage"; import { OptionPrintSchema, type OptionPrint } from "@islandflow/types"; +import { createIbkrOptionsAdapter } from "./adapters/ibkr"; +import { createSyntheticOptionsAdapter } from "./adapters/synthetic"; +import type { OptionIngestAdapter, StopHandler } from "./adapters/types"; import { z } from "zod"; const service = "ingest-options"; @@ -22,15 +25,17 @@ const envSchema = z.object({ NATS_URL: z.string().default("nats://localhost:4222"), CLICKHOUSE_URL: z.string().default("http://localhost:8123"), CLICKHOUSE_DATABASE: z.string().default("default"), + INGEST_ADAPTER: z.string().min(1).default("synthetic"), + IBKR_HOST: z.string().default("127.0.0.1"), + IBKR_PORT: z.coerce.number().int().positive().default(7497), + IBKR_CLIENT_ID: z.coerce.number().int().nonnegative().default(0), EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000) }); const env = readEnv(envSchema); const state = { - shuttingDown: false, - seq: 0, - timer: null as ReturnType | null + shuttingDown: false }; const retry = async ( @@ -60,22 +65,20 @@ const retry = async ( throw lastError ?? new Error(`${label} failed after retries`); }; -const buildSyntheticPrint = (): OptionPrint => { - const now = Date.now(); - state.seq += 1; +const selectAdapter = (name: string): OptionIngestAdapter => { + if (name === "synthetic") { + return createSyntheticOptionsAdapter({ emitIntervalMs: env.EMIT_INTERVAL_MS }); + } - return { - source_ts: now, - ingest_ts: now, - seq: state.seq, - trace_id: `ingest-options-${state.seq}`, - ts: now, - option_contract_id: "SPY-2025-01-17-450-C", - price: 1.25, - size: 10, - exchange: "TEST", - conditions: ["TEST"] - }; + if (name === "ibkr") { + return createIbkrOptionsAdapter({ + host: env.IBKR_HOST, + port: env.IBKR_PORT, + clientId: env.IBKR_CLIENT_ID + }); + } + + throw new Error(`Unknown ingest adapter: ${name}`); }; const run = async () => { @@ -111,33 +114,33 @@ const run = async () => { await ensureOptionPrintsTable(clickhouse); }); - const emit = async () => { - if (state.shuttingDown) { - return; + const adapter = selectAdapter(env.INGEST_ADAPTER); + logger.info("ingest adapter selected", { adapter: adapter.name }); + + const stopAdapter: StopHandler = await adapter.start({ + onTrade: async (candidate: OptionPrint) => { + if (state.shuttingDown) { + return; + } + + const print = OptionPrintSchema.parse(candidate); + + try { + await insertOptionPrint(clickhouse, print); + await publishJson(js, SUBJECT_OPTION_PRINTS, print); + logger.info("published option print", { + trace_id: print.trace_id, + seq: print.seq, + option_contract_id: print.option_contract_id + }); + } catch (error) { + logger.error("failed to publish option print", { + error: error instanceof Error ? error.message : String(error), + trace_id: print.trace_id + }); + } } - - const candidate = buildSyntheticPrint(); - const print = OptionPrintSchema.parse(candidate); - - try { - await insertOptionPrint(clickhouse, print); - await publishJson(js, SUBJECT_OPTION_PRINTS, print); - logger.info("published option print", { - trace_id: print.trace_id, - seq: print.seq, - option_contract_id: print.option_contract_id - }); - } catch (error) { - logger.error("failed to publish option print", { - error: error instanceof Error ? error.message : String(error), - trace_id: print.trace_id - }); - } - }; - - state.timer = setInterval(() => { - void emit(); - }, env.EMIT_INTERVAL_MS); + }); const shutdown = async (signal: string) => { if (state.shuttingDown) { @@ -145,9 +148,7 @@ const run = async () => { } state.shuttingDown = true; - if (state.timer) { - clearInterval(state.timer); - } + await stopAdapter(); logger.info("service stopping", { signal });