Implement live tape scroll-gated history

This commit is contained in:
dirtydishes 2026-05-06 22:28:20 -04:00
parent 53eeb9e72f
commit 6ba3c5343b
5 changed files with 533 additions and 53 deletions

View file

@ -327,6 +327,14 @@ const nextBeforeForItems = <T>(items: T[], cursorOf: (item: T) => Cursor): Curso
return last ? cursorOf(last) : null;
};
const snapshotLimitFor = (subscription: LiveSubscription, configuredLimit: number): number => {
const requested = "snapshot_limit" in subscription ? subscription.snapshot_limit : undefined;
if (!requested) {
return configuredLimit;
}
return Math.max(1, Math.min(configuredLimit, Math.floor(requested)));
};
const candleRedisKey = (underlyingId: string, intervalMs: number): string =>
`live:equity-candles:${underlyingId}:${intervalMs}`;
@ -448,6 +456,7 @@ export class LiveStateManager {
const scoped =
Boolean(subscription.underlying_ids?.length) || Boolean(subscription.option_contract_id);
if (subscription.filters?.view === "raw" || scoped) {
const limit = snapshotLimitFor(subscription, this.generic.options.limit);
const storageFilters: OptionPrintQueryFilters = {
view: subscription.filters?.view ?? "signal",
security:
@ -463,7 +472,7 @@ export class LiveStateManager {
};
const items = await fetchRecentOptionPrints(
this.clickhouse,
this.generic.options.limit,
limit,
undefined,
storageFilters
);
@ -476,10 +485,11 @@ export class LiveStateManager {
}
const config = this.generic.options;
const limit = snapshotLimitFor(subscription, config.limit);
const items = (this.genericItems.get("options") ?? []).filter((item) =>
isWithinLiveFeedLookback("options", item) &&
matchesOptionPrintFilters(item, subscription.filters)
);
).slice(0, limit);
return {
subscription,
items,
@ -489,10 +499,11 @@ export class LiveStateManager {
}
case "flow": {
const config = this.generic.flow;
const limit = snapshotLimitFor(subscription, config.limit);
const items = (this.genericItems.get("flow") ?? []).filter((item) =>
isWithinLiveFeedLookback("flow", item) &&
matchesFlowPacketFilters(item, subscription.filters)
);
).slice(0, limit);
return {
subscription,
items,
@ -502,12 +513,13 @@ export class LiveStateManager {
}
case "equities": {
const config = this.generic.equities;
const limit = snapshotLimitFor(subscription, config.limit);
if (subscription.underlying_ids?.length) {
const filters: EquityPrintQueryFilters = {
underlyingIds: subscription.underlying_ids,
sinceTs: Date.now() - LIVE_FEED_LOOKBACK_MS
};
const items = await fetchRecentEquityPrints(this.clickhouse, config.limit, filters);
const items = await fetchRecentEquityPrints(this.clickhouse, limit, filters);
return {
subscription,
items,
@ -517,7 +529,7 @@ export class LiveStateManager {
}
const items = (this.genericItems.get("equities") ?? []).filter((item) =>
isWithinLiveFeedLookback("equities", item)
);
).slice(0, limit);
return {
subscription,
items,
@ -555,9 +567,10 @@ export class LiveStateManager {
}
default: {
const config = this.generic[subscription.channel];
const limit = snapshotLimitFor(subscription, config.limit);
const items = (this.genericItems.get(subscription.channel) ?? []).filter((item) =>
isWithinLiveFeedLookback(subscription.channel, item)
);
).slice(0, limit);
return {
subscription,
items,