Implement server-backed live history
This commit is contained in:
parent
b4f87b50d2
commit
ba0daf5208
10 changed files with 402 additions and 44 deletions
|
|
@ -60,6 +60,7 @@ import {
|
|||
fetchEquityPrintsBefore,
|
||||
fetchEquityPrintsRange,
|
||||
fetchEquityPrintJoinsAfter,
|
||||
fetchEquityQuotesBefore,
|
||||
fetchEquityQuotesAfter,
|
||||
fetchInferredDarkBefore,
|
||||
fetchInferredDarkAfter,
|
||||
|
|
@ -977,19 +978,21 @@ const run = async () => {
|
|||
const fanoutLive = async (
|
||||
subscription: LiveSubscription,
|
||||
item: unknown,
|
||||
ingestChannel: "options" | "nbbo" | "equities" | "equity-candles" | "equity-overlay" | "equity-joins" | "flow" | "classifier-hits" | "alerts" | "inferred-dark"
|
||||
ingestChannel: "options" | "nbbo" | "equities" | "equity-quotes" | "equity-candles" | "equity-overlay" | "equity-joins" | "flow" | "classifier-hits" | "alerts" | "inferred-dark"
|
||||
) => {
|
||||
const watermark = await liveState.ingest(ingestChannel, item);
|
||||
|
||||
if (
|
||||
(ingestChannel === "options" ||
|
||||
ingestChannel === "nbbo" ||
|
||||
ingestChannel === "equities" ||
|
||||
ingestChannel === "equity-quotes" ||
|
||||
ingestChannel === "flow") &&
|
||||
!isLiveItemFresh(ingestChannel, item)
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
const watermark = await liveState.ingest(ingestChannel, item);
|
||||
const matchingSubscriptions =
|
||||
subscription.channel === "options" || subscription.channel === "flow"
|
||||
? [...subscriptionDefinitions.entries()].filter(([, candidate]) => candidate.channel === subscription.channel)
|
||||
|
|
@ -1088,6 +1091,7 @@ const run = async () => {
|
|||
try {
|
||||
const payload = EquityQuoteSchema.parse(equityQuoteSubscription.decode(msg));
|
||||
broadcast(equityQuoteSockets, { type: "equity-quote", payload });
|
||||
await fanoutLive({ channel: "equity-quotes" }, payload, "equity-quotes");
|
||||
msg.ack();
|
||||
} catch (error) {
|
||||
logger.error("failed to process equity quote", {
|
||||
|
|
@ -1380,6 +1384,12 @@ const run = async () => {
|
|||
return jsonResponse(buildHistoryResponse(data, (item) => ({ ts: item.ts, seq: item.seq })));
|
||||
}
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/history/equity-quotes") {
|
||||
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
|
||||
const data = await fetchEquityQuotesBefore(clickhouse, beforeTs, beforeSeq, limit);
|
||||
return jsonResponse(buildHistoryResponse(data, (item) => ({ ts: item.ts, seq: item.seq })));
|
||||
}
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/history/equity-joins") {
|
||||
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
|
||||
const data = await fetchEquityPrintJoinsBefore(clickhouse, beforeTs, beforeSeq, limit);
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import {
|
|||
fetchRecentEquityCandles,
|
||||
fetchRecentEquityPrintJoins,
|
||||
fetchRecentEquityPrints,
|
||||
fetchRecentEquityQuotes,
|
||||
fetchRecentFlowPackets,
|
||||
fetchRecentInferredDark,
|
||||
fetchRecentOptionNBBO,
|
||||
|
|
@ -18,6 +19,7 @@ import {
|
|||
EquityCandleSchema,
|
||||
EquityPrintJoinSchema,
|
||||
EquityPrintSchema,
|
||||
EquityQuoteSchema,
|
||||
FeedSnapshot,
|
||||
FlowPacketSchema,
|
||||
InferredDarkEventSchema,
|
||||
|
|
@ -44,6 +46,7 @@ const GENERIC_LIMIT_ENV_KEYS: Record<LiveGenericChannel, string> = {
|
|||
options: "LIVE_LIMIT_OPTIONS",
|
||||
nbbo: "LIVE_LIMIT_NBBO",
|
||||
equities: "LIVE_LIMIT_EQUITIES",
|
||||
"equity-quotes": "LIVE_LIMIT_EQUITY_QUOTES",
|
||||
"equity-joins": "LIVE_LIMIT_EQUITY_JOINS",
|
||||
flow: "LIVE_LIMIT_FLOW",
|
||||
"classifier-hits": "LIVE_LIMIT_CLASSIFIER_HITS",
|
||||
|
|
@ -69,6 +72,7 @@ export const LIVE_FRESHNESS_THRESHOLDS: Partial<Record<LiveGenericChannel, numbe
|
|||
options: 15_000,
|
||||
nbbo: 15_000,
|
||||
equities: 15_000,
|
||||
"equity-quotes": 15_000,
|
||||
flow: 30_000
|
||||
};
|
||||
|
||||
|
|
@ -102,6 +106,7 @@ export const resolveGenericLiveLimits = (env: NodeJS.ProcessEnv = process.env):
|
|||
options: parseGenericLimit(env, "options", DEFAULT_GENERIC_LIMIT),
|
||||
nbbo: parseGenericLimit(env, "nbbo", DEFAULT_GENERIC_LIMIT),
|
||||
equities: parseGenericLimit(env, "equities", DEFAULT_GENERIC_LIMIT),
|
||||
"equity-quotes": parseGenericLimit(env, "equity-quotes", DEFAULT_GENERIC_LIMIT),
|
||||
"equity-joins": parseGenericLimit(env, "equity-joins", DEFAULT_GENERIC_LIMIT),
|
||||
flow: parseGenericLimit(env, "flow", DEFAULT_GENERIC_LIMIT),
|
||||
"classifier-hits": parseGenericLimit(env, "classifier-hits", DEFAULT_GENERIC_LIMIT),
|
||||
|
|
@ -154,6 +159,14 @@ const getGenericConfig = (limits: GenericLiveLimits): {
|
|||
cursor: (item) => ({ ts: item.ts, seq: item.seq }),
|
||||
fetchRecent: fetchRecentEquityPrints
|
||||
},
|
||||
"equity-quotes": {
|
||||
redisKey: "live:equity-quotes",
|
||||
cursorField: "equity-quotes",
|
||||
limit: limits["equity-quotes"],
|
||||
parse: (value) => EquityQuoteSchema.parse(value),
|
||||
cursor: (item) => ({ ts: item.ts, seq: item.seq }),
|
||||
fetchRecent: fetchRecentEquityQuotes
|
||||
},
|
||||
"equity-joins": {
|
||||
redisKey: "live:equity-joins",
|
||||
cursorField: "equity-joins",
|
||||
|
|
@ -251,6 +264,7 @@ const extractFreshnessTs = (channel: LiveGenericChannel, item: any): number | nu
|
|||
case "options":
|
||||
case "nbbo":
|
||||
case "equities":
|
||||
case "equity-quotes":
|
||||
return typeof item.ts === "number" ? item.ts : null;
|
||||
case "flow":
|
||||
return typeof item.source_ts === "number" ? item.source_ts : null;
|
||||
|
|
@ -275,19 +289,6 @@ export const isLiveItemFresh = (
|
|||
return now - ts <= thresholdMs;
|
||||
};
|
||||
|
||||
const filterFreshGenericItems = <T>(
|
||||
channel: LiveGenericChannel,
|
||||
items: T[],
|
||||
now = Date.now()
|
||||
): T[] => {
|
||||
const thresholdMs = LIVE_FRESHNESS_THRESHOLDS[channel];
|
||||
if (!thresholdMs) {
|
||||
return items;
|
||||
}
|
||||
|
||||
return items.filter((item) => isLiveItemFresh(channel, item, now));
|
||||
};
|
||||
|
||||
const nextBeforeForItems = <T>(items: T[], cursorOf: (item: T) => Cursor): Cursor | null => {
|
||||
const last = items.at(-1);
|
||||
return last ? cursorOf(last) : null;
|
||||
|
|
@ -396,21 +397,17 @@ export class LiveStateManager {
|
|||
undefined,
|
||||
storageFilters
|
||||
);
|
||||
const freshItems = filterFreshGenericItems("options", items);
|
||||
return {
|
||||
subscription,
|
||||
items: freshItems,
|
||||
items,
|
||||
watermark: items[0] ? { ts: items[0].ts, seq: items[0].seq } : null,
|
||||
next_before: nextBeforeForItems(freshItems, (item) => ({ ts: item.ts, seq: item.seq }))
|
||||
next_before: nextBeforeForItems(items, (item) => ({ ts: item.ts, seq: item.seq }))
|
||||
};
|
||||
}
|
||||
|
||||
const config = this.generic.options;
|
||||
const items = filterFreshGenericItems(
|
||||
"options",
|
||||
(this.genericItems.get("options") ?? []).filter((item) =>
|
||||
matchesOptionPrintFilters(item, subscription.filters)
|
||||
)
|
||||
const items = (this.genericItems.get("options") ?? []).filter((item) =>
|
||||
matchesOptionPrintFilters(item, subscription.filters)
|
||||
);
|
||||
return {
|
||||
subscription,
|
||||
|
|
@ -421,11 +418,8 @@ export class LiveStateManager {
|
|||
}
|
||||
case "flow": {
|
||||
const config = this.generic.flow;
|
||||
const items = filterFreshGenericItems(
|
||||
"flow",
|
||||
(this.genericItems.get("flow") ?? []).filter((item) =>
|
||||
matchesFlowPacketFilters(item, subscription.filters)
|
||||
)
|
||||
const items = (this.genericItems.get("flow") ?? []).filter((item) =>
|
||||
matchesFlowPacketFilters(item, subscription.filters)
|
||||
);
|
||||
return {
|
||||
subscription,
|
||||
|
|
@ -464,10 +458,7 @@ export class LiveStateManager {
|
|||
}
|
||||
default: {
|
||||
const config = this.generic[subscription.channel];
|
||||
const items = filterFreshGenericItems(
|
||||
subscription.channel,
|
||||
this.genericItems.get(subscription.channel) ?? []
|
||||
);
|
||||
const items = this.genericItems.get(subscription.channel) ?? [];
|
||||
return {
|
||||
subscription,
|
||||
items,
|
||||
|
|
@ -513,9 +504,6 @@ export class LiveStateManager {
|
|||
default: {
|
||||
const config = this.generic[channel];
|
||||
const parsed = config.parse(item);
|
||||
if (!isLiveItemFresh(channel, parsed)) {
|
||||
return this.genericCursors.get(config.cursorField) ?? null;
|
||||
}
|
||||
const items = this.genericItems.get(channel) ?? [];
|
||||
const next = normalizeGenericItems(channel, [parsed, ...items], config);
|
||||
this.genericItems.set(channel, next);
|
||||
|
|
|
|||
|
|
@ -58,6 +58,7 @@ describe("LiveStateManager", () => {
|
|||
expect(limits.options).toBe(777);
|
||||
expect(limits.nbbo).toBe(100000);
|
||||
expect(limits.flow).toBe(10000);
|
||||
expect(limits["equity-quotes"]).toBe(10000);
|
||||
expect(limits.alerts).toBe(10000);
|
||||
});
|
||||
|
||||
|
|
@ -145,6 +146,7 @@ describe("LiveStateManager", () => {
|
|||
options: 10000,
|
||||
nbbo: 10000,
|
||||
equities: 10000,
|
||||
"equity-quotes": 10000,
|
||||
"equity-joins": 10000,
|
||||
flow: 2,
|
||||
"classifier-hits": 10000,
|
||||
|
|
@ -277,7 +279,7 @@ describe("LiveStateManager", () => {
|
|||
expect(flowSnapshot.items).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("suppresses stale items from live snapshots while preserving fresh ones", async () => {
|
||||
it("keeps stale persisted items in live snapshots", async () => {
|
||||
const manager = new LiveStateManager(makeClickHouse(), null);
|
||||
const now = Date.now();
|
||||
|
||||
|
|
@ -383,16 +385,20 @@ describe("LiveStateManager", () => {
|
|||
]);
|
||||
|
||||
expect((optionsSnapshot.items as Array<{ trace_id: string }>).map((item) => item.trace_id)).toEqual([
|
||||
"opt-fresh"
|
||||
"opt-fresh",
|
||||
"opt-stale"
|
||||
]);
|
||||
expect((nbboSnapshot.items as Array<{ trace_id: string }>).map((item) => item.trace_id)).toEqual([
|
||||
"nbbo-fresh"
|
||||
"nbbo-fresh",
|
||||
"nbbo-stale"
|
||||
]);
|
||||
expect((equitiesSnapshot.items as Array<{ trace_id: string }>).map((item) => item.trace_id)).toEqual([
|
||||
"eq-fresh"
|
||||
"eq-fresh",
|
||||
"eq-stale"
|
||||
]);
|
||||
expect((flowSnapshot.items as Array<{ id: string }>).map((item) => item.id)).toEqual([
|
||||
"flow-fresh"
|
||||
"flow-fresh",
|
||||
"flow-stale"
|
||||
]);
|
||||
});
|
||||
|
||||
|
|
@ -476,7 +482,7 @@ describe("LiveStateManager", () => {
|
|||
]);
|
||||
});
|
||||
|
||||
it("rejects stale ingest for freshness-gated channels", async () => {
|
||||
it("stores older valid ingest for freshness-gated channels", async () => {
|
||||
const manager = new LiveStateManager(makeClickHouse(), null);
|
||||
const now = Date.now();
|
||||
|
||||
|
|
@ -494,7 +500,71 @@ describe("LiveStateManager", () => {
|
|||
});
|
||||
|
||||
const snapshot = await manager.getSnapshot({ channel: "equities" });
|
||||
expect(snapshot.items).toHaveLength(0);
|
||||
expect(snapshot.items).toHaveLength(1);
|
||||
expect(snapshot.next_before).toEqual({ ts: now - 60_000, seq: 1 });
|
||||
});
|
||||
|
||||
it("hydrates equity quotes from redis", async () => {
|
||||
const redis = makeRedis();
|
||||
const now = Date.now();
|
||||
await redis.lPush(
|
||||
"live:equity-quotes",
|
||||
JSON.stringify({
|
||||
source_ts: now,
|
||||
ingest_ts: now + 1,
|
||||
seq: 1,
|
||||
trace_id: "quote-1",
|
||||
ts: now,
|
||||
underlying_id: "SPY",
|
||||
bid: 450,
|
||||
ask: 450.01
|
||||
})
|
||||
);
|
||||
await redis.hSet("live:cursors", "equity-quotes", JSON.stringify({ ts: now, seq: 1 }));
|
||||
|
||||
const manager = new LiveStateManager(makeClickHouse(), redis as never);
|
||||
await manager.hydrate();
|
||||
const snapshot = await manager.getSnapshot({ channel: "equity-quotes" });
|
||||
|
||||
expect(snapshot.items).toHaveLength(1);
|
||||
expect(snapshot.watermark).toEqual({ ts: now, seq: 1 });
|
||||
expect(snapshot.next_before).toEqual({ ts: now, seq: 1 });
|
||||
});
|
||||
|
||||
it("hydrates equity quotes from clickhouse when redis is empty and persists hot cache", async () => {
|
||||
const redis = makeRedis();
|
||||
const now = Date.now();
|
||||
const clickhouse = {
|
||||
...makeClickHouse(),
|
||||
query: async ({ query }: { query: string }) => ({
|
||||
async json<T>() {
|
||||
if (query.includes("equity_quotes")) {
|
||||
return [
|
||||
{
|
||||
source_ts: now,
|
||||
ingest_ts: now + 1,
|
||||
seq: 2,
|
||||
trace_id: "quote-2",
|
||||
ts: now,
|
||||
underlying_id: "SPY",
|
||||
bid: 451,
|
||||
ask: 451.01
|
||||
}
|
||||
] as T;
|
||||
}
|
||||
return [] as T;
|
||||
}
|
||||
})
|
||||
} as ClickHouseClient;
|
||||
|
||||
const manager = new LiveStateManager(clickhouse, redis as never);
|
||||
await manager.hydrate();
|
||||
const snapshot = await manager.getSnapshot({ channel: "equity-quotes" });
|
||||
const persisted = await redis.lRange("live:equity-quotes", 0, 10);
|
||||
|
||||
expect(snapshot.items).toHaveLength(1);
|
||||
expect(snapshot.watermark).toEqual({ ts: now, seq: 2 });
|
||||
expect(persisted).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("exposes freshness helper for event fanout gating", () => {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue