Implement smart money event bridge

This commit is contained in:
dirtydishes 2026-05-04 17:36:03 -04:00
parent a8cc2e3875
commit 6822fa1ba4
16 changed files with 1047 additions and 15 deletions

View file

@ -9,6 +9,7 @@ import {
SUBJECT_EQUITY_QUOTES,
SUBJECT_INFERRED_DARK,
SUBJECT_FLOW_PACKETS,
SUBJECT_SMART_MONEY_EVENTS,
SUBJECT_OPTION_NBBO,
SUBJECT_OPTION_SIGNAL_PRINTS,
STREAM_ALERTS,
@ -19,6 +20,7 @@ import {
STREAM_EQUITY_QUOTES,
STREAM_INFERRED_DARK,
STREAM_FLOW_PACKETS,
STREAM_SMART_MONEY_EVENTS,
STREAM_OPTION_NBBO,
STREAM_OPTION_SIGNAL_PRINTS,
buildDurableConsumer,
@ -36,17 +38,21 @@ import {
ensureEquityQuotesTable,
ensureInferredDarkTable,
ensureFlowPacketsTable,
ensureSmartMoneyEventsTable,
ensureOptionNBBOTable,
ensureOptionPrintsTable,
fetchAlertsAfter,
fetchAlertsBefore,
fetchClassifierHitsAfter,
fetchClassifierHitsBefore,
fetchSmartMoneyEventsAfter,
fetchSmartMoneyEventsBefore,
fetchFlowPacketsAfter,
fetchFlowPacketById,
fetchFlowPacketsBefore,
fetchRecentAlerts,
fetchRecentClassifierHits,
fetchRecentSmartMoneyEvents,
fetchRecentEquityPrintJoins,
fetchRecentFlowPackets,
fetchRecentInferredDark,
@ -95,6 +101,7 @@ import {
OptionSecurityTypeSchema,
OptionTypeSchema,
FlowPacketSchema,
SmartMoneyEventSchema,
OptionNBBOSchema,
OptionPrintSchema,
getSubscriptionKey
@ -256,6 +263,7 @@ type Channel =
| "equity-joins"
| "inferred-dark"
| "flow"
| "smart-money"
| "classifier-hits"
| "alerts";
@ -278,6 +286,7 @@ const equityQuoteSockets = new Set<LegacySocket>();
const equityJoinSockets = new Set<LegacySocket>();
const inferredDarkSockets = new Set<LegacySocket>();
const flowSockets = new Set<LegacySocket>();
const smartMoneySockets = new Set<LegacySocket>();
const classifierHitSockets = new Set<LegacySocket>();
const alertSockets = new Set<LegacySocket>();
const liveSocketSubscriptions = new Map<LiveSocket, Set<string>>();
@ -772,6 +781,19 @@ const run = async () => {
num_replicas: 1
});
await ensureStream(jsm, {
name: STREAM_SMART_MONEY_EVENTS,
subjects: [SUBJECT_SMART_MONEY_EVENTS],
retention: "limits",
storage: "file",
discard: "old",
max_msgs_per_subject: -1,
max_msgs: -1,
max_bytes: -1,
max_age: 0,
num_replicas: 1
});
await ensureStream(jsm, {
name: STREAM_CLASSIFIER_HITS,
subjects: [SUBJECT_CLASSIFIER_HITS],
@ -812,6 +834,7 @@ const run = async () => {
await ensureEquityPrintJoinsTable(clickhouse);
await ensureInferredDarkTable(clickhouse);
await ensureFlowPacketsTable(clickhouse);
await ensureSmartMoneyEventsTable(clickhouse);
await ensureClassifierHitsTable(clickhouse);
await ensureAlertsTable(clickhouse);
});
@ -918,6 +941,11 @@ const run = async () => {
stream: STREAM_FLOW_PACKETS,
durableName: "api-flow-packets"
},
{
subject: SUBJECT_SMART_MONEY_EVENTS,
stream: STREAM_SMART_MONEY_EVENTS,
durableName: "api-smart-money-events"
},
{
subject: SUBJECT_CLASSIFIER_HITS,
stream: STREAM_CLASSIFIER_HITS,
@ -1057,18 +1085,24 @@ const run = async () => {
consumerBindings[7].durableName
);
const classifierHitSubscription = await subscribeWithReset(
const smartMoneySubscription = await subscribeWithReset(
consumerBindings[8].subject,
consumerBindings[8].stream,
consumerBindings[8].durableName
);
const alertSubscription = await subscribeWithReset(
const classifierHitSubscription = await subscribeWithReset(
consumerBindings[9].subject,
consumerBindings[9].stream,
consumerBindings[9].durableName
);
const alertSubscription = await subscribeWithReset(
consumerBindings[10].subject,
consumerBindings[10].stream,
consumerBindings[10].durableName
);
const fanoutLive = async (
subscription: LiveSubscription,
item: unknown,
@ -1269,6 +1303,22 @@ const run = async () => {
}
};
const pumpSmartMoney = async () => {
for await (const msg of smartMoneySubscription.messages) {
try {
const payload = SmartMoneyEventSchema.parse(smartMoneySubscription.decode(msg));
broadcast(smartMoneySockets, { type: "smart-money", payload });
await fanoutLive({ channel: "smart-money" }, payload, "smart-money");
msg.ack();
} catch (error) {
logger.error("failed to process smart money event", {
error: error instanceof Error ? error.message : String(error)
});
msg.term();
}
}
};
const pumpClassifierHits = async () => {
for await (const msg of classifierHitSubscription.messages) {
try {
@ -1309,6 +1359,7 @@ const run = async () => {
void pumpEquityJoins();
void pumpInferredDark();
void pumpFlow();
void pumpSmartMoney();
void pumpClassifierHits();
void pumpAlerts();
@ -1429,6 +1480,12 @@ const run = async () => {
return jsonResponse({ data });
}
if (req.method === "GET" && url.pathname === "/flow/smart-money") {
const limit = parseLimit(url.searchParams.get("limit"));
const data = await fetchRecentSmartMoneyEvents(clickhouse, limit);
return jsonResponse({ data });
}
if (req.method === "GET" && url.pathname === "/flow/classifier-hits") {
const limit = parseLimit(url.searchParams.get("limit"));
const data = await fetchRecentClassifierHits(clickhouse, limit);
@ -1507,6 +1564,14 @@ const run = async () => {
);
}
if (req.method === "GET" && url.pathname === "/history/smart-money") {
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
const data = await fetchSmartMoneyEventsBefore(clickhouse, beforeTs, beforeSeq, limit);
return jsonResponse(
buildHistoryResponse(data, (item) => ({ ts: item.source_ts, seq: item.seq }))
);
}
if (req.method === "GET" && url.pathname === "/history/classifier-hits") {
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
const data = await fetchClassifierHitsBefore(clickhouse, beforeTs, beforeSeq, limit);
@ -1651,6 +1716,14 @@ const run = async () => {
return jsonResponse({ data, next });
}
if (req.method === "GET" && url.pathname === "/replay/smart-money") {
const { afterTs, afterSeq, limit } = parseReplayParams(url);
const data = await fetchSmartMoneyEventsAfter(clickhouse, afterTs, afterSeq, limit);
const last = data.at(-1);
const next = last ? { ts: last.source_ts, seq: last.seq } : null;
return jsonResponse({ data, next });
}
if (req.method === "GET" && url.pathname === "/replay/classifier-hits") {
const { afterTs, afterSeq, limit } = parseReplayParams(url);
const data = await fetchClassifierHitsAfter(clickhouse, afterTs, afterSeq, limit);
@ -1739,6 +1812,14 @@ const run = async () => {
return jsonResponse({ error: "websocket upgrade failed" }, 400);
}
if (req.method === "GET" && url.pathname === "/ws/smart-money") {
if (serverRef.upgrade(req, { data: { channel: "smart-money" } })) {
return new Response(null, { status: 101 });
}
return jsonResponse({ error: "websocket upgrade failed" }, 400);
}
if (req.method === "GET" && url.pathname === "/ws/alerts") {
if (serverRef.upgrade(req, { data: { channel: "alerts" } })) {
return new Response(null, { status: 101 });
@ -1781,6 +1862,8 @@ const run = async () => {
inferredDarkSockets.add(socket);
} else if (socket.data.channel === "flow") {
flowSockets.add(socket);
} else if (socket.data.channel === "smart-money") {
smartMoneySockets.add(socket);
} else if (socket.data.channel === "classifier-hits") {
classifierHitSockets.add(socket);
} else {
@ -1842,6 +1925,8 @@ const run = async () => {
inferredDarkSockets.delete(socket);
} else if (socket.data.channel === "flow") {
flowSockets.delete(socket);
} else if (socket.data.channel === "smart-money") {
smartMoneySockets.delete(socket);
} else if (socket.data.channel === "classifier-hits") {
classifierHitSockets.delete(socket);
} else {

View file

@ -9,6 +9,7 @@ import {
fetchRecentFlowPackets,
fetchRecentInferredDark,
fetchRecentOptionNBBO,
fetchRecentSmartMoneyEvents,
type ClickHouseClient
} from "@islandflow/storage";
import type { OptionPrintQueryFilters } from "@islandflow/storage";
@ -30,6 +31,7 @@ import {
matchesOptionPrintFilters,
OptionNBBOSchema,
OptionPrintSchema,
SmartMoneyEventSchema,
type OptionFlowFilters,
type Cursor,
type EquityCandle,
@ -51,6 +53,7 @@ const GENERIC_LIMIT_ENV_KEYS: Record<LiveGenericChannel, string> = {
"equity-quotes": "LIVE_LIMIT_EQUITY_QUOTES",
"equity-joins": "LIVE_LIMIT_EQUITY_JOINS",
flow: "LIVE_LIMIT_FLOW",
"smart-money": "LIVE_LIMIT_SMART_MONEY",
"classifier-hits": "LIVE_LIMIT_CLASSIFIER_HITS",
alerts: "LIVE_LIMIT_ALERTS",
"inferred-dark": "LIVE_LIMIT_INFERRED_DARK"
@ -111,6 +114,7 @@ export const resolveGenericLiveLimits = (env: NodeJS.ProcessEnv = process.env):
"equity-quotes": parseGenericLimit(env, "equity-quotes", DEFAULT_GENERIC_LIMIT),
"equity-joins": parseGenericLimit(env, "equity-joins", DEFAULT_GENERIC_LIMIT),
flow: parseGenericLimit(env, "flow", DEFAULT_GENERIC_LIMIT),
"smart-money": parseGenericLimit(env, "smart-money", DEFAULT_GENERIC_LIMIT),
"classifier-hits": parseGenericLimit(env, "classifier-hits", DEFAULT_GENERIC_LIMIT),
alerts: parseGenericLimit(env, "alerts", DEFAULT_GENERIC_LIMIT),
"inferred-dark": parseGenericLimit(env, "inferred-dark", DEFAULT_GENERIC_LIMIT)
@ -185,6 +189,14 @@ const getGenericConfig = (limits: GenericLiveLimits): {
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
fetchRecent: fetchRecentFlowPackets
},
"smart-money": {
redisKey: "live:smart-money",
cursorField: "smart-money",
limit: limits["smart-money"],
parse: (value) => SmartMoneyEventSchema.parse(value),
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
fetchRecent: fetchRecentSmartMoneyEvents
},
"classifier-hits": {
redisKey: "live:classifier-hits",
cursorField: "classifier-hits",