From 25e3097bb1ec5996c56983c8c7f80953d0bc22d5 Mon Sep 17 00:00:00 2001 From: Kellan Drucquer Date: Sat, 4 Apr 2026 22:17:31 -0400 Subject: [PATCH] switch clickhouse storage client to fetch --- packages/storage/src/clickhouse.ts | 138 ++++++++++++++++++++++++++--- 1 file changed, 126 insertions(+), 12 deletions(-) diff --git a/packages/storage/src/clickhouse.ts b/packages/storage/src/clickhouse.ts index 45829dd..1f72299 100644 --- a/packages/storage/src/clickhouse.ts +++ b/packages/storage/src/clickhouse.ts @@ -1,4 +1,3 @@ -import { createClient, type ClickHouseClient } from "@clickhouse/client"; import { AlertEventSchema, ClassifierHitEventSchema, @@ -87,18 +86,133 @@ export type ClickHouseOptions = { password?: string; }; -export const createClickHouseClient = (options: ClickHouseOptions): ClickHouseClient => { - return createClient({ - url: options.url, - database: options.database, - username: options.username, - password: options.password, - // Bun can reach ClickHouse via fetch, but the Node agent keep-alive path - // used by this client has been unreliable in our container deployment. - keep_alive: { - enabled: false - } +type ClickHouseQueryFormat = "JSONEachRow"; + +type ClickHouseQueryResult = { + json(): Promise; +}; + +export type ClickHouseClient = { + exec(params: { query: string }): Promise; + insert(params: { table: string; values: unknown[]; format: ClickHouseQueryFormat }): Promise; + query(params: { query: string; format: ClickHouseQueryFormat }): Promise; + ping(): Promise<{ success: boolean; error?: Error }>; + close(): Promise; +}; + +const buildBaseUrl = (options: ClickHouseOptions): URL => { + const url = new URL(options.url); + + if (options.database) { + url.searchParams.set("database", options.database); + } + + return url; +}; + +const buildHeaders = (options: ClickHouseOptions, hasBody: boolean): Headers => { + const headers = new Headers(); + + if (hasBody) { + headers.set("content-type", "text/plain; charset=utf-8"); + } + + if (options.username || options.password) { + const auth = Buffer.from(`${options.username ?? "default"}:${options.password ?? ""}`).toString("base64"); + headers.set("authorization", `Basic ${auth}`); + } + + return headers; +}; + +const executeClickHouse = async ( + options: ClickHouseOptions, + query: string, + body?: string +): Promise => { + const url = buildBaseUrl(options); + url.searchParams.set("query", query); + + const response = await fetch(url, { + method: "POST", + headers: buildHeaders(options, body !== undefined), + body }); + + if (!response.ok) { + const message = (await response.text()).trim() || `${response.status} ${response.statusText}`; + throw new Error(message); + } + + return response; +}; + +const parseJsonEachRow = (text: string): T => { + const trimmed = text.trim(); + + if (!trimmed) { + return [] as T; + } + + const rows = trimmed + .split("\n") + .filter((line) => line.trim().length > 0) + .map((line) => JSON.parse(line)); + + return rows as T; +}; + +export const createClickHouseClient = (options: ClickHouseOptions): ClickHouseClient => { + return { + async exec({ query }) { + await executeClickHouse(options, query); + }, + + async insert({ table, values, format }) { + const rows = values.map((value) => JSON.stringify(value)).join("\n"); + const body = rows.length > 0 ? `${rows}\n` : ""; + await executeClickHouse(options, `INSERT INTO ${table} FORMAT ${format}`, body); + }, + + async query({ query, format }) { + const response = await executeClickHouse(options, `${query} FORMAT ${format}`); + return { + async json() { + const text = await response.text(); + return parseJsonEachRow(text); + } + }; + }, + + async ping() { + try { + const url = buildBaseUrl(options); + url.pathname = "/ping"; + + const response = await fetch(url, { + method: "GET", + headers: buildHeaders(options, false) + }); + + if (!response.ok) { + const message = (await response.text()).trim() || `${response.status} ${response.statusText}`; + return { success: false, error: new Error(message) }; + } + + return { success: true }; + } catch (error) { + if (error instanceof Error) { + return { success: false, error }; + } + + throw error; + } + }, + + async close() { + return; + } + }; }; export const ensureOptionPrintsTable = async (