Implement scoped live 24h feed visibility

This commit is contained in:
dirtydishes 2026-05-04 05:52:38 -04:00
parent f28c8e641f
commit 48b0d980a6
11 changed files with 547 additions and 49 deletions

View file

@ -72,6 +72,7 @@ import {
fetchOptionPrintsByTraceIds,
fetchRecentOptionPrints
} from "@islandflow/storage";
import type { EquityPrintQueryFilters, OptionPrintQueryFilters } from "@islandflow/storage";
import {
AlertEventSchema,
ClassifierHitEventSchema,
@ -100,7 +101,7 @@ import {
} from "@islandflow/types";
import { createClient } from "redis";
import { z } from "zod";
import { LiveStateManager, shouldFanoutLiveEvent } from "./live";
import { LIVE_FEED_LOOKBACK_MS, LiveStateManager, shouldFanoutLiveEvent } from "./live";
const service = "api";
const logger = createLogger({ service });
@ -558,6 +559,62 @@ const buildHistoryResponse = <T extends { seq: number }>(
};
};
const parseScopeList = (url: URL, ...keys: string[]): string[] | undefined => {
const values = keys
.flatMap((key) => url.searchParams.getAll(key))
.flatMap((value) => value.split(","))
.map((value) => value.trim().toUpperCase())
.filter(Boolean);
const unique = Array.from(new Set(values));
return unique.length > 0 ? unique : undefined;
};
const parseLiveOptionPrintFilters = (url: URL): OptionPrintQueryFilters => {
const { storageFilters } = parseOptionPrintFilters(url);
return {
...storageFilters,
underlyingIds: parseScopeList(url, "underlying_id", "underlying_ids"),
optionContractId: url.searchParams.get("option_contract_id") ?? undefined,
sinceTs: Date.now() - LIVE_FEED_LOOKBACK_MS
};
};
const parseLiveEquityPrintFilters = (url: URL): EquityPrintQueryFilters => ({
underlyingIds: parseScopeList(url, "underlying_id", "underlying_ids"),
sinceTs: Date.now() - LIVE_FEED_LOOKBACK_MS
});
const matchesScopedOptionSubscription = (
print: { underlying_id?: string; option_contract_id: string },
subscription: LiveSubscription
): boolean => {
if (subscription.channel !== "options") {
return false;
}
if (subscription.option_contract_id && subscription.option_contract_id !== print.option_contract_id) {
return false;
}
if (subscription.underlying_ids?.length) {
const underlying = (print.underlying_id ?? "").toUpperCase();
return subscription.underlying_ids.map((value) => value.toUpperCase()).includes(underlying);
}
return true;
};
const matchesScopedEquitySubscription = (
print: { underlying_id: string },
subscription: LiveSubscription
): boolean => {
if (subscription.channel !== "equities") {
return false;
}
if (!subscription.underlying_ids?.length) {
return true;
}
const underlying = print.underlying_id.toUpperCase();
return subscription.underlying_ids.map((value) => value.toUpperCase()).includes(underlying);
};
const buildCandleCacheKey = (underlyingId: string, intervalMs: number): string => {
return `candles:equity:${intervalMs}:${underlyingId}`;
};
@ -987,7 +1044,7 @@ const run = async () => {
}
const matchingSubscriptions =
subscription.channel === "options" || subscription.channel === "flow"
subscription.channel === "options" || subscription.channel === "flow" || subscription.channel === "equities"
? [...subscriptionDefinitions.entries()].filter(([, candidate]) => candidate.channel === subscription.channel)
: [[getSubscriptionKey(subscription), subscription] as const];
@ -1003,7 +1060,15 @@ const run = async () => {
if (
candidate.channel === "options" &&
!matchesOptionPrintFilters(OptionPrintSchema.parse(item), candidate.filters)
(!matchesOptionPrintFilters(OptionPrintSchema.parse(item), candidate.filters) ||
!matchesScopedOptionSubscription(OptionPrintSchema.parse(item), candidate))
) {
continue;
}
if (
candidate.channel === "equities" &&
!matchesScopedEquitySubscription(EquityPrintSchema.parse(item), candidate)
) {
continue;
}
@ -1343,7 +1408,7 @@ const run = async () => {
try {
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
const source = parseReplaySource(url) ?? undefined;
const { storageFilters } = parseOptionPrintFilters(url);
const storageFilters = parseLiveOptionPrintFilters(url);
const data = await fetchOptionPrintsBefore(
clickhouse,
beforeTs,
@ -1373,7 +1438,13 @@ const run = async () => {
if (req.method === "GET" && url.pathname === "/history/equities") {
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
const data = await fetchEquityPrintsBefore(clickhouse, beforeTs, beforeSeq, limit);
const data = await fetchEquityPrintsBefore(
clickhouse,
beforeTs,
beforeSeq,
limit,
parseLiveEquityPrintFilters(url)
);
return jsonResponse(buildHistoryResponse(data, (item) => ({ ts: item.ts, seq: item.seq })));
}

View file

@ -12,6 +12,7 @@ import {
type ClickHouseClient
} from "@islandflow/storage";
import type { OptionPrintQueryFilters } from "@islandflow/storage";
import type { EquityPrintQueryFilters } from "@islandflow/storage";
import {
AlertEventSchema,
ClassifierHitEventSchema,
@ -38,6 +39,7 @@ import {
import type { RedisClientType } from "redis";
const CURSOR_HASH_KEY = "live:cursors";
export const LIVE_FEED_LOOKBACK_MS = 24 * 60 * 60 * 1000;
const DEFAULT_GENERIC_LIMIT = 10000;
const MAX_GENERIC_LIMIT = 100000;
@ -267,12 +269,24 @@ const extractFreshnessTs = (channel: LiveGenericChannel, item: any): number | nu
case "equity-quotes":
return typeof item.ts === "number" ? item.ts : null;
case "flow":
case "classifier-hits":
case "alerts":
case "inferred-dark":
return typeof item.source_ts === "number" ? item.source_ts : null;
default:
return null;
}
};
const isWithinLiveFeedLookback = (
channel: LiveGenericChannel,
item: unknown,
now = Date.now()
): boolean => {
const ts = extractFreshnessTs(channel, item);
return ts !== null && now - ts <= LIVE_FEED_LOOKBACK_MS;
};
export const isLiveItemFresh = (
channel: LiveGenericChannel,
item: unknown,
@ -289,7 +303,12 @@ export const isLiveItemFresh = (
return now - ts <= thresholdMs;
};
export const shouldFanoutLiveEvent = (_channel: LiveChannel, _item: unknown): boolean => true;
export const shouldFanoutLiveEvent = (channel: LiveChannel, item: unknown): boolean => {
if (channel === "equity-candles" || channel === "equity-overlay") {
return true;
}
return isWithinLiveFeedLookback(channel, item);
};
const nextBeforeForItems = <T>(items: T[], cursorOf: (item: T) => Cursor): Cursor | null => {
const last = items.at(-1);
@ -353,7 +372,13 @@ export class LiveStateManager {
const config = this.generic[channel];
if (this.redis?.isOpen) {
const payloads = await this.redis.lRange(config.redisKey, 0, config.limit - 1);
const cached = normalizeGenericItems(channel, parseJsonList(payloads, config.parse), config);
const cached = normalizeGenericItems(
channel,
parseJsonList(payloads, config.parse).filter((item) =>
isWithinLiveFeedLookback(channel, item)
),
config
);
if (cached.length > 0) {
this.genericItems.set(channel, cached);
this.stats.genericHydrateFromRedis += 1;
@ -370,7 +395,13 @@ export class LiveStateManager {
}
}
const fresh = normalizeGenericItems(channel, await config.fetchRecent(this.clickhouse, config.limit), config);
const fresh = normalizeGenericItems(
channel,
(await config.fetchRecent(this.clickhouse, config.limit)).filter((item) =>
isWithinLiveFeedLookback(channel, item)
),
config
);
this.stats.genericHydrateFromClickHouse += 1;
this.stats.cacheDepthByKey.set(config.redisKey, fresh.length);
this.genericItems.set(channel, fresh);
@ -382,16 +413,21 @@ export class LiveStateManager {
async getSnapshot(subscription: LiveSubscription): Promise<FeedSnapshot<unknown>> {
switch (subscription.channel) {
case "options": {
if (subscription.filters?.view === "raw") {
const scoped =
Boolean(subscription.underlying_ids?.length) || Boolean(subscription.option_contract_id);
if (subscription.filters?.view === "raw" || scoped) {
const storageFilters: OptionPrintQueryFilters = {
view: "raw",
view: subscription.filters?.view ?? "signal",
security:
subscription.filters.securityTypes?.length === 1
subscription.filters?.securityTypes?.length === 1
? subscription.filters.securityTypes[0]
: "all",
nbboSides: subscription.filters.nbboSides,
optionTypes: subscription.filters.optionTypes,
minNotional: subscription.filters.minNotional
nbboSides: subscription.filters?.nbboSides,
optionTypes: subscription.filters?.optionTypes,
minNotional: subscription.filters?.minNotional,
underlyingIds: subscription.underlying_ids,
optionContractId: subscription.option_contract_id,
sinceTs: Date.now() - LIVE_FEED_LOOKBACK_MS
};
const items = await fetchRecentOptionPrints(
this.clickhouse,
@ -409,6 +445,7 @@ export class LiveStateManager {
const config = this.generic.options;
const items = (this.genericItems.get("options") ?? []).filter((item) =>
isWithinLiveFeedLookback("options", item) &&
matchesOptionPrintFilters(item, subscription.filters)
);
return {
@ -421,6 +458,7 @@ export class LiveStateManager {
case "flow": {
const config = this.generic.flow;
const items = (this.genericItems.get("flow") ?? []).filter((item) =>
isWithinLiveFeedLookback("flow", item) &&
matchesFlowPacketFilters(item, subscription.filters)
);
return {
@ -430,6 +468,31 @@ export class LiveStateManager {
next_before: nextBeforeForItems(items, config.cursor)
};
}
case "equities": {
const config = this.generic.equities;
if (subscription.underlying_ids?.length) {
const filters: EquityPrintQueryFilters = {
underlyingIds: subscription.underlying_ids,
sinceTs: Date.now() - LIVE_FEED_LOOKBACK_MS
};
const items = await fetchRecentEquityPrints(this.clickhouse, config.limit, filters);
return {
subscription,
items,
watermark: items[0] ? { ts: items[0].ts, seq: items[0].seq } : null,
next_before: nextBeforeForItems(items, config.cursor)
};
}
const items = (this.genericItems.get("equities") ?? []).filter((item) =>
isWithinLiveFeedLookback("equities", item)
);
return {
subscription,
items,
watermark: this.genericCursors.get(config.cursorField) ?? null,
next_before: nextBeforeForItems(items, config.cursor)
};
}
case "equity-candles": {
const key = candleRedisKey(subscription.underlying_id, subscription.interval_ms);
const cursorField = candleCursorField(subscription.underlying_id, subscription.interval_ms);
@ -460,7 +523,9 @@ export class LiveStateManager {
}
default: {
const config = this.generic[subscription.channel];
const items = this.genericItems.get(subscription.channel) ?? [];
const items = (this.genericItems.get(subscription.channel) ?? []).filter((item) =>
isWithinLiveFeedLookback(subscription.channel, item)
);
return {
subscription,
items,
@ -506,6 +571,9 @@ export class LiveStateManager {
default: {
const config = this.generic[channel];
const parsed = config.parse(item);
if (!isWithinLiveFeedLookback(channel, parsed)) {
return null;
}
const items = this.genericItems.get(channel) ?? [];
const next = normalizeGenericItems(channel, [parsed, ...items], config);
this.genericItems.set(channel, next);

View file

@ -578,9 +578,11 @@ describe("LiveStateManager", () => {
expect(isLiveItemFresh("equity-joins", { source_ts: 1 }, 1_000_000)).toBe(true);
});
it("fans out stale live events so delayed data remains visible without refresh", () => {
expect(shouldFanoutLiveEvent("options", { ts: 1000 })).toBe(true);
expect(shouldFanoutLiveEvent("equities", { ts: 1000 })).toBe(true);
expect(shouldFanoutLiveEvent("flow", { source_ts: 1000 })).toBe(true);
it("gates live feed fanout to the rolling visibility window", () => {
const now = Date.now();
expect(shouldFanoutLiveEvent("options", { ts: now })).toBe(true);
expect(shouldFanoutLiveEvent("equities", { ts: now - 25 * 60 * 60 * 1000 })).toBe(false);
expect(shouldFanoutLiveEvent("flow", { source_ts: now - 25 * 60 * 60 * 1000 })).toBe(false);
expect(shouldFanoutLiveEvent("equity-candles", { ts: 1000 })).toBe(true);
});
});