Add equity prints ingestion

This commit is contained in:
dirtydishes 2025-12-27 19:21:01 -05:00
parent 488ae82ed6
commit 6a1f457028
9 changed files with 252 additions and 12 deletions

View file

@ -75,8 +75,12 @@
"services/ingest-equities": { "services/ingest-equities": {
"name": "@islandflow/ingest-equities", "name": "@islandflow/ingest-equities",
"dependencies": { "dependencies": {
"@islandflow/bus": "workspace:*",
"@islandflow/config": "workspace:*", "@islandflow/config": "workspace:*",
"@islandflow/observability": "workspace:*", "@islandflow/observability": "workspace:*",
"@islandflow/storage": "workspace:*",
"@islandflow/types": "workspace:*",
"zod": "^3.23.8",
}, },
}, },
"services/ingest-options": { "services/ingest-options": {

View file

@ -3,6 +3,7 @@ services:
image: clickhouse/clickhouse-server:23.8 image: clickhouse/clickhouse-server:23.8
ports: ports:
- "8123:8123" - "8123:8123"
- "9000:9000"
volumes: volumes:
- clickhouse-data:/var/lib/clickhouse - clickhouse-data:/var/lib/clickhouse
ulimits: ulimits:
@ -12,7 +13,7 @@ services:
redis: redis:
image: redis:7.2 image: redis:7.2
ports: ports:
- "6380:6379" - "6379:6379"
volumes: volumes:
- redis-data:/data - redis-data:/data
nats: nats:

View file

@ -1,2 +1,4 @@
export const STREAM_OPTION_PRINTS = "OPTIONS_PRINTS"; export const STREAM_OPTION_PRINTS = "OPTIONS_PRINTS";
export const SUBJECT_OPTION_PRINTS = "options.prints"; export const SUBJECT_OPTION_PRINTS = "options.prints";
export const STREAM_EQUITY_PRINTS = "EQUITY_PRINTS";
export const SUBJECT_EQUITY_PRINTS = "equities.prints";

View file

@ -1,6 +1,15 @@
import { createClient, type ClickHouseClient } from "@clickhouse/client"; import { createClient, type ClickHouseClient } from "@clickhouse/client";
import type { OptionPrint } from "@islandflow/types"; import type { EquityPrint, OptionPrint } from "@islandflow/types";
import { normalizeOptionPrint, optionPrintsTableDDL, OPTION_PRINTS_TABLE } from "./option-prints"; import {
normalizeOptionPrint,
optionPrintsTableDDL,
OPTION_PRINTS_TABLE
} from "./option-prints";
import {
equityPrintsTableDDL,
EQUITY_PRINTS_TABLE,
normalizeEquityPrint
} from "./equity-prints";
export type ClickHouseOptions = { export type ClickHouseOptions = {
url: string; url: string;
@ -26,6 +35,14 @@ export const ensureOptionPrintsTable = async (
}); });
}; };
export const ensureEquityPrintsTable = async (
client: ClickHouseClient
): Promise<void> => {
await client.exec({
query: equityPrintsTableDDL()
});
};
export const insertOptionPrint = async ( export const insertOptionPrint = async (
client: ClickHouseClient, client: ClickHouseClient,
print: OptionPrint print: OptionPrint
@ -37,3 +54,15 @@ export const insertOptionPrint = async (
format: "JSONEachRow" format: "JSONEachRow"
}); });
}; };
export const insertEquityPrint = async (
client: ClickHouseClient,
print: EquityPrint
): Promise<void> => {
const record = normalizeEquityPrint(print);
await client.insert({
table: EQUITY_PRINTS_TABLE,
values: [record],
format: "JSONEachRow"
});
};

View file

@ -0,0 +1,26 @@
import type { EquityPrint } from "@islandflow/types";
export const EQUITY_PRINTS_TABLE = "equity_prints";
export const equityPrintsTableDDL = (): string => {
return `
CREATE TABLE IF NOT EXISTS ${EQUITY_PRINTS_TABLE} (
source_ts UInt64,
ingest_ts UInt64,
seq UInt64,
trace_id String,
ts UInt64,
underlying_id String,
price Float64,
size UInt32,
exchange String,
offExchangeFlag Bool
)
ENGINE = MergeTree
ORDER BY (ts, underlying_id)
`;
};
export const normalizeEquityPrint = (print: EquityPrint): EquityPrint => {
return print;
};

View file

@ -1,2 +1,3 @@
export * from "./clickhouse"; export * from "./clickhouse";
export * from "./equity-prints";
export * from "./option-prints"; export * from "./option-prints";

View file

@ -0,0 +1,27 @@
import { describe, expect, it } from "bun:test";
import { equityPrintsTableDDL, EQUITY_PRINTS_TABLE } from "../src/equity-prints";
const basePrint = {
source_ts: 100,
ingest_ts: 200,
seq: 1,
trace_id: "trace-1",
ts: 100,
underlying_id: "SPY",
price: 450.1,
size: 100,
exchange: "TEST",
offExchangeFlag: false
};
describe("equity-prints storage helpers", () => {
it("keeps required fields intact", () => {
expect(basePrint.offExchangeFlag).toBe(false);
});
it("includes the correct table name in the DDL", () => {
const ddl = equityPrintsTableDDL();
expect(ddl).toContain(EQUITY_PRINTS_TABLE);
expect(ddl).toContain("CREATE TABLE IF NOT EXISTS");
});
});

View file

@ -6,7 +6,11 @@
"dev": "bun run src/index.ts" "dev": "bun run src/index.ts"
}, },
"dependencies": { "dependencies": {
"@islandflow/bus": "workspace:*",
"@islandflow/config": "workspace:*", "@islandflow/config": "workspace:*",
"@islandflow/observability": "workspace:*" "@islandflow/observability": "workspace:*",
"@islandflow/storage": "workspace:*",
"@islandflow/types": "workspace:*",
"zod": "^3.23.8"
} }
} }

View file

@ -1,17 +1,163 @@
import { readEnv } from "@islandflow/config";
import { createLogger } from "@islandflow/observability"; import { createLogger } from "@islandflow/observability";
import {
SUBJECT_EQUITY_PRINTS,
STREAM_EQUITY_PRINTS,
connectJetStreamWithRetry,
ensureStream,
publishJson
} from "@islandflow/bus";
import {
createClickHouseClient,
ensureEquityPrintsTable,
insertEquityPrint
} from "@islandflow/storage";
import { EquityPrintSchema, type EquityPrint } from "@islandflow/types";
import { z } from "zod";
const service = "ingest-equities"; const service = "ingest-equities";
const logger = createLogger({ service }); const logger = createLogger({ service });
const envSchema = z.object({
NATS_URL: z.string().default("nats://localhost:4222"),
CLICKHOUSE_URL: z.string().default("http://localhost:8123"),
CLICKHOUSE_DATABASE: z.string().default("default"),
EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000)
});
const env = readEnv(envSchema);
const state = {
shuttingDown: false,
seq: 0,
timer: null as ReturnType<typeof setInterval> | null
};
const retry = async <T>(
label: string,
attempts: number,
delayMs: number,
task: () => Promise<T>
): Promise<T> => {
let lastError: unknown;
for (let attempt = 1; attempt <= attempts; attempt += 1) {
try {
return await task();
} catch (error) {
lastError = error;
logger.warn(`${label} attempt failed`, {
attempt,
error: error instanceof Error ? error.message : String(error)
});
if (attempt < attempts) {
await new Promise((resolve) => setTimeout(resolve, delayMs));
}
}
}
throw lastError ?? new Error(`${label} failed after retries`);
};
const buildSyntheticPrint = (): EquityPrint => {
const now = Date.now();
state.seq += 1;
return {
source_ts: now,
ingest_ts: now,
seq: state.seq,
trace_id: `ingest-equities-${state.seq}`,
ts: now,
underlying_id: "SPY",
price: 450.1,
size: 100,
exchange: "TEST",
offExchangeFlag: false
};
};
const run = async () => {
logger.info("service starting"); logger.info("service starting");
const shutdown = (signal: string) => { const { nc, js, jsm } = await connectJetStreamWithRetry(
{
servers: env.NATS_URL,
name: service
},
{ attempts: 20, delayMs: 500 }
);
await ensureStream(jsm, {
name: STREAM_EQUITY_PRINTS,
subjects: [SUBJECT_EQUITY_PRINTS],
retention: "limits",
storage: "file",
discard: "old",
max_msgs_per_subject: -1,
max_msgs: -1,
max_bytes: -1,
max_age: 0,
num_replicas: 1
});
const clickhouse = createClickHouseClient({
url: env.CLICKHOUSE_URL,
database: env.CLICKHOUSE_DATABASE
});
await retry("clickhouse table init", 20, 500, async () => {
await ensureEquityPrintsTable(clickhouse);
});
const emit = async () => {
if (state.shuttingDown) {
return;
}
const candidate = buildSyntheticPrint();
const print = EquityPrintSchema.parse(candidate);
try {
await insertEquityPrint(clickhouse, print);
await publishJson(js, SUBJECT_EQUITY_PRINTS, print);
logger.info("published equity print", {
trace_id: print.trace_id,
seq: print.seq,
underlying_id: print.underlying_id
});
} catch (error) {
logger.error("failed to publish equity print", {
error: error instanceof Error ? error.message : String(error),
trace_id: print.trace_id
});
}
};
state.timer = setInterval(() => {
void emit();
}, env.EMIT_INTERVAL_MS);
const shutdown = async (signal: string) => {
if (state.shuttingDown) {
return;
}
state.shuttingDown = true;
if (state.timer) {
clearInterval(state.timer);
}
logger.info("service stopping", { signal }); logger.info("service stopping", { signal });
await nc.drain();
await clickhouse.close();
process.exit(0); process.exit(0);
}; };
process.on("SIGINT", () => shutdown("SIGINT")); process.on("SIGINT", () => void shutdown("SIGINT"));
process.on("SIGTERM", () => shutdown("SIGTERM")); process.on("SIGTERM", () => void shutdown("SIGTERM"));
};
// Keep the process alive until real listeners are wired. await run();
setInterval(() => {}, 60_000);