alpaca-news #5

Merged
dirtydishes merged 3 commits from alpaca-news into main 2026-05-20 00:09:09 +00:00
3 changed files with 17 additions and 5 deletions
Showing only changes of commit 93b9152345 - Show all commits

View file

@ -270,7 +270,7 @@ All runtime configuration comes from `.env`.
| `ALPACA_MONEYNESS_FALLBACK_PCT` | `0.1` | Wider fallback moneyness filter if candidate set is too sparse. | | `ALPACA_MONEYNESS_FALLBACK_PCT` | `0.1` | Wider fallback moneyness filter if candidate set is too sparse. |
| `ALPACA_MAX_QUOTES` | `200` | Upper bound on selected Alpaca options contracts/quotes per cycle. | | `ALPACA_MAX_QUOTES` | `200` | Upper bound on selected Alpaca options contracts/quotes per cycle. |
| `ALPACA_EQUITIES_FEED` | `iex` | Alpaca equities feed: `iex` or `sip`. | | `ALPACA_EQUITIES_FEED` | `iex` | Alpaca equities feed: `iex` or `sip`. |
| `ALPACA_NEWS_BACKFILL_LIMIT` | `100` | Alpaca news stories fetched on startup, capped at 200. | | `ALPACA_NEWS_BACKFILL_LIMIT` | `50` | Alpaca news stories fetched on startup, capped at 50 by the Alpaca News API. |
| `ALPACA_NEWS_WEBSOCKET_PATH` | `/v1beta1/news` | Alpaca news websocket path. | | `ALPACA_NEWS_WEBSOCKET_PATH` | `/v1beta1/news` | Alpaca news websocket path. |
### Databento replay adapter configuration ### Databento replay adapter configuration

View file

@ -92,7 +92,8 @@ import {
fetchNearestOptionNBBOForPrints, fetchNearestOptionNBBOForPrints,
fetchSmartMoneyEventsByPacketIds, fetchSmartMoneyEventsByPacketIds,
fetchClassifierHitsByPacketIds, fetchClassifierHitsByPacketIds,
fetchRecentOptionPrints fetchRecentOptionPrints,
insertNewsStory
} from "@islandflow/storage"; } from "@islandflow/storage";
import type { EquityPrintQueryFilters } from "@islandflow/storage"; import type { EquityPrintQueryFilters } from "@islandflow/storage";
import { import {
@ -1277,6 +1278,7 @@ const run = async () => {
for await (const msg of newsSubscription.messages) { for await (const msg of newsSubscription.messages) {
try { try {
const payload = NewsStorySchema.parse(newsSubscription.decode(msg)); const payload = NewsStorySchema.parse(newsSubscription.decode(msg));
await insertNewsStory(clickhouse, payload);
await fanoutLive({ channel: "news" }, payload, "news"); await fanoutLive({ channel: "news" }, payload, "news");
msg.ack(); msg.ack();
} catch (error) { } catch (error) {

View file

@ -30,13 +30,21 @@ const envSchema = z.object({
ALPACA_SECRET_KEY: z.string().default(""), ALPACA_SECRET_KEY: z.string().default(""),
ALPACA_REST_URL: z.string().default("https://data.alpaca.markets"), ALPACA_REST_URL: z.string().default("https://data.alpaca.markets"),
ALPACA_WS_BASE_URL: z.string().default("wss://stream.data.alpaca.markets"), ALPACA_WS_BASE_URL: z.string().default("wss://stream.data.alpaca.markets"),
ALPACA_NEWS_BACKFILL_LIMIT: z.coerce.number().int().positive().max(200).default(100), ALPACA_NEWS_BACKFILL_LIMIT: z.coerce.number().int().positive().max(50).default(50),
ALPACA_NEWS_WEBSOCKET_PATH: z.string().default("/v1beta1/news") ALPACA_NEWS_WEBSOCKET_PATH: z.string().default("/v1beta1/news")
}); });
const env = readEnv(envSchema); const env = readEnv(envSchema);
const alpacaCredentials = resolveAlpacaCredentials(env); const alpacaCredentials = resolveAlpacaCredentials(env);
const escapeHtml = (value: string): string =>
value
.replaceAll("&", "&")
.replaceAll("<", "&lt;")
.replaceAll(">", "&gt;")
.replaceAll('"', "&quot;")
.replaceAll("'", "&#39;");
type AlpacaNewsItem = { type AlpacaNewsItem = {
id?: number; id?: number;
headline?: string; headline?: string;
@ -66,7 +74,8 @@ const toStory = (item: AlpacaNewsItem, seq: number): NewsStory | null => {
} }
const provider = "alpaca"; const provider = "alpaca";
const contentHtml = item.content ?? ""; const summary = item.summary?.trim() ?? "";
const contentHtml = item.content?.trim() || (summary ? `<p>${escapeHtml(summary)}</p>` : "");
const symbols = resolveNewsSymbols(item.symbols ?? [], contentHtml); const symbols = resolveNewsSymbols(item.symbols ?? [], contentHtml);
const publishedTs = parseTimestamp(item.created_at); const publishedTs = parseTimestamp(item.created_at);
const updatedTs = parseTimestamp(item.updated_at ?? item.created_at); const updatedTs = parseTimestamp(item.updated_at ?? item.created_at);
@ -80,7 +89,7 @@ const toStory = (item: AlpacaNewsItem, seq: number): NewsStory | null => {
provider, provider,
source: item.source?.trim() || item.author?.trim() || "Alpaca News", source: item.source?.trim() || item.author?.trim() || "Alpaca News",
headline: item.headline?.trim() || `Story ${storyId}`, headline: item.headline?.trim() || `Story ${storyId}`,
summary: item.summary?.trim() || "", summary,
content_html: contentHtml, content_html: contentHtml,
url: item.url?.trim() || "", url: item.url?.trim() || "",
published_ts: publishedTs, published_ts: publishedTs,
@ -95,6 +104,7 @@ const fetchBackfill = async (): Promise<AlpacaNewsItem[]> => {
const url = new URL("/v1beta1/news", env.ALPACA_REST_URL); const url = new URL("/v1beta1/news", env.ALPACA_REST_URL);
url.searchParams.set("sort", "desc"); url.searchParams.set("sort", "desc");
url.searchParams.set("limit", env.ALPACA_NEWS_BACKFILL_LIMIT.toString()); url.searchParams.set("limit", env.ALPACA_NEWS_BACKFILL_LIMIT.toString());
url.searchParams.set("include_content", "true");
const response = await fetch(url.toString(), { const response = await fetch(url.toString(), {
headers: buildAlpacaAuthHeaders(alpacaCredentials) headers: buildAlpacaAuthHeaders(alpacaCredentials)