Add multi-leg structure tagging for flow packets

This commit is contained in:
dirtydishes 2025-12-30 13:33:50 -05:00
parent 163ab1039e
commit 0b0ffa651e
8 changed files with 291 additions and 93 deletions

View file

@ -40,7 +40,9 @@ import {
} from "@islandflow/types";
import { z } from "zod";
import { evaluateClassifiers, type ClassifierConfig } from "./classifiers";
import { parseContractId } from "./contracts";
import { createRedisClient, updateRollingStats, type RollingStatsConfig } from "./rolling-stats";
import { summarizeStructure, type ContractLeg } from "./structures";
const service = "compute";
const logger = createLogger({ service });
@ -142,11 +144,77 @@ type ClusterState = {
const clusters = new Map<string, ClusterState>();
const nbboCache = new Map<string, OptionNBBO>();
const recentLegsByKey = new Map<string, ContractLeg[]>();
const MAX_RECENT_LEGS = 20;
const rollingKey = (metric: string, contractId: string): string => {
return `rolling:${metric}:${contractId}`;
};
const buildLegFromCluster = (cluster: ClusterState): ContractLeg | null => {
const parsed = parseContractId(cluster.contractId);
if (!parsed) {
return null;
}
return {
...parsed,
contractId: cluster.contractId,
startTs: cluster.startTs,
endTs: cluster.endTs
};
};
const buildLegKey = (leg: ContractLeg): string => {
return `${leg.root}:${leg.expiry}`;
};
const isWithinStructureWindow = (anchorTs: number, candidateTs: number): boolean => {
return Math.abs(anchorTs - candidateTs) <= env.CLUSTER_WINDOW_MS;
};
const collectRecentLegs = (key: string, anchorTs: number, excludeId: string): ContractLeg[] => {
const recent = recentLegsByKey.get(key) ?? [];
const filtered = recent.filter(
(leg) => leg.contractId !== excludeId && isWithinStructureWindow(anchorTs, leg.endTs)
);
recentLegsByKey.set(key, filtered);
return filtered;
};
const storeRecentLeg = (leg: ContractLeg, anchorTs: number): void => {
const key = buildLegKey(leg);
const recent = collectRecentLegs(key, anchorTs, "");
const next = [leg, ...recent].slice(0, MAX_RECENT_LEGS);
recentLegsByKey.set(key, next);
};
const collectActiveLegs = (
key: string,
anchorTs: number,
excludeId: string
): ContractLeg[] => {
const legs: ContractLeg[] = [];
for (const [contractId, cluster] of clusters) {
if (contractId === excludeId) {
continue;
}
const leg = buildLegFromCluster(cluster);
if (!leg) {
continue;
}
if (buildLegKey(leg) !== key) {
continue;
}
if (!isWithinStructureWindow(anchorTs, leg.endTs)) {
continue;
}
legs.push(leg);
}
return legs;
};
const applyDeliverPolicy = (
opts: ReturnType<typeof buildDurableConsumer>,
policy: typeof env.COMPUTE_DELIVER_POLICY
@ -275,6 +343,25 @@ const flushCluster = async (
await addRollingSnapshot("premium", totalPremium, "total_premium");
await addRollingSnapshot("size", cluster.totalSize, "total_size");
const currentLeg = buildLegFromCluster(cluster);
if (currentLeg) {
const key = buildLegKey(currentLeg);
const anchorTs = cluster.endTs;
const candidates = [
...collectRecentLegs(key, anchorTs, currentLeg.contractId),
...collectActiveLegs(key, anchorTs, currentLeg.contractId)
];
const summary = summarizeStructure([currentLeg, ...candidates]);
if (summary) {
features.structure_type = summary.type;
features.structure_legs = summary.legs;
features.structure_strikes = summary.strikes;
features.structure_strike_span = roundTo(summary.strikeSpan);
features.structure_rights = summary.rights;
}
storeRecentLeg(currentLeg, anchorTs);
}
if (!nbboJoin.nbbo) {
joinQuality.nbbo_missing = 1;
} else {