diff --git a/.env.example b/.env.example index 3b24669..d015ffb 100644 --- a/.env.example +++ b/.env.example @@ -55,6 +55,8 @@ TESTING_THROTTLE_MS=200 # Compute consumer behavior COMPUTE_DELIVER_POLICY=new COMPUTE_CONSUMER_RESET=false +API_DELIVER_POLICY=new +API_CONSUMER_RESET=false NBBO_MAX_AGE_MS=1000 NEXT_PUBLIC_NBBO_MAX_AGE_MS=1000 NEXT_PUBLIC_LIVE_HOT_WINDOW=2000 diff --git a/README.md b/README.md index f0a2b60..88d4314 100644 --- a/README.md +++ b/README.md @@ -181,7 +181,7 @@ Default `smart-money` behavior: ### API -- `API_PORT`, `REST_DEFAULT_LIMIT` +- `API_PORT`, `REST_DEFAULT_LIMIT`, `API_DELIVER_POLICY`, `API_CONSUMER_RESET` - `LIVE_LIMIT_OPTIONS`, `LIVE_LIMIT_NBBO`, `LIVE_LIMIT_EQUITIES`, `LIVE_LIMIT_EQUITY_JOINS`, `LIVE_LIMIT_FLOW`, `LIVE_LIMIT_CLASSIFIER_HITS`, `LIVE_LIMIT_ALERTS`, `LIVE_LIMIT_INFERRED_DARK` (bounded live generic cache depths; defaults `10000`, max `100000`) ### Web live retention diff --git a/deployment/docker/.env.example b/deployment/docker/.env.example index 58b5986..e8359f8 100644 --- a/deployment/docker/.env.example +++ b/deployment/docker/.env.example @@ -5,6 +5,8 @@ REDIS_URL=redis://redis:6379 API_PORT=4000 REST_DEFAULT_LIMIT=200 +API_DELIVER_POLICY=new +API_CONSUMER_RESET=false NPM_SHARED_NETWORK=npm-shared diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 9aedabc..c8fa667 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -104,13 +104,17 @@ import { LiveStateManager, isLiveItemFresh } from "./live"; const service = "api"; const logger = createLogger({ service }); +const DeliverPolicySchema = z.enum(["new", "all", "last", "last_per_subject"]); + const envSchema = z.object({ API_PORT: z.coerce.number().int().positive().default(4000), NATS_URL: z.string().default("nats://127.0.0.1:4222"), CLICKHOUSE_URL: z.string().default("http://127.0.0.1:8123"), CLICKHOUSE_DATABASE: z.string().default("default"), REDIS_URL: z.string().default("redis://127.0.0.1:6379"), - REST_DEFAULT_LIMIT: z.coerce.number().int().positive().default(200) + REST_DEFAULT_LIMIT: z.coerce.number().int().positive().default(200), + API_DELIVER_POLICY: DeliverPolicySchema.default("new"), + API_CONSUMER_RESET: z.coerce.boolean().default(false) }); const env = readEnv(envSchema); @@ -288,6 +292,27 @@ const parseLimit = (value: string | null): number => { return limitSchema.parse(value); }; +const applyDeliverPolicy = ( + opts: ReturnType, + policy: z.infer +): void => { + switch (policy) { + case "all": + opts.deliverAll(); + break; + case "last": + opts.deliverLast(); + break; + case "last_per_subject": + opts.deliverLastPerSubject(); + break; + case "new": + default: + opts.deliverNew(); + break; + } +}; + const parseOptionPrintFilters = ( url: URL ): { @@ -757,12 +782,105 @@ const run = async () => { logger.info("live cache metrics", snapshot); }, 60000); + const consumerBindings = [ + { + subject: SUBJECT_OPTION_SIGNAL_PRINTS, + stream: STREAM_OPTION_SIGNAL_PRINTS, + durableName: "api-option-prints" + }, + { + subject: SUBJECT_OPTION_NBBO, + stream: STREAM_OPTION_NBBO, + durableName: "api-option-nbbo" + }, + { + subject: SUBJECT_EQUITY_PRINTS, + stream: STREAM_EQUITY_PRINTS, + durableName: "api-equity-prints" + }, + { + subject: SUBJECT_EQUITY_QUOTES, + stream: STREAM_EQUITY_QUOTES, + durableName: "api-equity-quotes" + }, + { + subject: SUBJECT_EQUITY_CANDLES, + stream: STREAM_EQUITY_CANDLES, + durableName: "api-equity-candles" + }, + { + subject: SUBJECT_EQUITY_JOINS, + stream: STREAM_EQUITY_JOINS, + durableName: "api-equity-joins" + }, + { + subject: SUBJECT_INFERRED_DARK, + stream: STREAM_INFERRED_DARK, + durableName: "api-inferred-dark" + }, + { + subject: SUBJECT_FLOW_PACKETS, + stream: STREAM_FLOW_PACKETS, + durableName: "api-flow-packets" + }, + { + subject: SUBJECT_CLASSIFIER_HITS, + stream: STREAM_CLASSIFIER_HITS, + durableName: "api-classifier-hits" + }, + { + subject: SUBJECT_ALERTS, + stream: STREAM_ALERTS, + durableName: "api-alerts" + } + ] as const; + + if (env.API_CONSUMER_RESET) { + for (const binding of consumerBindings) { + try { + await jsm.consumers.delete(binding.stream, binding.durableName); + logger.warn("reset jetstream consumer", { durable: binding.durableName }); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (!message.includes("not found")) { + logger.warn("failed to reset jetstream consumer", { + durable: binding.durableName, + error: message + }); + } + } + } + } else { + for (const binding of consumerBindings) { + try { + const info = await jsm.consumers.info(binding.stream, binding.durableName); + if (info?.config?.deliver_policy && info.config.deliver_policy !== env.API_DELIVER_POLICY) { + logger.warn("resetting consumer due to deliver policy change", { + durable: binding.durableName, + current: info.config.deliver_policy, + desired: env.API_DELIVER_POLICY + }); + await jsm.consumers.delete(binding.stream, binding.durableName); + } + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (!message.includes("not found")) { + logger.warn("failed to inspect jetstream consumer", { + durable: binding.durableName, + error: message + }); + } + } + } + } + const subscribeWithReset = async ( subject: string, stream: string, durableName: string ) => { const opts = buildDurableConsumer(durableName); + applyDeliverPolicy(opts, env.API_DELIVER_POLICY); try { return await subscribeJson(js, subject, opts); } catch (error) { @@ -791,68 +909,69 @@ const run = async () => { } const resetOpts = buildDurableConsumer(durableName); + applyDeliverPolicy(resetOpts, env.API_DELIVER_POLICY); return await subscribeJson(js, subject, resetOpts); } }; const optionSubscription = await subscribeWithReset( - SUBJECT_OPTION_SIGNAL_PRINTS, - STREAM_OPTION_SIGNAL_PRINTS, - "api-option-prints" + consumerBindings[0].subject, + consumerBindings[0].stream, + consumerBindings[0].durableName ); const optionNbboSubscription = await subscribeWithReset( - SUBJECT_OPTION_NBBO, - STREAM_OPTION_NBBO, - "api-option-nbbo" + consumerBindings[1].subject, + consumerBindings[1].stream, + consumerBindings[1].durableName ); const equitySubscription = await subscribeWithReset( - SUBJECT_EQUITY_PRINTS, - STREAM_EQUITY_PRINTS, - "api-equity-prints" + consumerBindings[2].subject, + consumerBindings[2].stream, + consumerBindings[2].durableName ); const equityQuoteSubscription = await subscribeWithReset( - SUBJECT_EQUITY_QUOTES, - STREAM_EQUITY_QUOTES, - "api-equity-quotes" + consumerBindings[3].subject, + consumerBindings[3].stream, + consumerBindings[3].durableName ); const equityCandleSubscription = await subscribeWithReset( - SUBJECT_EQUITY_CANDLES, - STREAM_EQUITY_CANDLES, - "api-equity-candles" + consumerBindings[4].subject, + consumerBindings[4].stream, + consumerBindings[4].durableName ); const equityJoinSubscription = await subscribeWithReset( - SUBJECT_EQUITY_JOINS, - STREAM_EQUITY_JOINS, - "api-equity-joins" + consumerBindings[5].subject, + consumerBindings[5].stream, + consumerBindings[5].durableName ); const inferredDarkSubscription = await subscribeWithReset( - SUBJECT_INFERRED_DARK, - STREAM_INFERRED_DARK, - "api-inferred-dark" + consumerBindings[6].subject, + consumerBindings[6].stream, + consumerBindings[6].durableName ); const flowSubscription = await subscribeWithReset( - SUBJECT_FLOW_PACKETS, - STREAM_FLOW_PACKETS, - "api-flow-packets" + consumerBindings[7].subject, + consumerBindings[7].stream, + consumerBindings[7].durableName ); const classifierHitSubscription = await subscribeWithReset( - SUBJECT_CLASSIFIER_HITS, - STREAM_CLASSIFIER_HITS, - "api-classifier-hits" + consumerBindings[8].subject, + consumerBindings[8].stream, + consumerBindings[8].durableName ); const alertSubscription = await subscribeWithReset( - SUBJECT_ALERTS, - STREAM_ALERTS, - "api-alerts" + consumerBindings[9].subject, + consumerBindings[9].stream, + consumerBindings[9].durableName ); const fanoutLive = async ( diff --git a/services/ingest-options/src/adapters/synthetic.ts b/services/ingest-options/src/adapters/synthetic.ts index 003d70c..7875f4f 100644 --- a/services/ingest-options/src/adapters/synthetic.ts +++ b/services/ingest-options/src/adapters/synthetic.ts @@ -481,8 +481,6 @@ export const createSyntheticOptionsAdapter = ( conditions: burst.conditions }; - void handlers.onTrade(print); - if (handlers.onNBBO) { nbboSeq += 1; const sizeBase = Math.max(1, Math.round(burst.baseSize * 0.4)); @@ -503,6 +501,8 @@ export const createSyntheticOptionsAdapter = ( void handlers.onNBBO(nbbo); } + + void handlers.onTrade(print); } remainingRuns -= 1;