switch clickhouse storage client to fetch
This commit is contained in:
parent
624c16b711
commit
25e3097bb1
1 changed files with 126 additions and 12 deletions
|
|
@ -1,4 +1,3 @@
|
||||||
import { createClient, type ClickHouseClient } from "@clickhouse/client";
|
|
||||||
import {
|
import {
|
||||||
AlertEventSchema,
|
AlertEventSchema,
|
||||||
ClassifierHitEventSchema,
|
ClassifierHitEventSchema,
|
||||||
|
|
@ -87,18 +86,133 @@ export type ClickHouseOptions = {
|
||||||
password?: string;
|
password?: string;
|
||||||
};
|
};
|
||||||
|
|
||||||
export const createClickHouseClient = (options: ClickHouseOptions): ClickHouseClient => {
|
type ClickHouseQueryFormat = "JSONEachRow";
|
||||||
return createClient({
|
|
||||||
url: options.url,
|
type ClickHouseQueryResult = {
|
||||||
database: options.database,
|
json<T>(): Promise<T>;
|
||||||
username: options.username,
|
};
|
||||||
password: options.password,
|
|
||||||
// Bun can reach ClickHouse via fetch, but the Node agent keep-alive path
|
export type ClickHouseClient = {
|
||||||
// used by this client has been unreliable in our container deployment.
|
exec(params: { query: string }): Promise<void>;
|
||||||
keep_alive: {
|
insert(params: { table: string; values: unknown[]; format: ClickHouseQueryFormat }): Promise<void>;
|
||||||
enabled: false
|
query(params: { query: string; format: ClickHouseQueryFormat }): Promise<ClickHouseQueryResult>;
|
||||||
|
ping(): Promise<{ success: boolean; error?: Error }>;
|
||||||
|
close(): Promise<void>;
|
||||||
|
};
|
||||||
|
|
||||||
|
const buildBaseUrl = (options: ClickHouseOptions): URL => {
|
||||||
|
const url = new URL(options.url);
|
||||||
|
|
||||||
|
if (options.database) {
|
||||||
|
url.searchParams.set("database", options.database);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return url;
|
||||||
|
};
|
||||||
|
|
||||||
|
const buildHeaders = (options: ClickHouseOptions, hasBody: boolean): Headers => {
|
||||||
|
const headers = new Headers();
|
||||||
|
|
||||||
|
if (hasBody) {
|
||||||
|
headers.set("content-type", "text/plain; charset=utf-8");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (options.username || options.password) {
|
||||||
|
const auth = Buffer.from(`${options.username ?? "default"}:${options.password ?? ""}`).toString("base64");
|
||||||
|
headers.set("authorization", `Basic ${auth}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
return headers;
|
||||||
|
};
|
||||||
|
|
||||||
|
const executeClickHouse = async (
|
||||||
|
options: ClickHouseOptions,
|
||||||
|
query: string,
|
||||||
|
body?: string
|
||||||
|
): Promise<Response> => {
|
||||||
|
const url = buildBaseUrl(options);
|
||||||
|
url.searchParams.set("query", query);
|
||||||
|
|
||||||
|
const response = await fetch(url, {
|
||||||
|
method: "POST",
|
||||||
|
headers: buildHeaders(options, body !== undefined),
|
||||||
|
body
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
const message = (await response.text()).trim() || `${response.status} ${response.statusText}`;
|
||||||
|
throw new Error(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
return response;
|
||||||
|
};
|
||||||
|
|
||||||
|
const parseJsonEachRow = <T>(text: string): T => {
|
||||||
|
const trimmed = text.trim();
|
||||||
|
|
||||||
|
if (!trimmed) {
|
||||||
|
return [] as T;
|
||||||
|
}
|
||||||
|
|
||||||
|
const rows = trimmed
|
||||||
|
.split("\n")
|
||||||
|
.filter((line) => line.trim().length > 0)
|
||||||
|
.map((line) => JSON.parse(line));
|
||||||
|
|
||||||
|
return rows as T;
|
||||||
|
};
|
||||||
|
|
||||||
|
export const createClickHouseClient = (options: ClickHouseOptions): ClickHouseClient => {
|
||||||
|
return {
|
||||||
|
async exec({ query }) {
|
||||||
|
await executeClickHouse(options, query);
|
||||||
|
},
|
||||||
|
|
||||||
|
async insert({ table, values, format }) {
|
||||||
|
const rows = values.map((value) => JSON.stringify(value)).join("\n");
|
||||||
|
const body = rows.length > 0 ? `${rows}\n` : "";
|
||||||
|
await executeClickHouse(options, `INSERT INTO ${table} FORMAT ${format}`, body);
|
||||||
|
},
|
||||||
|
|
||||||
|
async query({ query, format }) {
|
||||||
|
const response = await executeClickHouse(options, `${query} FORMAT ${format}`);
|
||||||
|
return {
|
||||||
|
async json<T>() {
|
||||||
|
const text = await response.text();
|
||||||
|
return parseJsonEachRow<T>(text);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
},
|
||||||
|
|
||||||
|
async ping() {
|
||||||
|
try {
|
||||||
|
const url = buildBaseUrl(options);
|
||||||
|
url.pathname = "/ping";
|
||||||
|
|
||||||
|
const response = await fetch(url, {
|
||||||
|
method: "GET",
|
||||||
|
headers: buildHeaders(options, false)
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
const message = (await response.text()).trim() || `${response.status} ${response.statusText}`;
|
||||||
|
return { success: false, error: new Error(message) };
|
||||||
|
}
|
||||||
|
|
||||||
|
return { success: true };
|
||||||
|
} catch (error) {
|
||||||
|
if (error instanceof Error) {
|
||||||
|
return { success: false, error };
|
||||||
|
}
|
||||||
|
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
async close() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
export const ensureOptionPrintsTable = async (
|
export const ensureOptionPrintsTable = async (
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue