Add ingest adapter seam and IBKR stub

This commit is contained in:
dirtydishes 2025-12-27 23:02:11 -05:00
parent f2f12f2ebe
commit a35ab0b778
8 changed files with 239 additions and 94 deletions

View file

@ -36,6 +36,7 @@ Not started:
## Current Capabilities ## Current Capabilities
- Synthetic options/equity prints with deterministic sequencing - Synthetic options/equity prints with deterministic sequencing
- Ingest adapter seam (env-selected, default `synthetic`) for options and equities
- Raw event persistence in ClickHouse + streaming via NATS JetStream - Raw event persistence in ClickHouse + streaming via NATS JetStream
- Deterministic option FlowPacket clustering (time-window) - Deterministic option FlowPacket clustering (time-window)
- API gateway with REST, WS, and replay endpoints - API gateway with REST, WS, and replay endpoints

View file

@ -0,0 +1,54 @@
import type { EquityPrint } from "@islandflow/types";
import type { EquityIngestAdapter, EquityIngestHandlers } from "./types";
type SyntheticEquitiesAdapterConfig = {
emitIntervalMs: number;
};
const buildSyntheticPrint = (seq: number, now: number): EquityPrint => {
return {
source_ts: now,
ingest_ts: now,
seq,
trace_id: `ingest-equities-${seq}`,
ts: now,
underlying_id: "SPY",
price: 450.1,
size: 100,
exchange: "TEST",
offExchangeFlag: false
};
};
export const createSyntheticEquitiesAdapter = (
config: SyntheticEquitiesAdapterConfig
): EquityIngestAdapter => {
return {
name: "synthetic",
start: (handlers: EquityIngestHandlers) => {
let seq = 0;
let timer: ReturnType<typeof setInterval> | null = null;
let stopped = false;
const emit = () => {
if (stopped) {
return;
}
seq += 1;
const now = Date.now();
const print = buildSyntheticPrint(seq, now);
void handlers.onTrade(print);
};
timer = setInterval(emit, config.emitIntervalMs);
return () => {
stopped = true;
if (timer) {
clearInterval(timer);
}
};
}
};
};

View file

@ -0,0 +1,13 @@
import type { EquityPrint, EquityQuote } from "@islandflow/types";
export type StopHandler = () => void | Promise<void>;
export type EquityIngestHandlers = {
onTrade: (print: EquityPrint) => void | Promise<void>;
onQuote?: (quote: EquityQuote) => void | Promise<void>;
};
export type EquityIngestAdapter = {
name: string;
start: (handlers: EquityIngestHandlers) => StopHandler | Promise<StopHandler>;
};

View file

@ -13,6 +13,8 @@ import {
insertEquityPrint insertEquityPrint
} from "@islandflow/storage"; } from "@islandflow/storage";
import { EquityPrintSchema, type EquityPrint } from "@islandflow/types"; import { EquityPrintSchema, type EquityPrint } from "@islandflow/types";
import { createSyntheticEquitiesAdapter } from "./adapters/synthetic";
import type { EquityIngestAdapter, StopHandler } from "./adapters/types";
import { z } from "zod"; import { z } from "zod";
const service = "ingest-equities"; const service = "ingest-equities";
@ -22,15 +24,14 @@ const envSchema = z.object({
NATS_URL: z.string().default("nats://localhost:4222"), NATS_URL: z.string().default("nats://localhost:4222"),
CLICKHOUSE_URL: z.string().default("http://localhost:8123"), CLICKHOUSE_URL: z.string().default("http://localhost:8123"),
CLICKHOUSE_DATABASE: z.string().default("default"), CLICKHOUSE_DATABASE: z.string().default("default"),
INGEST_ADAPTER: z.string().min(1).default("synthetic"),
EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000) EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000)
}); });
const env = readEnv(envSchema); const env = readEnv(envSchema);
const state = { const state = {
shuttingDown: false, shuttingDown: false
seq: 0,
timer: null as ReturnType<typeof setInterval> | null
}; };
const retry = async <T>( const retry = async <T>(
@ -60,22 +61,12 @@ const retry = async <T>(
throw lastError ?? new Error(`${label} failed after retries`); throw lastError ?? new Error(`${label} failed after retries`);
}; };
const buildSyntheticPrint = (): EquityPrint => { const selectAdapter = (name: string): EquityIngestAdapter => {
const now = Date.now(); if (name === "synthetic") {
state.seq += 1; return createSyntheticEquitiesAdapter({ emitIntervalMs: env.EMIT_INTERVAL_MS });
}
return { throw new Error(`Unknown ingest adapter: ${name}`);
source_ts: now,
ingest_ts: now,
seq: state.seq,
trace_id: `ingest-equities-${state.seq}`,
ts: now,
underlying_id: "SPY",
price: 450.1,
size: 100,
exchange: "TEST",
offExchangeFlag: false
};
}; };
const run = async () => { const run = async () => {
@ -111,12 +102,15 @@ const run = async () => {
await ensureEquityPrintsTable(clickhouse); await ensureEquityPrintsTable(clickhouse);
}); });
const emit = async () => { const adapter = selectAdapter(env.INGEST_ADAPTER);
logger.info("ingest adapter selected", { adapter: adapter.name });
const stopAdapter: StopHandler = await adapter.start({
onTrade: async (candidate: EquityPrint) => {
if (state.shuttingDown) { if (state.shuttingDown) {
return; return;
} }
const candidate = buildSyntheticPrint();
const print = EquityPrintSchema.parse(candidate); const print = EquityPrintSchema.parse(candidate);
try { try {
@ -133,11 +127,8 @@ const run = async () => {
trace_id: print.trace_id trace_id: print.trace_id
}); });
} }
}; }
});
state.timer = setInterval(() => {
void emit();
}, env.EMIT_INTERVAL_MS);
const shutdown = async (signal: string) => { const shutdown = async (signal: string) => {
if (state.shuttingDown) { if (state.shuttingDown) {
@ -145,9 +136,7 @@ const run = async () => {
} }
state.shuttingDown = true; state.shuttingDown = true;
if (state.timer) { await stopAdapter();
clearInterval(state.timer);
}
logger.info("service stopping", { signal }); logger.info("service stopping", { signal });

View file

@ -0,0 +1,20 @@
import type { OptionIngestAdapter, OptionIngestHandlers } from "./types";
type IbkrOptionsAdapterConfig = {
host: string;
port: number;
clientId: number;
};
export const createIbkrOptionsAdapter = (
config: IbkrOptionsAdapterConfig
): OptionIngestAdapter => {
return {
name: "ibkr",
start: (_handlers: OptionIngestHandlers) => {
throw new Error(
`IBKR adapter not implemented. Requested ${config.host}:${config.port} clientId=${config.clientId}.`
);
}
};
};

View file

@ -0,0 +1,54 @@
import type { OptionPrint } from "@islandflow/types";
import type { OptionIngestAdapter, OptionIngestHandlers } from "./types";
type SyntheticOptionsAdapterConfig = {
emitIntervalMs: number;
};
const buildSyntheticPrint = (seq: number, now: number): OptionPrint => {
return {
source_ts: now,
ingest_ts: now,
seq,
trace_id: `ingest-options-${seq}`,
ts: now,
option_contract_id: "SPY-2025-01-17-450-C",
price: 1.25,
size: 10,
exchange: "TEST",
conditions: ["TEST"]
};
};
export const createSyntheticOptionsAdapter = (
config: SyntheticOptionsAdapterConfig
): OptionIngestAdapter => {
return {
name: "synthetic",
start: (handlers: OptionIngestHandlers) => {
let seq = 0;
let timer: ReturnType<typeof setInterval> | null = null;
let stopped = false;
const emit = () => {
if (stopped) {
return;
}
seq += 1;
const now = Date.now();
const print = buildSyntheticPrint(seq, now);
void handlers.onTrade(print);
};
timer = setInterval(emit, config.emitIntervalMs);
return () => {
stopped = true;
if (timer) {
clearInterval(timer);
}
};
}
};
};

View file

@ -0,0 +1,13 @@
import type { OptionNBBO, OptionPrint } from "@islandflow/types";
export type StopHandler = () => void | Promise<void>;
export type OptionIngestHandlers = {
onTrade: (print: OptionPrint) => void | Promise<void>;
onNBBO?: (nbbo: OptionNBBO) => void | Promise<void>;
};
export type OptionIngestAdapter = {
name: string;
start: (handlers: OptionIngestHandlers) => StopHandler | Promise<StopHandler>;
};

View file

@ -13,6 +13,9 @@ import {
insertOptionPrint insertOptionPrint
} from "@islandflow/storage"; } from "@islandflow/storage";
import { OptionPrintSchema, type OptionPrint } from "@islandflow/types"; import { OptionPrintSchema, type OptionPrint } from "@islandflow/types";
import { createIbkrOptionsAdapter } from "./adapters/ibkr";
import { createSyntheticOptionsAdapter } from "./adapters/synthetic";
import type { OptionIngestAdapter, StopHandler } from "./adapters/types";
import { z } from "zod"; import { z } from "zod";
const service = "ingest-options"; const service = "ingest-options";
@ -22,15 +25,17 @@ const envSchema = z.object({
NATS_URL: z.string().default("nats://localhost:4222"), NATS_URL: z.string().default("nats://localhost:4222"),
CLICKHOUSE_URL: z.string().default("http://localhost:8123"), CLICKHOUSE_URL: z.string().default("http://localhost:8123"),
CLICKHOUSE_DATABASE: z.string().default("default"), CLICKHOUSE_DATABASE: z.string().default("default"),
INGEST_ADAPTER: z.string().min(1).default("synthetic"),
IBKR_HOST: z.string().default("127.0.0.1"),
IBKR_PORT: z.coerce.number().int().positive().default(7497),
IBKR_CLIENT_ID: z.coerce.number().int().nonnegative().default(0),
EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000) EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000)
}); });
const env = readEnv(envSchema); const env = readEnv(envSchema);
const state = { const state = {
shuttingDown: false, shuttingDown: false
seq: 0,
timer: null as ReturnType<typeof setInterval> | null
}; };
const retry = async <T>( const retry = async <T>(
@ -60,22 +65,20 @@ const retry = async <T>(
throw lastError ?? new Error(`${label} failed after retries`); throw lastError ?? new Error(`${label} failed after retries`);
}; };
const buildSyntheticPrint = (): OptionPrint => { const selectAdapter = (name: string): OptionIngestAdapter => {
const now = Date.now(); if (name === "synthetic") {
state.seq += 1; return createSyntheticOptionsAdapter({ emitIntervalMs: env.EMIT_INTERVAL_MS });
}
return { if (name === "ibkr") {
source_ts: now, return createIbkrOptionsAdapter({
ingest_ts: now, host: env.IBKR_HOST,
seq: state.seq, port: env.IBKR_PORT,
trace_id: `ingest-options-${state.seq}`, clientId: env.IBKR_CLIENT_ID
ts: now, });
option_contract_id: "SPY-2025-01-17-450-C", }
price: 1.25,
size: 10, throw new Error(`Unknown ingest adapter: ${name}`);
exchange: "TEST",
conditions: ["TEST"]
};
}; };
const run = async () => { const run = async () => {
@ -111,12 +114,15 @@ const run = async () => {
await ensureOptionPrintsTable(clickhouse); await ensureOptionPrintsTable(clickhouse);
}); });
const emit = async () => { const adapter = selectAdapter(env.INGEST_ADAPTER);
logger.info("ingest adapter selected", { adapter: adapter.name });
const stopAdapter: StopHandler = await adapter.start({
onTrade: async (candidate: OptionPrint) => {
if (state.shuttingDown) { if (state.shuttingDown) {
return; return;
} }
const candidate = buildSyntheticPrint();
const print = OptionPrintSchema.parse(candidate); const print = OptionPrintSchema.parse(candidate);
try { try {
@ -133,11 +139,8 @@ const run = async () => {
trace_id: print.trace_id trace_id: print.trace_id
}); });
} }
}; }
});
state.timer = setInterval(() => {
void emit();
}, env.EMIT_INTERVAL_MS);
const shutdown = async (signal: string) => { const shutdown = async (signal: string) => {
if (state.shuttingDown) { if (state.shuttingDown) {
@ -145,9 +148,7 @@ const run = async () => {
} }
state.shuttingDown = true; state.shuttingDown = true;
if (state.timer) { await stopAdapter();
clearInterval(state.timer);
}
logger.info("service stopping", { signal }); logger.info("service stopping", { signal });