Fix live tape freshness and filter UX

This commit is contained in:
dirtydishes 2026-04-28 17:13:46 -04:00
parent 27b0a399e6
commit 75fc6f9373
8 changed files with 1087 additions and 159 deletions

View file

@ -65,6 +65,13 @@ type GenericFeedConfig = {
fetchRecent: (clickhouse: ClickHouseClient, limit: number) => Promise<any[]>;
};
const LIVE_FRESHNESS_THRESHOLDS: Partial<Record<LiveGenericChannel, number>> = {
options: 15_000,
nbbo: 15_000,
equities: 15_000,
flow: 30_000
};
export type GenericLiveLimits = Record<LiveGenericChannel, number>;
const parseGenericLimit = (
@ -201,6 +208,76 @@ const parseJsonList = <T>(payloads: string[], parse: (value: unknown) => T): T[]
return items;
};
const compareCursors = (a: Cursor, b: Cursor): number => (b.ts - a.ts) || (b.seq - a.seq);
const sortGenericItems = <T>(items: T[], cursorOf: (item: T) => Cursor): T[] =>
[...items].sort((a, b) => compareCursors(cursorOf(a), cursorOf(b)));
const keepNewestNbboByContract = <T extends { option_contract_id: string }>(
items: T[],
cursorOf: (item: T) => Cursor,
limit: number
): T[] => {
const latestByContract = new Map<string, T>();
for (const item of items) {
const existing = latestByContract.get(item.option_contract_id);
if (!existing || compareCursors(cursorOf(item), cursorOf(existing)) < 0) {
latestByContract.set(item.option_contract_id, item);
}
}
return sortGenericItems(Array.from(latestByContract.values()), cursorOf).slice(0, limit);
};
const normalizeGenericItems = <T>(
channel: LiveGenericChannel,
items: T[],
config: GenericFeedConfig
): T[] => {
if (channel === "nbbo") {
return keepNewestNbboByContract(
items as Array<T & { option_contract_id: string }>,
config.cursor,
config.limit
);
}
return sortGenericItems(items, config.cursor).slice(0, config.limit);
};
const extractFreshnessTs = (channel: LiveGenericChannel, item: any): number | null => {
switch (channel) {
case "options":
case "nbbo":
case "equities":
return typeof item.ts === "number" ? item.ts : null;
case "flow":
return typeof item.source_ts === "number" ? item.source_ts : null;
default:
return null;
}
};
const filterFreshGenericItems = <T>(
channel: LiveGenericChannel,
items: T[],
now = Date.now()
): T[] => {
const thresholdMs = LIVE_FRESHNESS_THRESHOLDS[channel];
if (!thresholdMs) {
return items;
}
return items.filter((item) => {
const ts = extractFreshnessTs(channel, item);
if (ts === null) {
return false;
}
return now - ts <= thresholdMs;
});
};
const nextBeforeForItems = <T>(items: T[], cursorOf: (item: T) => Cursor): Cursor | null => {
const last = items.at(-1);
return last ? cursorOf(last) : null;
@ -263,17 +340,24 @@ export class LiveStateManager {
const config = this.generic[channel];
if (this.redis?.isOpen) {
const payloads = await this.redis.lRange(config.redisKey, 0, config.limit - 1);
const cached = parseJsonList(payloads, config.parse);
const cached = normalizeGenericItems(channel, parseJsonList(payloads, config.parse), config);
if (cached.length > 0) {
this.genericItems.set(channel, cached);
this.stats.genericHydrateFromRedis += 1;
this.stats.cacheDepthByKey.set(config.redisKey, cached.length);
this.genericCursors.set(config.cursorField, parseCursor(await this.redis.hGet(CURSOR_HASH_KEY, config.cursorField)));
await this.persistList(
config.redisKey,
config.cursorField,
cached,
config.limit,
this.genericCursors.get(config.cursorField) ?? null
);
return;
}
}
const fresh = await config.fetchRecent(this.clickhouse, config.limit);
const fresh = normalizeGenericItems(channel, await config.fetchRecent(this.clickhouse, config.limit), config);
this.stats.genericHydrateFromClickHouse += 1;
this.stats.cacheDepthByKey.set(config.redisKey, fresh.length);
this.genericItems.set(channel, fresh);
@ -302,17 +386,21 @@ export class LiveStateManager {
undefined,
storageFilters
);
const freshItems = filterFreshGenericItems("options", items);
return {
subscription,
items,
items: freshItems,
watermark: items[0] ? { ts: items[0].ts, seq: items[0].seq } : null,
next_before: nextBeforeForItems(items, (item) => ({ ts: item.ts, seq: item.seq }))
next_before: nextBeforeForItems(freshItems, (item) => ({ ts: item.ts, seq: item.seq }))
};
}
const config = this.generic.options;
const items = (this.genericItems.get("options") ?? []).filter((item) =>
matchesOptionPrintFilters(item, subscription.filters)
const items = filterFreshGenericItems(
"options",
(this.genericItems.get("options") ?? []).filter((item) =>
matchesOptionPrintFilters(item, subscription.filters)
)
);
return {
subscription,
@ -323,8 +411,11 @@ export class LiveStateManager {
}
case "flow": {
const config = this.generic.flow;
const items = (this.genericItems.get("flow") ?? []).filter((item) =>
matchesFlowPacketFilters(item, subscription.filters)
const items = filterFreshGenericItems(
"flow",
(this.genericItems.get("flow") ?? []).filter((item) =>
matchesFlowPacketFilters(item, subscription.filters)
)
);
return {
subscription,
@ -363,7 +454,10 @@ export class LiveStateManager {
}
default: {
const config = this.generic[subscription.channel];
const items = this.genericItems.get(subscription.channel) ?? [];
const items = filterFreshGenericItems(
subscription.channel,
this.genericItems.get(subscription.channel) ?? []
);
return {
subscription,
items,
@ -410,13 +504,7 @@ export class LiveStateManager {
const config = this.generic[channel];
const parsed = config.parse(item);
const items = this.genericItems.get(channel) ?? [];
const next = [parsed, ...items]
.sort((a, b) => {
const aCursor = config.cursor(a);
const bCursor = config.cursor(b);
return (bCursor.ts - aCursor.ts) || (bCursor.seq - aCursor.seq);
})
.slice(0, config.limit);
const next = normalizeGenericItems(channel, [parsed, ...items], config);
this.genericItems.set(channel, next);
this.stats.cacheDepthByKey.set(config.redisKey, next.length);
const cursor = config.cursor(parsed);