diff --git a/apps/web/tsconfig.json b/apps/web/tsconfig.json index c99989f..bdf5e1a 100644 --- a/apps/web/tsconfig.json +++ b/apps/web/tsconfig.json @@ -24,6 +24,7 @@ ".next/types/**/*.ts" ], "exclude": [ - "node_modules" + "node_modules", + "scripts" ] } diff --git a/bun.lock b/bun.lock index bc0ef55..30a81e3 100644 --- a/bun.lock +++ b/bun.lock @@ -71,6 +71,7 @@ "@islandflow/bus": "workspace:*", "@islandflow/config": "workspace:*", "@islandflow/observability": "workspace:*", + "@islandflow/storage": "workspace:*", "@islandflow/types": "workspace:*", "zod": "^3.23.8", }, diff --git a/services/api/src/index.ts b/services/api/src/index.ts index a2b7608..444e47c 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -39,6 +39,33 @@ const envSchema = z.object({ const env = readEnv(envSchema); +const retry = async ( + label: string, + attempts: number, + delayMs: number, + task: () => Promise +): Promise => { + let lastError: unknown; + + for (let attempt = 1; attempt <= attempts; attempt += 1) { + try { + return await task(); + } catch (error) { + lastError = error; + logger.warn(`${label} attempt failed`, { + attempt, + error: error instanceof Error ? error.message : String(error) + }); + + if (attempt < attempts) { + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } + } + } + + throw lastError ?? new Error(`${label} failed after retries`); +}; + const limitSchema = z.coerce.number().int().positive().max(1000); const replayParamsSchema = z.object({ after_ts: z.coerce.number().int().nonnegative().default(0), @@ -157,9 +184,11 @@ const run = async () => { database: env.CLICKHOUSE_DATABASE }); - await ensureOptionPrintsTable(clickhouse); - await ensureEquityPrintsTable(clickhouse); - await ensureFlowPacketsTable(clickhouse); + await retry("clickhouse table init", 20, 500, async () => { + await ensureOptionPrintsTable(clickhouse); + await ensureEquityPrintsTable(clickhouse); + await ensureFlowPacketsTable(clickhouse); + }); const optionSubscription = await subscribeJson( js, diff --git a/services/compute/src/index.ts b/services/compute/src/index.ts index e29c342..31fe445 100644 --- a/services/compute/src/index.ts +++ b/services/compute/src/index.ts @@ -31,6 +31,33 @@ const envSchema = z.object({ const env = readEnv(envSchema); +const retry = async ( + label: string, + attempts: number, + delayMs: number, + task: () => Promise +): Promise => { + let lastError: unknown; + + for (let attempt = 1; attempt <= attempts; attempt += 1) { + try { + return await task(); + } catch (error) { + lastError = error; + logger.warn(`${label} attempt failed`, { + attempt, + error: error instanceof Error ? error.message : String(error) + }); + + if (attempt < attempts) { + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } + } + } + + throw lastError ?? new Error(`${label} failed after retries`); +}; + type ClusterState = { contractId: string; startTs: number; @@ -174,7 +201,9 @@ const run = async () => { database: env.CLICKHOUSE_DATABASE }); - await ensureFlowPacketsTable(clickhouse); + await retry("clickhouse table init", 20, 500, async () => { + await ensureFlowPacketsTable(clickhouse); + }); const subscription = await subscribeJson( js,