Add event bus and storage layer

This commit is contained in:
dirtydishes 2025-12-27 19:14:27 -05:00
parent 9ba51d8e96
commit 488ae82ed6
19 changed files with 537 additions and 21 deletions

11
packages/bus/package.json Normal file
View file

@ -0,0 +1,11 @@
{
"name": "@islandflow/bus",
"private": true,
"type": "module",
"exports": {
".": "./src/index.ts"
},
"dependencies": {
"nats": "^2.24.0"
}
}

View file

@ -0,0 +1,2 @@
export * from "./jetstream";
export * from "./subjects";

View file

@ -0,0 +1,125 @@
import {
connect,
consumerOpts,
type ConsumerOptsBuilder,
type JetStreamClient,
type JetStreamManager,
type NatsConnection,
type StreamConfig,
JSONCodec,
type JsMsg,
createInbox
} from "nats";
export type NatsConnectionOptions = {
servers: string | string[];
name?: string;
timeoutMs?: number;
};
export type JetStreamConnection = {
nc: NatsConnection;
js: JetStreamClient;
jsm: JetStreamManager;
};
export type RetryOptions = {
attempts: number;
delayMs: number;
};
const sleep = (delayMs: number): Promise<void> => {
return new Promise((resolve) => setTimeout(resolve, delayMs));
};
export const connectJetStream = async (
options: NatsConnectionOptions
): Promise<JetStreamConnection> => {
const nc = await connect({
servers: options.servers,
name: options.name,
timeout: options.timeoutMs
});
const js = nc.jetstream();
const jsm = await nc.jetstreamManager();
return { nc, js, jsm };
};
export const connectJetStreamWithRetry = async (
options: NatsConnectionOptions,
retry: RetryOptions
): Promise<JetStreamConnection> => {
let lastError: unknown;
for (let attempt = 1; attempt <= retry.attempts; attempt += 1) {
try {
return await connectJetStream(options);
} catch (error) {
lastError = error;
if (attempt < retry.attempts) {
await sleep(retry.delayMs);
}
}
}
throw lastError ?? new Error("Failed to connect to NATS");
};
export const ensureStream = async (
jsm: JetStreamManager,
config: StreamConfig
): Promise<void> => {
try {
await jsm.streams.info(config.name);
return;
} catch (error) {
if (error instanceof Error && error.message.includes("not found")) {
await jsm.streams.add(config);
return;
}
throw error;
}
};
export const buildDurableConsumer = (
durableName: string,
deliverSubject: string = createInbox()
): ConsumerOptsBuilder => {
const opts = consumerOpts();
opts.durable(durableName);
opts.manualAck();
opts.ackExplicit();
opts.deliverTo(deliverSubject);
return opts;
};
export const publishJson = async <T>(
js: JetStreamClient,
subject: string,
payload: T
): Promise<void> => {
const codec = JSONCodec<T>();
await js.publish(subject, codec.encode(payload));
};
export type JsonSubscription<T> = {
messages: AsyncIterable<JsMsg>;
decode: (msg: JsMsg) => T;
};
export const subscribeJson = async <T>(
js: JetStreamClient,
subject: string,
opts: ConsumerOptsBuilder
): Promise<JsonSubscription<T>> => {
const codec = JSONCodec<T>();
const sub = await js.subscribe(subject, opts);
return {
messages: sub,
decode: (msg) => codec.decode(msg.data)
};
};

View file

@ -0,0 +1,2 @@
export const STREAM_OPTION_PRINTS = "OPTIONS_PRINTS";
export const SUBJECT_OPTION_PRINTS = "options.prints";

View file

@ -0,0 +1,7 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"types": []
},
"include": ["src/**/*.ts"]
}

View file

@ -0,0 +1,12 @@
{
"name": "@islandflow/storage",
"private": true,
"type": "module",
"exports": {
".": "./src/index.ts"
},
"dependencies": {
"@clickhouse/client": "^0.2.6",
"@islandflow/types": "workspace:*"
}
}

View file

@ -0,0 +1,39 @@
import { createClient, type ClickHouseClient } from "@clickhouse/client";
import type { OptionPrint } from "@islandflow/types";
import { normalizeOptionPrint, optionPrintsTableDDL, OPTION_PRINTS_TABLE } from "./option-prints";
export type ClickHouseOptions = {
url: string;
database?: string;
username?: string;
password?: string;
};
export const createClickHouseClient = (options: ClickHouseOptions): ClickHouseClient => {
return createClient({
url: options.url,
database: options.database,
username: options.username,
password: options.password
});
};
export const ensureOptionPrintsTable = async (
client: ClickHouseClient
): Promise<void> => {
await client.exec({
query: optionPrintsTableDDL()
});
};
export const insertOptionPrint = async (
client: ClickHouseClient,
print: OptionPrint
): Promise<void> => {
const record = normalizeOptionPrint(print);
await client.insert({
table: OPTION_PRINTS_TABLE,
values: [record],
format: "JSONEachRow"
});
};

View file

@ -0,0 +1,2 @@
export * from "./clickhouse";
export * from "./option-prints";

View file

@ -0,0 +1,29 @@
import type { OptionPrint } from "@islandflow/types";
export const OPTION_PRINTS_TABLE = "option_prints";
export const optionPrintsTableDDL = (): string => {
return `
CREATE TABLE IF NOT EXISTS ${OPTION_PRINTS_TABLE} (
source_ts UInt64,
ingest_ts UInt64,
seq UInt64,
trace_id String,
ts UInt64,
option_contract_id String,
price Float64,
size UInt32,
exchange String,
conditions Array(String)
)
ENGINE = MergeTree
ORDER BY (ts, option_contract_id)
`;
};
export const normalizeOptionPrint = (print: OptionPrint): OptionPrint => {
return {
...print,
conditions: print.conditions ?? []
};
};

View file

@ -0,0 +1,27 @@
import { describe, expect, it } from "bun:test";
import { normalizeOptionPrint, optionPrintsTableDDL, OPTION_PRINTS_TABLE } from "../src/option-prints";
const basePrint = {
source_ts: 100,
ingest_ts: 200,
seq: 1,
trace_id: "trace-1",
ts: 100,
option_contract_id: "SPY-2025-01-17-450-C",
price: 1.25,
size: 10,
exchange: "TEST"
};
describe("option-prints storage helpers", () => {
it("normalizes missing conditions to empty array", () => {
const normalized = normalizeOptionPrint(basePrint);
expect(normalized.conditions).toEqual([]);
});
it("includes the correct table name in the DDL", () => {
const ddl = optionPrintsTableDDL();
expect(ddl).toContain(OPTION_PRINTS_TABLE);
expect(ddl).toContain("CREATE TABLE IF NOT EXISTS");
});
});

View file

@ -0,0 +1,7 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"types": []
},
"include": ["src/**/*.ts", "tests/**/*.ts"]
}