Add API gateway for prints
This commit is contained in:
parent
6a1f457028
commit
ba84554b0b
4 changed files with 346 additions and 9 deletions
4
bun.lock
4
bun.lock
|
|
@ -44,8 +44,12 @@
|
|||
"services/api": {
|
||||
"name": "@islandflow/api",
|
||||
"dependencies": {
|
||||
"@islandflow/bus": "workspace:*",
|
||||
"@islandflow/config": "workspace:*",
|
||||
"@islandflow/observability": "workspace:*",
|
||||
"@islandflow/storage": "workspace:*",
|
||||
"@islandflow/types": "workspace:*",
|
||||
"zod": "^3.23.8",
|
||||
},
|
||||
},
|
||||
"services/candles": {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
import { createClient, type ClickHouseClient } from "@clickhouse/client";
|
||||
import { EquityPrintSchema, OptionPrintSchema } from "@islandflow/types";
|
||||
import type { EquityPrint, OptionPrint } from "@islandflow/types";
|
||||
import {
|
||||
normalizeOptionPrint,
|
||||
|
|
@ -66,3 +67,104 @@ export const insertEquityPrint = async (
|
|||
format: "JSONEachRow"
|
||||
});
|
||||
};
|
||||
|
||||
const clampLimit = (limit: number): number => {
|
||||
if (!Number.isFinite(limit)) {
|
||||
return 100;
|
||||
}
|
||||
|
||||
return Math.max(1, Math.min(1000, Math.floor(limit)));
|
||||
};
|
||||
|
||||
const coerceNumber = (value: unknown): unknown => {
|
||||
if (typeof value === "string") {
|
||||
const parsed = Number(value);
|
||||
if (Number.isFinite(parsed)) {
|
||||
return parsed;
|
||||
}
|
||||
}
|
||||
|
||||
return value;
|
||||
};
|
||||
|
||||
const normalizeNumericFields = (
|
||||
row: Record<string, unknown>,
|
||||
fields: string[]
|
||||
): Record<string, unknown> => {
|
||||
const record: Record<string, unknown> = { ...row };
|
||||
|
||||
for (const field of fields) {
|
||||
if (field in record) {
|
||||
record[field] = coerceNumber(record[field]);
|
||||
}
|
||||
}
|
||||
|
||||
return record;
|
||||
};
|
||||
|
||||
export const fetchRecentOptionPrints = async (
|
||||
client: ClickHouseClient,
|
||||
limit: number
|
||||
): Promise<OptionPrint[]> => {
|
||||
const safeLimit = clampLimit(limit);
|
||||
const result = await client.query({
|
||||
query: `SELECT * FROM ${OPTION_PRINTS_TABLE} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`,
|
||||
format: "JSONEachRow"
|
||||
});
|
||||
|
||||
const rows = await result.json<unknown[]>();
|
||||
const normalized = rows.map((row) => {
|
||||
if (row && typeof row === "object") {
|
||||
return normalizeNumericFields(row as Record<string, unknown>, [
|
||||
"source_ts",
|
||||
"ingest_ts",
|
||||
"seq",
|
||||
"ts",
|
||||
"price",
|
||||
"size"
|
||||
]);
|
||||
}
|
||||
|
||||
return row;
|
||||
});
|
||||
|
||||
return OptionPrintSchema.array().parse(normalized);
|
||||
};
|
||||
|
||||
export const fetchRecentEquityPrints = async (
|
||||
client: ClickHouseClient,
|
||||
limit: number
|
||||
): Promise<EquityPrint[]> => {
|
||||
const safeLimit = clampLimit(limit);
|
||||
const result = await client.query({
|
||||
query: `SELECT * FROM ${EQUITY_PRINTS_TABLE} ORDER BY ts DESC, seq DESC LIMIT ${safeLimit}`,
|
||||
format: "JSONEachRow"
|
||||
});
|
||||
|
||||
const rows = await result.json<unknown[]>();
|
||||
const normalized = rows.map((row) => {
|
||||
if (row && typeof row === "object") {
|
||||
const record = normalizeNumericFields(row as Record<string, unknown>, [
|
||||
"source_ts",
|
||||
"ingest_ts",
|
||||
"seq",
|
||||
"ts",
|
||||
"price",
|
||||
"size"
|
||||
]);
|
||||
|
||||
if ("offExchangeFlag" in record) {
|
||||
return {
|
||||
...record,
|
||||
offExchangeFlag: Boolean(record.offExchangeFlag)
|
||||
};
|
||||
}
|
||||
|
||||
return record;
|
||||
}
|
||||
|
||||
return row;
|
||||
});
|
||||
|
||||
return EquityPrintSchema.array().parse(normalized);
|
||||
};
|
||||
|
|
|
|||
|
|
@ -6,7 +6,11 @@
|
|||
"dev": "bun run src/index.ts"
|
||||
},
|
||||
"dependencies": {
|
||||
"@islandflow/bus": "workspace:*",
|
||||
"@islandflow/config": "workspace:*",
|
||||
"@islandflow/observability": "workspace:*"
|
||||
"@islandflow/observability": "workspace:*",
|
||||
"@islandflow/storage": "workspace:*",
|
||||
"@islandflow/types": "workspace:*",
|
||||
"zod": "^3.23.8"
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,17 +1,244 @@
|
|||
import { readEnv } from "@islandflow/config";
|
||||
import { createLogger } from "@islandflow/observability";
|
||||
import {
|
||||
SUBJECT_EQUITY_PRINTS,
|
||||
SUBJECT_OPTION_PRINTS,
|
||||
STREAM_EQUITY_PRINTS,
|
||||
STREAM_OPTION_PRINTS,
|
||||
buildDurableConsumer,
|
||||
connectJetStreamWithRetry,
|
||||
ensureStream,
|
||||
subscribeJson
|
||||
} from "@islandflow/bus";
|
||||
import {
|
||||
createClickHouseClient,
|
||||
ensureEquityPrintsTable,
|
||||
ensureOptionPrintsTable,
|
||||
fetchRecentEquityPrints,
|
||||
fetchRecentOptionPrints
|
||||
} from "@islandflow/storage";
|
||||
import { EquityPrintSchema, OptionPrintSchema } from "@islandflow/types";
|
||||
import { z } from "zod";
|
||||
|
||||
const service = "api";
|
||||
const logger = createLogger({ service });
|
||||
|
||||
const envSchema = z.object({
|
||||
API_PORT: z.coerce.number().int().positive().default(4000),
|
||||
NATS_URL: z.string().default("nats://localhost:4222"),
|
||||
CLICKHOUSE_URL: z.string().default("http://localhost:8123"),
|
||||
CLICKHOUSE_DATABASE: z.string().default("default"),
|
||||
REST_DEFAULT_LIMIT: z.coerce.number().int().positive().default(200)
|
||||
});
|
||||
|
||||
const env = readEnv(envSchema);
|
||||
|
||||
const limitSchema = z.coerce.number().int().positive().max(1000);
|
||||
|
||||
type Channel = "options" | "equities";
|
||||
|
||||
type WsData = {
|
||||
channel: Channel;
|
||||
};
|
||||
|
||||
const optionSockets = new Set<WebSocket<WsData>>();
|
||||
const equitySockets = new Set<WebSocket<WsData>>();
|
||||
|
||||
const jsonResponse = (body: unknown, status = 200): Response => {
|
||||
return new Response(JSON.stringify(body), {
|
||||
status,
|
||||
headers: {
|
||||
"content-type": "application/json"
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
const parseLimit = (value: string | null): number => {
|
||||
if (value === null) {
|
||||
return env.REST_DEFAULT_LIMIT;
|
||||
}
|
||||
|
||||
return limitSchema.parse(value);
|
||||
};
|
||||
|
||||
const broadcast = (sockets: Set<WebSocket<WsData>>, payload: unknown): void => {
|
||||
const message = JSON.stringify(payload);
|
||||
|
||||
for (const socket of sockets) {
|
||||
try {
|
||||
socket.send(message);
|
||||
} catch (error) {
|
||||
logger.warn("failed to send websocket message", {
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
sockets.delete(socket);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const run = async () => {
|
||||
logger.info("service starting");
|
||||
|
||||
const shutdown = (signal: string) => {
|
||||
const { nc, js, jsm } = await connectJetStreamWithRetry(
|
||||
{
|
||||
servers: env.NATS_URL,
|
||||
name: service
|
||||
},
|
||||
{ attempts: 20, delayMs: 500 }
|
||||
);
|
||||
|
||||
await ensureStream(jsm, {
|
||||
name: STREAM_OPTION_PRINTS,
|
||||
subjects: [SUBJECT_OPTION_PRINTS],
|
||||
retention: "limits",
|
||||
storage: "file",
|
||||
discard: "old",
|
||||
max_msgs_per_subject: -1,
|
||||
max_msgs: -1,
|
||||
max_bytes: -1,
|
||||
max_age: 0,
|
||||
num_replicas: 1
|
||||
});
|
||||
|
||||
await ensureStream(jsm, {
|
||||
name: STREAM_EQUITY_PRINTS,
|
||||
subjects: [SUBJECT_EQUITY_PRINTS],
|
||||
retention: "limits",
|
||||
storage: "file",
|
||||
discard: "old",
|
||||
max_msgs_per_subject: -1,
|
||||
max_msgs: -1,
|
||||
max_bytes: -1,
|
||||
max_age: 0,
|
||||
num_replicas: 1
|
||||
});
|
||||
|
||||
const clickhouse = createClickHouseClient({
|
||||
url: env.CLICKHOUSE_URL,
|
||||
database: env.CLICKHOUSE_DATABASE
|
||||
});
|
||||
|
||||
await ensureOptionPrintsTable(clickhouse);
|
||||
await ensureEquityPrintsTable(clickhouse);
|
||||
|
||||
const optionSubscription = await subscribeJson(
|
||||
js,
|
||||
SUBJECT_OPTION_PRINTS,
|
||||
buildDurableConsumer("api-option-prints")
|
||||
);
|
||||
|
||||
const equitySubscription = await subscribeJson(
|
||||
js,
|
||||
SUBJECT_EQUITY_PRINTS,
|
||||
buildDurableConsumer("api-equity-prints")
|
||||
);
|
||||
|
||||
const pumpOptions = async () => {
|
||||
for await (const msg of optionSubscription.messages) {
|
||||
try {
|
||||
const payload = OptionPrintSchema.parse(optionSubscription.decode(msg));
|
||||
broadcast(optionSockets, { type: "option-print", payload });
|
||||
msg.ack();
|
||||
} catch (error) {
|
||||
logger.error("failed to process option print", {
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
msg.term();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const pumpEquities = async () => {
|
||||
for await (const msg of equitySubscription.messages) {
|
||||
try {
|
||||
const payload = EquityPrintSchema.parse(equitySubscription.decode(msg));
|
||||
broadcast(equitySockets, { type: "equity-print", payload });
|
||||
msg.ack();
|
||||
} catch (error) {
|
||||
logger.error("failed to process equity print", {
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
msg.term();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
void pumpOptions();
|
||||
void pumpEquities();
|
||||
|
||||
const server = Bun.serve<WsData>({
|
||||
port: env.API_PORT,
|
||||
fetch: async (req, serverRef) => {
|
||||
const url = new URL(req.url);
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/health") {
|
||||
return jsonResponse({ status: "ok" });
|
||||
}
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/prints/options") {
|
||||
const limit = parseLimit(url.searchParams.get("limit"));
|
||||
const data = await fetchRecentOptionPrints(clickhouse, limit);
|
||||
return jsonResponse({ data });
|
||||
}
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/prints/equities") {
|
||||
const limit = parseLimit(url.searchParams.get("limit"));
|
||||
const data = await fetchRecentEquityPrints(clickhouse, limit);
|
||||
return jsonResponse({ data });
|
||||
}
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/ws/options") {
|
||||
if (serverRef.upgrade(req, { data: { channel: "options" } })) {
|
||||
return new Response(null, { status: 101 });
|
||||
}
|
||||
|
||||
return jsonResponse({ error: "websocket upgrade failed" }, 400);
|
||||
}
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/ws/equities") {
|
||||
if (serverRef.upgrade(req, { data: { channel: "equities" } })) {
|
||||
return new Response(null, { status: 101 });
|
||||
}
|
||||
|
||||
return jsonResponse({ error: "websocket upgrade failed" }, 400);
|
||||
}
|
||||
|
||||
return jsonResponse({ error: "not found" }, 404);
|
||||
},
|
||||
websocket: {
|
||||
open: (socket) => {
|
||||
if (socket.data.channel === "options") {
|
||||
optionSockets.add(socket);
|
||||
} else {
|
||||
equitySockets.add(socket);
|
||||
}
|
||||
|
||||
logger.info("websocket connected", { channel: socket.data.channel });
|
||||
},
|
||||
close: (socket) => {
|
||||
if (socket.data.channel === "options") {
|
||||
optionSockets.delete(socket);
|
||||
} else {
|
||||
equitySockets.delete(socket);
|
||||
}
|
||||
|
||||
logger.info("websocket disconnected", { channel: socket.data.channel });
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
logger.info("api listening", { port: server.port });
|
||||
|
||||
const shutdown = async (signal: string) => {
|
||||
logger.info("service stopping", { signal });
|
||||
server.stop();
|
||||
await nc.drain();
|
||||
await clickhouse.close();
|
||||
process.exit(0);
|
||||
};
|
||||
|
||||
process.on("SIGINT", () => shutdown("SIGINT"));
|
||||
process.on("SIGTERM", () => shutdown("SIGTERM"));
|
||||
process.on("SIGINT", () => void shutdown("SIGINT"));
|
||||
process.on("SIGTERM", () => void shutdown("SIGTERM"));
|
||||
};
|
||||
|
||||
// Keep the process alive until real listeners are wired.
|
||||
setInterval(() => {}, 60_000);
|
||||
await run();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue