From 906fe411c9daffde038285c3a29f02d07c351e6c Mon Sep 17 00:00:00 2001 From: dirtydishes Date: Mon, 18 May 2026 16:55:31 -0400 Subject: [PATCH] add alpaca news wire across ingest api and web --- .beads/issues.jsonl | 1 + apps/web/app/globals.css | 74 +++++ apps/web/app/news/page.tsx | 7 + apps/web/app/terminal.test.ts | 21 +- apps/web/app/terminal.tsx | 305 +++++++++++++++++++- bun.lock | 13 + deployment/docker/Dockerfile.ingest-options | 1 + deployment/docker/Dockerfile.service | 1 + deployment/docker/Dockerfile.web | 1 + deployment/docker/docker-compose.yml | 4 + deployment/docker/workspace-root/bun.lock | 13 + docs/turns/2026-05-18-news-wire-view.html | 152 ++++++++++ packages/bus/src/streams.ts | 5 +- packages/bus/src/subjects.ts | 2 + packages/storage/src/clickhouse.ts | 141 +++++++++ packages/storage/src/index.ts | 1 + packages/storage/src/news.ts | 102 +++++++ packages/storage/tests/news.test.ts | 78 +++++ packages/types/src/events.ts | 23 ++ packages/types/src/live.ts | 8 +- packages/types/tests/live.test.ts | 26 +- scripts/deploy.ts | 18 +- scripts/dev-services.ts | 1 + scripts/dev.ts | 1 + services/api/src/index.ts | 54 +++- services/api/src/live.ts | 65 +++-- services/ingest-news/package.json | 16 + services/ingest-news/src/index.ts | 216 ++++++++++++++ services/ingest-news/src/symbols.ts | 70 +++++ services/ingest-news/tests/symbols.test.ts | 30 ++ services/ingest-news/tsconfig.json | 7 + 31 files changed, 1407 insertions(+), 50 deletions(-) create mode 100644 apps/web/app/news/page.tsx create mode 100644 docs/turns/2026-05-18-news-wire-view.html create mode 100644 packages/storage/src/news.ts create mode 100644 packages/storage/tests/news.test.ts create mode 100644 services/ingest-news/package.json create mode 100644 services/ingest-news/src/index.ts create mode 100644 services/ingest-news/src/symbols.ts create mode 100644 services/ingest-news/tests/symbols.test.ts create mode 100644 services/ingest-news/tsconfig.json diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index 629eb06..9909cdd 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -13,6 +13,7 @@ {"_type":"issue","id":"islandflow-ayo","title":"Drop stale backlog events from live fanout","description":"Follow-up to live freshness rollout: /ws/live was still fanning out stale backlog events for freshness-gated channels, which kept tape panes in Live feed behind despite active synthetic ingest. Gate fanout and cache ingest by freshness for options/nbbo/equities/flow.","status":"closed","priority":1,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T21:26:39Z","created_by":"dirtydishes","updated_at":"2026-04-28T21:26:44Z","started_at":"2026-04-28T21:26:44Z","closed_at":"2026-04-28T21:26:44Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-0v6","title":"Fix tape freshness, NBBO coverage, pause controls, and filter popup","description":"Implement the tape fixes requested for synthetic options notional sizing, strict live freshness, live-mode pause/resume behavior, stronger NBBO snapshot coverage, and moving flow filters behind a popup. Includes server-side live cache changes, web terminal state/UI changes, and tests for synthetic pricing, live snapshot freshness/NBBO retention, and live pause/filter interactions.","status":"closed","priority":1,"issue_type":"task","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T21:02:52Z","created_by":"dirtydishes","updated_at":"2026-04-28T21:13:38Z","started_at":"2026-04-28T21:02:57Z","closed_at":"2026-04-28T21:13:38Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-e4r","title":"Implement smart-money flow filtering and synthetic firehose modes","description":"Implement the approved multi-surface plan for named synthetic market profiles, options raw-vs-signal filtering, live/API filter contracts, Tape page client-side flow filters, firehose-readiness improvements, tests, and README updates.","status":"closed","priority":1,"issue_type":"feature","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T20:10:49Z","created_by":"dirtydishes","updated_at":"2026-04-28T20:29:29Z","started_at":"2026-04-28T20:10:53Z","closed_at":"2026-04-28T20:29:29Z","close_reason":"Implemented synthetic market profiles, options signal-path filtering, signal-aware API/replay contracts, Tape page filters, tests, and README updates. Follow-up tracked in islandflow-biq.","dependency_count":0,"dependent_count":0,"comment_count":0} +{"_type":"issue","id":"islandflow-8fn","title":"implement alpaca-backed news wire view","description":"Why this issue exists and what needs to be done:\\nAdd an Alpaca-powered live news pipeline, API, storage, and web experience, including a dedicated /news route, Home preview, live fanout, history pagination, ticker resolution, and replay-mode live-only empty states.\\n\\nAcceptance criteria:\\n- normalized NewsStory contract and live channel exist\\n- ingest-news service backfills and streams Alpaca news\\n- API persists, serves, and fans out news\\n- web app exposes /news plus Home preview and drawer\\n- tests cover types, storage, API, and key UI behaviors\\n- turn documentation is added\\n\\nDesign:\\nReuse Islandflow drawer, chips, panes, and terminal styling; keep news live-only in v1 replay mode.\\n\\nNotes:\\nImplement client-side ticker filtering in v1 and expose latest revision only per provider+story_id.","status":"closed","priority":2,"issue_type":"feature","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-18T20:37:13Z","created_by":"dirtydishes","updated_at":"2026-05-18T20:55:11Z","started_at":"2026-05-18T20:37:20Z","closed_at":"2026-05-18T20:55:11Z","close_reason":"Closed","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-k8i","title":"Fix duplicate alert context import in API entrypoint","description":"Recent alert-context work introduced a duplicate fetchAlertContextByTraceId import in services/api/src/index.ts, which risks breaking TypeScript compilation and API startup. Remove the duplicate import and validate the affected API/web tests.","status":"closed","priority":2,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-18T13:01:58Z","created_by":"dirtydishes","updated_at":"2026-05-18T13:03:40Z","started_at":"2026-05-18T13:02:02Z","closed_at":"2026-05-18T13:03:40Z","close_reason":"Closed","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-lk9","title":"Fix PR creation workflow after Forgejo migration","description":"## Why\\nCreating pull requests with fails after the repository moved primary collaboration from GitHub to Forgejo. The current workflow still assumes GitHub GraphQL PR creation semantics, which do not work against the Forgejo remote.\\n\\n## What\\nInvestigate the current PR creation path, identify remaining GitHub-specific assumptions, and update the repo workflow/scripts/docs so contributors can reliably publish branches and open PRs in the Forgejo-based setup.\\n\\n## Acceptance Criteria\\n- The repo no longer instructs contributors to use a broken GitHub-specific PR creation path for Forgejo branches\\n- There is a documented and preferably scripted way to create the equivalent review request against Forgejo\\n- Validation demonstrates the new workflow behaves correctly or clearly documents any remaining platform limitation","status":"in_progress","priority":2,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-18T10:26:47Z","created_by":"dirtydishes","updated_at":"2026-05-18T10:26:53Z","started_at":"2026-05-18T10:26:53Z","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-1ei","title":"Make deploy helper remote-aware for Forgejo","description":"Why: scripts/deploy.ts hardcodes git remote name origin for fetch/pull/push and branch verification, but this repository now uses forgejo/github remotes and may not have an origin remote. What: update deploy.ts to resolve the deploy git remote robustly (Forgejo-aware), use it across local prechecks, branch publish, and remote rollout git operations, and keep behavior explicit in output.","status":"closed","priority":2,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-18T03:20:12Z","created_by":"dirtydishes","updated_at":"2026-05-18T03:22:39Z","started_at":"2026-05-18T03:20:16Z","closed_at":"2026-05-18T03:22:39Z","close_reason":"Closed","dependency_count":0,"dependent_count":0,"comment_count":0} diff --git a/apps/web/app/globals.css b/apps/web/app/globals.css index 64b6f16..cf6746b 100644 --- a/apps/web/app/globals.css +++ b/apps/web/app/globals.css @@ -708,7 +708,12 @@ h3 { grid-template-columns: repeat(3, minmax(0, 1fr)); } +.page-grid-news { + grid-template-columns: minmax(0, 1fr); +} + .page-grid-home > :nth-child(3), +.page-grid-home > :nth-child(4), .page-grid-tape > :nth-child(1), .page-grid-replay > :nth-child(1) { grid-column: 1 / -1; @@ -933,6 +938,7 @@ h3 { } .page-grid-home > :nth-child(3), +.page-grid-home > :nth-child(4), .page-grid-replay > :not(:first-child) { height: clamp(430px, 58vh, 760px); } @@ -1747,6 +1753,72 @@ h3 { gap: 10px; } +.terminal-link-button { + text-decoration: none; +} + +.news-list { + display: flex; + flex-direction: column; + gap: 10px; +} + +.news-row { + width: 100%; + display: flex; + flex-direction: column; + gap: 8px; + padding: 14px 16px; + border: 1px solid var(--border); + border-radius: 12px; + background: oklch(0.18 0.012 250 / 0.6); + color: var(--text); + text-align: left; + transition: border-color 150ms ease, background 150ms ease; +} + +.news-row:hover { + border-color: var(--accent-soft); + background: oklch(0.2 0.015 250 / 0.75); +} + +.news-row-head, +.news-row-meta { + display: flex; + align-items: center; + justify-content: space-between; + gap: 10px; + flex-wrap: wrap; +} + +.news-row h3 { + margin: 0; + font-size: 0.96rem; + font-weight: 600; +} + +.news-row-time { + color: var(--text-dim); + font-family: var(--font-mono), monospace; + font-size: 0.78rem; +} + +.news-row-meta { + color: var(--text-dim); + font-size: 0.78rem; +} + +.news-drawer-body a { + color: var(--accent); +} + +.news-drawer-body p, +.news-drawer-body ul, +.news-drawer-body ol, +.news-drawer-body blockquote { + margin: 0 0 12px; +} + .synthetic-status-grid strong, .synthetic-hit-row strong { font-family: var(--font-mono), monospace; @@ -1964,6 +2036,7 @@ h3 { } .page-grid-home > :nth-child(3), + .page-grid-home > :nth-child(4), .page-grid-tape > :nth-child(1), .page-grid-replay > :nth-child(1) { grid-column: auto; @@ -1973,6 +2046,7 @@ h3 { .page-grid-home > :nth-child(1), .page-grid-home > :nth-child(2), .page-grid-home > :nth-child(3), + .page-grid-home > :nth-child(4), .page-grid-signals > .terminal-pane, .page-grid-replay > :not(:first-child), .page-grid-tape > :first-child, diff --git a/apps/web/app/news/page.tsx b/apps/web/app/news/page.tsx new file mode 100644 index 0000000..7e06aa8 --- /dev/null +++ b/apps/web/app/news/page.tsx @@ -0,0 +1,7 @@ +import { NewsRoute } from "../terminal"; + +export const dynamic = "force-dynamic"; + +export default function Page() { + return ; +} diff --git a/apps/web/app/terminal.test.ts b/apps/web/app/terminal.test.ts index 2be3da8..63918f2 100644 --- a/apps/web/app/terminal.test.ts +++ b/apps/web/app/terminal.test.ts @@ -247,6 +247,15 @@ describe("live manifest", () => { ]); }); + it("includes news subscriptions on home and /news", () => { + expect(getLiveManifest("/", "SPY", 60000, buildDefaultFlowFilters()).map((subscription) => subscription.channel)).toContain( + "news" + ); + expect(getLiveManifest("/news", "SPY", 60000, buildDefaultFlowFilters()).map((subscription) => subscription.channel)).toEqual([ + "news" + ]); + }); + it("scopes /charts subscriptions to chart channels only", () => { const channels = getLiveManifest("/charts", "SPY", 60000, buildDefaultFlowFilters()).map( (subscription) => subscription.channel @@ -431,6 +440,13 @@ describe("route feature map", () => { expect(features.equityOverlay).toBe(true); expect(features.alerts).toBe(false); }); + + it("maps /news to the dedicated news pane", () => { + const features = getRouteFeatures("/news"); + expect(features.news).toBe(true); + expect(features.showNewsPane).toBe(true); + expect(features.showAlertsPane).toBe(false); + }); }); describe("fixed tape virtualization config", () => { @@ -461,10 +477,11 @@ describe("dark underlying route dependency helper", () => { }); describe("terminal navigation", () => { - it("exposes only Home and Tape as top-level destinations", () => { + it("exposes Home, Tape, and News as top-level destinations", () => { expect(NAV_ITEMS).toEqual([ { href: "/", label: "Home" }, - { href: "/tape", label: "Tape" } + { href: "/tape", label: "Tape" }, + { href: "/news", label: "News" } ]); }); }); diff --git a/apps/web/app/terminal.tsx b/apps/web/app/terminal.tsx index e1ee74c..218e149 100644 --- a/apps/web/app/terminal.tsx +++ b/apps/web/app/terminal.tsx @@ -33,6 +33,7 @@ import type { LiveServerMessage, LiveHotChannelHealthMap, LiveSubscription, + NewsStory, OptionFlowFilters, OptionFlowView, OptionNbboSide, @@ -158,6 +159,7 @@ type RouteFeatures = { nbbo: boolean; equities: boolean; flow: boolean; + news: boolean; alerts: boolean; smartMoney: boolean; classifierHits: boolean; @@ -168,6 +170,7 @@ type RouteFeatures = { showOptionsPane: boolean; showEquitiesPane: boolean; showFlowPane: boolean; + showNewsPane: boolean; showAlertsPane: boolean; showClassifierPane: boolean; showDarkPane: boolean; @@ -187,6 +190,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => { const includeEquitiesFallback = shouldIncludeEquitiesForDarkUnderlyingFallback(); const normalizedPath = pathname === "/tape" || + pathname === "/news" || pathname === "/signals" || pathname === "/charts" || pathname === "/replay" @@ -200,6 +204,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => { nbbo: true, equities: true, flow: true, + news: false, alerts: false, smartMoney: false, classifierHits: false, @@ -210,6 +215,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => { showOptionsPane: true, showEquitiesPane: true, showFlowPane: true, + showNewsPane: false, showAlertsPane: false, showClassifierPane: false, showDarkPane: false, @@ -220,12 +226,41 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => { needsAlertEvidencePrefetch: false, needsDarkUnderlying: false }; + case "/news": + return { + options: false, + nbbo: false, + equities: false, + flow: false, + news: true, + alerts: false, + smartMoney: false, + classifierHits: false, + inferredDark: false, + equityJoins: false, + equityCandles: false, + equityOverlay: false, + showOptionsPane: false, + showEquitiesPane: false, + showFlowPane: false, + showNewsPane: true, + showAlertsPane: false, + showClassifierPane: false, + showDarkPane: false, + showChartPane: false, + showFocusPane: false, + showReplayConsole: false, + needsClassifierDecor: false, + needsAlertEvidencePrefetch: false, + needsDarkUnderlying: false + }; case "/signals": return { options: false, nbbo: false, equities: includeEquitiesFallback, flow: false, + news: false, alerts: true, smartMoney: true, classifierHits: true, @@ -236,6 +271,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => { showOptionsPane: false, showEquitiesPane: false, showFlowPane: false, + showNewsPane: false, showAlertsPane: true, showClassifierPane: true, showDarkPane: true, @@ -252,6 +288,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => { nbbo: false, equities: includeEquitiesFallback, flow: false, + news: false, alerts: false, smartMoney: true, classifierHits: false, @@ -262,6 +299,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => { showOptionsPane: false, showEquitiesPane: false, showFlowPane: false, + showNewsPane: false, showAlertsPane: false, showClassifierPane: false, showDarkPane: false, @@ -278,6 +316,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => { nbbo: false, equities: false, flow: false, + news: false, alerts: false, smartMoney: false, classifierHits: false, @@ -288,6 +327,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => { showOptionsPane: true, showEquitiesPane: false, showFlowPane: true, + showNewsPane: false, showAlertsPane: true, showClassifierPane: false, showDarkPane: false, @@ -305,6 +345,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => { nbbo: false, equities: true, flow: false, + news: true, alerts: true, smartMoney: true, classifierHits: false, @@ -315,6 +356,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => { showOptionsPane: false, showEquitiesPane: true, showFlowPane: false, + showNewsPane: true, showAlertsPane: true, showClassifierPane: false, showDarkPane: false, @@ -332,6 +374,7 @@ const EMPTY_ALERT_EVENTS: AlertEvent[] = []; const EMPTY_CLASSIFIER_HIT_EVENTS: ClassifierHitEvent[] = []; const EMPTY_SMART_MONEY_EVENTS: SmartMoneyEvent[] = []; const EMPTY_INFERRED_DARK_EVENTS: InferredDarkEvent[] = []; +const EMPTY_NEWS_STORIES: NewsStory[] = []; type CandlestickSeries = ReturnType; @@ -1194,6 +1237,44 @@ const formatDateTime = (ts: number): string => { return `${date.toLocaleDateString()} ${date.toLocaleTimeString()}`; }; +const isSameLocalDay = (left: number, right: number): boolean => { + const a = new Date(left); + const b = new Date(right); + return ( + a.getFullYear() === b.getFullYear() && + a.getMonth() === b.getMonth() && + a.getDate() === b.getDate() + ); +}; + +export const formatNewsTimestamp = (ts: number, now = Date.now()): string => { + const date = new Date(ts); + return isSameLocalDay(ts, now) + ? date.toLocaleTimeString([], { hour: "numeric", minute: "2-digit" }) + : date.toLocaleString([], { month: "short", day: "numeric", hour: "numeric", minute: "2-digit" }); +}; + +const sanitizeNewsHtml = (value: string): { html: string; fallbackText: string; sanitized: boolean } => { + const fallbackText = value + .replace(//gi, " ") + .replace(//gi, " ") + .replace(/<[^>]+>/g, " ") + .replace(/\s+/g, " ") + .trim(); + + try { + const sanitized = value + .replace(//gi, "") + .replace(//gi, "") + .replace(/\son\w+=(?:"[^"]*"|'[^']*'|[^\s>]+)/gi, "") + .replace(/\shref=(["'])javascript:[\s\S]*?\1/gi, ' href="#"') + .replace(/<(?!\/?(p|div|section|article|span|strong|em|b|i|ul|ol|li|br|a|h1|h2|h3|h4|blockquote)\b)[^>]*>/gi, ""); + return { html: sanitized, fallbackText, sanitized: true }; + } catch { + return { html: "", fallbackText, sanitized: false }; + } +}; + const humanizeClassifierId = (value: string): string => { if (!value) { return "Classifier"; @@ -2870,6 +2951,7 @@ type LiveSessionState = { smartMoneyHistory: SmartMoneyEvent[]; classifierHitsHistory: ClassifierHitEvent[]; alertsHistory: AlertEvent[]; + newsHistory: NewsStory[]; inferredDarkHistory: InferredDarkEvent[]; options: OptionPrint[]; nbbo: OptionNBBO[]; @@ -2880,6 +2962,7 @@ type LiveSessionState = { smartMoney: SmartMoneyEvent[]; classifierHits: ClassifierHitEvent[]; alerts: AlertEvent[]; + news: NewsStory[]; inferredDark: InferredDarkEvent[]; chartCandles: EquityCandle[]; chartOverlay: EquityPrint[]; @@ -2900,6 +2983,7 @@ const LIVE_HISTORY_ENDPOINTS: Partial([]); const [classifierHits, setClassifierHits] = useState([]); const [alerts, setAlerts] = useState([]); + const [news, setNews] = useState([]); const [inferredDark, setInferredDark] = useState([]); const [optionsHistory, setOptionsHistory] = useState([]); const [nbboHistory, setNbboHistory] = useState([]); @@ -3142,6 +3230,7 @@ const useLiveSession = ( const [smartMoneyHistory, setSmartMoneyHistory] = useState([]); const [classifierHitsHistory, setClassifierHitsHistory] = useState([]); const [alertsHistory, setAlertsHistory] = useState([]); + const [newsHistory, setNewsHistory] = useState([]); const [inferredDarkHistory, setInferredDarkHistory] = useState([]); const [chartCandles, setChartCandles] = useState([]); const [chartOverlay, setChartOverlay] = useState([]); @@ -3154,6 +3243,7 @@ const useLiveSession = ( const smartMoneyRef = useRef([]); const classifierHitsRef = useRef([]); const alertsRef = useRef([]); + const newsRef = useRef([]); const inferredDarkRef = useRef([]); const chartCandlesRef = useRef([]); const chartOverlayRef = useRef([]); @@ -3165,6 +3255,7 @@ const useLiveSession = ( const smartMoneyHistoryRef = useRef([]); const classifierHitsHistoryRef = useRef([]); const alertsHistoryRef = useRef([]); + const newsHistoryRef = useRef([]); const inferredDarkHistoryRef = useRef([]); const socketRef = useRef(null); const reconnectRef = useRef(null); @@ -3218,6 +3309,7 @@ const useLiveSession = ( setSmartMoney([]); setClassifierHits([]); setAlerts([]); + setNews([]); setInferredDark([]); setOptionsHistory([]); setNbboHistory([]); @@ -3227,6 +3319,7 @@ const useLiveSession = ( setSmartMoneyHistory([]); setClassifierHitsHistory([]); setAlertsHistory([]); + setNewsHistory([]); setInferredDarkHistory([]); setChartCandles([]); setChartOverlay([]); @@ -3239,6 +3332,7 @@ const useLiveSession = ( smartMoneyRef.current = []; classifierHitsRef.current = []; alertsRef.current = []; + newsRef.current = []; inferredDarkRef.current = []; chartCandlesRef.current = []; chartOverlayRef.current = []; @@ -3250,6 +3344,7 @@ const useLiveSession = ( smartMoneyHistoryRef.current = []; classifierHitsHistoryRef.current = []; alertsHistoryRef.current = []; + newsHistoryRef.current = []; inferredDarkHistoryRef.current = []; subscribedKeysRef.current = new Set(); subscribedMapRef.current = new Map(); @@ -3403,6 +3498,12 @@ const useLiveSession = ( ref: alertsHistoryRef }); break; + case "news": + mergeItems(setNews, newsRef, items as NewsStory[], LIVE_OPTIONS_HEAD_LIMIT, { + setter: setNewsHistory, + ref: newsHistoryRef + }); + break; case "inferred-dark": mergeItems(setInferredDark, inferredDarkRef, items as InferredDarkEvent[], LIVE_HOT_WINDOW, { setter: setInferredDarkHistory, @@ -3694,6 +3795,9 @@ const useLiveSession = ( case "alerts": mergeOlder(setAlertsHistory, alertsHistoryRef, alertsRef.current); break; + case "news": + mergeOlder(setNewsHistory, newsHistoryRef, newsRef.current); + break; case "inferred-dark": mergeOlder(setInferredDarkHistory, inferredDarkHistoryRef, inferredDarkRef.current); break; @@ -3735,6 +3839,7 @@ const useLiveSession = ( smartMoneyHistory, classifierHitsHistory, alertsHistory, + newsHistory, inferredDarkHistory, options, nbbo, @@ -3745,6 +3850,7 @@ const useLiveSession = ( smartMoney, classifierHits, alerts, + news, inferredDark, chartCandles, chartOverlay @@ -4822,6 +4928,69 @@ const AlertDrawer = ({ alert, flowPacket, evidence, contextStatus, onClose }: Al ); }; +type NewsDrawerProps = { + story: NewsStory; + onClose: () => void; +}; + +const NewsDrawer = ({ story, onClose }: NewsDrawerProps) => { + const body = sanitizeNewsHtml(story.content_html); + + return ( + + ); +}; + type ClassifierHitDrawerProps = { hit: ClassifierHitEvent; flowPacket: FlowPacket | null; @@ -5178,6 +5347,7 @@ const useTerminalState = () => { const [mode, setMode] = useState("live"); const [replaySource, setReplaySource] = useState(null); const [selectedAlert, setSelectedAlert] = useState(null); + const [selectedNewsStory, setSelectedNewsStory] = useState(null); const [selectedDarkEvent, setSelectedDarkEvent] = useState(null); const [selectedClassifierHit, setSelectedClassifierHit] = useState(null); const [selectedSmartMoneyEvent, setSelectedSmartMoneyEvent] = useState(null); @@ -5274,12 +5444,13 @@ const useTerminalState = () => { }, [mode]); useEffect(() => { - if (!selectedAlert && !selectedClassifierHit && !selectedDarkEvent && !selectedSmartMoneyEvent) { + if (!selectedAlert && !selectedNewsStory && !selectedClassifierHit && !selectedDarkEvent && !selectedSmartMoneyEvent) { return; } const dismissDrawers = () => { setSelectedAlert(null); + setSelectedNewsStory(null); setSelectedClassifierHit(null); setSelectedSmartMoneyEvent(null); setSelectedDarkEvent(null); @@ -5305,7 +5476,7 @@ const useTerminalState = () => { document.removeEventListener("mousedown", handlePointerDown); document.removeEventListener("keydown", handleKeyDown); }; - }, [selectedAlert, selectedClassifierHit, selectedDarkEvent, selectedSmartMoneyEvent]); + }, [selectedAlert, selectedNewsStory, selectedClassifierHit, selectedDarkEvent, selectedSmartMoneyEvent]); const optionsScroll = useListScroll(); const equitiesScroll = useListScroll(); @@ -5540,6 +5711,14 @@ const useTerminalState = () => { ) : equityJoins; const flowFeed = mode === "live" ? liveFlow : flow; + const newsFeed = + mode === "live" + ? toStaticTapeState( + liveSession.status, + composeTapeItems([], liveSession.news, liveSession.newsHistory), + liveSession.lastUpdate + ) + : toStaticTapeState("disconnected", [], null); const alertsFeed = mode === "live" ? toStaticTapeState( @@ -6490,6 +6669,16 @@ const useTerminalState = () => { routeFeatures.needsAlertEvidencePrefetch ]); + const filteredNews = useMemo(() => { + if (!routeFeatures.news && !routeFeatures.showNewsPane) { + return EMPTY_NEWS_STORIES; + } + if (tickerSet.size === 0) { + return newsFeed.items; + } + return newsFeed.items.filter((story) => story.resolved_symbols.some((symbol) => matchesTicker(symbol))); + }, [matchesTicker, newsFeed.items, routeFeatures.news, routeFeatures.showNewsPane, tickerSet]); + const visibleAlerts = useMemo(() => { if (routeFeatures.needsAlertEvidencePrefetch) { return filteredAlerts.slice(0, 12); @@ -6767,6 +6956,7 @@ const useTerminalState = () => { (hit: ClassifierHitEvent) => { const alert = findAlertForClassifierHit(hit); if (alert) { + setSelectedNewsStory(null); setSelectedClassifierHit(null); setSelectedDarkEvent(null); setSelectedSmartMoneyEvent(null); @@ -6774,6 +6964,7 @@ const useTerminalState = () => { return; } + setSelectedNewsStory(null); setSelectedAlert(null); setSelectedDarkEvent(null); setSelectedSmartMoneyEvent(null); @@ -6783,6 +6974,7 @@ const useTerminalState = () => { ); const openFromSmartMoneyEvent = useCallback((event: SmartMoneyEvent) => { + setSelectedNewsStory(null); setSelectedAlert(null); setSelectedClassifierHit(null); setSelectedDarkEvent(null); @@ -6797,6 +6989,7 @@ const useTerminalState = () => { ); const handleDarkMarkerClick = useCallback((event: InferredDarkEvent) => { + setSelectedNewsStory(null); setSelectedAlert(null); setSelectedClassifierHit(null); setSelectedSmartMoneyEvent(null); @@ -6817,6 +7010,9 @@ const useTerminalState = () => { if (routeFeatures.flow || routeFeatures.showFlowPane) { updates.push(flowFeed.lastUpdate); } + if (routeFeatures.news || routeFeatures.showNewsPane) { + updates.push(newsFeed.lastUpdate); + } if (routeFeatures.alerts || routeFeatures.showAlertsPane) { updates.push(alertsFeed.lastUpdate); } @@ -6839,6 +7035,8 @@ const useTerminalState = () => { routeFeatures.showFocusPane, routeFeatures.flow, routeFeatures.showFlowPane, + routeFeatures.news, + routeFeatures.showNewsPane, routeFeatures.alerts, routeFeatures.showAlertsPane, routeFeatures.smartMoney, @@ -6849,6 +7047,7 @@ const useTerminalState = () => { equitiesFeed.lastUpdate, inferredDarkFeed.lastUpdate, flowFeed.lastUpdate, + newsFeed.lastUpdate, alertsFeed.lastUpdate, smartMoneyFeed.lastUpdate, classifierHitsFeed.lastUpdate @@ -6861,6 +7060,8 @@ const useTerminalState = () => { setReplaySource, selectedAlert, setSelectedAlert, + selectedNewsStory, + setSelectedNewsStory, selectedDarkEvent, setSelectedDarkEvent, selectedClassifierHit, @@ -6887,6 +7088,7 @@ const useTerminalState = () => { equityJoins: equityJoinsFeed, nbbo: nbboFeed, inferredDark: inferredDarkFeed, + news: newsFeed, flow: flowFeed, alerts: alertsFeed, smartMoney: smartMoneyFeed, @@ -6920,6 +7122,7 @@ const useTerminalState = () => { equitiesScopedQuiet, equitiesSilentWarning, filteredInferredDark, + filteredNews, filteredFlow, filteredAlerts, filteredSmartMoneyEvents, @@ -6953,7 +7156,8 @@ const useTerminal = (): TerminalState => { export const NAV_ITEMS = [ { href: "/", label: "Home" }, - { href: "/tape", label: "Tape" } + { href: "/tape", label: "Tape" }, + { href: "/news", label: "News" } ] as const; type PageFrameProps = { @@ -7780,6 +7984,7 @@ const AlertsPane = memo(({ state, limit, withStrip = false, className }: AlertsP data-tape-key={key} style={{ transform: `translateY(${start}px)` }} onClick={() => { + state.setSelectedNewsStory(null); state.setSelectedDarkEvent(null); state.setSelectedClassifierHit(null); state.setSelectedSmartMoneyEvent(null); @@ -7806,6 +8011,83 @@ const AlertsPane = memo(({ state, limit, withStrip = false, className }: AlertsP ); }); +type NewsPaneProps = { + state: TerminalState; + limit?: number; + className?: string; +}; + +const NewsPane = memo(({ state, limit, className }: NewsPaneProps) => { + const items = limit ? state.filteredNews.slice(0, limit) : state.filteredNews; + const canLoadOlder = state.mode === "live" && !limit && items.length > 0; + + return ( + + View all + + ) : ( +
+ + {state.mode === "live" ? "Live wire" : "Live-only in v1"} +
+ ) + } + actions={ + canLoadOlder ? ( + + ) : null + } + > + {state.mode === "replay" ? ( +
News is live-only in v1.
+ ) : items.length === 0 ? ( +
+ {state.tickerSet.size > 0 ? "No news stories match the current filter." : "Waiting for live news stories."} +
+ ) : ( +
+ {items.map((story) => ( + + ))} +
+ )} +
+ ); +}); + type ClassifierPaneProps = { state: TerminalState; limit?: number; @@ -8016,6 +8298,7 @@ const DarkPane = memo(({ state, limit, className }: DarkPaneProps) => { data-tape-key={key} style={{ transform: `translateY(${start}px)` }} onClick={() => { + state.setSelectedNewsStory(null); state.setSelectedAlert(null); state.setSelectedClassifierHit(null); state.setSelectedSmartMoneyEvent(null); @@ -8624,6 +8907,10 @@ export function TerminalAppShell({ children }: { children: ReactNode }) { /> ) : null} + {state.selectedNewsStory ? ( + state.setSelectedNewsStory(null)} /> + ) : null} + {state.selectedClassifierHit ? ( + ); } +export function NewsRoute() { + const state = useTerminal(); + return ( + +
+ +
+
+ ); +} + export function TapeRoute() { const state = useTerminal(); return ( diff --git a/bun.lock b/bun.lock index 46160a7..35e00d7 100644 --- a/bun.lock +++ b/bun.lock @@ -121,6 +121,17 @@ "zod": "^3.23.8", }, }, + "services/ingest-news": { + "name": "@islandflow/ingest-news", + "dependencies": { + "@islandflow/bus": "workspace:*", + "@islandflow/config": "workspace:*", + "@islandflow/observability": "workspace:*", + "@islandflow/types": "workspace:*", + "ws": "^8.18.3", + "zod": "^3.23.8", + }, + }, "services/ingest-options": { "name": "@islandflow/ingest-options", "dependencies": { @@ -250,6 +261,8 @@ "@islandflow/ingest-equities": ["@islandflow/ingest-equities@workspace:services/ingest-equities"], + "@islandflow/ingest-news": ["@islandflow/ingest-news@workspace:services/ingest-news"], + "@islandflow/ingest-options": ["@islandflow/ingest-options@workspace:services/ingest-options"], "@islandflow/observability": ["@islandflow/observability@workspace:packages/observability"], diff --git a/deployment/docker/Dockerfile.ingest-options b/deployment/docker/Dockerfile.ingest-options index 52cba59..212b96b 100644 --- a/deployment/docker/Dockerfile.ingest-options +++ b/deployment/docker/Dockerfile.ingest-options @@ -31,6 +31,7 @@ COPY --from=services candles/package.json ./services/candles/package.json COPY --from=services compute/package.json ./services/compute/package.json COPY --from=services eod-enricher/package.json ./services/eod-enricher/package.json COPY --from=services ingest-equities/package.json ./services/ingest-equities/package.json +COPY --from=services ingest-news/package.json ./services/ingest-news/package.json COPY --from=services ingest-options/package.json ./services/ingest-options/package.json COPY --from=services ingest-options/py/requirements.txt ./services/ingest-options/py/requirements.txt COPY --from=services refdata/package.json ./services/refdata/package.json diff --git a/deployment/docker/Dockerfile.service b/deployment/docker/Dockerfile.service index e0fcf72..4a7d9f1 100644 --- a/deployment/docker/Dockerfile.service +++ b/deployment/docker/Dockerfile.service @@ -24,6 +24,7 @@ COPY --from=services candles/package.json ./services/candles/package.json COPY --from=services compute/package.json ./services/compute/package.json COPY --from=services eod-enricher/package.json ./services/eod-enricher/package.json COPY --from=services ingest-equities/package.json ./services/ingest-equities/package.json +COPY --from=services ingest-news/package.json ./services/ingest-news/package.json COPY --from=services ingest-options/package.json ./services/ingest-options/package.json COPY --from=services refdata/package.json ./services/refdata/package.json COPY --from=services replay/package.json ./services/replay/package.json diff --git a/deployment/docker/Dockerfile.web b/deployment/docker/Dockerfile.web index 33723ae..37443d9 100644 --- a/deployment/docker/Dockerfile.web +++ b/deployment/docker/Dockerfile.web @@ -30,6 +30,7 @@ COPY --from=services candles/package.json ./services/candles/package.json COPY --from=services compute/package.json ./services/compute/package.json COPY --from=services eod-enricher/package.json ./services/eod-enricher/package.json COPY --from=services ingest-equities/package.json ./services/ingest-equities/package.json +COPY --from=services ingest-news/package.json ./services/ingest-news/package.json COPY --from=services ingest-options/package.json ./services/ingest-options/package.json COPY --from=services refdata/package.json ./services/refdata/package.json COPY --from=services replay/package.json ./services/replay/package.json diff --git a/deployment/docker/docker-compose.yml b/deployment/docker/docker-compose.yml index 96598ba..37682f6 100644 --- a/deployment/docker/docker-compose.yml +++ b/deployment/docker/docker-compose.yml @@ -115,6 +115,10 @@ services: <<: *service-common command: ["services/ingest-equities/src/index.ts"] + ingest-news: + <<: *service-common + command: ["services/ingest-news/src/index.ts"] + replay: <<: *service-common profiles: ["replay"] diff --git a/deployment/docker/workspace-root/bun.lock b/deployment/docker/workspace-root/bun.lock index 46160a7..35e00d7 100644 --- a/deployment/docker/workspace-root/bun.lock +++ b/deployment/docker/workspace-root/bun.lock @@ -121,6 +121,17 @@ "zod": "^3.23.8", }, }, + "services/ingest-news": { + "name": "@islandflow/ingest-news", + "dependencies": { + "@islandflow/bus": "workspace:*", + "@islandflow/config": "workspace:*", + "@islandflow/observability": "workspace:*", + "@islandflow/types": "workspace:*", + "ws": "^8.18.3", + "zod": "^3.23.8", + }, + }, "services/ingest-options": { "name": "@islandflow/ingest-options", "dependencies": { @@ -250,6 +261,8 @@ "@islandflow/ingest-equities": ["@islandflow/ingest-equities@workspace:services/ingest-equities"], + "@islandflow/ingest-news": ["@islandflow/ingest-news@workspace:services/ingest-news"], + "@islandflow/ingest-options": ["@islandflow/ingest-options@workspace:services/ingest-options"], "@islandflow/observability": ["@islandflow/observability@workspace:packages/observability"], diff --git a/docs/turns/2026-05-18-news-wire-view.html b/docs/turns/2026-05-18-news-wire-view.html new file mode 100644 index 0000000..be02f26 --- /dev/null +++ b/docs/turns/2026-05-18-news-wire-view.html @@ -0,0 +1,152 @@ + + + + + + Turn Report: News Wire View via Alpaca Feed + + + +
+

Created 2026-05-18 · Task: News Wire View via Alpaca Feed

+

News Wire View via Alpaca Feed

+
+ Summary +

+ Added an Alpaca-backed live news pipeline end to end: normalized NewsStory types, + a dedicated JetStream subject/stream, ClickHouse storage helpers with latest-revision semantics, + a new services/ingest-news service, API endpoints and live fanout, and a web + /news route plus Home preview with a right-side story drawer. +

+
+ +
+

Changes Made

+
    +
  • Added NewsStorySchema, the news live channel, and subscription parsing support in packages/types.
  • +
  • Added bus constants for the flow.news subject and NEWS stream.
  • +
  • Added ClickHouse news storage helpers, including recent, before-cursor, and after-cursor queries that collapse provider revisions to the latest row per provider + story_id.
  • +
  • Created services/ingest-news with Alpaca REST backfill, Alpaca websocket streaming, normalization, and deterministic ticker resolution.
  • +
  • Extended the API service to persist live news in the shared cache, expose GET /news and GET /history/news, and fan out news events on /ws/live.
  • +
  • Added a top-level /news route, primary nav entry, Home preview pane, replay-mode live-only empty states, and a sanitized full-story drawer.
  • +
  • Updated dev and deployment wiring so the new service is included in local runners and the Docker workspace snapshot.
  • +
+
+ +
+

Context

+

+ The plan called for a free-provider v1 news surface that behaves like the rest of Islandflow: + compact, evidence-first, and live-native. The implementation keeps replay intentionally out of scope + for news while still integrating news into the same live manifest, history pagination, rail navigation, + and drawer language used elsewhere in the terminal. +

+
+ +
+

Important Implementation Details

+
    +
  • Ticker resolution prefers provider symbols first, then falls back only to structured patterns in provider HTML: ticker anchors, EXCHANGE:SYM, and $SYM.
  • +
  • News history uses published_ts as the visible cursor while revisions are collapsed with a window function over provider, story_id ordered by updated_ts, ingest_ts, and seq.
  • +
  • The web drawer sanitizes provider HTML by removing scripts, inline event handlers, and unsupported tags; if sanitization yields nothing useful, the drawer falls back to stripped plain text.
  • +
  • Replay mode intentionally renders a clear empty state for news on both Home and /news instead of pretending news is replay-synced.
  • +
+
resolved_symbols = provider_symbols
+  or ticker anchors in content_html
+  or EXCHANGE:SYM matches
+  or $SYM matches
+
+ +
+

Expected Impact for End-Users

+

+ Traders can now monitor a dedicated live news wire inside Islandflow, spot symbol-linked headlines from + the Home view, and open full stories in-context without leaving the app. The displayed ticker chips are + grounded in stored provider and derived symbol metadata, which makes the feed safer to filter and trust. +

+
+ +
+

Validation

+
    +
  • Ran targeted Bun tests covering types, storage, API live-state behavior, ingest-news symbol resolution, route wiring, and terminal helpers.
  • +
  • Built the Next.js web app with bun --cwd=apps/web run build.
  • +
  • Ran bun run check:docker-workspace after syncing the deployment workspace snapshot.
  • +
+
+ +
+

Issues, Limitations, and Mitigations

+
    +
  • Replay support remains intentionally absent in v1; the UI now states that explicitly instead of showing misleading empty historical behavior.
  • +
  • The sanitizer is intentionally conservative and custom, which keeps dependencies light but may strip some harmless provider formatting.
  • +
  • The ingest service assumes Alpaca’s current REST and websocket news contracts; if Alpaca changes those payload shapes, the normalization layer will need adjustment.
  • +
+
+ +
+

Follow-up Work

+
    +
  • No additional follow-up issue was required during this turn.
  • +
  • Future extensions are still available behind the same contract: multi-provider aggregation, server-side symbol filtering, and replay-aware news history.
  • +
+
+
+ + diff --git a/packages/bus/src/streams.ts b/packages/bus/src/streams.ts index eeb8116..b23c125 100644 --- a/packages/bus/src/streams.ts +++ b/packages/bus/src/streams.ts @@ -7,6 +7,7 @@ import { STREAM_EQUITY_QUOTES, STREAM_FLOW_PACKETS, STREAM_INFERRED_DARK, + STREAM_NEWS, STREAM_OPTION_NBBO, STREAM_OPTION_PRINTS, STREAM_OPTION_SIGNAL_PRINTS, @@ -19,6 +20,7 @@ import { SUBJECT_EQUITY_QUOTES, SUBJECT_FLOW_PACKETS, SUBJECT_INFERRED_DARK, + SUBJECT_NEWS, SUBJECT_OPTION_NBBO, SUBJECT_OPTION_PRINTS, SUBJECT_OPTION_SIGNAL_PRINTS, @@ -53,7 +55,8 @@ export const STREAM_CATALOG: readonly KnownStreamDefinition[] = [ retentionClass: "derived" }, { name: STREAM_CLASSIFIER_HITS, subject: SUBJECT_CLASSIFIER_HITS, retentionClass: "derived" }, - { name: STREAM_ALERTS, subject: SUBJECT_ALERTS, retentionClass: "derived" } + { name: STREAM_ALERTS, subject: SUBJECT_ALERTS, retentionClass: "derived" }, + { name: STREAM_NEWS, subject: SUBJECT_NEWS, retentionClass: "derived" } ]; const STREAM_CATALOG_BY_NAME = new Map(STREAM_CATALOG.map((definition) => [definition.name, definition])); diff --git a/packages/bus/src/subjects.ts b/packages/bus/src/subjects.ts index 6b21afd..956d357 100644 --- a/packages/bus/src/subjects.ts +++ b/packages/bus/src/subjects.ts @@ -22,3 +22,5 @@ export const STREAM_CLASSIFIER_HITS = "CLASSIFIER_HITS"; export const SUBJECT_CLASSIFIER_HITS = "flow.classifier_hits"; export const STREAM_ALERTS = "ALERTS"; export const SUBJECT_ALERTS = "flow.alerts"; +export const STREAM_NEWS = "NEWS"; +export const SUBJECT_NEWS = "flow.news"; diff --git a/packages/storage/src/clickhouse.ts b/packages/storage/src/clickhouse.ts index bc0061e..af469d7 100644 --- a/packages/storage/src/clickhouse.ts +++ b/packages/storage/src/clickhouse.ts @@ -7,6 +7,7 @@ import { EquityPrintJoinSchema, InferredDarkEventSchema, FlowPacketSchema, + NewsStorySchema, OptionNBBOSchema, OptionPrintSchema, SmartMoneyEventSchema @@ -20,6 +21,7 @@ import type { EquityPrintJoin, InferredDarkEvent, FlowPacket, + NewsStory, SmartMoneyEvent, OptionNBBO, OptionPrint, @@ -91,6 +93,13 @@ import { toSmartMoneyEventRecord, type SmartMoneyEventRecord } from "./smart-money-events"; +import { + NEWS_TABLE, + newsTableDDL, + fromNewsRecord, + toNewsRecord, + type NewsRecord +} from "./news"; export type ClickHouseOptions = { url: string; @@ -320,6 +329,12 @@ export const ensureAlertsTable = async (client: ClickHouseClient): Promise } }; +export const ensureNewsTable = async (client: ClickHouseClient): Promise => { + await client.exec({ + query: newsTableDDL() + }); +}; + export const insertOptionPrint = async ( client: ClickHouseClient, print: OptionPrint @@ -449,6 +464,15 @@ export const insertAlert = async (client: ClickHouseClient, alert: AlertEvent): }); }; +export const insertNewsStory = async (client: ClickHouseClient, story: NewsStory): Promise => { + const record = toNewsRecord(story); + await client.insert({ + table: NEWS_TABLE, + values: [record], + format: "JSONEachRow" + }); +}; + export type ClickHouseBatchWriterOptions = { flushIntervalMs?: number; maxRows?: number; @@ -600,6 +624,13 @@ export const enqueueAlertInsert = ( writer.enqueue(ALERTS_TABLE, toAlertRecord(alert)); }; +export const enqueueNewsStoryInsert = ( + writer: ClickHouseBatchWriter, + story: NewsStory +): void => { + writer.enqueue(NEWS_TABLE, toNewsRecord(story)); +}; + const clampLimit = (limit: number): number => { if (!Number.isFinite(limit)) { return 100; @@ -1016,6 +1047,32 @@ const normalizeAlertRow = (row: unknown): AlertRecord | null => { }; }; +const normalizeNewsRow = (row: unknown): NewsRecord | null => { + if (!row || typeof row !== "object") { + return null; + } + + const record = row as Record; + return { + source_ts: coerceNumber(record.source_ts) as number, + ingest_ts: coerceNumber(record.ingest_ts) as number, + seq: coerceNumber(record.seq) as number, + trace_id: String(record.trace_id ?? ""), + story_id: coerceNumber(record.story_id) as number, + provider: String(record.provider ?? ""), + source: String(record.source ?? ""), + headline: String(record.headline ?? ""), + summary: String(record.summary ?? ""), + content_html: String(record.content_html ?? ""), + url: String(record.url ?? ""), + published_ts: coerceNumber(record.published_ts) as number, + updated_ts: coerceNumber(record.updated_ts) as number, + provider_symbols_json: String(record.provider_symbols_json ?? "[]"), + resolved_symbols_json: String(record.resolved_symbols_json ?? "[]"), + symbol_resolution: String(record.symbol_resolution ?? "none") as NewsRecord["symbol_resolution"] + }; +}; + export const fetchRecentOptionPrints = async ( client: ClickHouseClient, limit: number, @@ -1207,6 +1264,50 @@ export const fetchRecentAlerts = async ( return AlertEventSchema.array().parse(alerts); }; +const latestNewsSelect = ` +SELECT + source_ts, + ingest_ts, + seq, + trace_id, + story_id, + provider, + source, + headline, + summary, + content_html, + url, + published_ts, + updated_ts, + provider_symbols_json, + resolved_symbols_json, + symbol_resolution +FROM ( + SELECT + *, + row_number() OVER (PARTITION BY provider, story_id ORDER BY updated_ts DESC, ingest_ts DESC, seq DESC) AS revision_rank + FROM ${NEWS_TABLE} +) +WHERE revision_rank = 1 +`; + +export const fetchRecentNews = async ( + client: ClickHouseClient, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const result = await client.query({ + query: `${latestNewsSelect} ORDER BY published_ts DESC, story_id DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeNewsRow) + .filter((record): record is NewsRecord => record !== null); + return NewsStorySchema.array().parse(records.map(fromNewsRecord)); +}; + const normalizeAlertEvidenceRefs = (refs: string[]): string[] => { return Array.from(new Set(refs.map((ref) => ref.trim()).filter(Boolean))); }; @@ -1600,6 +1701,27 @@ export const fetchAlertsAfter = async ( return AlertEventSchema.array().parse(alerts); }; +export const fetchNewsAfter = async ( + client: ClickHouseClient, + afterTs: number, + afterSeq: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const safeAfterTs = clampCursor(afterTs); + const safeAfterSeq = clampCursor(afterSeq); + const result = await client.query({ + query: `${latestNewsSelect} AND (published_ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY published_ts ASC, seq ASC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeNewsRow) + .filter((record): record is NewsRecord => record !== null); + return NewsStorySchema.array().parse(records.map(fromNewsRecord)); +}; + export const fetchOptionPrintsBefore = async ( client: ClickHouseClient, beforeTs: number, @@ -1778,6 +1900,25 @@ export const fetchAlertsBefore = async ( return AlertEventSchema.array().parse(records.map(fromAlertRecord)); }; +export const fetchNewsBefore = async ( + client: ClickHouseClient, + beforeTs: number, + beforeSeq: number, + limit: number +): Promise => { + const safeLimit = clampLimit(limit); + const result = await client.query({ + query: `${latestNewsSelect} AND ${buildBeforeTupleCondition("published_ts", "seq", beforeTs, beforeSeq)} ORDER BY published_ts DESC, seq DESC LIMIT ${safeLimit}`, + format: "JSONEachRow" + }); + + const rows = await result.json(); + const records = rows + .map(normalizeNewsRow) + .filter((record): record is NewsRecord => record !== null); + return NewsStorySchema.array().parse(records.map(fromNewsRecord)); +}; + export const fetchInferredDarkBefore = async ( client: ClickHouseClient, beforeTs: number, diff --git a/packages/storage/src/index.ts b/packages/storage/src/index.ts index 4fefabc..810d67c 100644 --- a/packages/storage/src/index.ts +++ b/packages/storage/src/index.ts @@ -10,3 +10,4 @@ export * from "./equity-print-joins"; export * from "./inferred-dark"; export * from "./option-prints"; export * from "./option-nbbo"; +export * from "./news"; diff --git a/packages/storage/src/news.ts b/packages/storage/src/news.ts new file mode 100644 index 0000000..cf92f40 --- /dev/null +++ b/packages/storage/src/news.ts @@ -0,0 +1,102 @@ +import type { NewsStory, NewsSymbolResolution } from "@islandflow/types"; + +export const NEWS_TABLE = "news"; + +export type NewsRecord = { + source_ts: number; + ingest_ts: number; + seq: number; + trace_id: string; + story_id: number; + provider: string; + source: string; + headline: string; + summary: string; + content_html: string; + url: string; + published_ts: number; + updated_ts: number; + provider_symbols_json: string; + resolved_symbols_json: string; + symbol_resolution: NewsSymbolResolution; +}; + +export const newsTableDDL = (): string => { + return ` +CREATE TABLE IF NOT EXISTS ${NEWS_TABLE} ( + source_ts UInt64, + ingest_ts UInt64, + seq UInt64, + trace_id String, + story_id UInt64, + provider String, + source String, + headline String, + summary String, + content_html String, + url String, + published_ts UInt64, + updated_ts UInt64, + provider_symbols_json String, + resolved_symbols_json String, + symbol_resolution String +) +ENGINE = ReplacingMergeTree(updated_ts) +ORDER BY (provider, story_id, updated_ts, seq) +`; +}; + +const safeStringArray = (value: string): string[] => { + try { + const parsed = JSON.parse(value); + if (Array.isArray(parsed)) { + return parsed.map((entry) => String(entry)); + } + } catch { + // ignore + } + + return []; +}; + +export const toNewsRecord = (story: NewsStory): NewsRecord => { + return { + source_ts: story.source_ts, + ingest_ts: story.ingest_ts, + seq: story.seq, + trace_id: story.trace_id, + story_id: story.story_id, + provider: story.provider, + source: story.source, + headline: story.headline, + summary: story.summary, + content_html: story.content_html, + url: story.url, + published_ts: story.published_ts, + updated_ts: story.updated_ts, + provider_symbols_json: JSON.stringify(story.provider_symbols), + resolved_symbols_json: JSON.stringify(story.resolved_symbols), + symbol_resolution: story.symbol_resolution + }; +}; + +export const fromNewsRecord = (record: NewsRecord): NewsStory => { + return { + source_ts: record.source_ts, + ingest_ts: record.ingest_ts, + seq: record.seq, + trace_id: record.trace_id, + story_id: record.story_id, + provider: record.provider, + source: record.source, + headline: record.headline, + summary: record.summary, + content_html: record.content_html, + url: record.url, + published_ts: record.published_ts, + updated_ts: record.updated_ts, + provider_symbols: safeStringArray(record.provider_symbols_json), + resolved_symbols: safeStringArray(record.resolved_symbols_json), + symbol_resolution: record.symbol_resolution + }; +}; diff --git a/packages/storage/tests/news.test.ts b/packages/storage/tests/news.test.ts new file mode 100644 index 0000000..c5b71c8 --- /dev/null +++ b/packages/storage/tests/news.test.ts @@ -0,0 +1,78 @@ +import { describe, expect, it } from "bun:test"; +import type { ClickHouseClient } from "../src/clickhouse"; +import { + NEWS_TABLE, + fromNewsRecord, + newsTableDDL, + toNewsRecord +} from "../src/news"; +import { + fetchNewsAfter, + fetchNewsBefore, + fetchRecentNews +} from "../src/clickhouse"; + +const makeClient = (resolver: (query: string) => unknown[]): ClickHouseClient => + ({ + exec: async () => {}, + insert: async () => {}, + ping: async () => ({ success: true }), + close: async () => {}, + query: async ({ query }: { query: string }) => ({ + async json() { + return resolver(query) as T; + } + }) + }) as ClickHouseClient; + +const story = { + source_ts: 100, + ingest_ts: 101, + seq: 3, + trace_id: "alpaca:77", + story_id: 77, + provider: "alpaca", + source: "Benzinga", + headline: "TSLA rises", + summary: "Summary", + content_html: "

TSLA rises

", + url: "https://example.com/story", + published_ts: 100, + updated_ts: 120, + provider_symbols: ["TSLA"], + resolved_symbols: ["TSLA", "AAPL"], + symbol_resolution: "mixed" as const +}; + +describe("news storage helpers", () => { + it("includes the correct table name in the DDL", () => { + const ddl = newsTableDDL(); + expect(ddl).toContain(NEWS_TABLE); + expect(ddl).toContain("ReplacingMergeTree"); + }); + + it("round-trips news records", () => { + const record = toNewsRecord(story); + const restored = fromNewsRecord(record); + expect(restored).toEqual(story); + }); + + it("uses latest-revision selection for recent and cursor queries", async () => { + const queries: string[] = []; + const client = makeClient((query) => { + queries.push(query); + return [toNewsRecord(story)]; + }); + + const recent = await fetchRecentNews(client, 10); + const before = await fetchNewsBefore(client, 200, 10, 10); + const after = await fetchNewsAfter(client, 50, 1, 10); + + expect(recent[0]?.trace_id).toBe("alpaca:77"); + expect(before[0]?.story_id).toBe(77); + expect(after[0]?.updated_ts).toBe(120); + expect(queries[0]).toContain("row_number() OVER"); + expect(queries[1]).toContain("published_ts"); + expect(queries[2]).toContain("(published_ts, seq) > (50, 1)"); + }); +}); diff --git a/packages/types/src/events.ts b/packages/types/src/events.ts index c15dc7b..0556bd8 100644 --- a/packages/types/src/events.ts +++ b/packages/types/src/events.ts @@ -262,3 +262,26 @@ export const InferredDarkEventSchema = EventMetaSchema.merge( ); export type InferredDarkEvent = z.infer; + +export const NewsSymbolResolutionSchema = z.enum(["provider", "derived", "mixed", "none"]); + +export type NewsSymbolResolution = z.infer; + +export const NewsStorySchema = EventMetaSchema.merge( + z.object({ + story_id: z.number().int().nonnegative(), + provider: z.string().min(1), + source: z.string().min(1), + headline: z.string().min(1), + summary: z.string(), + content_html: z.string(), + url: z.string().url().or(z.literal("")), + published_ts: z.number().int().nonnegative(), + updated_ts: z.number().int().nonnegative(), + provider_symbols: z.array(z.string().min(1)), + resolved_symbols: z.array(z.string().min(1)), + symbol_resolution: NewsSymbolResolutionSchema + }) +); + +export type NewsStory = z.infer; diff --git a/packages/types/src/live.ts b/packages/types/src/live.ts index 0787c84..10ac486 100644 --- a/packages/types/src/live.ts +++ b/packages/types/src/live.ts @@ -8,6 +8,7 @@ import { EquityQuoteSchema, FlowPacketSchema, InferredDarkEventSchema, + NewsStorySchema, OptionNBBOSchema, OptionPrintSchema, SmartMoneyEventSchema @@ -34,7 +35,8 @@ export const LiveGenericChannelSchema = z.enum([ "smart-money", "classifier-hits", "alerts", - "inferred-dark" + "inferred-dark", + "news" ]); export const LiveChannelSchema = z.enum([ @@ -48,6 +50,7 @@ export const LiveChannelSchema = z.enum([ "classifier-hits", "alerts", "inferred-dark", + "news", "equity-candles", "equity-overlay" ]); @@ -91,7 +94,7 @@ export const LiveSubscriptionSchema = z.discriminatedUnion("channel", [ snapshot_limit: z.number().int().positive().optional() }), z.object({ - channel: z.enum(["nbbo", "equity-quotes", "equity-joins", "classifier-hits", "alerts", "inferred-dark"]), + channel: z.enum(["nbbo", "equity-quotes", "equity-joins", "classifier-hits", "alerts", "inferred-dark", "news"]), snapshot_limit: z.number().int().positive().optional() }), z.object({ @@ -123,6 +126,7 @@ const livePayloadSchemas = { "classifier-hits": ClassifierHitEventSchema, alerts: AlertEventSchema, "inferred-dark": InferredDarkEventSchema, + news: NewsStorySchema, "equity-candles": EquityCandleSchema, "equity-overlay": EquityPrintSchema } as const; diff --git a/packages/types/tests/live.test.ts b/packages/types/tests/live.test.ts index 075eab1..ef254b4 100644 --- a/packages/types/tests/live.test.ts +++ b/packages/types/tests/live.test.ts @@ -9,6 +9,7 @@ import { describe("live protocol types", () => { it("builds stable keys for generic and parameterized subscriptions", () => { expect(getSubscriptionKey({ channel: "flow" })).toBe("flow|{}"); + expect(getSubscriptionKey({ channel: "news" })).toBe("news"); expect( getSubscriptionKey({ channel: "options", @@ -53,12 +54,13 @@ describe("live protocol types", () => { op: "subscribe", subscriptions: [ { channel: "flow", filters: { nbboSides: ["AA", "A"], minNotional: 50000 } }, + { channel: "news", snapshot_limit: 100 }, { channel: "equity-candles", underlying_id: "SPY", interval_ms: 60000 } ] }); expect(parsed.op).toBe("subscribe"); - expect(parsed.subscriptions).toHaveLength(2); + expect(parsed.subscriptions).toHaveLength(3); }); it("validates snapshot and event server messages", () => { @@ -74,18 +76,24 @@ describe("live protocol types", () => { }); const event = LiveServerMessageSchema.parse({ op: "event", - subscription: { channel: "equity-overlay", underlying_id: "SPY" }, + subscription: { channel: "news" }, item: { source_ts: 100, ingest_ts: 101, seq: 1, - trace_id: "eq-1", - ts: 100, - underlying_id: "SPY", - price: 500, - size: 10, - exchange: "X", - offExchangeFlag: true + trace_id: "alpaca:1", + story_id: 1, + provider: "alpaca", + source: "Benzinga", + headline: "TSLA rises", + summary: "", + content_html: "

TSLA rises

", + url: "https://example.com/story", + published_ts: 100, + updated_ts: 100, + provider_symbols: ["TSLA"], + resolved_symbols: ["TSLA"], + symbol_resolution: "provider" }, watermark: cursor }); diff --git a/scripts/deploy.ts b/scripts/deploy.ts index 68d260a..5b94d95 100644 --- a/scripts/deploy.ts +++ b/scripts/deploy.ts @@ -48,7 +48,8 @@ const NATIVE_UNITS = { ingestOptions: process.env.DEPLOY_NATIVE_INGEST_OPTIONS_UNIT?.trim() || "islandflow-ingest-options", ingestEquities: - process.env.DEPLOY_NATIVE_INGEST_EQUITIES_UNIT?.trim() || "islandflow-ingest-equities" + process.env.DEPLOY_NATIVE_INGEST_EQUITIES_UNIT?.trim() || "islandflow-ingest-equities", + ingestNews: process.env.DEPLOY_NATIVE_INGEST_NEWS_UNIT?.trim() || "islandflow-ingest-news" } as const; const DOCKER_CORE_SERVICES = [ "api", @@ -56,14 +57,16 @@ const DOCKER_CORE_SERVICES = [ "compute", "candles", "ingest-options", - "ingest-equities" + "ingest-equities", + "ingest-news" ] as const; const DOCKER_BACKEND_SERVICES = [ "api", "compute", "candles", "ingest-options", - "ingest-equities" + "ingest-equities", + "ingest-news" ] as const; const scriptPath = fileURLToPath(import.meta.url); @@ -106,7 +109,8 @@ Environment: DEPLOY_NATIVE_COMPUTE_UNIT Override native compute systemd unit name. DEPLOY_NATIVE_CANDLES_UNIT Override native candles systemd unit name. DEPLOY_NATIVE_INGEST_OPTIONS_UNIT Override native ingest-options systemd unit name. - DEPLOY_NATIVE_INGEST_EQUITIES_UNIT Override native ingest-equities systemd unit name.`); + DEPLOY_NATIVE_INGEST_EQUITIES_UNIT Override native ingest-equities systemd unit name. + DEPLOY_NATIVE_INGEST_NEWS_UNIT Override native ingest-news systemd unit name.`); process.exit(exitCode); } @@ -465,7 +469,8 @@ function nativeUnitsForScope(scope: DeployScope): string[] { NATIVE_UNITS.compute, NATIVE_UNITS.candles, NATIVE_UNITS.ingestOptions, - NATIVE_UNITS.ingestEquities + NATIVE_UNITS.ingestEquities, + NATIVE_UNITS.ingestNews ]; default: return [ @@ -474,7 +479,8 @@ function nativeUnitsForScope(scope: DeployScope): string[] { NATIVE_UNITS.compute, NATIVE_UNITS.candles, NATIVE_UNITS.ingestOptions, - NATIVE_UNITS.ingestEquities + NATIVE_UNITS.ingestEquities, + NATIVE_UNITS.ingestNews ]; } } diff --git a/scripts/dev-services.ts b/scripts/dev-services.ts index 09cd381..2bcb641 100644 --- a/scripts/dev-services.ts +++ b/scripts/dev-services.ts @@ -222,6 +222,7 @@ process.on("SIGHUP", () => handleSignal("SIGHUP")); const tasks: ChildSpec[] = [ { name: "ingest-options", cmd: ["bun", "run", "dev"], cwd: "services/ingest-options" }, { name: "ingest-equities", cmd: ["bun", "run", "dev"], cwd: "services/ingest-equities" }, + { name: "ingest-news", cmd: ["bun", "run", "dev"], cwd: "services/ingest-news" }, { name: "compute", cmd: ["bun", "run", "dev"], cwd: "services/compute" }, { name: "candles", cmd: ["bun", "run", "dev"], cwd: "services/candles" }, { name: "refdata", cmd: ["bun", "run", "dev"], cwd: "services/refdata" }, diff --git a/scripts/dev.ts b/scripts/dev.ts index 64406d6..1d031a7 100644 --- a/scripts/dev.ts +++ b/scripts/dev.ts @@ -325,6 +325,7 @@ const serviceTasks: ChildSpec[] = [ { name: "web", cmd: ["bun", "run", "dev"], cwd: "apps/web" }, { name: "ingest-options", cmd: ["bun", "run", "dev"], cwd: "services/ingest-options" }, { name: "ingest-equities", cmd: ["bun", "run", "dev"], cwd: "services/ingest-equities" }, + { name: "ingest-news", cmd: ["bun", "run", "dev"], cwd: "services/ingest-news" }, { name: "compute", cmd: ["bun", "run", "dev"], cwd: "services/compute" }, { name: "candles", cmd: ["bun", "run", "dev"], cwd: "services/candles" }, { name: "refdata", cmd: ["bun", "run", "dev"], cwd: "services/refdata" }, diff --git a/services/api/src/index.ts b/services/api/src/index.ts index 433222a..c0a9c79 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -9,6 +9,7 @@ import { SUBJECT_EQUITY_QUOTES, SUBJECT_INFERRED_DARK, SUBJECT_FLOW_PACKETS, + SUBJECT_NEWS, SUBJECT_SMART_MONEY_EVENTS, SUBJECT_OPTION_NBBO, SUBJECT_OPTION_SIGNAL_PRINTS, @@ -20,6 +21,7 @@ import { STREAM_EQUITY_QUOTES, STREAM_INFERRED_DARK, STREAM_FLOW_PACKETS, + STREAM_NEWS, STREAM_SMART_MONEY_EVENTS, STREAM_OPTION_NBBO, STREAM_OPTION_SIGNAL_PRINTS, @@ -35,6 +37,7 @@ import { import { createClickHouseClient, ensureAlertsTable, + ensureNewsTable, ensureClassifierHitsTable, ensureEquityCandlesTable, ensureEquityPrintJoinsTable, @@ -48,6 +51,8 @@ import { fetchAlertsAfter, fetchAlertsBefore, fetchAlertContextByTraceId, + fetchNewsAfter, + fetchNewsBefore, fetchClassifierHitsAfter, fetchClassifierHitsBefore, fetchSmartMoneyEventsAfter, @@ -58,6 +63,7 @@ import { fetchFlowPacketsByMemberTraceIds, fetchFlowPacketsBefore, fetchRecentAlerts, + fetchRecentNews, fetchRecentClassifierHits, fetchRecentSmartMoneyEvents, fetchRecentEquityPrintJoins, @@ -99,6 +105,7 @@ import { EquityQuoteSchema, FeedSnapshot, InferredDarkEventSchema, + NewsStorySchema, LiveClientMessageSchema, LiveServerMessage, LiveSubscription, @@ -676,7 +683,8 @@ const run = async () => { STREAM_FLOW_PACKETS, STREAM_SMART_MONEY_EVENTS, STREAM_CLASSIFIER_HITS, - STREAM_ALERTS + STREAM_ALERTS, + STREAM_NEWS ], { logger } ); @@ -719,6 +727,7 @@ const run = async () => { await ensureSmartMoneyEventsTable(clickhouse); await ensureClassifierHitsTable(clickhouse); await ensureAlertsTable(clickhouse); + await ensureNewsTable(clickhouse); }); let redis: ReturnType | null = null; @@ -843,6 +852,11 @@ const run = async () => { subject: SUBJECT_ALERTS, stream: STREAM_ALERTS, durableName: "api-alerts" + }, + { + subject: SUBJECT_NEWS, + stream: STREAM_NEWS, + durableName: "api-news" } ] as const; @@ -991,10 +1005,16 @@ const run = async () => { consumerBindings[10].durableName ); + const newsSubscription = await subscribeWithReset( + consumerBindings[11].subject, + consumerBindings[11].stream, + consumerBindings[11].durableName + ); + const fanoutLive = async ( subscription: LiveSubscription, item: unknown, - ingestChannel: "options" | "nbbo" | "equities" | "equity-quotes" | "equity-candles" | "equity-overlay" | "equity-joins" | "flow" | "classifier-hits" | "alerts" | "inferred-dark" + ingestChannel: "options" | "nbbo" | "equities" | "equity-quotes" | "equity-candles" | "equity-overlay" | "equity-joins" | "flow" | "classifier-hits" | "alerts" | "inferred-dark" | "news" ) => { const watermark = await liveState.ingest(ingestChannel, item); @@ -1252,6 +1272,21 @@ const run = async () => { } }; + const pumpNews = async () => { + for await (const msg of newsSubscription.messages) { + try { + const payload = NewsStorySchema.parse(newsSubscription.decode(msg)); + await fanoutLive({ channel: "news" }, payload, "news"); + msg.ack(); + } catch (error) { + logger.error("failed to process news story", { + error: error instanceof Error ? error.message : String(error) + }); + msg.term(); + } + } + }; + void pumpOptions(); void pumpOptionNbbo(); void pumpEquities(); @@ -1263,6 +1298,7 @@ const run = async () => { void pumpSmartMoney(); void pumpClassifierHits(); void pumpAlerts(); + void pumpNews(); const buildSyntheticStatusBody = () => { const derived = @@ -1490,6 +1526,12 @@ const run = async () => { return jsonResponse({ data }); } + if (req.method === "GET" && url.pathname === "/news") { + const limit = parseLimit(url.searchParams.get("limit") ?? "100"); + const data = await fetchRecentNews(clickhouse, limit); + return jsonResponse({ data }); + } + if (req.method === "GET" && isAlertContextPath(url.pathname)) { try { const traceId = parseAlertContextTraceIdPath(url.pathname); @@ -1607,6 +1649,14 @@ const run = async () => { ); } + if (req.method === "GET" && url.pathname === "/history/news") { + const { beforeTs, beforeSeq, limit } = parseBeforeParams(url); + const data = await fetchNewsBefore(clickhouse, beforeTs, beforeSeq, limit); + return jsonResponse( + buildHistoryResponse(data, (item) => ({ ts: item.published_ts, seq: item.seq })) + ); + } + if (req.method === "GET" && /^\/flow\/packets\/[^/]+$/.test(url.pathname)) { const id = decodeURIComponent(url.pathname.slice("/flow/packets/".length)); const data = await fetchFlowPacketById(clickhouse, id); diff --git a/services/api/src/live.ts b/services/api/src/live.ts index 024935e..c8d2886 100644 --- a/services/api/src/live.ts +++ b/services/api/src/live.ts @@ -8,6 +8,7 @@ import { fetchRecentEquityQuotes, fetchRecentFlowPackets, fetchRecentInferredDark, + fetchRecentNews, fetchRecentOptionNBBO, fetchRecentSmartMoneyEvents, type ClickHouseClient @@ -25,6 +26,7 @@ import { FeedSnapshot, FlowPacketSchema, InferredDarkEventSchema, + NewsStorySchema, LiveChannelHealth, LiveGenericChannel, LiveHotChannel, @@ -40,6 +42,7 @@ import { type EquityCandle, type EquityPrint, type LiveChannel, + type NewsStory, type OptionPrint } from "@islandflow/types"; import { createMetrics } from "@islandflow/observability"; @@ -63,7 +66,8 @@ const GENERIC_LIMIT_ENV_KEYS: Record = { "smart-money": "LIVE_LIMIT_SMART_MONEY", "classifier-hits": "LIVE_LIMIT_CLASSIFIER_HITS", alerts: "LIVE_LIMIT_ALERTS", - "inferred-dark": "LIVE_LIMIT_INFERRED_DARK" + "inferred-dark": "LIVE_LIMIT_INFERRED_DARK", + news: "LIVE_LIMIT_NEWS" }; const CHART_LIMITS = { @@ -81,7 +85,8 @@ const DEFAULT_LIVE_LIMITS: GenericLiveLimits = { "smart-money": 300, "classifier-hits": 300, alerts: 300, - "inferred-dark": 300 + "inferred-dark": 300, + news: 100 }; const DEFAULT_SCOPED_CACHE_MAX_KEYS = 32; @@ -196,16 +201,28 @@ export const resolveGenericLiveLimits = (env: NodeJS.ProcessEnv = process.env): env, "inferred-dark", env.LIVE_LIMIT_DEFAULT ? liveLimitDefault : DEFAULT_LIVE_LIMITS["inferred-dark"] - ) + ), + news: parseGenericLimit(env, "news", env.LIVE_LIMIT_DEFAULT ? liveLimitDefault : DEFAULT_LIVE_LIMITS.news) }; }; -const parsePositiveInt = (value: string | undefined, fallback: number): number => { - const parsed = Number(value); - if (!Number.isFinite(parsed)) { - return fallback; +const extractFreshnessTs = (channel: LiveGenericChannel, item: any): number | null => { + switch (channel) { + case "options": + case "nbbo": + case "equities": + case "equity-quotes": + return typeof item.ts === "number" ? item.ts : null; + case "flow": + case "classifier-hits": + case "alerts": + case "inferred-dark": + return typeof item.source_ts === "number" ? item.source_ts : null; + case "news": + return typeof item.published_ts === "number" ? item.published_ts : null; + default: + return null; } - return Math.max(1, Math.floor(parsed)); }; export const resolveLiveStateConfig = (env: NodeJS.ProcessEnv = process.env): LiveStateConfig => ({ @@ -217,6 +234,13 @@ export const resolveLiveStateConfig = (env: NodeJS.ProcessEnv = process.env): Li ), redisFlushMaxItems: parsePositiveInt(env.LIVE_REDIS_FLUSH_MAX_ITEMS, DEFAULT_REDIS_FLUSH_MAX_ITEMS) }); +const parsePositiveInt = (value: string | undefined, fallback: number): number => { + const parsed = Number(value); + if (!Number.isFinite(parsed)) { + return fallback; + } + return Math.max(1, Math.floor(parsed)); +}; type RedisLike = Pick< RedisClientType, @@ -318,6 +342,14 @@ const getGenericConfig = (limits: GenericLiveLimits): { parse: (value) => InferredDarkEventSchema.parse(value), cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), fetchRecent: fetchRecentInferredDark + }, + news: { + redisKey: "live:news", + cursorField: "news", + limit: limits.news, + parse: (value) => NewsStorySchema.parse(value), + cursor: (item) => ({ ts: item.published_ts, seq: item.seq }), + fetchRecent: fetchRecentNews } }); @@ -371,23 +403,6 @@ const normalizeGenericItems = ( return sortGenericItems(items, config.cursor).slice(0, config.limit); }; -const extractFreshnessTs = (channel: LiveGenericChannel, item: any): number | null => { - switch (channel) { - case "options": - case "nbbo": - case "equities": - case "equity-quotes": - return typeof item.ts === "number" ? item.ts : null; - case "flow": - case "classifier-hits": - case "alerts": - case "inferred-dark": - return typeof item.source_ts === "number" ? item.source_ts : null; - default: - return null; - } -}; - const isWithinLiveFeedLookback = ( channel: LiveGenericChannel, item: unknown, diff --git a/services/ingest-news/package.json b/services/ingest-news/package.json new file mode 100644 index 0000000..050f40b --- /dev/null +++ b/services/ingest-news/package.json @@ -0,0 +1,16 @@ +{ + "name": "@islandflow/ingest-news", + "private": true, + "type": "module", + "scripts": { + "dev": "bun run src/index.ts" + }, + "dependencies": { + "@islandflow/bus": "workspace:*", + "@islandflow/config": "workspace:*", + "@islandflow/observability": "workspace:*", + "@islandflow/types": "workspace:*", + "ws": "^8.18.3", + "zod": "^3.23.8" + } +} diff --git a/services/ingest-news/src/index.ts b/services/ingest-news/src/index.ts new file mode 100644 index 0000000..3f91ee2 --- /dev/null +++ b/services/ingest-news/src/index.ts @@ -0,0 +1,216 @@ +import { readEnv } from "@islandflow/config"; +import { createLogger } from "@islandflow/observability"; +import { + SUBJECT_NEWS, + STREAM_NEWS, + connectJetStreamWithRetry, + ensureKnownStreams, + publishJson +} from "@islandflow/bus"; +import { NewsStorySchema, type NewsStory } from "@islandflow/types"; +import WebSocket from "ws"; +import { z } from "zod"; +import { resolveNewsSymbols } from "./symbols"; + +const service = "ingest-news"; +const logger = createLogger({ service }); + +const envSchema = z.object({ + NATS_URL: z.string().default("nats://127.0.0.1:4222"), + ALPACA_API_KEY: z.string().default(""), + ALPACA_REST_URL: z.string().default("https://data.alpaca.markets"), + ALPACA_WS_BASE_URL: z.string().default("wss://stream.data.alpaca.markets"), + ALPACA_NEWS_BACKFILL_LIMIT: z.coerce.number().int().positive().max(200).default(100), + ALPACA_NEWS_WEBSOCKET_PATH: z.string().default("/v1beta1/news") +}); + +const env = readEnv(envSchema); + +type AlpacaNewsItem = { + id?: number; + headline?: string; + summary?: string; + content?: string; + author?: string; + created_at?: string; + updated_at?: string; + url?: string; + symbols?: string[]; + source?: string; +}; + +type AlpacaNewsResponse = { + news?: AlpacaNewsItem[]; +}; + +const buildHeaders = (): Record => ({ + Authorization: `Bearer ${env.ALPACA_API_KEY}` +}); + +const parseTimestamp = (value: string | undefined): number => { + const parsed = value ? Date.parse(value) : Number.NaN; + return Number.isFinite(parsed) ? parsed : Date.now(); +}; + +const toStory = (item: AlpacaNewsItem, seq: number): NewsStory | null => { + const storyId = Number(item.id); + if (!Number.isFinite(storyId) || storyId < 0) { + return null; + } + + const provider = "alpaca"; + const contentHtml = item.content ?? ""; + const symbols = resolveNewsSymbols(item.symbols ?? [], contentHtml); + const publishedTs = parseTimestamp(item.created_at); + const updatedTs = parseTimestamp(item.updated_at ?? item.created_at); + + return NewsStorySchema.parse({ + source_ts: publishedTs, + ingest_ts: Date.now(), + seq, + trace_id: `${provider}:${storyId}`, + story_id: storyId, + provider, + source: item.source?.trim() || item.author?.trim() || "Alpaca News", + headline: item.headline?.trim() || `Story ${storyId}`, + summary: item.summary?.trim() || "", + content_html: contentHtml, + url: item.url?.trim() || "", + published_ts: publishedTs, + updated_ts: updatedTs, + provider_symbols: symbols.provider_symbols, + resolved_symbols: symbols.resolved_symbols, + symbol_resolution: symbols.symbol_resolution + }); +}; + +const fetchBackfill = async (): Promise => { + const url = new URL("/v1beta1/news", env.ALPACA_REST_URL); + url.searchParams.set("sort", "desc"); + url.searchParams.set("limit", env.ALPACA_NEWS_BACKFILL_LIMIT.toString()); + + const response = await fetch(url.toString(), { + headers: buildHeaders() + }); + + if (!response.ok) { + throw new Error(`alpaca news backfill failed (${response.status})`); + } + + const payload = (await response.json()) as AlpacaNewsResponse; + return Array.isArray(payload.news) ? payload.news : []; +}; + +const decodePayload = (data: WebSocket.RawData): unknown => { + if (typeof data === "string") { + return JSON.parse(data) as unknown; + } + if (data instanceof ArrayBuffer) { + return JSON.parse(new TextDecoder().decode(new Uint8Array(data))) as unknown; + } + if (ArrayBuffer.isView(data)) { + return JSON.parse(new TextDecoder().decode(new Uint8Array(data.buffer, data.byteOffset, data.byteLength))) as unknown; + } + return JSON.parse(new TextDecoder().decode(new Uint8Array(data as ArrayBuffer))) as unknown; +}; + +const run = async () => { + if (!env.ALPACA_API_KEY) { + throw new Error("ALPACA_API_KEY is required for ingest-news."); + } + + const { nc, js, jsm } = await connectJetStreamWithRetry( + { + servers: env.NATS_URL, + name: service + }, + { attempts: 120, delayMs: 500 } + ); + + await ensureKnownStreams(jsm, [STREAM_NEWS], { logger }); + + let seq = 0; + const publishStory = async (item: AlpacaNewsItem) => { + seq += 1; + const story = toStory(item, seq); + if (!story) { + return; + } + await publishJson(js, SUBJECT_NEWS, story); + }; + + const backfill = await fetchBackfill(); + for (const item of backfill.reverse()) { + await publishStory(item); + } + + const wsUrl = new URL(env.ALPACA_NEWS_WEBSOCKET_PATH, env.ALPACA_WS_BASE_URL).toString(); + const ws = new WebSocket(wsUrl, { + headers: buildHeaders() + }); + + ws.on("open", () => { + ws.send( + JSON.stringify({ + action: "auth", + key: env.ALPACA_API_KEY, + secret: "" + }) + ); + }); + + ws.on("message", (raw) => { + let payload: unknown; + try { + payload = decodePayload(raw); + } catch (error) { + logger.warn("failed to decode alpaca news message", { + error: error instanceof Error ? error.message : String(error) + }); + return; + } + + if (!Array.isArray(payload)) { + return; + } + + for (const entry of payload) { + if (!entry || typeof entry !== "object") { + continue; + } + const message = entry as Record; + if (message.T === "success") { + const msg = typeof message.msg === "string" ? message.msg : ""; + if (msg === "authenticated") { + ws.send(JSON.stringify({ action: "subscribe", news: ["*"] })); + } + continue; + } + if (message.T === "subscription" || message.T === "error") { + continue; + } + void publishStory(message as AlpacaNewsItem).catch((error) => { + logger.error("failed to publish alpaca news story", { + error: error instanceof Error ? error.message : String(error) + }); + }); + } + }); + + const shutdown = async (signal: string) => { + logger.info("shutting down", { signal }); + ws.close(); + await nc.drain(); + process.exit(0); + }; + + process.on("SIGINT", () => void shutdown("SIGINT")); + process.on("SIGTERM", () => void shutdown("SIGTERM")); +}; + +void run().catch((error) => { + logger.error("service crashed", { + error: error instanceof Error ? error.message : String(error) + }); + process.exit(1); +}); diff --git a/services/ingest-news/src/symbols.ts b/services/ingest-news/src/symbols.ts new file mode 100644 index 0000000..e1537fd --- /dev/null +++ b/services/ingest-news/src/symbols.ts @@ -0,0 +1,70 @@ +import type { NewsSymbolResolution } from "@islandflow/types"; + +const TICKER_ANCHOR_RE = />\s*([A-Z]{1,5})\s*<\/a>/g; +const EXCHANGE_TICKER_RE = /\b(?:NASDAQ|NYSE|NYSEAMERICAN|AMEX|OTC|CBOE):([A-Z]{1,5})\b/g; +const DOLLAR_TICKER_RE = /\$([A-Z]{1,5})\b/g; + +const normalizeSymbols = (symbols: string[]): string[] => { + const seen = new Set(); + const normalized: string[] = []; + + for (const entry of symbols) { + const symbol = entry.trim().toUpperCase(); + if (!symbol || !/^[A-Z]{1,5}$/.test(symbol) || seen.has(symbol)) { + continue; + } + seen.add(symbol); + normalized.push(symbol); + } + + return normalized; +}; + +const collectMatches = (value: string, regex: RegExp): string[] => { + regex.lastIndex = 0; + const matches: string[] = []; + let match: RegExpExecArray | null = null; + while ((match = regex.exec(value)) !== null) { + matches.push(match[1] ?? ""); + } + return matches; +}; + +export const resolveNewsSymbols = ( + providerSymbols: string[], + contentHtml: string +): { + provider_symbols: string[]; + resolved_symbols: string[]; + symbol_resolution: NewsSymbolResolution; +} => { + const normalizedProvider = normalizeSymbols(providerSymbols); + const derived = normalizeSymbols([ + ...collectMatches(contentHtml, TICKER_ANCHOR_RE), + ...collectMatches(contentHtml, EXCHANGE_TICKER_RE), + ...collectMatches(contentHtml, DOLLAR_TICKER_RE) + ]); + + if (normalizedProvider.length > 0) { + const merged = normalizeSymbols([...normalizedProvider, ...derived]); + return { + provider_symbols: normalizedProvider, + resolved_symbols: merged, + symbol_resolution: derived.length > 0 ? "mixed" : "provider" + }; + } + + if (derived.length > 0) { + return { + provider_symbols: [], + resolved_symbols: derived, + symbol_resolution: "derived" + }; + } + + return { + provider_symbols: [], + resolved_symbols: [], + symbol_resolution: "none" + }; +}; diff --git a/services/ingest-news/tests/symbols.test.ts b/services/ingest-news/tests/symbols.test.ts new file mode 100644 index 0000000..4f3994e --- /dev/null +++ b/services/ingest-news/tests/symbols.test.ts @@ -0,0 +1,30 @@ +import { describe, expect, it } from "bun:test"; +import { resolveNewsSymbols } from "../src/symbols"; + +describe("resolveNewsSymbols", () => { + it("prefers provider symbols when present", () => { + const result = resolveNewsSymbols(["tsla", "aapl"], "

No extra tickers here.

"); + expect(result.provider_symbols).toEqual(["TSLA", "AAPL"]); + expect(result.resolved_symbols).toEqual(["TSLA", "AAPL"]); + expect(result.symbol_resolution).toBe("provider"); + }); + + it("falls back to ticker anchors", () => { + const result = resolveNewsSymbols([], 'TSLA'); + expect(result.resolved_symbols).toEqual(["TSLA"]); + expect(result.symbol_resolution).toBe("derived"); + }); + + it("falls back to exchange and dollar patterns", () => { + const result = resolveNewsSymbols([], "

NASDAQ:TSLA met with $IBM executives.

"); + expect(result.resolved_symbols).toEqual(["TSLA", "IBM"]); + expect(result.symbol_resolution).toBe("derived"); + }); + + it("dedupes and uppercases merged symbols", () => { + const result = resolveNewsSymbols(["tsla"], "

$TSLA and NASDAQ:TSLA

"); + expect(result.provider_symbols).toEqual(["TSLA"]); + expect(result.resolved_symbols).toEqual(["TSLA"]); + expect(result.symbol_resolution).toBe("mixed"); + }); +}); diff --git a/services/ingest-news/tsconfig.json b/services/ingest-news/tsconfig.json new file mode 100644 index 0000000..43ef119 --- /dev/null +++ b/services/ingest-news/tsconfig.json @@ -0,0 +1,7 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "types": [] + }, + "include": ["src/**/*.ts", "tests/**/*.ts"] +}