Add equity print quote joins

This commit is contained in:
dirtydishes 2026-01-04 17:07:43 -05:00
parent 9908c431f0
commit 3164167bee
15 changed files with 1033 additions and 7 deletions

View file

@ -3,13 +3,17 @@ import { createLogger } from "@islandflow/observability";
import {
SUBJECT_ALERTS,
SUBJECT_CLASSIFIER_HITS,
SUBJECT_EQUITY_JOINS,
SUBJECT_EQUITY_PRINTS,
SUBJECT_EQUITY_QUOTES,
SUBJECT_FLOW_PACKETS,
SUBJECT_OPTION_NBBO,
SUBJECT_OPTION_PRINTS,
STREAM_ALERTS,
STREAM_CLASSIFIER_HITS,
STREAM_EQUITY_JOINS,
STREAM_EQUITY_PRINTS,
STREAM_EQUITY_QUOTES,
STREAM_FLOW_PACKETS,
STREAM_OPTION_NBBO,
STREAM_OPTION_PRINTS,
@ -22,15 +26,21 @@ import {
createClickHouseClient,
ensureAlertsTable,
ensureClassifierHitsTable,
ensureEquityPrintJoinsTable,
ensureEquityPrintsTable,
ensureEquityQuotesTable,
ensureFlowPacketsTable,
ensureOptionNBBOTable,
ensureOptionPrintsTable,
fetchRecentAlerts,
fetchRecentClassifierHits,
fetchRecentEquityPrintJoins,
fetchRecentFlowPackets,
fetchRecentEquityQuotes,
fetchRecentOptionNBBO,
fetchEquityPrintsAfter,
fetchEquityPrintJoinsAfter,
fetchEquityQuotesAfter,
fetchRecentEquityPrints,
fetchOptionNBBOAfter,
fetchOptionPrintsAfter,
@ -40,6 +50,8 @@ import {
AlertEventSchema,
ClassifierHitEventSchema,
EquityPrintSchema,
EquityPrintJoinSchema,
EquityQuoteSchema,
FlowPacketSchema,
OptionNBBOSchema,
OptionPrintSchema
@ -93,7 +105,15 @@ const replayParamsSchema = z.object({
limit: z.coerce.number().int().positive().max(1000).default(200)
});
type Channel = "options" | "options-nbbo" | "equities" | "flow" | "classifier-hits" | "alerts";
type Channel =
| "options"
| "options-nbbo"
| "equities"
| "equity-quotes"
| "equity-joins"
| "flow"
| "classifier-hits"
| "alerts";
type WsData = {
channel: Channel;
@ -102,6 +122,8 @@ type WsData = {
const optionSockets = new Set<WebSocket<WsData>>();
const optionNbboSockets = new Set<WebSocket<WsData>>();
const equitySockets = new Set<WebSocket<WsData>>();
const equityQuoteSockets = new Set<WebSocket<WsData>>();
const equityJoinSockets = new Set<WebSocket<WsData>>();
const flowSockets = new Set<WebSocket<WsData>>();
const classifierHitSockets = new Set<WebSocket<WsData>>();
const alertSockets = new Set<WebSocket<WsData>>();
@ -202,6 +224,32 @@ const run = async () => {
num_replicas: 1
});
await ensureStream(jsm, {
name: STREAM_EQUITY_QUOTES,
subjects: [SUBJECT_EQUITY_QUOTES],
retention: "limits",
storage: "file",
discard: "old",
max_msgs_per_subject: -1,
max_msgs: -1,
max_bytes: -1,
max_age: 0,
num_replicas: 1
});
await ensureStream(jsm, {
name: STREAM_EQUITY_JOINS,
subjects: [SUBJECT_EQUITY_JOINS],
retention: "limits",
storage: "file",
discard: "old",
max_msgs_per_subject: -1,
max_msgs: -1,
max_bytes: -1,
max_age: 0,
num_replicas: 1
});
await ensureStream(jsm, {
name: STREAM_FLOW_PACKETS,
subjects: [SUBJECT_FLOW_PACKETS],
@ -250,6 +298,8 @@ const run = async () => {
await ensureOptionPrintsTable(clickhouse);
await ensureOptionNBBOTable(clickhouse);
await ensureEquityPrintsTable(clickhouse);
await ensureEquityQuotesTable(clickhouse);
await ensureEquityPrintJoinsTable(clickhouse);
await ensureFlowPacketsTable(clickhouse);
await ensureClassifierHitsTable(clickhouse);
await ensureAlertsTable(clickhouse);
@ -311,6 +361,18 @@ const run = async () => {
"api-equity-prints"
);
const equityQuoteSubscription = await subscribeWithReset(
SUBJECT_EQUITY_QUOTES,
STREAM_EQUITY_QUOTES,
"api-equity-quotes"
);
const equityJoinSubscription = await subscribeWithReset(
SUBJECT_EQUITY_JOINS,
STREAM_EQUITY_JOINS,
"api-equity-joins"
);
const flowSubscription = await subscribeWithReset(
SUBJECT_FLOW_PACKETS,
STREAM_FLOW_PACKETS,
@ -374,6 +436,36 @@ const run = async () => {
}
};
const pumpEquityQuotes = async () => {
for await (const msg of equityQuoteSubscription.messages) {
try {
const payload = EquityQuoteSchema.parse(equityQuoteSubscription.decode(msg));
broadcast(equityQuoteSockets, { type: "equity-quote", payload });
msg.ack();
} catch (error) {
logger.error("failed to process equity quote", {
error: error instanceof Error ? error.message : String(error)
});
msg.term();
}
}
};
const pumpEquityJoins = async () => {
for await (const msg of equityJoinSubscription.messages) {
try {
const payload = EquityPrintJoinSchema.parse(equityJoinSubscription.decode(msg));
broadcast(equityJoinSockets, { type: "equity-join", payload });
msg.ack();
} catch (error) {
logger.error("failed to process equity join", {
error: error instanceof Error ? error.message : String(error)
});
msg.term();
}
}
};
const pumpFlow = async () => {
for await (const msg of flowSubscription.messages) {
try {
@ -422,6 +514,8 @@ const run = async () => {
void pumpOptions();
void pumpOptionNbbo();
void pumpEquities();
void pumpEquityQuotes();
void pumpEquityJoins();
void pumpFlow();
void pumpClassifierHits();
void pumpAlerts();
@ -453,6 +547,18 @@ const run = async () => {
return jsonResponse({ data });
}
if (req.method === "GET" && url.pathname === "/quotes/equities") {
const limit = parseLimit(url.searchParams.get("limit"));
const data = await fetchRecentEquityQuotes(clickhouse, limit);
return jsonResponse({ data });
}
if (req.method === "GET" && url.pathname === "/joins/equities") {
const limit = parseLimit(url.searchParams.get("limit"));
const data = await fetchRecentEquityPrintJoins(clickhouse, limit);
return jsonResponse({ data });
}
if (req.method === "GET" && url.pathname === "/flow/packets") {
const limit = parseLimit(url.searchParams.get("limit"));
const data = await fetchRecentFlowPackets(clickhouse, limit);
@ -495,6 +601,22 @@ const run = async () => {
return jsonResponse({ data, next });
}
if (req.method === "GET" && url.pathname === "/replay/equity-quotes") {
const { afterTs, afterSeq, limit } = parseReplayParams(url);
const data = await fetchEquityQuotesAfter(clickhouse, afterTs, afterSeq, limit);
const last = data.at(-1);
const next = last ? { ts: last.ts, seq: last.seq } : null;
return jsonResponse({ data, next });
}
if (req.method === "GET" && url.pathname === "/replay/equity-joins") {
const { afterTs, afterSeq, limit } = parseReplayParams(url);
const data = await fetchEquityPrintJoinsAfter(clickhouse, afterTs, afterSeq, limit);
const last = data.at(-1);
const next = last ? { ts: last.source_ts, seq: last.seq } : null;
return jsonResponse({ data, next });
}
if (req.method === "GET" && url.pathname === "/ws/options") {
if (serverRef.upgrade(req, { data: { channel: "options" } })) {
return new Response(null, { status: 101 });
@ -519,6 +641,22 @@ const run = async () => {
return jsonResponse({ error: "websocket upgrade failed" }, 400);
}
if (req.method === "GET" && url.pathname === "/ws/equity-quotes") {
if (serverRef.upgrade(req, { data: { channel: "equity-quotes" } })) {
return new Response(null, { status: 101 });
}
return jsonResponse({ error: "websocket upgrade failed" }, 400);
}
if (req.method === "GET" && url.pathname === "/ws/equity-joins") {
if (serverRef.upgrade(req, { data: { channel: "equity-joins" } })) {
return new Response(null, { status: 101 });
}
return jsonResponse({ error: "websocket upgrade failed" }, 400);
}
if (req.method === "GET" && url.pathname === "/ws/flow") {
if (serverRef.upgrade(req, { data: { channel: "flow" } })) {
return new Response(null, { status: 101 });
@ -553,6 +691,10 @@ const run = async () => {
optionNbboSockets.add(socket);
} else if (socket.data.channel === "equities") {
equitySockets.add(socket);
} else if (socket.data.channel === "equity-quotes") {
equityQuoteSockets.add(socket);
} else if (socket.data.channel === "equity-joins") {
equityJoinSockets.add(socket);
} else if (socket.data.channel === "flow") {
flowSockets.add(socket);
} else if (socket.data.channel === "classifier-hits") {
@ -570,6 +712,10 @@ const run = async () => {
optionNbboSockets.delete(socket);
} else if (socket.data.channel === "equities") {
equitySockets.delete(socket);
} else if (socket.data.channel === "equity-quotes") {
equityQuoteSockets.delete(socket);
} else if (socket.data.channel === "equity-joins") {
equityJoinSockets.delete(socket);
} else if (socket.data.channel === "flow") {
flowSockets.delete(socket);
} else if (socket.data.channel === "classifier-hits") {

View file

@ -0,0 +1,104 @@
import type { EquityPrint, EquityPrintJoin, EquityQuote } from "@islandflow/types";
export type EquityQuoteJoin = {
quote: EquityQuote | null;
ageMs: number;
stale: boolean;
};
export type QuotePlacement = "AA" | "A" | "B" | "BB" | "MID" | "MISSING" | "STALE";
const roundTo = (value: number, digits = 4): number => {
if (!Number.isFinite(value)) {
return 0;
}
return Number(value.toFixed(digits));
};
export const classifyQuotePlacement = (
price: number,
join: EquityQuoteJoin
): QuotePlacement => {
if (!Number.isFinite(price)) {
return "MISSING";
}
if (!join.quote) {
return "MISSING";
}
if (join.stale) {
return "STALE";
}
const bid = join.quote.bid;
const ask = join.quote.ask;
if (!Number.isFinite(bid) || !Number.isFinite(ask) || ask <= 0) {
return "MISSING";
}
const spread = Math.max(0, ask - bid);
const epsilon = Math.max(0.01, spread * 0.05);
if (price > ask + epsilon) {
return "AA";
}
if (price >= ask - epsilon) {
return "A";
}
if (price < bid - epsilon) {
return "BB";
}
if (price <= bid + epsilon) {
return "B";
}
return "MID";
};
export const buildEquityPrintJoin = (
print: EquityPrint,
join: EquityQuoteJoin
): EquityPrintJoin => {
const joinQuality: Record<string, number> = {};
const placement = classifyQuotePlacement(print.price, join);
const features: Record<string, string | number | boolean> = {
underlying_id: print.underlying_id,
price: print.price,
size: print.size,
off_exchange_flag: print.offExchangeFlag,
print_ts: print.ts,
quote_placement: placement
};
if (!join.quote) {
joinQuality.quote_missing = 1;
} else {
joinQuality.quote_age_ms = join.ageMs;
if (join.stale) {
joinQuality.quote_stale = 1;
} else {
const bid = join.quote.bid;
const ask = join.quote.ask;
const mid = (bid + ask) / 2;
const spread = ask - bid;
features.quote_ts = join.quote.ts;
features.quote_bid = bid;
features.quote_ask = ask;
features.quote_mid = roundTo(mid);
features.quote_spread = roundTo(spread);
}
}
const joinId = `equityjoin:${print.trace_id}`;
return {
source_ts: print.source_ts,
ingest_ts: print.ingest_ts,
seq: print.seq,
trace_id: joinId,
id: joinId,
print_trace_id: print.trace_id,
quote_trace_id: join.quote?.trace_id ?? "",
features,
join_quality: joinQuality
};
};

View file

@ -3,11 +3,17 @@ import { createLogger } from "@islandflow/observability";
import {
SUBJECT_ALERTS,
SUBJECT_CLASSIFIER_HITS,
SUBJECT_EQUITY_JOINS,
SUBJECT_EQUITY_PRINTS,
SUBJECT_EQUITY_QUOTES,
SUBJECT_FLOW_PACKETS,
SUBJECT_OPTION_NBBO,
SUBJECT_OPTION_PRINTS,
STREAM_ALERTS,
STREAM_CLASSIFIER_HITS,
STREAM_EQUITY_JOINS,
STREAM_EQUITY_PRINTS,
STREAM_EQUITY_QUOTES,
STREAM_FLOW_PACKETS,
STREAM_OPTION_NBBO,
STREAM_OPTION_PRINTS,
@ -21,19 +27,27 @@ import {
createClickHouseClient,
ensureAlertsTable,
ensureClassifierHitsTable,
ensureEquityPrintJoinsTable,
ensureFlowPacketsTable,
insertAlert,
insertClassifierHit,
insertEquityPrintJoin,
insertFlowPacket
} from "@islandflow/storage";
import {
AlertEventSchema,
ClassifierHitEventSchema,
EquityPrintJoinSchema,
EquityPrintSchema,
EquityQuoteSchema,
FlowPacketSchema,
OptionNBBOSchema,
OptionPrintSchema,
type AlertEvent,
type ClassifierHitEvent,
type EquityPrint,
type EquityQuote,
type EquityPrintJoin,
type FlowPacket,
type OptionNBBO,
type OptionPrint
@ -41,6 +55,7 @@ import {
import { z } from "zod";
import { evaluateClassifiers, type ClassifierConfig } from "./classifiers";
import { parseContractId } from "./contracts";
import { buildEquityPrintJoin, type EquityQuoteJoin } from "./equity-joins";
import { createRedisClient, updateRollingStats, type RollingStatsConfig } from "./rolling-stats";
import { summarizeStructure, type ContractLeg } from "./structures";
@ -71,6 +86,7 @@ const envSchema = z.object({
}, z.boolean())
.default(false),
NBBO_MAX_AGE_MS: z.coerce.number().int().positive().default(1000),
EQUITY_QUOTE_MAX_AGE_MS: z.coerce.number().int().positive().default(1000),
CLASSIFIER_SWEEP_MIN_PREMIUM: z.coerce.number().positive().default(40_000),
CLASSIFIER_SWEEP_MIN_COUNT: z.coerce.number().int().positive().default(3),
CLASSIFIER_SWEEP_MIN_PREMIUM_Z: z.coerce.number().nonnegative().default(2),
@ -161,6 +177,7 @@ type ClusterState = {
const clusters = new Map<string, ClusterState>();
const nbboCache = new Map<string, OptionNBBO>();
const equityQuoteCache = new Map<string, EquityQuote>();
const recentLegsByKey = new Map<string, ContractLeg[]>();
const MAX_RECENT_LEGS = 20;
@ -341,6 +358,17 @@ const updateNbboCache = (nbbo: OptionNBBO): void => {
}
};
const updateEquityQuoteCache = (quote: EquityQuote): void => {
const existing = equityQuoteCache.get(quote.underlying_id);
if (
!existing ||
quote.ts > existing.ts ||
(quote.ts === existing.ts && quote.seq >= existing.seq)
) {
equityQuoteCache.set(quote.underlying_id, quote);
}
};
const selectNbbo = (contractId: string, ts: number): NbboJoin => {
const nbbo = nbboCache.get(contractId) ?? null;
if (!nbbo) {
@ -352,6 +380,17 @@ const selectNbbo = (contractId: string, ts: number): NbboJoin => {
return { nbbo, ageMs, stale };
};
const selectEquityQuote = (underlyingId: string, ts: number): EquityQuoteJoin => {
const quote = equityQuoteCache.get(underlyingId) ?? null;
if (!quote) {
return { quote: null, ageMs: env.EQUITY_QUOTE_MAX_AGE_MS + 1, stale: true };
}
const ageMs = Math.abs(ts - quote.ts);
const stale = ageMs > env.EQUITY_QUOTE_MAX_AGE_MS;
return { quote, ageMs, stale };
};
const classifyPlacement = (price: number, join: NbboJoin): NbboPlacement => {
if (!Number.isFinite(price)) {
return "MISSING";
@ -609,6 +648,25 @@ const emitClassifiers = async (
}
};
const emitEquityJoin = async (
clickhouse: ReturnType<typeof createClickHouseClient>,
js: Awaited<ReturnType<typeof connectJetStreamWithRetry>>["js"],
print: EquityPrint
): Promise<void> => {
const join = selectEquityQuote(print.underlying_id, print.ts);
const payload: EquityPrintJoin = EquityPrintJoinSchema.parse(buildEquityPrintJoin(print, join));
try {
await insertEquityPrintJoin(clickhouse, payload);
await publishJson(js, SUBJECT_EQUITY_JOINS, payload);
} catch (error) {
logger.error("failed to emit equity print join", {
error: error instanceof Error ? error.message : String(error),
trace_id: payload.trace_id
});
}
};
const flushEligibleClusters = async (
clickhouse: ReturnType<typeof createClickHouseClient>,
js: Awaited<ReturnType<typeof connectJetStreamWithRetry>>["js"],
@ -666,6 +724,32 @@ const run = async () => {
num_replicas: 1
});
await ensureStream(jsm, {
name: STREAM_EQUITY_PRINTS,
subjects: [SUBJECT_EQUITY_PRINTS],
retention: "limits",
storage: "file",
discard: "old",
max_msgs_per_subject: -1,
max_msgs: -1,
max_bytes: -1,
max_age: 0,
num_replicas: 1
});
await ensureStream(jsm, {
name: STREAM_EQUITY_QUOTES,
subjects: [SUBJECT_EQUITY_QUOTES],
retention: "limits",
storage: "file",
discard: "old",
max_msgs_per_subject: -1,
max_msgs: -1,
max_bytes: -1,
max_age: 0,
num_replicas: 1
});
await ensureStream(jsm, {
name: STREAM_FLOW_PACKETS,
subjects: [SUBJECT_FLOW_PACKETS],
@ -679,6 +763,19 @@ const run = async () => {
num_replicas: 1
});
await ensureStream(jsm, {
name: STREAM_EQUITY_JOINS,
subjects: [SUBJECT_EQUITY_JOINS],
retention: "limits",
storage: "file",
discard: "old",
max_msgs_per_subject: -1,
max_msgs: -1,
max_bytes: -1,
max_age: 0,
num_replicas: 1
});
await ensureStream(jsm, {
name: STREAM_CLASSIFIER_HITS,
subjects: [SUBJECT_CLASSIFIER_HITS],
@ -726,12 +823,15 @@ const run = async () => {
await retry("clickhouse table init", 20, 500, async () => {
await ensureFlowPacketsTable(clickhouse);
await ensureEquityPrintJoinsTable(clickhouse);
await ensureClassifierHitsTable(clickhouse);
await ensureAlertsTable(clickhouse);
});
const durableName = "compute-option-prints";
const nbboDurableName = "compute-option-nbbo";
const equityPrintDurableName = "compute-equity-prints";
const equityQuoteDurableName = "compute-equity-quotes";
if (env.COMPUTE_CONSUMER_RESET) {
try {
@ -791,6 +891,76 @@ const run = async () => {
}
}
if (env.COMPUTE_CONSUMER_RESET) {
try {
await jsm.consumers.delete(STREAM_EQUITY_PRINTS, equityPrintDurableName);
logger.warn("reset jetstream consumer", { durable: equityPrintDurableName });
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
if (!message.includes("not found")) {
logger.warn("failed to reset jetstream consumer", {
durable: equityPrintDurableName,
error: message
});
}
}
} else {
try {
const info = await jsm.consumers.info(STREAM_EQUITY_PRINTS, equityPrintDurableName);
if (info?.config?.deliver_policy && info.config.deliver_policy !== env.COMPUTE_DELIVER_POLICY) {
logger.warn("resetting consumer due to deliver policy change", {
durable: equityPrintDurableName,
current: info.config.deliver_policy,
desired: env.COMPUTE_DELIVER_POLICY
});
await jsm.consumers.delete(STREAM_EQUITY_PRINTS, equityPrintDurableName);
}
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
if (!message.includes("not found")) {
logger.warn("failed to inspect jetstream consumer", {
durable: equityPrintDurableName,
error: message
});
}
}
}
if (env.COMPUTE_CONSUMER_RESET) {
try {
await jsm.consumers.delete(STREAM_EQUITY_QUOTES, equityQuoteDurableName);
logger.warn("reset jetstream consumer", { durable: equityQuoteDurableName });
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
if (!message.includes("not found")) {
logger.warn("failed to reset jetstream consumer", {
durable: equityQuoteDurableName,
error: message
});
}
}
} else {
try {
const info = await jsm.consumers.info(STREAM_EQUITY_QUOTES, equityQuoteDurableName);
if (info?.config?.deliver_policy && info.config.deliver_policy !== env.COMPUTE_DELIVER_POLICY) {
logger.warn("resetting consumer due to deliver policy change", {
durable: equityQuoteDurableName,
current: info.config.deliver_policy,
desired: env.COMPUTE_DELIVER_POLICY
});
await jsm.consumers.delete(STREAM_EQUITY_QUOTES, equityQuoteDurableName);
}
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
if (!message.includes("not found")) {
logger.warn("failed to inspect jetstream consumer", {
durable: equityQuoteDurableName,
error: message
});
}
}
}
const subscription = await (async () => {
const opts = buildDurableConsumer(durableName);
applyDeliverPolicy(opts, env.COMPUTE_DELIVER_POLICY);
@ -863,6 +1033,81 @@ const run = async () => {
}
})();
const equitySubscription = await (async () => {
const opts = buildDurableConsumer(equityPrintDurableName);
applyDeliverPolicy(opts, env.COMPUTE_DELIVER_POLICY);
try {
return await subscribeJson(js, SUBJECT_EQUITY_PRINTS, opts);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
const shouldReset =
message.includes("duplicate subscription") ||
message.includes("durable requires") ||
message.includes("subject does not match consumer");
if (!shouldReset) {
throw error;
}
logger.warn("resetting jetstream consumer", { durable: equityPrintDurableName, error: message });
try {
await jsm.consumers.delete(STREAM_EQUITY_PRINTS, equityPrintDurableName);
} catch (deleteError) {
const deleteMessage = deleteError instanceof Error ? deleteError.message : String(deleteError);
if (!deleteMessage.includes("not found")) {
logger.warn("failed to delete jetstream consumer", {
durable: equityPrintDurableName,
error: deleteMessage
});
}
}
const resetOpts = buildDurableConsumer(equityPrintDurableName);
applyDeliverPolicy(resetOpts, env.COMPUTE_DELIVER_POLICY);
return await subscribeJson(js, SUBJECT_EQUITY_PRINTS, resetOpts);
}
})();
const equityQuoteSubscription = await (async () => {
const opts = buildDurableConsumer(equityQuoteDurableName);
applyDeliverPolicy(opts, env.COMPUTE_DELIVER_POLICY);
try {
return await subscribeJson(js, SUBJECT_EQUITY_QUOTES, opts);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
const shouldReset =
message.includes("duplicate subscription") ||
message.includes("durable requires") ||
message.includes("subject does not match consumer");
if (!shouldReset) {
throw error;
}
logger.warn("resetting jetstream consumer", {
durable: equityQuoteDurableName,
error: message
});
try {
await jsm.consumers.delete(STREAM_EQUITY_QUOTES, equityQuoteDurableName);
} catch (deleteError) {
const deleteMessage = deleteError instanceof Error ? deleteError.message : String(deleteError);
if (!deleteMessage.includes("not found")) {
logger.warn("failed to delete jetstream consumer", {
durable: equityQuoteDurableName,
error: deleteMessage
});
}
}
const resetOpts = buildDurableConsumer(equityQuoteDurableName);
applyDeliverPolicy(resetOpts, env.COMPUTE_DELIVER_POLICY);
return await subscribeJson(js, SUBJECT_EQUITY_QUOTES, resetOpts);
}
})();
const nbboLoop = async () => {
for await (const msg of nbboSubscription.messages) {
try {
@ -878,7 +1123,39 @@ const run = async () => {
}
};
const equityQuoteLoop = async () => {
for await (const msg of equityQuoteSubscription.messages) {
try {
const quote = EquityQuoteSchema.parse(equityQuoteSubscription.decode(msg));
updateEquityQuoteCache(quote);
msg.ack();
} catch (error) {
logger.error("failed to process equity quote", {
error: error instanceof Error ? error.message : String(error)
});
msg.term();
}
}
};
const equityPrintLoop = async () => {
for await (const msg of equitySubscription.messages) {
try {
const print = EquityPrintSchema.parse(equitySubscription.decode(msg));
await emitEquityJoin(clickhouse, js, print);
msg.ack();
} catch (error) {
logger.error("failed to process equity print", {
error: error instanceof Error ? error.message : String(error)
});
msg.term();
}
}
};
void nbboLoop();
void equityQuoteLoop();
void equityPrintLoop();
const shutdown = async (signal: string) => {
logger.info("service stopping", { signal });

View file

@ -0,0 +1,72 @@
import { describe, expect, it } from "bun:test";
import { buildEquityPrintJoin, classifyQuotePlacement } from "../src/equity-joins";
const basePrint = {
source_ts: 100,
ingest_ts: 110,
seq: 1,
trace_id: "print-1",
ts: 100,
underlying_id: "SPY",
price: 100,
size: 50,
exchange: "TEST",
offExchangeFlag: false
};
const baseQuote = {
source_ts: 95,
ingest_ts: 105,
seq: 2,
trace_id: "quote-1",
ts: 98,
underlying_id: "SPY",
bid: 99.9,
ask: 100.1
};
describe("equity join helpers", () => {
it("classifies placements with stale and missing quotes", () => {
const missing = classifyQuotePlacement(basePrint.price, {
quote: null,
ageMs: 1500,
stale: true
});
const stale = classifyQuotePlacement(basePrint.price, {
quote: baseQuote,
ageMs: 1500,
stale: true
});
expect(missing).toBe("MISSING");
expect(stale).toBe("STALE");
});
it("builds join payloads with quote features when fresh", () => {
const join = buildEquityPrintJoin(basePrint, {
quote: baseQuote,
ageMs: 5,
stale: false
});
expect(join.id).toBe("equityjoin:print-1");
expect(join.quote_trace_id).toBe("quote-1");
expect(join.join_quality.quote_age_ms).toBe(5);
expect(join.features.quote_bid).toBe(99.9);
expect(join.features.quote_ask).toBe(100.1);
expect(join.features.quote_mid).toBeCloseTo(100, 2);
expect(join.features.quote_spread).toBeCloseTo(0.2, 2);
});
it("marks missing quotes in join quality", () => {
const join = buildEquityPrintJoin(basePrint, {
quote: null,
ageMs: 2000,
stale: true
});
expect(join.quote_trace_id).toBe("");
expect(join.join_quality.quote_missing).toBe(1);
expect(join.features.quote_placement).toBe("MISSING");
});
});

View file

@ -1,4 +1,4 @@
import { SP500_SYMBOLS, type EquityPrint } from "@islandflow/types";
import { SP500_SYMBOLS, type EquityPrint, type EquityQuote } from "@islandflow/types";
import type { EquityIngestAdapter, EquityIngestHandlers } from "./types";
type SyntheticEquitiesAdapterConfig = {
@ -38,6 +38,25 @@ const buildSyntheticPrint = (
};
};
const buildSyntheticQuote = (
seq: number,
now: number,
symbol: string,
bid: number,
ask: number
): EquityQuote => {
return {
source_ts: now,
ingest_ts: now,
seq,
trace_id: `synthetic-equity-quote-${seq}`,
ts: now,
underlying_id: symbol,
bid,
ask
};
};
export const createSyntheticEquitiesAdapter = (
config: SyntheticEquitiesAdapterConfig
): EquityIngestAdapter => {
@ -45,6 +64,7 @@ export const createSyntheticEquitiesAdapter = (
name: "synthetic",
start: (handlers: EquityIngestHandlers) => {
let seq = 0;
let quoteSeq = 0;
let timer: ReturnType<typeof setInterval> | null = null;
let stopped = false;
@ -65,8 +85,18 @@ export const createSyntheticEquitiesAdapter = (
const size = 10 + (seq % 600);
const exchange = EXCHANGES[(seq + symbolHash) % EXCHANGES.length];
const offExchangeFlag = (seq + i) % 6 === 0;
const print = buildSyntheticPrint(seq, now + i * 4, symbol, price, size, exchange, offExchangeFlag);
const eventTs = now + i * 4;
const print = buildSyntheticPrint(seq, eventTs, symbol, price, size, exchange, offExchangeFlag);
void handlers.onTrade(print);
if (handlers.onQuote) {
quoteSeq += 1;
const spread = Math.max(0.02, Number((price * 0.002).toFixed(2)));
const bid = Math.max(0.01, Number((price - spread / 2).toFixed(2)));
const ask = Math.max(bid + 0.01, Number((price + spread / 2).toFixed(2)));
const quote = buildSyntheticQuote(quoteSeq, eventTs, symbol, bid, ask);
void handlers.onQuote(quote);
}
}
};

View file

@ -2,7 +2,9 @@ import { readEnv } from "@islandflow/config";
import { createLogger } from "@islandflow/observability";
import {
SUBJECT_EQUITY_PRINTS,
SUBJECT_EQUITY_QUOTES,
STREAM_EQUITY_PRINTS,
STREAM_EQUITY_QUOTES,
connectJetStreamWithRetry,
ensureStream,
publishJson
@ -10,9 +12,16 @@ import {
import {
createClickHouseClient,
ensureEquityPrintsTable,
insertEquityPrint
ensureEquityQuotesTable,
insertEquityPrint,
insertEquityQuote
} from "@islandflow/storage";
import { EquityPrintSchema, type EquityPrint } from "@islandflow/types";
import {
EquityPrintSchema,
EquityQuoteSchema,
type EquityPrint,
type EquityQuote
} from "@islandflow/types";
import { createSyntheticEquitiesAdapter } from "./adapters/synthetic";
import type { EquityIngestAdapter, StopHandler } from "./adapters/types";
import { z } from "zod";
@ -136,6 +145,19 @@ const run = async () => {
num_replicas: 1
});
await ensureStream(jsm, {
name: STREAM_EQUITY_QUOTES,
subjects: [SUBJECT_EQUITY_QUOTES],
retention: "limits",
storage: "file",
discard: "old",
max_msgs_per_subject: -1,
max_msgs: -1,
max_bytes: -1,
max_age: 0,
num_replicas: 1
});
const clickhouse = createClickHouseClient({
url: env.CLICKHOUSE_URL,
database: env.CLICKHOUSE_DATABASE
@ -143,11 +165,13 @@ const run = async () => {
await retry("clickhouse table init", 20, 500, async () => {
await ensureEquityPrintsTable(clickhouse);
await ensureEquityQuotesTable(clickhouse);
});
const adapter = selectAdapter(env.EQUITIES_INGEST_ADAPTER);
logger.info("ingest adapter selected", { adapter: adapter.name });
const allowPublish = buildThrottle(env.TESTING_MODE, env.TESTING_THROTTLE_MS);
const allowQuotePublish = buildThrottle(env.TESTING_MODE, env.TESTING_THROTTLE_MS);
const stopAdapter: StopHandler = await adapter.start({
onTrade: async (candidate: EquityPrint) => {
@ -176,6 +200,28 @@ const run = async () => {
trace_id: print.trace_id
});
}
},
onQuote: async (candidate: EquityQuote) => {
if (state.shuttingDown) {
return;
}
const now = Date.now();
if (!allowQuotePublish(now)) {
return;
}
const quote = EquityQuoteSchema.parse(candidate);
try {
await insertEquityQuote(clickhouse, quote);
await publishJson(js, SUBJECT_EQUITY_QUOTES, quote);
} catch (error) {
logger.error("failed to publish equity quote", {
error: error instanceof Error ? error.message : String(error),
trace_id: quote.trace_id
});
}
}
});