add alpaca news wire across ingest api and web

This commit is contained in:
dirtydishes 2026-05-18 16:55:31 -04:00
parent 62aae70878
commit 906fe411c9
31 changed files with 1407 additions and 50 deletions

View file

@ -7,6 +7,7 @@ import {
EquityPrintJoinSchema,
InferredDarkEventSchema,
FlowPacketSchema,
NewsStorySchema,
OptionNBBOSchema,
OptionPrintSchema,
SmartMoneyEventSchema
@ -20,6 +21,7 @@ import type {
EquityPrintJoin,
InferredDarkEvent,
FlowPacket,
NewsStory,
SmartMoneyEvent,
OptionNBBO,
OptionPrint,
@ -91,6 +93,13 @@ import {
toSmartMoneyEventRecord,
type SmartMoneyEventRecord
} from "./smart-money-events";
import {
NEWS_TABLE,
newsTableDDL,
fromNewsRecord,
toNewsRecord,
type NewsRecord
} from "./news";
export type ClickHouseOptions = {
url: string;
@ -320,6 +329,12 @@ export const ensureAlertsTable = async (client: ClickHouseClient): Promise<void>
}
};
export const ensureNewsTable = async (client: ClickHouseClient): Promise<void> => {
await client.exec({
query: newsTableDDL()
});
};
export const insertOptionPrint = async (
client: ClickHouseClient,
print: OptionPrint
@ -449,6 +464,15 @@ export const insertAlert = async (client: ClickHouseClient, alert: AlertEvent):
});
};
export const insertNewsStory = async (client: ClickHouseClient, story: NewsStory): Promise<void> => {
const record = toNewsRecord(story);
await client.insert({
table: NEWS_TABLE,
values: [record],
format: "JSONEachRow"
});
};
export type ClickHouseBatchWriterOptions = {
flushIntervalMs?: number;
maxRows?: number;
@ -600,6 +624,13 @@ export const enqueueAlertInsert = (
writer.enqueue(ALERTS_TABLE, toAlertRecord(alert));
};
export const enqueueNewsStoryInsert = (
writer: ClickHouseBatchWriter,
story: NewsStory
): void => {
writer.enqueue(NEWS_TABLE, toNewsRecord(story));
};
const clampLimit = (limit: number): number => {
if (!Number.isFinite(limit)) {
return 100;
@ -1016,6 +1047,32 @@ const normalizeAlertRow = (row: unknown): AlertRecord | null => {
};
};
const normalizeNewsRow = (row: unknown): NewsRecord | null => {
if (!row || typeof row !== "object") {
return null;
}
const record = row as Record<string, unknown>;
return {
source_ts: coerceNumber(record.source_ts) as number,
ingest_ts: coerceNumber(record.ingest_ts) as number,
seq: coerceNumber(record.seq) as number,
trace_id: String(record.trace_id ?? ""),
story_id: coerceNumber(record.story_id) as number,
provider: String(record.provider ?? ""),
source: String(record.source ?? ""),
headline: String(record.headline ?? ""),
summary: String(record.summary ?? ""),
content_html: String(record.content_html ?? ""),
url: String(record.url ?? ""),
published_ts: coerceNumber(record.published_ts) as number,
updated_ts: coerceNumber(record.updated_ts) as number,
provider_symbols_json: String(record.provider_symbols_json ?? "[]"),
resolved_symbols_json: String(record.resolved_symbols_json ?? "[]"),
symbol_resolution: String(record.symbol_resolution ?? "none") as NewsRecord["symbol_resolution"]
};
};
export const fetchRecentOptionPrints = async (
client: ClickHouseClient,
limit: number,
@ -1207,6 +1264,50 @@ export const fetchRecentAlerts = async (
return AlertEventSchema.array().parse(alerts);
};
const latestNewsSelect = `
SELECT
source_ts,
ingest_ts,
seq,
trace_id,
story_id,
provider,
source,
headline,
summary,
content_html,
url,
published_ts,
updated_ts,
provider_symbols_json,
resolved_symbols_json,
symbol_resolution
FROM (
SELECT
*,
row_number() OVER (PARTITION BY provider, story_id ORDER BY updated_ts DESC, ingest_ts DESC, seq DESC) AS revision_rank
FROM ${NEWS_TABLE}
)
WHERE revision_rank = 1
`;
export const fetchRecentNews = async (
client: ClickHouseClient,
limit: number
): Promise<NewsStory[]> => {
const safeLimit = clampLimit(limit);
const result = await client.query({
query: `${latestNewsSelect} ORDER BY published_ts DESC, story_id DESC LIMIT ${safeLimit}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const records = rows
.map(normalizeNewsRow)
.filter((record): record is NewsRecord => record !== null);
return NewsStorySchema.array().parse(records.map(fromNewsRecord));
};
const normalizeAlertEvidenceRefs = (refs: string[]): string[] => {
return Array.from(new Set(refs.map((ref) => ref.trim()).filter(Boolean)));
};
@ -1600,6 +1701,27 @@ export const fetchAlertsAfter = async (
return AlertEventSchema.array().parse(alerts);
};
export const fetchNewsAfter = async (
client: ClickHouseClient,
afterTs: number,
afterSeq: number,
limit: number
): Promise<NewsStory[]> => {
const safeLimit = clampLimit(limit);
const safeAfterTs = clampCursor(afterTs);
const safeAfterSeq = clampCursor(afterSeq);
const result = await client.query({
query: `${latestNewsSelect} AND (published_ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY published_ts ASC, seq ASC LIMIT ${safeLimit}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const records = rows
.map(normalizeNewsRow)
.filter((record): record is NewsRecord => record !== null);
return NewsStorySchema.array().parse(records.map(fromNewsRecord));
};
export const fetchOptionPrintsBefore = async (
client: ClickHouseClient,
beforeTs: number,
@ -1778,6 +1900,25 @@ export const fetchAlertsBefore = async (
return AlertEventSchema.array().parse(records.map(fromAlertRecord));
};
export const fetchNewsBefore = async (
client: ClickHouseClient,
beforeTs: number,
beforeSeq: number,
limit: number
): Promise<NewsStory[]> => {
const safeLimit = clampLimit(limit);
const result = await client.query({
query: `${latestNewsSelect} AND ${buildBeforeTupleCondition("published_ts", "seq", beforeTs, beforeSeq)} ORDER BY published_ts DESC, seq DESC LIMIT ${safeLimit}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const records = rows
.map(normalizeNewsRow)
.filter((record): record is NewsRecord => record !== null);
return NewsStorySchema.array().parse(records.map(fromNewsRecord));
};
export const fetchInferredDarkBefore = async (
client: ClickHouseClient,
beforeTs: number,

View file

@ -10,3 +10,4 @@ export * from "./equity-print-joins";
export * from "./inferred-dark";
export * from "./option-prints";
export * from "./option-nbbo";
export * from "./news";

View file

@ -0,0 +1,102 @@
import type { NewsStory, NewsSymbolResolution } from "@islandflow/types";
export const NEWS_TABLE = "news";
export type NewsRecord = {
source_ts: number;
ingest_ts: number;
seq: number;
trace_id: string;
story_id: number;
provider: string;
source: string;
headline: string;
summary: string;
content_html: string;
url: string;
published_ts: number;
updated_ts: number;
provider_symbols_json: string;
resolved_symbols_json: string;
symbol_resolution: NewsSymbolResolution;
};
export const newsTableDDL = (): string => {
return `
CREATE TABLE IF NOT EXISTS ${NEWS_TABLE} (
source_ts UInt64,
ingest_ts UInt64,
seq UInt64,
trace_id String,
story_id UInt64,
provider String,
source String,
headline String,
summary String,
content_html String,
url String,
published_ts UInt64,
updated_ts UInt64,
provider_symbols_json String,
resolved_symbols_json String,
symbol_resolution String
)
ENGINE = ReplacingMergeTree(updated_ts)
ORDER BY (provider, story_id, updated_ts, seq)
`;
};
const safeStringArray = (value: string): string[] => {
try {
const parsed = JSON.parse(value);
if (Array.isArray(parsed)) {
return parsed.map((entry) => String(entry));
}
} catch {
// ignore
}
return [];
};
export const toNewsRecord = (story: NewsStory): NewsRecord => {
return {
source_ts: story.source_ts,
ingest_ts: story.ingest_ts,
seq: story.seq,
trace_id: story.trace_id,
story_id: story.story_id,
provider: story.provider,
source: story.source,
headline: story.headline,
summary: story.summary,
content_html: story.content_html,
url: story.url,
published_ts: story.published_ts,
updated_ts: story.updated_ts,
provider_symbols_json: JSON.stringify(story.provider_symbols),
resolved_symbols_json: JSON.stringify(story.resolved_symbols),
symbol_resolution: story.symbol_resolution
};
};
export const fromNewsRecord = (record: NewsRecord): NewsStory => {
return {
source_ts: record.source_ts,
ingest_ts: record.ingest_ts,
seq: record.seq,
trace_id: record.trace_id,
story_id: record.story_id,
provider: record.provider,
source: record.source,
headline: record.headline,
summary: record.summary,
content_html: record.content_html,
url: record.url,
published_ts: record.published_ts,
updated_ts: record.updated_ts,
provider_symbols: safeStringArray(record.provider_symbols_json),
resolved_symbols: safeStringArray(record.resolved_symbols_json),
symbol_resolution: record.symbol_resolution
};
};

View file

@ -0,0 +1,78 @@
import { describe, expect, it } from "bun:test";
import type { ClickHouseClient } from "../src/clickhouse";
import {
NEWS_TABLE,
fromNewsRecord,
newsTableDDL,
toNewsRecord
} from "../src/news";
import {
fetchNewsAfter,
fetchNewsBefore,
fetchRecentNews
} from "../src/clickhouse";
const makeClient = (resolver: (query: string) => unknown[]): ClickHouseClient =>
({
exec: async () => {},
insert: async () => {},
ping: async () => ({ success: true }),
close: async () => {},
query: async ({ query }: { query: string }) => ({
async json<T>() {
return resolver(query) as T;
}
})
}) as ClickHouseClient;
const story = {
source_ts: 100,
ingest_ts: 101,
seq: 3,
trace_id: "alpaca:77",
story_id: 77,
provider: "alpaca",
source: "Benzinga",
headline: "TSLA rises",
summary: "Summary",
content_html: "<p>TSLA rises</p>",
url: "https://example.com/story",
published_ts: 100,
updated_ts: 120,
provider_symbols: ["TSLA"],
resolved_symbols: ["TSLA", "AAPL"],
symbol_resolution: "mixed" as const
};
describe("news storage helpers", () => {
it("includes the correct table name in the DDL", () => {
const ddl = newsTableDDL();
expect(ddl).toContain(NEWS_TABLE);
expect(ddl).toContain("ReplacingMergeTree");
});
it("round-trips news records", () => {
const record = toNewsRecord(story);
const restored = fromNewsRecord(record);
expect(restored).toEqual(story);
});
it("uses latest-revision selection for recent and cursor queries", async () => {
const queries: string[] = [];
const client = makeClient((query) => {
queries.push(query);
return [toNewsRecord(story)];
});
const recent = await fetchRecentNews(client, 10);
const before = await fetchNewsBefore(client, 200, 10, 10);
const after = await fetchNewsAfter(client, 50, 1, 10);
expect(recent[0]?.trace_id).toBe("alpaca:77");
expect(before[0]?.story_id).toBe(77);
expect(after[0]?.updated_ts).toBe(120);
expect(queries[0]).toContain("row_number() OVER");
expect(queries[1]).toContain("published_ts");
expect(queries[2]).toContain("(published_ts, seq) > (50, 1)");
});
});