Add Alpaca equities ingest adapter
Adds an Alpaca-backed equities adapter with exchange metadata mapping for conservative off-exchange tagging and a small helper test suite.
This commit is contained in:
parent
debbc1046b
commit
1175dd00cc
4 changed files with 393 additions and 0 deletions
|
|
@ -11,6 +11,7 @@
|
||||||
"@islandflow/observability": "workspace:*",
|
"@islandflow/observability": "workspace:*",
|
||||||
"@islandflow/storage": "workspace:*",
|
"@islandflow/storage": "workspace:*",
|
||||||
"@islandflow/types": "workspace:*",
|
"@islandflow/types": "workspace:*",
|
||||||
|
"ws": "^8.18.3",
|
||||||
"zod": "^3.23.8"
|
"zod": "^3.23.8"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
335
services/ingest-equities/src/adapters/alpaca.ts
Normal file
335
services/ingest-equities/src/adapters/alpaca.ts
Normal file
|
|
@ -0,0 +1,335 @@
|
||||||
|
import { createLogger } from "@islandflow/observability";
|
||||||
|
import type { EquityPrint, EquityQuote } from "@islandflow/types";
|
||||||
|
import type { EquityIngestAdapter, EquityIngestHandlers } from "./types";
|
||||||
|
import WebSocket from "ws";
|
||||||
|
|
||||||
|
export type AlpacaEquitiesFeed = "iex" | "sip";
|
||||||
|
|
||||||
|
export type AlpacaEquitiesAdapterConfig = {
|
||||||
|
keyId: string;
|
||||||
|
secretKey: string;
|
||||||
|
restUrl: string;
|
||||||
|
wsBaseUrl: string;
|
||||||
|
feed: AlpacaEquitiesFeed;
|
||||||
|
symbols: string[];
|
||||||
|
};
|
||||||
|
|
||||||
|
type AlpacaExchangeMetaEntry = {
|
||||||
|
code: string;
|
||||||
|
name: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
type AlpacaTradeMessage = {
|
||||||
|
T: "t";
|
||||||
|
S: string;
|
||||||
|
t: string;
|
||||||
|
p: number;
|
||||||
|
s: number;
|
||||||
|
x?: string;
|
||||||
|
c?: string[];
|
||||||
|
z?: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
type AlpacaQuoteMessage = {
|
||||||
|
T: "q";
|
||||||
|
S: string;
|
||||||
|
t: string;
|
||||||
|
bp: number;
|
||||||
|
ap: number;
|
||||||
|
bs?: number;
|
||||||
|
as?: number;
|
||||||
|
bx?: string;
|
||||||
|
ax?: string;
|
||||||
|
c?: string[];
|
||||||
|
z?: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
const logger = createLogger({ service: "ingest-equities" });
|
||||||
|
|
||||||
|
const normalizeSymbols = (symbols: string[]): string[] => {
|
||||||
|
const seen = new Set<string>();
|
||||||
|
const result: string[] = [];
|
||||||
|
|
||||||
|
for (const entry of symbols) {
|
||||||
|
const symbol = entry.trim().toUpperCase();
|
||||||
|
if (!symbol || seen.has(symbol)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
seen.add(symbol);
|
||||||
|
result.push(symbol);
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
};
|
||||||
|
|
||||||
|
const buildHeaders = (config: AlpacaEquitiesAdapterConfig): Record<string, string> => ({
|
||||||
|
"APCA-API-KEY-ID": config.keyId,
|
||||||
|
"APCA-API-SECRET-KEY": config.secretKey
|
||||||
|
});
|
||||||
|
|
||||||
|
const parseTimestamp = (value: string): number => {
|
||||||
|
const parsed = Date.parse(value);
|
||||||
|
if (Number.isFinite(parsed)) {
|
||||||
|
return parsed;
|
||||||
|
}
|
||||||
|
return Date.now();
|
||||||
|
};
|
||||||
|
|
||||||
|
const decodePayload = (data: WebSocket.RawData): unknown => {
|
||||||
|
if (typeof data === "string") {
|
||||||
|
return JSON.parse(data) as unknown;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (data instanceof ArrayBuffer) {
|
||||||
|
return JSON.parse(new TextDecoder().decode(new Uint8Array(data))) as unknown;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ArrayBuffer.isView(data)) {
|
||||||
|
return JSON.parse(new TextDecoder().decode(new Uint8Array(data.buffer, data.byteOffset, data.byteLength))) as unknown;
|
||||||
|
}
|
||||||
|
|
||||||
|
return JSON.parse(new TextDecoder().decode(new Uint8Array(data as ArrayBuffer))) as unknown;
|
||||||
|
};
|
||||||
|
|
||||||
|
const extractExchangeMeta = (payload: unknown): AlpacaExchangeMetaEntry[] => {
|
||||||
|
if (!Array.isArray(payload)) {
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
const result: AlpacaExchangeMetaEntry[] = [];
|
||||||
|
|
||||||
|
for (const entry of payload) {
|
||||||
|
if (!entry || typeof entry !== "object") {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const candidate = entry as Record<string, unknown>;
|
||||||
|
const code = typeof candidate.code === "string" ? candidate.code : typeof candidate.exchange === "string" ? candidate.exchange : null;
|
||||||
|
const name = typeof candidate.name === "string" ? candidate.name : typeof candidate.description === "string" ? candidate.description : null;
|
||||||
|
if (!code || !name) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
result.push({ code, name });
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
};
|
||||||
|
|
||||||
|
const buildExchangeNameMap = (entries: AlpacaExchangeMetaEntry[]): Map<string, string> => {
|
||||||
|
const map = new Map<string, string>();
|
||||||
|
for (const entry of entries) {
|
||||||
|
const code = entry.code.trim();
|
||||||
|
const name = entry.name.trim();
|
||||||
|
if (!code || !name) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
map.set(code, name);
|
||||||
|
}
|
||||||
|
return map;
|
||||||
|
};
|
||||||
|
|
||||||
|
const OFF_EXCHANGE_HINTS = ["FINRA", "TRF", "ADF", "OTC", "Trade Reporting Facility", "Alternative Display Facility"];
|
||||||
|
|
||||||
|
export const inferOffExchangeFlag = (exchangeCode: string | undefined, exchangeNameMap: Map<string, string>): boolean => {
|
||||||
|
if (!exchangeCode) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
const name = exchangeNameMap.get(exchangeCode) ?? "";
|
||||||
|
const normalized = name.toUpperCase();
|
||||||
|
|
||||||
|
if (normalized) {
|
||||||
|
return OFF_EXCHANGE_HINTS.some((hint) => normalized.includes(hint.toUpperCase()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Conservative fallback: only tag the most common FINRA code when no mapping is available.
|
||||||
|
return exchangeCode.toUpperCase() === "D";
|
||||||
|
};
|
||||||
|
|
||||||
|
const buildWsUrl = (wsBaseUrl: string, feed: AlpacaEquitiesFeed): string => {
|
||||||
|
const parsed = new URL(wsBaseUrl);
|
||||||
|
return `${parsed.origin}/v2/${feed}`;
|
||||||
|
};
|
||||||
|
|
||||||
|
const fetchExchangeMeta = async (config: AlpacaEquitiesAdapterConfig): Promise<Map<string, string>> => {
|
||||||
|
const url = new URL("/v2/stocks/meta/exchanges", config.restUrl);
|
||||||
|
|
||||||
|
try {
|
||||||
|
const response = await fetch(url.toString(), {
|
||||||
|
headers: buildHeaders(config)
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
logger.warn("alpaca exchange meta request failed", {
|
||||||
|
status: response.status
|
||||||
|
});
|
||||||
|
return new Map();
|
||||||
|
}
|
||||||
|
|
||||||
|
const payload = (await response.json()) as unknown;
|
||||||
|
const entries = extractExchangeMeta(payload);
|
||||||
|
return buildExchangeNameMap(entries);
|
||||||
|
} catch (error) {
|
||||||
|
logger.warn("alpaca exchange meta request error", {
|
||||||
|
error: error instanceof Error ? error.message : String(error)
|
||||||
|
});
|
||||||
|
return new Map();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
export const createAlpacaEquitiesAdapter = (
|
||||||
|
config: AlpacaEquitiesAdapterConfig
|
||||||
|
): EquityIngestAdapter => {
|
||||||
|
return {
|
||||||
|
name: "alpaca",
|
||||||
|
start: async (handlers: EquityIngestHandlers) => {
|
||||||
|
if (!config.keyId || !config.secretKey) {
|
||||||
|
throw new Error("Alpaca equities adapter requires ALPACA_KEY_ID and ALPACA_SECRET_KEY.");
|
||||||
|
}
|
||||||
|
|
||||||
|
const symbols = normalizeSymbols(config.symbols);
|
||||||
|
if (symbols.length === 0) {
|
||||||
|
throw new Error("Alpaca equities adapter requires at least one symbol.");
|
||||||
|
}
|
||||||
|
|
||||||
|
const exchangeNameMap = await fetchExchangeMeta(config);
|
||||||
|
const wsUrl = buildWsUrl(config.wsBaseUrl, config.feed);
|
||||||
|
const ws = new WebSocket(wsUrl);
|
||||||
|
|
||||||
|
let seq = 0;
|
||||||
|
let stopped = false;
|
||||||
|
let authenticated = false;
|
||||||
|
|
||||||
|
ws.on("open", () => {
|
||||||
|
ws.send(
|
||||||
|
JSON.stringify({
|
||||||
|
action: "auth",
|
||||||
|
key: config.keyId,
|
||||||
|
secret: config.secretKey
|
||||||
|
})
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
const subscribe = () => {
|
||||||
|
const message: Record<string, unknown> = {
|
||||||
|
action: "subscribe",
|
||||||
|
trades: symbols
|
||||||
|
};
|
||||||
|
|
||||||
|
if (handlers.onQuote) {
|
||||||
|
message.quotes = symbols;
|
||||||
|
}
|
||||||
|
|
||||||
|
ws.send(JSON.stringify(message));
|
||||||
|
};
|
||||||
|
|
||||||
|
ws.on("message", (data) => {
|
||||||
|
if (stopped) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let payload: unknown;
|
||||||
|
try {
|
||||||
|
payload = decodePayload(data);
|
||||||
|
} catch (error) {
|
||||||
|
logger.warn("alpaca equities message decode failed", {
|
||||||
|
error: error instanceof Error ? error.message : String(error)
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!Array.isArray(payload)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const entry of payload) {
|
||||||
|
if (!entry || typeof entry !== "object") {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const message = entry as (AlpacaTradeMessage | AlpacaQuoteMessage | { T?: string; msg?: string });
|
||||||
|
const type = message.T;
|
||||||
|
|
||||||
|
if (type === "success") {
|
||||||
|
const msg = (message as { msg?: string }).msg ?? "";
|
||||||
|
if (msg === "authenticated") {
|
||||||
|
authenticated = true;
|
||||||
|
subscribe();
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (type === "subscription") {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (type === "error") {
|
||||||
|
logger.error("alpaca equities stream error", { message });
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (type === "t") {
|
||||||
|
const trade = message as AlpacaTradeMessage;
|
||||||
|
const sourceTs = parseTimestamp(trade.t);
|
||||||
|
seq += 1;
|
||||||
|
const exchangeCode = trade.x ?? "";
|
||||||
|
|
||||||
|
void handlers.onTrade({
|
||||||
|
source_ts: sourceTs,
|
||||||
|
ingest_ts: Date.now(),
|
||||||
|
seq,
|
||||||
|
trace_id: `alpaca-equities-${seq}`,
|
||||||
|
ts: sourceTs,
|
||||||
|
underlying_id: trade.S,
|
||||||
|
price: trade.p,
|
||||||
|
size: trade.s,
|
||||||
|
exchange: exchangeCode || "ALPACA",
|
||||||
|
offExchangeFlag: inferOffExchangeFlag(exchangeCode, exchangeNameMap)
|
||||||
|
} satisfies EquityPrint);
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (type === "q" && handlers.onQuote) {
|
||||||
|
const quote = message as AlpacaQuoteMessage;
|
||||||
|
const sourceTs = parseTimestamp(quote.t);
|
||||||
|
seq += 1;
|
||||||
|
|
||||||
|
void handlers.onQuote({
|
||||||
|
source_ts: sourceTs,
|
||||||
|
ingest_ts: Date.now(),
|
||||||
|
seq,
|
||||||
|
trace_id: `alpaca-equity-quote-${seq}`,
|
||||||
|
ts: sourceTs,
|
||||||
|
underlying_id: quote.S,
|
||||||
|
bid: quote.bp,
|
||||||
|
ask: quote.ap
|
||||||
|
} satisfies EquityQuote);
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
ws.on("error", (error) => {
|
||||||
|
logger.error("alpaca equities websocket error", {
|
||||||
|
error: error instanceof Error ? error.message : String(error)
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
ws.on("close", (code, reason) => {
|
||||||
|
logger.warn("alpaca equities websocket closed", {
|
||||||
|
code,
|
||||||
|
reason: reason.toString(),
|
||||||
|
authenticated
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
return () => {
|
||||||
|
stopped = true;
|
||||||
|
ws.close();
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
@ -22,6 +22,7 @@ import {
|
||||||
type EquityPrint,
|
type EquityPrint,
|
||||||
type EquityQuote
|
type EquityQuote
|
||||||
} from "@islandflow/types";
|
} from "@islandflow/types";
|
||||||
|
import { createAlpacaEquitiesAdapter } from "./adapters/alpaca";
|
||||||
import { createSyntheticEquitiesAdapter } from "./adapters/synthetic";
|
import { createSyntheticEquitiesAdapter } from "./adapters/synthetic";
|
||||||
import type { EquityIngestAdapter, StopHandler } from "./adapters/types";
|
import type { EquityIngestAdapter, StopHandler } from "./adapters/types";
|
||||||
import { z } from "zod";
|
import { z } from "zod";
|
||||||
|
|
@ -35,6 +36,15 @@ const envSchema = z.object({
|
||||||
CLICKHOUSE_DATABASE: z.string().default("default"),
|
CLICKHOUSE_DATABASE: z.string().default("default"),
|
||||||
EQUITIES_INGEST_ADAPTER: z.string().min(1).default("synthetic"),
|
EQUITIES_INGEST_ADAPTER: z.string().min(1).default("synthetic"),
|
||||||
EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000),
|
EMIT_INTERVAL_MS: z.coerce.number().int().positive().default(1000),
|
||||||
|
|
||||||
|
// Alpaca (equities)
|
||||||
|
ALPACA_KEY_ID: z.string().default(""),
|
||||||
|
ALPACA_SECRET_KEY: z.string().default(""),
|
||||||
|
ALPACA_REST_URL: z.string().default("https://data.alpaca.markets"),
|
||||||
|
ALPACA_WS_BASE_URL: z.string().default("wss://stream.data.alpaca.markets"),
|
||||||
|
ALPACA_UNDERLYINGS: z.string().default("SPY,NVDA,AAPL"),
|
||||||
|
ALPACA_EQUITIES_FEED: z.enum(["iex", "sip"]).default("iex"),
|
||||||
|
|
||||||
TESTING_MODE: z
|
TESTING_MODE: z
|
||||||
.preprocess((value) => {
|
.preprocess((value) => {
|
||||||
if (typeof value === "string") {
|
if (typeof value === "string") {
|
||||||
|
|
@ -113,11 +123,29 @@ const retry = async <T>(
|
||||||
throw lastError ?? new Error(`${label} failed after retries`);
|
throw lastError ?? new Error(`${label} failed after retries`);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const parseSymbolList = (value: string): string[] => {
|
||||||
|
return value
|
||||||
|
.split(",")
|
||||||
|
.map((entry) => entry.trim())
|
||||||
|
.filter(Boolean);
|
||||||
|
};
|
||||||
|
|
||||||
const selectAdapter = (name: string): EquityIngestAdapter => {
|
const selectAdapter = (name: string): EquityIngestAdapter => {
|
||||||
if (name === "synthetic") {
|
if (name === "synthetic") {
|
||||||
return createSyntheticEquitiesAdapter({ emitIntervalMs: env.EMIT_INTERVAL_MS });
|
return createSyntheticEquitiesAdapter({ emitIntervalMs: env.EMIT_INTERVAL_MS });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (name === "alpaca") {
|
||||||
|
return createAlpacaEquitiesAdapter({
|
||||||
|
keyId: env.ALPACA_KEY_ID,
|
||||||
|
secretKey: env.ALPACA_SECRET_KEY,
|
||||||
|
restUrl: env.ALPACA_REST_URL,
|
||||||
|
wsBaseUrl: env.ALPACA_WS_BASE_URL,
|
||||||
|
feed: env.ALPACA_EQUITIES_FEED,
|
||||||
|
symbols: parseSymbolList(env.ALPACA_UNDERLYINGS)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
throw new Error(`Unknown ingest adapter: ${name}`);
|
throw new Error(`Unknown ingest adapter: ${name}`);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
29
services/ingest-equities/tests/alpaca.test.ts
Normal file
29
services/ingest-equities/tests/alpaca.test.ts
Normal file
|
|
@ -0,0 +1,29 @@
|
||||||
|
import { describe, expect, test } from "bun:test";
|
||||||
|
import { inferOffExchangeFlag } from "../src/adapters/alpaca";
|
||||||
|
|
||||||
|
describe("alpaca equities adapter helpers", () => {
|
||||||
|
test("inferOffExchangeFlag tags FINRA/TRF venues as off-exchange", () => {
|
||||||
|
const map = new Map<string, string>([
|
||||||
|
["D", "FINRA / Nasdaq TRF"],
|
||||||
|
["N", "FINRA / NYSE TRF"],
|
||||||
|
["Q", "NASDAQ"],
|
||||||
|
["P", "NYSE ARCA"],
|
||||||
|
["O", "OTC Markets"]
|
||||||
|
]);
|
||||||
|
|
||||||
|
expect(inferOffExchangeFlag("D", map)).toBe(true);
|
||||||
|
expect(inferOffExchangeFlag("N", map)).toBe(true);
|
||||||
|
expect(inferOffExchangeFlag("O", map)).toBe(true);
|
||||||
|
expect(inferOffExchangeFlag("Q", map)).toBe(false);
|
||||||
|
expect(inferOffExchangeFlag("P", map)).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
test("inferOffExchangeFlag falls back conservatively when no mapping", () => {
|
||||||
|
const empty = new Map<string, string>();
|
||||||
|
|
||||||
|
expect(inferOffExchangeFlag(undefined, empty)).toBe(false);
|
||||||
|
expect(inferOffExchangeFlag("", empty)).toBe(false);
|
||||||
|
expect(inferOffExchangeFlag("D", empty)).toBe(true);
|
||||||
|
expect(inferOffExchangeFlag("N", empty)).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
Loading…
Add table
Add a link
Reference in a new issue