Add smart-money option signal path and tape filters

This commit is contained in:
dirtydishes 2026-04-28 16:29:44 -04:00
parent 758f111d7e
commit 27b0a399e6
23 changed files with 1827 additions and 175 deletions

View file

@ -10,7 +10,7 @@ import {
SUBJECT_INFERRED_DARK,
SUBJECT_FLOW_PACKETS,
SUBJECT_OPTION_NBBO,
SUBJECT_OPTION_PRINTS,
SUBJECT_OPTION_SIGNAL_PRINTS,
STREAM_ALERTS,
STREAM_CLASSIFIER_HITS,
STREAM_EQUITY_CANDLES,
@ -20,7 +20,7 @@ import {
STREAM_INFERRED_DARK,
STREAM_FLOW_PACKETS,
STREAM_OPTION_NBBO,
STREAM_OPTION_PRINTS,
STREAM_OPTION_SIGNAL_PRINTS,
buildDurableConsumer,
connectJetStreamWithRetry,
ensureStream,
@ -85,6 +85,13 @@ import {
LiveServerMessage,
LiveSubscription,
LiveSubscriptionSchema,
matchesFlowPacketFilters,
matchesOptionPrintFilters,
OptionFlowFilters,
OptionFlowViewSchema,
OptionNbboSideSchema,
OptionSecurityTypeSchema,
OptionTypeSchema,
FlowPacketSchema,
OptionNBBOSchema,
OptionPrintSchema,
@ -199,6 +206,32 @@ const equityPrintRangeSchema = z.object({
end_ts: z.coerce.number().int().nonnegative(),
limit: limitSchema.optional()
});
const optionSideListSchema = z
.string()
.transform((value) =>
value
.split(",")
.map((entry) => entry.trim())
.filter(Boolean)
)
.pipe(z.array(OptionNbboSideSchema));
const optionTypeListSchema = z
.string()
.transform((value) =>
value
.split(",")
.map((entry) => entry.trim())
.filter(Boolean)
)
.pipe(z.array(OptionTypeSchema));
const optionSecuritySchema = z.enum(["stock", "etf", "all"]);
const optionFilterQuerySchema = z.object({
view: OptionFlowViewSchema.optional(),
security: optionSecuritySchema.optional(),
side: optionSideListSchema.optional(),
type: optionTypeListSchema.optional(),
min_notional: z.coerce.number().nonnegative().optional()
});
type Channel =
| "options"
@ -235,6 +268,7 @@ const classifierHitSockets = new Set<LegacySocket>();
const alertSockets = new Set<LegacySocket>();
const liveSocketSubscriptions = new Map<LiveSocket, Set<string>>();
const subscriptionSockets = new Map<string, Set<LiveSocket>>();
const subscriptionDefinitions = new Map<string, LiveSubscription>();
const liveHeartbeats = new Map<LiveSocket, ReturnType<typeof setInterval>>();
const jsonResponse = (body: unknown, status = 200): Response => {
@ -254,6 +288,43 @@ const parseLimit = (value: string | null): number => {
return limitSchema.parse(value);
};
const parseOptionPrintFilters = (
url: URL
): {
view: z.infer<typeof OptionFlowViewSchema>;
storageFilters: Parameters<typeof fetchRecentOptionPrints>[3];
liveFilters: OptionFlowFilters;
} => {
const parsed = optionFilterQuerySchema.parse({
view: url.searchParams.get("view") ?? undefined,
security: url.searchParams.get("security") ?? undefined,
side: url.searchParams.get("side") ?? undefined,
type: url.searchParams.get("type") ?? undefined,
min_notional: url.searchParams.get("min_notional") ?? undefined
});
const view = parsed.view ?? "signal";
const security = parsed.security ?? (view === "raw" ? "all" : "stock");
const storageFilters = {
view,
security,
minNotional: parsed.min_notional,
nbboSides: parsed.side,
optionTypes: parsed.type
} as const;
const liveFilters: OptionFlowFilters = {
view,
securityTypes:
security === "all"
? undefined
: ([security] as Array<z.infer<typeof OptionSecurityTypeSchema>>),
nbboSides: parsed.side,
optionTypes: parsed.type,
minNotional: parsed.min_notional
};
return { view, storageFilters, liveFilters };
};
const parseReplayParams = (url: URL): { afterTs: number; afterSeq: number; limit: number } => {
const params = replayParamsSchema.parse({
after_ts: url.searchParams.get("after_ts") ?? undefined,
@ -412,6 +483,7 @@ const subscribeSocket = (socket: LiveSocket, subscription: LiveSubscription): vo
const sockets = subscriptionSockets.get(key) ?? new Set<LiveSocket>();
sockets.add(socket);
subscriptionSockets.set(key, sockets);
subscriptionDefinitions.set(key, subscription);
};
const unsubscribeSocket = (socket: LiveSocket, subscription: LiveSubscription): void => {
@ -425,6 +497,7 @@ const unsubscribeSocket = (socket: LiveSocket, subscription: LiveSubscription):
sockets.delete(socket);
if (sockets.size === 0) {
subscriptionSockets.delete(key);
subscriptionDefinitions.delete(key);
}
};
@ -436,6 +509,7 @@ const cleanupLiveSocket = (socket: LiveSocket): void => {
sockets?.delete(socket);
if (sockets && sockets.size === 0) {
subscriptionSockets.delete(key);
subscriptionDefinitions.delete(key);
}
}
}
@ -504,8 +578,8 @@ const run = async () => {
);
await ensureStream(jsm, {
name: STREAM_OPTION_PRINTS,
subjects: [SUBJECT_OPTION_PRINTS],
name: STREAM_OPTION_SIGNAL_PRINTS,
subjects: [SUBJECT_OPTION_SIGNAL_PRINTS],
retention: "limits",
storage: "file",
discard: "old",
@ -722,8 +796,8 @@ const run = async () => {
};
const optionSubscription = await subscribeWithReset(
SUBJECT_OPTION_PRINTS,
STREAM_OPTION_PRINTS,
SUBJECT_OPTION_SIGNAL_PRINTS,
STREAM_OPTION_SIGNAL_PRINTS,
"api-option-prints"
);
@ -786,20 +860,44 @@ const run = async () => {
item: unknown,
ingestChannel: "options" | "nbbo" | "equities" | "equity-candles" | "equity-overlay" | "equity-joins" | "flow" | "classifier-hits" | "alerts" | "inferred-dark"
) => {
const key = getSubscriptionKey(subscription);
const sockets = subscriptionSockets.get(key);
const watermark = await liveState.ingest(ingestChannel, item);
if (!sockets || sockets.size === 0) {
const matchingSubscriptions =
subscription.channel === "options" || subscription.channel === "flow"
? [...subscriptionDefinitions.entries()].filter(([, candidate]) => candidate.channel === subscription.channel)
: [[getSubscriptionKey(subscription), subscription] as const];
if (matchingSubscriptions.length === 0) {
return;
}
for (const socket of sockets) {
sendLiveMessage(socket, {
op: "event",
subscription,
item,
watermark
});
for (const [key, candidate] of matchingSubscriptions) {
const sockets = subscriptionSockets.get(key);
if (!sockets || sockets.size === 0) {
continue;
}
if (
candidate.channel === "options" &&
!matchesOptionPrintFilters(OptionPrintSchema.parse(item), candidate.filters)
) {
continue;
}
if (
candidate.channel === "flow" &&
!matchesFlowPacketFilters(FlowPacketSchema.parse(item), candidate.filters)
) {
continue;
}
for (const socket of sockets) {
sendLiveMessage(socket, {
op: "event",
subscription: candidate,
item,
watermark
});
}
}
};
@ -996,10 +1094,21 @@ const run = async () => {
}
if (req.method === "GET" && url.pathname === "/prints/options") {
const limit = parseLimit(url.searchParams.get("limit"));
const source = parseReplaySource(url) ?? undefined;
const data = await fetchRecentOptionPrints(clickhouse, limit, source);
return jsonResponse({ data });
try {
const limit = parseLimit(url.searchParams.get("limit"));
const source = parseReplaySource(url) ?? undefined;
const { storageFilters } = parseOptionPrintFilters(url);
const data = await fetchRecentOptionPrints(clickhouse, limit, source, storageFilters);
return jsonResponse({ data });
} catch (error) {
return jsonResponse(
{
error: "invalid options query",
detail: error instanceof Error ? error.message : String(error)
},
400
);
}
}
if (req.method === "GET" && url.pathname === "/nbbo/options") {
@ -1105,10 +1214,28 @@ const run = async () => {
}
if (req.method === "GET" && url.pathname === "/history/options") {
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
const source = parseReplaySource(url) ?? undefined;
const data = await fetchOptionPrintsBefore(clickhouse, beforeTs, beforeSeq, limit, source);
return jsonResponse(buildHistoryResponse(data, (item) => ({ ts: item.ts, seq: item.seq })));
try {
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
const source = parseReplaySource(url) ?? undefined;
const { storageFilters } = parseOptionPrintFilters(url);
const data = await fetchOptionPrintsBefore(
clickhouse,
beforeTs,
beforeSeq,
limit,
source,
storageFilters
);
return jsonResponse(buildHistoryResponse(data, (item) => ({ ts: item.ts, seq: item.seq })));
} catch (error) {
return jsonResponse(
{
error: "invalid options history query",
detail: error instanceof Error ? error.message : String(error)
},
400
);
}
}
if (req.method === "GET" && url.pathname === "/history/nbbo") {
@ -1183,12 +1310,30 @@ const run = async () => {
}
if (req.method === "GET" && url.pathname === "/replay/options") {
const { afterTs, afterSeq, limit } = parseReplayParams(url);
const source = parseReplaySource(url) ?? undefined;
const data = await fetchOptionPrintsAfter(clickhouse, afterTs, afterSeq, limit, source);
const last = data.at(-1);
const next = last ? { ts: last.ts, seq: last.seq } : null;
return jsonResponse({ data, next });
try {
const { afterTs, afterSeq, limit } = parseReplayParams(url);
const source = parseReplaySource(url) ?? undefined;
const { storageFilters } = parseOptionPrintFilters(url);
const data = await fetchOptionPrintsAfter(
clickhouse,
afterTs,
afterSeq,
limit,
source,
storageFilters
);
const last = data.at(-1);
const next = last ? { ts: last.ts, seq: last.seq } : null;
return jsonResponse({ data, next });
} catch (error) {
return jsonResponse(
{
error: "invalid options replay query",
detail: error instanceof Error ? error.message : String(error)
},
400
);
}
}
if (req.method === "GET" && url.pathname === "/replay/nbbo") {

View file

@ -1,4 +1,5 @@
import {
fetchRecentOptionPrints,
fetchRecentAlerts,
fetchRecentClassifierHits,
fetchRecentEquityCandles,
@ -7,9 +8,9 @@ import {
fetchRecentFlowPackets,
fetchRecentInferredDark,
fetchRecentOptionNBBO,
fetchRecentOptionPrints,
type ClickHouseClient
} from "@islandflow/storage";
import type { OptionPrintQueryFilters } from "@islandflow/storage";
import {
AlertEventSchema,
ClassifierHitEventSchema,
@ -22,8 +23,11 @@ import {
InferredDarkEventSchema,
LiveGenericChannel,
LiveSubscription,
matchesFlowPacketFilters,
matchesOptionPrintFilters,
OptionNBBOSchema,
OptionPrintSchema,
type OptionFlowFilters,
type Cursor,
type EquityCandle,
type EquityPrint,
@ -124,7 +128,8 @@ const getGenericConfig = (limits: GenericLiveLimits): {
limit: limits.options,
parse: (value) => OptionPrintSchema.parse(value),
cursor: (item) => ({ ts: item.ts, seq: item.seq }),
fetchRecent: fetchRecentOptionPrints
fetchRecent: (clickhouse, limit) =>
fetchRecentOptionPrints(clickhouse, limit, undefined, { view: "signal" })
},
nbbo: {
redisKey: "live:nbbo",
@ -279,6 +284,55 @@ export class LiveStateManager {
async getSnapshot(subscription: LiveSubscription): Promise<FeedSnapshot<unknown>> {
switch (subscription.channel) {
case "options": {
if (subscription.filters?.view === "raw") {
const storageFilters: OptionPrintQueryFilters = {
view: "raw",
security:
subscription.filters.securityTypes?.length === 1
? subscription.filters.securityTypes[0]
: "all",
nbboSides: subscription.filters.nbboSides,
optionTypes: subscription.filters.optionTypes,
minNotional: subscription.filters.minNotional
};
const items = await fetchRecentOptionPrints(
this.clickhouse,
this.generic.options.limit,
undefined,
storageFilters
);
return {
subscription,
items,
watermark: items[0] ? { ts: items[0].ts, seq: items[0].seq } : null,
next_before: nextBeforeForItems(items, (item) => ({ ts: item.ts, seq: item.seq }))
};
}
const config = this.generic.options;
const items = (this.genericItems.get("options") ?? []).filter((item) =>
matchesOptionPrintFilters(item, subscription.filters)
);
return {
subscription,
items,
watermark: this.genericCursors.get(config.cursorField) ?? null,
next_before: nextBeforeForItems(items, config.cursor)
};
}
case "flow": {
const config = this.generic.flow;
const items = (this.genericItems.get("flow") ?? []).filter((item) =>
matchesFlowPacketFilters(item, subscription.filters)
);
return {
subscription,
items,
watermark: this.genericCursors.get(config.cursorField) ?? null,
next_before: nextBeforeForItems(items, config.cursor)
};
}
case "equity-candles": {
const key = candleRedisKey(subscription.underlying_id, subscription.interval_ms);
const cursorField = candleCursorField(subscription.underlying_id, subscription.interval_ms);

View file

@ -196,4 +196,81 @@ describe("LiveStateManager", () => {
expect(stats.trimOperations).toBeGreaterThan(0);
expect(stats.cacheDepthByKey["live:flow"]).toBe(2);
});
it("filters option and flow snapshots using subscription filters", async () => {
const manager = new LiveStateManager(makeClickHouse(), null);
await manager.ingest("options", {
source_ts: 100,
ingest_ts: 101,
seq: 1,
trace_id: "opt-1",
ts: 100,
option_contract_id: "AAPL-2025-01-17-200-C",
price: 1,
size: 100,
exchange: "X",
underlying_id: "AAPL",
option_type: "call",
notional: 10000,
nbbo_side: "A",
is_etf: false,
signal_pass: true,
signal_reasons: ["keep:ask-side"],
signal_profile: "smart-money"
});
await manager.ingest("options", {
source_ts: 110,
ingest_ts: 111,
seq: 2,
trace_id: "opt-2",
ts: 110,
option_contract_id: "SPY-2025-01-17-500-P",
price: 1,
size: 100,
exchange: "X",
underlying_id: "SPY",
option_type: "put",
notional: 10000,
nbbo_side: "B",
is_etf: true,
signal_pass: true,
signal_reasons: ["keep:ask-side"],
signal_profile: "smart-money"
});
await manager.ingest("flow", {
source_ts: 120,
ingest_ts: 121,
seq: 3,
trace_id: "flow-1",
id: "flow-1",
members: ["opt-1"],
features: {
option_contract_id: "AAPL-2025-01-17-200-C",
total_notional: 10000,
is_etf: false,
option_type: "call",
nbbo_a_count: 1,
nbbo_aa_count: 0,
nbbo_mid_count: 0,
nbbo_b_count: 0,
nbbo_bb_count: 0,
nbbo_missing_count: 0,
nbbo_stale_count: 0
},
join_quality: {}
});
const optionSnapshot = await manager.getSnapshot({
channel: "options",
filters: { securityTypes: ["stock"], nbboSides: ["A"], optionTypes: ["call"] }
});
const flowSnapshot = await manager.getSnapshot({
channel: "flow",
filters: { securityTypes: ["stock"], nbboSides: ["A"], optionTypes: ["call"] }
});
expect(optionSnapshot.items).toHaveLength(1);
expect(flowSnapshot.items).toHaveLength(1);
});
});