Sync replay source for options and NBBO
This commit is contained in:
parent
4f743437d1
commit
04188851b3
1 changed files with 69 additions and 6 deletions
|
|
@ -122,6 +122,29 @@ const extractTracePrefix = <T,>(item: T): string | null => {
|
||||||
return inferTracePrefix(traceId);
|
return inferTracePrefix(traceId);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const extractReplaySource = <T,>(item: T): string | null => {
|
||||||
|
const prefix = extractTracePrefix(item);
|
||||||
|
if (!prefix) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
const normalized = prefix.toLowerCase();
|
||||||
|
if (normalized.startsWith("synthetic")) {
|
||||||
|
return "synthetic";
|
||||||
|
}
|
||||||
|
if (normalized.startsWith("databento")) {
|
||||||
|
return "databento";
|
||||||
|
}
|
||||||
|
if (normalized.startsWith("alpaca")) {
|
||||||
|
return "alpaca";
|
||||||
|
}
|
||||||
|
if (normalized.startsWith("ibkr")) {
|
||||||
|
return "ibkr";
|
||||||
|
}
|
||||||
|
|
||||||
|
return prefix;
|
||||||
|
};
|
||||||
|
|
||||||
type SortableItem = {
|
type SortableItem = {
|
||||||
ts?: number;
|
ts?: number;
|
||||||
source_ts?: number;
|
source_ts?: number;
|
||||||
|
|
@ -604,6 +627,8 @@ type TapeConfig<T> = {
|
||||||
onNewItems?: (count: number) => void;
|
onNewItems?: (count: number) => void;
|
||||||
getItemTs?: (item: T) => number;
|
getItemTs?: (item: T) => number;
|
||||||
getReplayKey?: (item: T) => string | null;
|
getReplayKey?: (item: T) => string | null;
|
||||||
|
replaySourceKey?: string | null;
|
||||||
|
onReplaySourceKey?: (key: string | null) => void;
|
||||||
};
|
};
|
||||||
|
|
||||||
const useTape = <T extends SortableItem & { seq: number }>(
|
const useTape = <T extends SortableItem & { seq: number }>(
|
||||||
|
|
@ -614,6 +639,8 @@ const useTape = <T extends SortableItem & { seq: number }>(
|
||||||
const pollMs = config.pollMs ?? 1000;
|
const pollMs = config.pollMs ?? 1000;
|
||||||
const getItemTs = config.getItemTs ?? extractSortTs;
|
const getItemTs = config.getItemTs ?? extractSortTs;
|
||||||
const getReplayKey = config.getReplayKey ?? extractTracePrefix;
|
const getReplayKey = config.getReplayKey ?? extractTracePrefix;
|
||||||
|
const replaySourceKey = config.replaySourceKey ?? null;
|
||||||
|
const onReplaySourceKey = config.onReplaySourceKey;
|
||||||
const [status, setStatus] = useState<WsStatus>("connecting");
|
const [status, setStatus] = useState<WsStatus>("connecting");
|
||||||
const [items, setItems] = useState<T[]>([]);
|
const [items, setItems] = useState<T[]>([]);
|
||||||
const [lastUpdate, setLastUpdate] = useState<number | null>(null);
|
const [lastUpdate, setLastUpdate] = useState<number | null>(null);
|
||||||
|
|
@ -627,6 +654,7 @@ const useTape = <T extends SortableItem & { seq: number }>(
|
||||||
const replayEndRef = useRef<number | null>(null);
|
const replayEndRef = useRef<number | null>(null);
|
||||||
const replayCompleteRef = useRef<boolean>(false);
|
const replayCompleteRef = useRef<boolean>(false);
|
||||||
const replaySourceRef = useRef<string | null>(null);
|
const replaySourceRef = useRef<string | null>(null);
|
||||||
|
const replaySourceNotifiedRef = useRef<string | null>(null);
|
||||||
const emptyPollsRef = useRef<number>(0);
|
const emptyPollsRef = useRef<number>(0);
|
||||||
const pausedRef = useRef(paused);
|
const pausedRef = useRef(paused);
|
||||||
const pendingRef = useRef<T[]>([]);
|
const pendingRef = useRef<T[]>([]);
|
||||||
|
|
@ -690,6 +718,7 @@ const useTape = <T extends SortableItem & { seq: number }>(
|
||||||
setReplayComplete(false);
|
setReplayComplete(false);
|
||||||
replayCompleteRef.current = false;
|
replayCompleteRef.current = false;
|
||||||
replaySourceRef.current = null;
|
replaySourceRef.current = null;
|
||||||
|
replaySourceNotifiedRef.current = null;
|
||||||
emptyPollsRef.current = 0;
|
emptyPollsRef.current = 0;
|
||||||
setDropped(0);
|
setDropped(0);
|
||||||
setStatus("connecting");
|
setStatus("connecting");
|
||||||
|
|
@ -697,7 +726,7 @@ const useTape = <T extends SortableItem & { seq: number }>(
|
||||||
pendingRef.current = [];
|
pendingRef.current = [];
|
||||||
pendingCountRef.current = 0;
|
pendingCountRef.current = 0;
|
||||||
cancelFlush();
|
cancelFlush();
|
||||||
}, [mode, cancelFlush]);
|
}, [mode, replaySourceKey, cancelFlush]);
|
||||||
|
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
if (mode !== "replay" || !latestPath) {
|
if (mode !== "replay" || !latestPath) {
|
||||||
|
|
@ -863,7 +892,12 @@ const useTape = <T extends SortableItem & { seq: number }>(
|
||||||
const payload = (await response.json()) as ReplayResponse<T>;
|
const payload = (await response.json()) as ReplayResponse<T>;
|
||||||
|
|
||||||
let sourcePrefix = replaySourceRef.current;
|
let sourcePrefix = replaySourceRef.current;
|
||||||
if (!sourcePrefix) {
|
if (replaySourceKey) {
|
||||||
|
if (sourcePrefix !== replaySourceKey) {
|
||||||
|
sourcePrefix = replaySourceKey;
|
||||||
|
replaySourceRef.current = replaySourceKey;
|
||||||
|
}
|
||||||
|
} else if (!sourcePrefix) {
|
||||||
const firstWithTrace = payload.data.find((item) => getReplayKey(item));
|
const firstWithTrace = payload.data.find((item) => getReplayKey(item));
|
||||||
if (firstWithTrace) {
|
if (firstWithTrace) {
|
||||||
sourcePrefix = getReplayKey(firstWithTrace);
|
sourcePrefix = getReplayKey(firstWithTrace);
|
||||||
|
|
@ -871,6 +905,11 @@ const useTape = <T extends SortableItem & { seq: number }>(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (onReplaySourceKey && sourcePrefix && replaySourceNotifiedRef.current !== sourcePrefix) {
|
||||||
|
replaySourceNotifiedRef.current = sourcePrefix;
|
||||||
|
onReplaySourceKey(sourcePrefix);
|
||||||
|
}
|
||||||
|
|
||||||
const filtered = sourcePrefix
|
const filtered = sourcePrefix
|
||||||
? payload.data.filter((item) => getReplayKey(item) === sourcePrefix)
|
? payload.data.filter((item) => getReplayKey(item) === sourcePrefix)
|
||||||
: payload.data;
|
: payload.data;
|
||||||
|
|
@ -915,7 +954,7 @@ const useTape = <T extends SortableItem & { seq: number }>(
|
||||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasForeign) {
|
if (!replaySourceKey && hasForeign) {
|
||||||
replayCompleteRef.current = true;
|
replayCompleteRef.current = true;
|
||||||
setReplayComplete(true);
|
setReplayComplete(true);
|
||||||
setStatus("disconnected");
|
setStatus("disconnected");
|
||||||
|
|
@ -943,7 +982,18 @@ const useTape = <T extends SortableItem & { seq: number }>(
|
||||||
window.clearInterval(interval);
|
window.clearInterval(interval);
|
||||||
cancelFlush();
|
cancelFlush();
|
||||||
};
|
};
|
||||||
}, [mode, replayPath, batchSize, pollMs, scheduleFlush, cancelFlush, getItemTs, getReplayKey]);
|
}, [
|
||||||
|
mode,
|
||||||
|
replayPath,
|
||||||
|
batchSize,
|
||||||
|
pollMs,
|
||||||
|
scheduleFlush,
|
||||||
|
cancelFlush,
|
||||||
|
getItemTs,
|
||||||
|
getReplayKey,
|
||||||
|
replaySourceKey,
|
||||||
|
onReplaySourceKey
|
||||||
|
]);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
status,
|
status,
|
||||||
|
|
@ -1896,10 +1946,19 @@ const formatFlowMetric = (value: number, suffix?: string): string => {
|
||||||
|
|
||||||
export default function HomePage() {
|
export default function HomePage() {
|
||||||
const [mode, setMode] = useState<TapeMode>("live");
|
const [mode, setMode] = useState<TapeMode>("live");
|
||||||
|
const [replaySource, setReplaySource] = useState<string | null>(null);
|
||||||
const [selectedAlert, setSelectedAlert] = useState<AlertEvent | null>(null);
|
const [selectedAlert, setSelectedAlert] = useState<AlertEvent | null>(null);
|
||||||
const [selectedDarkEvent, setSelectedDarkEvent] = useState<InferredDarkEvent | null>(null);
|
const [selectedDarkEvent, setSelectedDarkEvent] = useState<InferredDarkEvent | null>(null);
|
||||||
const [filterInput, setFilterInput] = useState<string>("");
|
const [filterInput, setFilterInput] = useState<string>("");
|
||||||
const [chartIntervalMs, setChartIntervalMs] = useState<number>(CANDLE_INTERVALS[0].ms);
|
const [chartIntervalMs, setChartIntervalMs] = useState<number>(CANDLE_INTERVALS[0].ms);
|
||||||
|
|
||||||
|
const handleReplaySource = useCallback((value: string | null) => {
|
||||||
|
setReplaySource(value);
|
||||||
|
}, []);
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
setReplaySource(null);
|
||||||
|
}, [mode]);
|
||||||
const optionsScroll = useListScroll();
|
const optionsScroll = useListScroll();
|
||||||
const equitiesScroll = useListScroll();
|
const equitiesScroll = useListScroll();
|
||||||
const flowScroll = useListScroll();
|
const flowScroll = useListScroll();
|
||||||
|
|
@ -1927,7 +1986,9 @@ export default function HomePage() {
|
||||||
batchSize: mode === "replay" ? 120 : undefined,
|
batchSize: mode === "replay" ? 120 : undefined,
|
||||||
pollMs: mode === "replay" ? 200 : undefined,
|
pollMs: mode === "replay" ? 200 : undefined,
|
||||||
captureScroll: optionsAnchor.capture,
|
captureScroll: optionsAnchor.capture,
|
||||||
onNewItems: optionsScroll.onNewItems
|
onNewItems: optionsScroll.onNewItems,
|
||||||
|
getReplayKey: extractReplaySource,
|
||||||
|
onReplaySourceKey: handleReplaySource
|
||||||
});
|
});
|
||||||
|
|
||||||
const equities = useTape<EquityPrint>({
|
const equities = useTape<EquityPrint>({
|
||||||
|
|
@ -1960,7 +2021,9 @@ export default function HomePage() {
|
||||||
latestPath: "/nbbo/options",
|
latestPath: "/nbbo/options",
|
||||||
expectedType: "option-nbbo",
|
expectedType: "option-nbbo",
|
||||||
batchSize: mode === "replay" ? 120 : undefined,
|
batchSize: mode === "replay" ? 120 : undefined,
|
||||||
pollMs: mode === "replay" ? 200 : undefined
|
pollMs: mode === "replay" ? 200 : undefined,
|
||||||
|
getReplayKey: extractReplaySource,
|
||||||
|
replaySourceKey: replaySource
|
||||||
});
|
});
|
||||||
|
|
||||||
const inferredDark = useTape<InferredDarkEvent>({
|
const inferredDark = useTape<InferredDarkEvent>({
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue