Add hosted synthetic control plane

This commit is contained in:
dirtydishes 2026-05-13 22:10:05 -04:00
parent af04875107
commit 8dcbcd2201
21 changed files with 3695 additions and 772 deletions

View file

@ -25,8 +25,12 @@ import {
STREAM_OPTION_SIGNAL_PRINTS,
buildDurableConsumer,
connectJetStreamWithRetry,
ensureSyntheticControlState,
ensureKnownStreams,
subscribeJson
openSyntheticControlKv,
subscribeJson,
watchSyntheticControlState,
writeSyntheticControlState
} from "@islandflow/bus";
import {
createClickHouseClient,
@ -100,6 +104,7 @@ import {
matchesFlowPacketFilters,
matchesOptionPrintFilters,
FlowPacketSchema,
SyntheticControlStateSchema,
SmartMoneyEventSchema,
OptionNBBOSchema,
OptionPrintSchema,
@ -114,6 +119,13 @@ import {
shouldFanoutLiveEvent
} from "./live";
import { parseOptionPrintQuery } from "./option-queries";
import {
buildSyntheticDerivedStatus,
createRollingSyntheticProfileHits,
getSyntheticBackendDisabledReason,
recordSyntheticProfileHit,
resolveSyntheticBackendMode
} from "./synthetic-control";
const service = "api";
const logger = createLogger({ service });
@ -127,10 +139,27 @@ const envSchema = z.object({
CLICKHOUSE_URL: z.string().default("http://127.0.0.1:8123"),
CLICKHOUSE_DATABASE: z.string().default("default"),
REDIS_URL: z.string().default("redis://127.0.0.1:6379"),
OPTIONS_INGEST_ADAPTER: z.string().min(1).default("synthetic"),
EQUITIES_INGEST_ADAPTER: z.string().min(1).default("synthetic"),
REST_DEFAULT_LIMIT: z.coerce.number().int().positive().default(200),
API_DELIVER_POLICY: DeliverPolicySchema.default("new"),
API_CONSUMER_RESET: z.coerce.boolean().default(false),
LIVE_LAG_WARN_MS: z.coerce.number().int().positive().default(120_000)
LIVE_LAG_WARN_MS: z.coerce.number().int().positive().default(120_000),
SYNTHETIC_CONTROL_ENABLED: z
.preprocess((value) => {
if (typeof value === "string") {
const normalized = value.trim().toLowerCase();
if (["1", "true", "yes", "on"].includes(normalized)) {
return true;
}
if (["0", "false", "no", "off"].includes(normalized)) {
return false;
}
}
return value;
}, z.boolean())
.default(false),
SYNTHETIC_ADMIN_TOKEN: z.string().default("")
});
const env = readEnv(envSchema);
@ -283,6 +312,14 @@ const readJsonBody = async (req: Request): Promise<unknown> => {
return JSON.parse(text);
};
const getBearerToken = (req: Request): string => {
const authorization = req.headers.get("authorization") ?? "";
if (authorization.toLowerCase().startsWith("bearer ")) {
return authorization.slice(7).trim();
}
return req.headers.get("x-synthetic-admin-token")?.trim() ?? "";
};
const optionsSupportLookupSchema = z.object({
trace_ids: z.array(z.string().min(1)).default([]),
nbbo_context: z
@ -641,6 +678,27 @@ const run = async () => {
{ logger }
);
const syntheticBackendMode = resolveSyntheticBackendMode(
env.OPTIONS_INGEST_ADAPTER,
env.EQUITIES_INGEST_ADAPTER
);
const syntheticBackendDisabledReason =
getSyntheticBackendDisabledReason(syntheticBackendMode);
const syntheticControlKv = await openSyntheticControlKv(js);
let syntheticControl = await ensureSyntheticControlState(syntheticControlKv);
const syntheticProfileHits = createRollingSyntheticProfileHits();
const stopSyntheticControlWatch = await watchSyntheticControlState(
syntheticControlKv,
(nextControl) => {
syntheticControl = nextControl;
},
(error) => {
logger.warn("synthetic control watch failed", {
error: getErrorMessage(error)
});
}
);
const clickhouse = createClickHouseClient({
url: env.CLICKHOUSE_URL,
database: env.CLICKHOUSE_DATABASE
@ -1146,6 +1204,7 @@ const run = async () => {
for await (const msg of smartMoneySubscription.messages) {
try {
const payload = SmartMoneyEventSchema.parse(smartMoneySubscription.decode(msg));
recordSyntheticProfileHit(syntheticProfileHits, payload);
broadcast(smartMoneySockets, { type: "smart-money", payload });
await fanoutLive({ channel: "smart-money" }, payload, "smart-money");
msg.ack();
@ -1202,6 +1261,54 @@ const run = async () => {
void pumpClassifierHits();
void pumpAlerts();
const buildSyntheticStatusBody = () => {
const derived =
syntheticBackendMode === "synthetic"
? buildSyntheticDerivedStatus(Date.now(), syntheticControl, syntheticProfileHits)
: null;
return {
enabled: env.SYNTHETIC_CONTROL_ENABLED && syntheticBackendMode === "synthetic",
backend_mode: syntheticBackendMode,
adapters: {
options: env.OPTIONS_INGEST_ADAPTER,
equities: env.EQUITIES_INGEST_ADAPTER
},
control: syntheticBackendMode === "synthetic" ? syntheticControl : null,
derived,
...(syntheticBackendDisabledReason
? { disabled_reason: syntheticBackendDisabledReason }
: {})
};
};
const authenticateSyntheticAdminRequest = (req: Request): Response | null => {
if (!env.SYNTHETIC_CONTROL_ENABLED) {
return jsonResponse({ error: "not found" }, 404);
}
if (!env.SYNTHETIC_ADMIN_TOKEN) {
return jsonResponse(
{
error: "synthetic admin misconfigured",
detail: "SYNTHETIC_ADMIN_TOKEN is required when synthetic control is enabled."
},
500
);
}
if (getBearerToken(req) !== env.SYNTHETIC_ADMIN_TOKEN) {
return jsonResponse({ error: "unauthorized" }, 401);
}
if (syntheticBackendMode !== "synthetic") {
return jsonResponse(
{
error: "synthetic backend unavailable",
...buildSyntheticStatusBody()
},
409
);
}
return null;
};
const server = Bun.serve<WsData | LiveWsData>({
port: env.API_PORT,
fetch: async (req: Request, serverRef: any) => {
@ -1211,6 +1318,49 @@ const run = async () => {
return jsonResponse({ status: "ok" });
}
if (req.method === "GET" && url.pathname === "/admin/synthetic/status") {
const authError = authenticateSyntheticAdminRequest(req);
if (authError) {
return authError;
}
return jsonResponse(buildSyntheticStatusBody());
}
if (req.method === "GET" && url.pathname === "/admin/synthetic/control") {
const authError = authenticateSyntheticAdminRequest(req);
if (authError) {
return authError;
}
return jsonResponse({ control: syntheticControl });
}
if (req.method === "PUT" && url.pathname === "/admin/synthetic/control") {
const authError = authenticateSyntheticAdminRequest(req);
if (authError) {
return authError;
}
try {
const payload = SyntheticControlStateSchema.parse(await readJsonBody(req));
syntheticControl = await writeSyntheticControlState(syntheticControlKv, payload);
return jsonResponse({
control: syntheticControl,
derived: buildSyntheticDerivedStatus(
Date.now(),
syntheticControl,
syntheticProfileHits
)
});
} catch (error) {
return jsonResponse(
{
error: "invalid synthetic control payload",
detail: getErrorMessage(error)
},
400
);
}
}
if (req.method === "GET" && url.pathname === "/prints/options") {
try {
const limit = parseLimit(url.searchParams.get("limit"));
@ -1824,6 +1974,7 @@ const run = async () => {
logger.info("service stopping", { signal });
server.stop();
clearInterval(liveStateMetricsTimer);
await stopSyntheticControlWatch();
await liveState.close();
if (redis && redis.isOpen) {

View file

@ -0,0 +1,93 @@
import {
SyntheticDerivedStatusSchema,
buildEmptySyntheticProfileHitCounts,
getSyntheticSessionState,
type SmartMoneyEvent,
type SmartMoneyProfileId,
type SyntheticControlState,
type SyntheticDerivedStatus
} from "@islandflow/types";
export type SyntheticBackendMode = "synthetic" | "mixed" | "live";
export type RollingSyntheticProfileHits = Record<SmartMoneyProfileId, number[]>;
export const createRollingSyntheticProfileHits = (): RollingSyntheticProfileHits => ({
institutional_directional: [],
retail_whale: [],
event_driven: [],
vol_seller: [],
arbitrage: [],
hedge_reactive: []
});
export const resolveSyntheticBackendMode = (
optionsAdapter: string,
equitiesAdapter: string
): SyntheticBackendMode => {
const optionsSynthetic = optionsAdapter === "synthetic";
const equitiesSynthetic = equitiesAdapter === "synthetic";
if (optionsSynthetic && equitiesSynthetic) {
return "synthetic";
}
if (optionsSynthetic || equitiesSynthetic) {
return "mixed";
}
return "live";
};
export const getSyntheticBackendDisabledReason = (
mode: SyntheticBackendMode
): string | undefined => {
if (mode === "synthetic") {
return undefined;
}
if (mode === "mixed") {
return "Synthetic control requires both hosted ingest adapters to run in synthetic mode.";
}
return "Hosted ingest adapters are not synthetic, so the internal synthetic control surface is unavailable.";
};
export const recordSyntheticProfileHit = (
state: RollingSyntheticProfileHits,
event: Pick<SmartMoneyEvent, "primary_profile_id" | "source_ts">
): void => {
if (!event.primary_profile_id) {
return;
}
state[event.primary_profile_id].push(event.source_ts);
};
export const getSyntheticProfileHitCounts = (
state: RollingSyntheticProfileHits,
now: number,
coverageWindowMinutes: number
): Record<SmartMoneyProfileId, number> => {
const floorTs = now - coverageWindowMinutes * 60_000;
const counts = buildEmptySyntheticProfileHitCounts();
for (const profileId of Object.keys(state) as SmartMoneyProfileId[]) {
const retained = state[profileId].filter((ts) => ts >= floorTs);
state[profileId] = retained;
counts[profileId] = retained.length;
}
return counts;
};
export const buildSyntheticDerivedStatus = (
now: number,
control: SyntheticControlState,
state: RollingSyntheticProfileHits
): SyntheticDerivedStatus => {
const session = getSyntheticSessionState(now, control);
return SyntheticDerivedStatusSchema.parse({
session_phase: session.session_phase,
regime: session.regime,
focus_symbols: session.focus_symbols,
profile_hit_counts: getSyntheticProfileHitCounts(
state,
now,
control.coverage_window_minutes
),
coverage_window_minutes: control.coverage_window_minutes
});
};

View file

@ -0,0 +1,69 @@
import { describe, expect, it } from "bun:test";
import { DEFAULT_SYNTHETIC_CONTROL_STATE } from "@islandflow/types";
import {
buildSyntheticDerivedStatus,
createRollingSyntheticProfileHits,
getSyntheticBackendDisabledReason,
getSyntheticProfileHitCounts,
recordSyntheticProfileHit,
resolveSyntheticBackendMode
} from "../src/synthetic-control";
describe("synthetic control backend mode", () => {
it("detects synthetic, mixed, and live hosted modes", () => {
expect(resolveSyntheticBackendMode("synthetic", "synthetic")).toBe("synthetic");
expect(resolveSyntheticBackendMode("synthetic", "alpaca")).toBe("mixed");
expect(resolveSyntheticBackendMode("alpaca", "alpaca")).toBe("live");
});
it("provides a useful disabled reason for non-synthetic modes", () => {
expect(getSyntheticBackendDisabledReason("mixed")).toContain("both hosted ingest adapters");
expect(getSyntheticBackendDisabledReason("live")).toContain("not synthetic");
});
});
describe("synthetic control rolling status", () => {
it("tracks public-profile hits inside the rolling coverage window", () => {
const hits = createRollingSyntheticProfileHits();
recordSyntheticProfileHit(hits, {
primary_profile_id: "event_driven",
source_ts: 1_000
});
recordSyntheticProfileHit(hits, {
primary_profile_id: "event_driven",
source_ts: 60_000
});
recordSyntheticProfileHit(hits, {
primary_profile_id: "arbitrage",
source_ts: 70_000
});
expect(getSyntheticProfileHitCounts(hits, 11 * 60_000, 10)).toEqual({
institutional_directional: 0,
retail_whale: 0,
event_driven: 1,
vol_seller: 0,
arbitrage: 1,
hedge_reactive: 0
});
});
it("builds derived status from the shared session engine", () => {
const hits = createRollingSyntheticProfileHits();
recordSyntheticProfileHit(hits, {
primary_profile_id: "hedge_reactive",
source_ts: Date.parse("2026-01-14T18:00:00Z")
});
const derived = buildSyntheticDerivedStatus(
Date.parse("2026-01-14T18:05:00Z"),
DEFAULT_SYNTHETIC_CONTROL_STATE,
hits
);
expect(derived.coverage_window_minutes).toBe(20);
expect(derived.focus_symbols.length).toBeGreaterThan(0);
expect(derived.profile_hit_counts.hedge_reactive).toBe(1);
});
});