324 lines
9.5 KiB
Python
324 lines
9.5 KiB
Python
#!/usr/bin/env python3
|
|
import argparse
|
|
import datetime as dt
|
|
import inspect
|
|
import json
|
|
import os
|
|
import sys
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Replay Databento option trades as JSON lines.")
|
|
parser.add_argument("--dataset", required=True)
|
|
parser.add_argument("--schema", default="trades")
|
|
parser.add_argument("--start", required=True)
|
|
parser.add_argument("--end", default="")
|
|
parser.add_argument("--symbols", default="ALL")
|
|
parser.add_argument("--stype-in", dest="stype_in", default="raw_symbol")
|
|
parser.add_argument("--stype-out", dest="stype_out", default="raw_symbol")
|
|
parser.add_argument("--limit", type=int, default=0)
|
|
parser.add_argument("--api-key", dest="api_key", default="")
|
|
return parser.parse_args()
|
|
|
|
|
|
def resolve_client(db_module, api_key: str):
|
|
try:
|
|
return db_module.Historical(api_key)
|
|
except TypeError:
|
|
return db_module.Historical(key=api_key)
|
|
|
|
|
|
def normalize_symbols(value: str):
|
|
if not value or value.strip().upper() == "ALL":
|
|
return None
|
|
return [symbol.strip() for symbol in value.split(",") if symbol.strip()]
|
|
|
|
|
|
def parse_date(value: str | None) -> dt.date | None:
|
|
if not value:
|
|
return None
|
|
try:
|
|
parsed = dt.datetime.fromisoformat(value)
|
|
return parsed.date()
|
|
except ValueError:
|
|
try:
|
|
return dt.date.fromisoformat(value)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def normalize_ts(ts_value):
|
|
if ts_value is None:
|
|
return None
|
|
if isinstance(ts_value, dt.datetime):
|
|
return int(ts_value.timestamp() * 1000)
|
|
if isinstance(ts_value, dt.date):
|
|
return int(dt.datetime.combine(ts_value, dt.time()).timestamp() * 1000)
|
|
if isinstance(ts_value, (int, float)):
|
|
if ts_value > 1_000_000_000_000_000:
|
|
return int(ts_value / 1_000_000)
|
|
return int(ts_value)
|
|
return None
|
|
|
|
|
|
def stringify(value):
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, bytes):
|
|
return value.decode("utf-8", errors="ignore")
|
|
return str(value)
|
|
|
|
def is_numeric_symbol(value: object) -> bool:
|
|
if isinstance(value, int):
|
|
return True
|
|
if isinstance(value, str):
|
|
return value.isdigit()
|
|
return False
|
|
|
|
|
|
class SymbolResolver:
|
|
def __init__(self, client, dataset: str, start_date: dt.date | None, end_date: dt.date | None):
|
|
from databento.common.symbology import InstrumentMap
|
|
|
|
self._client = client
|
|
self._dataset = dataset
|
|
self._start_date = start_date
|
|
self._end_date = end_date
|
|
self._map = InstrumentMap()
|
|
self._pending: list[int] = []
|
|
self._pending_set: set[int] = set()
|
|
|
|
def queue(self, instrument_id: int) -> None:
|
|
if instrument_id in self._pending_set:
|
|
return
|
|
self._pending_set.add(instrument_id)
|
|
self._pending.append(instrument_id)
|
|
|
|
def resolve_pending(self) -> None:
|
|
if not self._pending:
|
|
return
|
|
pending = self._pending
|
|
self._pending = []
|
|
self._pending_set.clear()
|
|
|
|
for i in range(0, len(pending), 2000):
|
|
chunk = pending[i : i + 2000]
|
|
response = self._client.symbology.resolve(
|
|
dataset=self._dataset,
|
|
symbols=chunk,
|
|
stype_in="instrument_id",
|
|
stype_out="raw_symbol",
|
|
start_date=self._start_date or dt.date.today(),
|
|
end_date=self._end_date,
|
|
)
|
|
self._map.insert_json(response)
|
|
|
|
def lookup(self, instrument_id: int, date: dt.date) -> str | None:
|
|
if instrument_id is None:
|
|
return None
|
|
return self._map.resolve(instrument_id, date)
|
|
|
|
def pending_count(self) -> int:
|
|
return len(self._pending)
|
|
|
|
|
|
def _first_attr(record, names: list[str]):
|
|
for name in names:
|
|
if not name:
|
|
continue
|
|
value = getattr(record, name, None)
|
|
if value is not None:
|
|
return value
|
|
return None
|
|
|
|
|
|
def _to_int(value, default: int = 0) -> int:
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return default
|
|
|
|
|
|
def build_payload(record, symbol_override: str | None = None) -> dict | None:
|
|
ts_event = getattr(record, "ts_event", None)
|
|
symbol = symbol_override or (
|
|
getattr(record, "symbol", None)
|
|
or getattr(record, "raw_symbol", None)
|
|
or getattr(record, "instrument_id", None)
|
|
)
|
|
|
|
if ts_event is None or symbol is None:
|
|
return None
|
|
|
|
ts_ms = normalize_ts(ts_event)
|
|
if ts_ms is None:
|
|
return None
|
|
|
|
exchange = (
|
|
getattr(record, "exchange", None)
|
|
or getattr(record, "publisher_id", None)
|
|
or getattr(record, "exchange_id", None)
|
|
)
|
|
|
|
price = getattr(record, "price", None)
|
|
size = getattr(record, "size", None)
|
|
if price is not None and size is not None:
|
|
conditions = getattr(record, "conditions", None) or getattr(record, "condition", None)
|
|
if isinstance(conditions, str):
|
|
conditions = [conditions]
|
|
|
|
payload = {
|
|
"type": "trade",
|
|
"ts": ts_ms,
|
|
"price": float(price),
|
|
"size": int(size),
|
|
"symbol": stringify(symbol),
|
|
}
|
|
|
|
if exchange is not None:
|
|
payload["exchange"] = stringify(exchange)
|
|
if conditions:
|
|
payload["conditions"] = conditions
|
|
|
|
return payload
|
|
|
|
bid = _first_attr(record, ["bid_px", "bid_price", "bid"])
|
|
ask = _first_attr(record, ["ask_px", "ask_price", "ask"])
|
|
if bid is None or ask is None:
|
|
return None
|
|
|
|
bid_size = _first_attr(record, ["bid_sz", "bid_size", "bid_qty", "bid_q"])
|
|
ask_size = _first_attr(record, ["ask_sz", "ask_size", "ask_qty", "ask_q"])
|
|
|
|
payload = {
|
|
"type": "nbbo",
|
|
"ts": ts_ms,
|
|
"bid": float(bid),
|
|
"ask": float(ask),
|
|
"bidSize": int(bid_size) if bid_size is not None else 0,
|
|
"askSize": int(ask_size) if ask_size is not None else 0,
|
|
"symbol": stringify(symbol),
|
|
}
|
|
|
|
if exchange is not None:
|
|
payload["exchange"] = stringify(exchange)
|
|
|
|
return payload
|
|
|
|
|
|
def emit_payload(payload: dict | None) -> None:
|
|
if payload is None:
|
|
return
|
|
print(json.dumps(payload), flush=True)
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
|
|
api_key = args.api_key or os.getenv("DATABENTO_API_KEY")
|
|
if not api_key:
|
|
sys.stderr.write("DATABENTO_API_KEY is required.\n")
|
|
return 1
|
|
|
|
try:
|
|
import databento as db
|
|
except ImportError:
|
|
sys.stderr.write("Missing Python package 'databento'. Install with pip.\n")
|
|
return 1
|
|
|
|
client = resolve_client(db, api_key)
|
|
|
|
start_date = parse_date(args.start)
|
|
end_date = parse_date(args.end) if args.end else None
|
|
resolver = SymbolResolver(client, args.dataset, start_date, end_date)
|
|
buffered: list[tuple[object, int, dt.date]] = []
|
|
|
|
kwargs = {
|
|
"dataset": args.dataset,
|
|
"schema": args.schema,
|
|
"start": args.start,
|
|
"end": args.end or None,
|
|
"symbols": normalize_symbols(args.symbols),
|
|
"stype_in": args.stype_in,
|
|
"stype_out": args.stype_out,
|
|
"limit": args.limit or None,
|
|
}
|
|
|
|
signature = inspect.signature(client.timeseries.get_range)
|
|
filtered_kwargs = {
|
|
key: value for key, value in kwargs.items() if key in signature.parameters and value is not None
|
|
}
|
|
|
|
data = client.timeseries.get_range(**filtered_kwargs)
|
|
|
|
def flush_buffer(force: bool = False) -> None:
|
|
if not buffered:
|
|
return
|
|
|
|
resolver.resolve_pending()
|
|
remaining: list[tuple[object, int, dt.date]] = []
|
|
for record, instrument_id, date in buffered:
|
|
mapped = resolver.lookup(instrument_id, date)
|
|
if mapped:
|
|
emit_payload(build_payload(record, mapped))
|
|
elif force:
|
|
emit_payload(build_payload(record, str(instrument_id)))
|
|
else:
|
|
remaining.append((record, instrument_id, date))
|
|
buffered[:] = remaining
|
|
|
|
def handle_record(record) -> None:
|
|
symbol = (
|
|
getattr(record, "symbol", None)
|
|
or getattr(record, "raw_symbol", None)
|
|
or getattr(record, "instrument_id", None)
|
|
)
|
|
|
|
ts_event = getattr(record, "ts_event", None)
|
|
ts_ms = normalize_ts(ts_event)
|
|
if ts_ms is None:
|
|
return
|
|
|
|
date = dt.datetime.utcfromtimestamp(ts_ms / 1000).date()
|
|
|
|
if is_numeric_symbol(symbol):
|
|
instrument_id = _to_int(symbol, default=-1)
|
|
if instrument_id < 0:
|
|
return
|
|
|
|
mapped = resolver.lookup(instrument_id, date)
|
|
if mapped:
|
|
emit_payload(build_payload(record, mapped))
|
|
return
|
|
|
|
resolver.queue(instrument_id)
|
|
buffered.append((record, instrument_id, date))
|
|
|
|
if resolver.pending_count() >= 200:
|
|
flush_buffer()
|
|
return
|
|
|
|
emit_payload(build_payload(record))
|
|
|
|
if hasattr(data, "replay"):
|
|
try:
|
|
data.replay(callback=handle_record)
|
|
except TypeError:
|
|
data.replay(handle_record)
|
|
flush_buffer(force=True)
|
|
return 0
|
|
|
|
if hasattr(data, "__iter__"):
|
|
for record in data:
|
|
handle_record(record)
|
|
if len(buffered) >= 2000:
|
|
flush_buffer()
|
|
flush_buffer(force=True)
|
|
return 0
|
|
|
|
sys.stderr.write("Unsupported Databento response type.\n")
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|