Implement options snapshot tape table

This commit is contained in:
dirtydishes 2026-05-04 01:14:52 -04:00
parent 6abfff30d3
commit e78387130a
15 changed files with 904 additions and 128 deletions

View file

@ -4,12 +4,16 @@ import {
SUBJECT_OPTION_NBBO,
SUBJECT_OPTION_PRINTS,
SUBJECT_OPTION_SIGNAL_PRINTS,
SUBJECT_EQUITY_QUOTES,
STREAM_EQUITY_QUOTES,
STREAM_OPTION_NBBO,
STREAM_OPTION_PRINTS,
STREAM_OPTION_SIGNAL_PRINTS,
buildDurableConsumer,
connectJetStreamWithRetry,
ensureStream,
publishJson
publishJson,
subscribeJson
} from "@islandflow/bus";
import {
createClickHouseClient,
@ -21,9 +25,10 @@ import {
import {
OptionNBBOSchema,
OptionPrintSchema,
evaluateOptionSignal,
EquityQuoteSchema,
deriveOptionPrintMetadata,
resolveSyntheticMarketModes,
type EquityQuote,
type OptionNBBO,
type OptionPrint,
type OptionsSignalConfig
@ -33,6 +38,7 @@ import { createDatabentoOptionsAdapter } from "./adapters/databento";
import { createIbkrOptionsAdapter } from "./adapters/ibkr";
import { createSyntheticOptionsAdapter } from "./adapters/synthetic";
import type { OptionIngestAdapter, StopHandler } from "./adapters/types";
import { enrichOptionPrint, rememberContext, selectAtOrBefore, type ContextHistory } from "./enrichment";
import { z } from "zod";
const service = "ingest-options";
@ -135,7 +141,9 @@ const state = {
shuttingDown: false,
shutdownPromise: null as Promise<void> | null
};
const latestNbboByContract = new Map<string, OptionNBBO>();
const nbboHistoryByContract: ContextHistory<OptionNBBO> = new Map();
const equityQuoteHistoryByUnderlying: ContextHistory<EquityQuote> = new Map();
const getErrorMessage = (error: unknown): string => {
return error instanceof Error ? error.message : String(error);
@ -338,6 +346,19 @@ const run = async () => {
num_replicas: 1
});
await ensureStream(jsm, {
name: STREAM_EQUITY_QUOTES,
subjects: [SUBJECT_EQUITY_QUOTES],
retention: "limits",
storage: "file",
discard: "old",
max_msgs_per_subject: -1,
max_msgs: -1,
max_bytes: -1,
max_age: 0,
num_replicas: 1
});
const clickhouse = createClickHouseClient({
url: env.CLICKHOUSE_URL,
database: env.CLICKHOUSE_DATABASE
@ -365,26 +386,15 @@ const run = async () => {
}
const rawPrint = OptionPrintSchema.parse(candidate);
const derived = deriveOptionPrintMetadata(
rawPrint,
latestNbboByContract.get(rawPrint.option_contract_id),
optionsSignalConfig
const parsedMetadata = deriveOptionPrintMetadata(rawPrint, null, optionsSignalConfig);
const optionQuote = selectAtOrBefore(
nbboHistoryByContract.get(rawPrint.option_contract_id),
rawPrint.ts
);
const signalDecision = evaluateOptionSignal(
{
...rawPrint,
...derived,
signal_profile: optionsSignalConfig.mode
},
optionsSignalConfig
);
const print = OptionPrintSchema.parse({
...rawPrint,
...derived,
signal_pass: signalDecision.signalPass,
signal_reasons: signalDecision.signalReasons,
signal_profile: signalDecision.signalProfile
});
const equityQuote = parsedMetadata.underlying_id
? selectAtOrBefore(equityQuoteHistoryByUnderlying.get(parsedMetadata.underlying_id), rawPrint.ts)
: null;
const print = enrichOptionPrint(rawPrint, optionQuote, equityQuote, optionsSignalConfig);
try {
await insertOptionPrint(clickhouse, print);
@ -422,14 +432,7 @@ const run = async () => {
}
const nbbo = OptionNBBOSchema.parse(candidate);
const existing = latestNbboByContract.get(nbbo.option_contract_id);
if (
!existing ||
nbbo.ts > existing.ts ||
(nbbo.ts === existing.ts && nbbo.seq >= existing.seq)
) {
latestNbboByContract.set(nbbo.option_contract_id, nbbo);
}
rememberContext(nbboHistoryByContract, nbbo.option_contract_id, nbbo);
try {
await insertOptionNBBO(clickhouse, nbbo);
@ -447,6 +450,33 @@ const run = async () => {
}
});
const equityQuoteConsumer = buildDurableConsumer("ingest-options-equity-quotes");
equityQuoteConsumer.deliverAll();
const equityQuoteSubscription = await subscribeJson<EquityQuote>(
js,
SUBJECT_EQUITY_QUOTES,
equityQuoteConsumer
);
void (async () => {
for await (const msg of equityQuoteSubscription.messages) {
if (state.shuttingDown) {
msg.ack();
continue;
}
try {
const quote = EquityQuoteSchema.parse(equityQuoteSubscription.decode(msg));
rememberContext(equityQuoteHistoryByUnderlying, quote.underlying_id.toUpperCase(), quote);
msg.ack();
} catch (error) {
logger.error("failed to process equity quote context", {
error: getErrorMessage(error)
});
msg.ack();
}
}
})();
const shutdown = async (signal: string) => {
if (state.shutdownPromise) {
return state.shutdownPromise;