Emit structure flow packets with full evidence

This commit is contained in:
dirtydishes 2026-01-28 20:36:43 -05:00
parent f08abec68a
commit fe6aef5fbc
6 changed files with 538 additions and 54 deletions

View file

@ -69,6 +69,12 @@ import {
import { buildEquityPrintJoin, type EquityQuoteJoin } from "./equity-joins";
import { createRedisClient, updateRollingStats, type RollingStatsConfig } from "./rolling-stats";
import { summarizeStructure, type ContractLeg } from "./structures";
import {
buildStructureFlowPacket,
planStructurePacket,
shouldEmitStructurePacket,
type LegEvidence
} from "./structure-packets";
const service = "compute";
const logger = createLogger({ service });
@ -216,7 +222,8 @@ const clusters = new Map<string, ClusterState>();
const nbboCache = new Map<string, OptionNBBO>();
const equityQuoteCache = new Map<string, EquityQuote>();
const darkInferenceState = createDarkInferenceState();
const recentLegsByKey = new Map<string, ContractLeg[]>();
const recentLegsByKey = new Map<string, LegEvidence[]>();
const recentStructureEmits = new Map<string, number>();
const MAX_RECENT_LEGS = 20;
@ -261,7 +268,7 @@ const recordPlacement = (counts: NbboPlacementCounts, placement: NbboPlacement):
}
};
const buildLegFromCluster = (cluster: ClusterState): ContractLeg | null => {
const buildLegFromCluster = (cluster: ClusterState): LegEvidence | null => {
const parsed = parseContractId(cluster.contractId);
if (!parsed) {
return null;
@ -271,7 +278,22 @@ const buildLegFromCluster = (cluster: ClusterState): ContractLeg | null => {
...parsed,
contractId: cluster.contractId,
startTs: cluster.startTs,
endTs: cluster.endTs
endTs: cluster.endTs,
members: cluster.members.slice(),
totalSize: cluster.totalSize,
totalPremium: cluster.totalPremium,
placements: {
aa: cluster.placements.aa,
a: cluster.placements.a,
b: cluster.placements.b,
bb: cluster.placements.bb,
mid: cluster.placements.mid,
missing: cluster.placements.missing,
stale: cluster.placements.stale
},
source_ts: cluster.startSourceTs,
ingest_ts: cluster.endIngestTs,
seq: cluster.endSeq
};
};
@ -283,7 +305,7 @@ const isWithinStructureWindow = (anchorTs: number, candidateTs: number): boolean
return Math.abs(anchorTs - candidateTs) <= env.CLUSTER_WINDOW_MS;
};
const collectRecentLegs = (key: string, anchorTs: number, excludeId: string): ContractLeg[] => {
const collectRecentLegs = (key: string, anchorTs: number, excludeId: string): LegEvidence[] => {
const recent = recentLegsByKey.get(key) ?? [];
const filtered = recent.filter(
(leg) => leg.contractId !== excludeId && isWithinStructureWindow(anchorTs, leg.endTs)
@ -292,7 +314,7 @@ const collectRecentLegs = (key: string, anchorTs: number, excludeId: string): Co
return filtered;
};
const storeRecentLeg = (leg: ContractLeg, anchorTs: number): void => {
const storeRecentLeg = (leg: LegEvidence, anchorTs: number): void => {
const key = buildLegKey(leg);
const recent = collectRecentLegs(key, anchorTs, "");
const next = [leg, ...recent].slice(0, MAX_RECENT_LEGS);
@ -303,8 +325,8 @@ const collectActiveLegs = (
key: string,
anchorTs: number,
excludeId: string
): ContractLeg[] => {
const legs: ContractLeg[] = [];
): LegEvidence[] => {
const legs: LegEvidence[] = [];
for (const [contractId, cluster] of clusters) {
if (contractId === excludeId) {
continue;
@ -324,6 +346,78 @@ const collectActiveLegs = (
return legs;
};
const STRUCTURE_TYPES = new Set(["straddle", "strangle", "vertical", "ladder"]);
const MAX_RECENT_STRUCTURE_EMITS = 2000;
const pruneRecentStructureEmits = (anchorTs: number): void => {
const ttl = env.CLUSTER_WINDOW_MS * 5;
for (const [key, ts] of recentStructureEmits) {
if (anchorTs - ts > ttl) {
recentStructureEmits.delete(key);
}
}
if (recentStructureEmits.size <= MAX_RECENT_STRUCTURE_EMITS) {
return;
}
const overflow = recentStructureEmits.size - MAX_RECENT_STRUCTURE_EMITS;
let removed = 0;
for (const key of recentStructureEmits.keys()) {
recentStructureEmits.delete(key);
removed += 1;
if (removed >= overflow) {
break;
}
}
};
const emitStructurePacketIfNeeded = async (
clickhouse: ReturnType<typeof createClickHouseClient>,
js: Awaited<ReturnType<typeof connectJetStreamWithRetry>>["js"],
legs: LegEvidence[],
summary: ReturnType<typeof summarizeStructure>,
currentContractId: string
): Promise<void> => {
if (!summary) {
return;
}
if (!STRUCTURE_TYPES.has(summary.type)) {
return;
}
if (!shouldEmitStructurePacket(legs, currentContractId)) {
return;
}
const plan = planStructurePacket(legs, summary, env.CLUSTER_WINDOW_MS);
if (!plan) {
return;
}
pruneRecentStructureEmits(plan.endTs);
const lastEmitTs = recentStructureEmits.get(plan.dedupeKey);
if (typeof lastEmitTs === "number" && plan.endTs - lastEmitTs <= env.CLUSTER_WINDOW_MS) {
return;
}
recentStructureEmits.set(plan.dedupeKey, plan.endTs);
const packet = buildStructureFlowPacket(plan, summary);
const validated = FlowPacketSchema.parse(packet);
await insertFlowPacket(clickhouse, validated);
await publishJson(js, SUBJECT_FLOW_PACKETS, validated);
await emitClassifiers(clickhouse, js, validated);
logger.info("emitted structure flow packet", {
id: validated.id,
type: summary.type,
legs: summary.legs,
strikes: summary.strikes
});
};
const applyDeliverPolicy = (
opts: ReturnType<typeof buildDurableConsumer>,
policy: typeof env.COMPUTE_DELIVER_POLICY
@ -586,7 +680,8 @@ const flushCluster = async (
...collectRecentLegs(key, anchorTs, currentLeg.contractId),
...collectActiveLegs(key, anchorTs, currentLeg.contractId)
];
const summary = summarizeStructure([currentLeg, ...candidates]);
const legs = [currentLeg, ...candidates];
const summary = summarizeStructure(legs);
if (summary) {
features.structure_type = summary.type;
features.structure_legs = summary.legs;
@ -594,6 +689,8 @@ const flushCluster = async (
features.structure_strike_span = roundTo(summary.strikeSpan);
features.structure_rights = summary.rights;
}
await emitStructurePacketIfNeeded(clickhouse, js, legs, summary, currentLeg.contractId);
storeRecentLeg(currentLeg, anchorTs);
}