From 955eccce3c78004e781e2c99d46909659cf5547a Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Fri, 22 May 2026 23:06:27 -0400 Subject: [PATCH 1/2] gate option print clickhouse persistence by signal pass --- .env.example | 1 + deployment/docker/.env.example | 1 + docs/clickhouse-reset-runbook.md | 27 +++ ...p-persisting-non-signal-option-prints.html | 174 ++++++++++++++++++ services/ingest-options/src/index.ts | 29 ++- services/ingest-options/src/trade-pipeline.ts | 24 +++ .../tests/trade-pipeline.test.ts | 101 ++++++++++ 7 files changed, 352 insertions(+), 5 deletions(-) create mode 100644 docs/turns/2026-05-23-stop-persisting-non-signal-option-prints.html create mode 100644 services/ingest-options/src/trade-pipeline.ts create mode 100644 services/ingest-options/tests/trade-pipeline.test.ts diff --git a/.env.example b/.env.example index be20b62..ebc8a75 100644 --- a/.env.example +++ b/.env.example @@ -19,6 +19,7 @@ ALPACA_MAX_DTE_DAYS=30 ALPACA_MONEYNESS_PCT=0.06 ALPACA_MONEYNESS_FALLBACK_PCT=0.1 ALPACA_MAX_QUOTES=200 +OPTIONS_PERSIST_SIGNAL_ONLY=true # Databento replay DATABENTO_API_KEY= diff --git a/deployment/docker/.env.example b/deployment/docker/.env.example index 4972ada..d34a95a 100644 --- a/deployment/docker/.env.example +++ b/deployment/docker/.env.example @@ -40,6 +40,7 @@ ALPACA_MAX_DTE_DAYS=30 ALPACA_MONEYNESS_PCT=0.06 ALPACA_MONEYNESS_FALLBACK_PCT=0.1 ALPACA_MAX_QUOTES=200 +OPTIONS_PERSIST_SIGNAL_ONLY=true # Databento replay DATABENTO_API_KEY= diff --git a/docs/clickhouse-reset-runbook.md b/docs/clickhouse-reset-runbook.md index dac1775..09d0138 100644 --- a/docs/clickhouse-reset-runbook.md +++ b/docs/clickhouse-reset-runbook.md @@ -55,3 +55,30 @@ docker compose exec clickhouse clickhouse-client --query "SELECT count() FROM fl ``` Restart ingest/API services through the normal dev or deployment path. The options tape should repopulate its 100-row hot head from new signal prints, and older rows should appear only after the scroll gate asks `/history/options` for ClickHouse-backed history. + +## One-Time Cleanup: Remove Non-Signal Option Prints + +If `OPTIONS_PERSIST_SIGNAL_ONLY=true` is enabled, historical rows with `signal_pass = 0` can be removed once to align stored history with the new ingestion policy: + +```bash +docker compose exec clickhouse clickhouse-client --query "ALTER TABLE option_prints DELETE WHERE signal_pass = 0" +``` + +For `deployment/docker/docker-compose.yml`, run the same command with `docker compose -f deployment/docker/docker-compose.yml exec clickhouse ...`. + +Important notes: + +- ClickHouse `ALTER ... DELETE` is asynchronous; deleted rows may still appear until the mutation is applied. +- You can monitor mutation progress: + +```bash +docker compose exec clickhouse clickhouse-client --query "SELECT command, is_done, latest_fail_reason FROM system.mutations WHERE table = 'option_prints' ORDER BY create_time DESC LIMIT 5" +``` + +- After mutation completion, verify row counts: + +```bash +docker compose exec clickhouse clickhouse-client --query "SELECT count() AS remaining_non_signal FROM option_prints WHERE signal_pass = 0" +``` + +- Optional compaction (larger datasets): run `OPTIMIZE TABLE option_prints FINAL` during a maintenance window. diff --git a/docs/turns/2026-05-23-stop-persisting-non-signal-option-prints.html b/docs/turns/2026-05-23-stop-persisting-non-signal-option-prints.html new file mode 100644 index 0000000..57b4ac1 --- /dev/null +++ b/docs/turns/2026-05-23-stop-persisting-non-signal-option-prints.html @@ -0,0 +1,174 @@ + + + + + + Turn Doc: Stop Persisting Non-Signal Option Prints + + + +
+

Stop Persisting Non-Signal Option Prints in ClickHouse

+

Date: 2026-05-23 00:00 EDT
Beads Issue: islandflow-l9h

+ +

Summary

+
+

Implemented a signal-gated persistence path for option prints in ingest-options. With the new default configuration, prints that fail the initial signal gate (signal_pass=false) are no longer inserted into ClickHouse, while JetStream publish behavior remains unchanged.

+
+ +

Changes Made

+ + +

Context

+

The options pipeline enriches and classifies prints before persistence and fanout. Previously, all enriched prints were inserted into ClickHouse regardless of signal eligibility, which retained low-value noise in durable history. The intended direction is to keep durable history aligned with the signal gate while preserving stream fanout compatibility.

+ +

Important Implementation Details

+ + +

Relevant Diff Snippets

+

Unified diffs below are formatted to be compatible with diffs.com rendering conventions.

+
diff --git a/services/ingest-options/src/index.ts b/services/ingest-options/src/index.ts
+@@
++  OPTIONS_PERSIST_SIGNAL_ONLY: z.preprocess(..., z.boolean()).default(true),
+@@
++  logger.info("option print clickhouse persistence mode", { signal_only: env.OPTIONS_PERSIST_SIGNAL_ONLY });
+@@
+-        await insertOptionPrint(clickhouse, print);
+-        await publishJson(js, SUBJECT_OPTION_PRINTS, print);
+-        if (print.signal_pass) {
+-          await publishJson(js, SUBJECT_OPTION_SIGNAL_PRINTS, print);
+-        }
++        await processOptionTrade(print, {
++          persistSignalOnly: env.OPTIONS_PERSIST_SIGNAL_ONLY,
++          persist: async (value) => insertOptionPrint(clickhouse, value),
++          publishRaw: async (value) => publishJson(js, SUBJECT_OPTION_PRINTS, value),
++          publishSignal: async (value) => publishJson(js, SUBJECT_OPTION_SIGNAL_PRINTS, value)
++        });
+ +
diff --git a/services/ingest-options/src/trade-pipeline.ts b/services/ingest-options/src/trade-pipeline.ts
+@@
++export const shouldPersistOptionPrint = (print, persistSignalOnly) => !persistSignalOnly || print.signal_pass === true;
++
++export const processOptionTrade = async (print, deps) => {
++  if (shouldPersistOptionPrint(print, deps.persistSignalOnly)) {
++    await deps.persist(print);
++  }
++  await deps.publishRaw(print);
++  if (print.signal_pass) {
++    await deps.publishSignal(print);
++  }
++};
+ +
diff --git a/docs/clickhouse-reset-runbook.md b/docs/clickhouse-reset-runbook.md
+@@
++## One-Time Cleanup: Remove Non-Signal Option Prints
++docker compose exec clickhouse clickhouse-client --query "ALTER TABLE option_prints DELETE WHERE signal_pass = 0"
++...monitor with system.mutations and verify remaining_non_signal count...
+ +

Expected Impact for End-Users

+

Options history and replay streams backed by ClickHouse contain less noise and better reflect actionable signal flow. This improves signal-to-noise in historical tape usage without changing event schemas or API contract shapes.

+ +

Validation

+ + +

Issues, Limitations, and Mitigations

+ + +

Follow-up Work

+ +
+ + diff --git a/services/ingest-options/src/index.ts b/services/ingest-options/src/index.ts index 301632e..5164b13 100644 --- a/services/ingest-options/src/index.ts +++ b/services/ingest-options/src/index.ts @@ -44,6 +44,7 @@ import { createIbkrOptionsAdapter } from "./adapters/ibkr"; import { createSyntheticOptionsAdapter } from "./adapters/synthetic"; import type { OptionIngestAdapter, StopHandler } from "./adapters/types"; import { enrichOptionPrint, rememberContext, selectAtOrBefore, type ContextHistory } from "./enrichment"; +import { processOptionTrade } from "./trade-pipeline"; import { z } from "zod"; const service = "ingest-options"; @@ -104,6 +105,20 @@ const envSchema = z.object({ OPTIONS_SIGNAL_ETF_UNDERLYINGS: z .string() .default("SPY,QQQ,IWM,DIA,TLT,GLD,SLV,XLF,XLE,XLV,XLI,XLP,XLU,XLY,SMH,ARKK"), + OPTIONS_PERSIST_SIGNAL_ONLY: z + .preprocess((value) => { + if (typeof value === "string") { + const normalized = value.trim().toLowerCase(); + if (["1", "true", "yes", "on"].includes(normalized)) { + return true; + } + if (["0", "false", "no", "off"].includes(normalized)) { + return false; + } + } + return value; + }, z.boolean()) + .default(true), TESTING_MODE: z .preprocess((value) => { if (typeof value === "string") { @@ -400,6 +415,9 @@ const run = async () => { () => syntheticControl ); logger.info("ingest adapter selected", { adapter: adapter.name }); + logger.info("option print clickhouse persistence mode", { + signal_only: env.OPTIONS_PERSIST_SIGNAL_ONLY + }); const allowPublish = buildThrottle(env.TESTING_MODE, env.TESTING_THROTTLE_MS); const allowNbboPublish = buildThrottle(env.TESTING_MODE, env.TESTING_THROTTLE_MS); @@ -426,11 +444,12 @@ const run = async () => { const print = enrichOptionPrint(rawPrint, optionQuote, equityQuote, optionsSignalConfig); try { - await insertOptionPrint(clickhouse, print); - await publishJson(js, SUBJECT_OPTION_PRINTS, print); - if (print.signal_pass) { - await publishJson(js, SUBJECT_OPTION_SIGNAL_PRINTS, print); - } + await processOptionTrade(print, { + persistSignalOnly: env.OPTIONS_PERSIST_SIGNAL_ONLY, + persist: async (value) => insertOptionPrint(clickhouse, value), + publishRaw: async (value) => publishJson(js, SUBJECT_OPTION_PRINTS, value), + publishSignal: async (value) => publishJson(js, SUBJECT_OPTION_SIGNAL_PRINTS, value) + }); } catch (error) { if (isExpectedShutdownError(error)) { return; diff --git a/services/ingest-options/src/trade-pipeline.ts b/services/ingest-options/src/trade-pipeline.ts new file mode 100644 index 0000000..e7c67ec --- /dev/null +++ b/services/ingest-options/src/trade-pipeline.ts @@ -0,0 +1,24 @@ +import type { OptionPrint } from "@islandflow/types"; + +export type ProcessOptionTradeDeps = { + persistSignalOnly: boolean; + persist: (print: OptionPrint) => Promise; + publishRaw: (print: OptionPrint) => Promise; + publishSignal: (print: OptionPrint) => Promise; +}; + +export const shouldPersistOptionPrint = (print: Pick, persistSignalOnly: boolean): boolean => { + return !persistSignalOnly || print.signal_pass === true; +}; + +export const processOptionTrade = async (print: OptionPrint, deps: ProcessOptionTradeDeps): Promise => { + if (shouldPersistOptionPrint(print, deps.persistSignalOnly)) { + await deps.persist(print); + } + + await deps.publishRaw(print); + + if (print.signal_pass) { + await deps.publishSignal(print); + } +}; diff --git a/services/ingest-options/tests/trade-pipeline.test.ts b/services/ingest-options/tests/trade-pipeline.test.ts new file mode 100644 index 0000000..8679abf --- /dev/null +++ b/services/ingest-options/tests/trade-pipeline.test.ts @@ -0,0 +1,101 @@ +import { describe, expect, it } from "bun:test"; +import type { OptionPrint } from "@islandflow/types"; +import { processOptionTrade, shouldPersistOptionPrint } from "../src/trade-pipeline"; + +const makePrint = (signalPass: boolean): OptionPrint => ({ + source_ts: 1_000, + ingest_ts: 1_001, + seq: 1, + trace_id: `print-${signalPass ? "pass" : "fail"}`, + ts: 1_000, + option_contract_id: "SPY-2025-01-17-450-C", + price: 1.25, + size: 100, + exchange: "TEST", + signal_pass: signalPass +}); + +describe("option trade persistence gating", () => { + it("does not persist failing prints when signal-only persistence is enabled", async () => { + const persisted: string[] = []; + const rawPublished: string[] = []; + const signalPublished: string[] = []; + + await processOptionTrade(makePrint(false), { + persistSignalOnly: true, + persist: async (print) => { + persisted.push(print.trace_id); + }, + publishRaw: async (print) => { + rawPublished.push(print.trace_id); + }, + publishSignal: async (print) => { + signalPublished.push(print.trace_id); + } + }); + + expect(persisted).toEqual([]); + expect(rawPublished).toEqual(["print-fail"]); + expect(signalPublished).toEqual([]); + }); + + it("persists and publishes passing prints when signal-only persistence is enabled", async () => { + const persisted: string[] = []; + const rawPublished: string[] = []; + const signalPublished: string[] = []; + + await processOptionTrade(makePrint(true), { + persistSignalOnly: true, + persist: async (print) => { + persisted.push(print.trace_id); + }, + publishRaw: async (print) => { + rawPublished.push(print.trace_id); + }, + publishSignal: async (print) => { + signalPublished.push(print.trace_id); + } + }); + + expect(persisted).toEqual(["print-pass"]); + expect(rawPublished).toEqual(["print-pass"]); + expect(signalPublished).toEqual(["print-pass"]); + }); + + it("persists failing prints when signal-only persistence is disabled", async () => { + const persisted: string[] = []; + const rawPublished: string[] = []; + const signalPublished: string[] = []; + + await processOptionTrade(makePrint(false), { + persistSignalOnly: false, + persist: async (print) => { + persisted.push(print.trace_id); + }, + publishRaw: async (print) => { + rawPublished.push(print.trace_id); + }, + publishSignal: async (print) => { + signalPublished.push(print.trace_id); + } + }); + + expect(persisted).toEqual(["print-fail"]); + expect(rawPublished).toEqual(["print-fail"]); + expect(signalPublished).toEqual([]); + }); +}); + +describe("shouldPersistOptionPrint", () => { + it("returns true for passing prints in signal-only mode", () => { + expect(shouldPersistOptionPrint({ signal_pass: true }, true)).toBe(true); + }); + + it("returns false for failing prints in signal-only mode", () => { + expect(shouldPersistOptionPrint({ signal_pass: false }, true)).toBe(false); + }); + + it("returns true for failing prints when signal-only mode is disabled", () => { + expect(shouldPersistOptionPrint({ signal_pass: false }, false)).toBe(true); + }); +}); From a69d216e6704d47f9ab5cc06e76e5798bcfcef77 Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Fri, 22 May 2026 23:07:16 -0400 Subject: [PATCH 2/2] align turn doc timestamp and filename --- ...=> 2026-05-22-stop-persisting-non-signal-option-prints.html} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename docs/turns/{2026-05-23-stop-persisting-non-signal-option-prints.html => 2026-05-22-stop-persisting-non-signal-option-prints.html} (98%) diff --git a/docs/turns/2026-05-23-stop-persisting-non-signal-option-prints.html b/docs/turns/2026-05-22-stop-persisting-non-signal-option-prints.html similarity index 98% rename from docs/turns/2026-05-23-stop-persisting-non-signal-option-prints.html rename to docs/turns/2026-05-22-stop-persisting-non-signal-option-prints.html index 57b4ac1..8ef3b46 100644 --- a/docs/turns/2026-05-23-stop-persisting-non-signal-option-prints.html +++ b/docs/turns/2026-05-22-stop-persisting-non-signal-option-prints.html @@ -76,7 +76,7 @@

Stop Persisting Non-Signal Option Prints in ClickHouse

-

Date: 2026-05-23 00:00 EDT
Beads Issue: islandflow-l9h

+

Date: 2026-05-22 23:06:02 EDT
Beads Issue: islandflow-l9h

Summary