Compare commits

...

2 commits

32 changed files with 1410 additions and 52 deletions

View file

@ -13,6 +13,7 @@
{"_type":"issue","id":"islandflow-ayo","title":"Drop stale backlog events from live fanout","description":"Follow-up to live freshness rollout: /ws/live was still fanning out stale backlog events for freshness-gated channels, which kept tape panes in Live feed behind despite active synthetic ingest. Gate fanout and cache ingest by freshness for options/nbbo/equities/flow.","status":"closed","priority":1,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T21:26:39Z","created_by":"dirtydishes","updated_at":"2026-04-28T21:26:44Z","started_at":"2026-04-28T21:26:44Z","closed_at":"2026-04-28T21:26:44Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-ayo","title":"Drop stale backlog events from live fanout","description":"Follow-up to live freshness rollout: /ws/live was still fanning out stale backlog events for freshness-gated channels, which kept tape panes in Live feed behind despite active synthetic ingest. Gate fanout and cache ingest by freshness for options/nbbo/equities/flow.","status":"closed","priority":1,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T21:26:39Z","created_by":"dirtydishes","updated_at":"2026-04-28T21:26:44Z","started_at":"2026-04-28T21:26:44Z","closed_at":"2026-04-28T21:26:44Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0}
{"_type":"issue","id":"islandflow-0v6","title":"Fix tape freshness, NBBO coverage, pause controls, and filter popup","description":"Implement the tape fixes requested for synthetic options notional sizing, strict live freshness, live-mode pause/resume behavior, stronger NBBO snapshot coverage, and moving flow filters behind a popup. Includes server-side live cache changes, web terminal state/UI changes, and tests for synthetic pricing, live snapshot freshness/NBBO retention, and live pause/filter interactions.","status":"closed","priority":1,"issue_type":"task","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T21:02:52Z","created_by":"dirtydishes","updated_at":"2026-04-28T21:13:38Z","started_at":"2026-04-28T21:02:57Z","closed_at":"2026-04-28T21:13:38Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-0v6","title":"Fix tape freshness, NBBO coverage, pause controls, and filter popup","description":"Implement the tape fixes requested for synthetic options notional sizing, strict live freshness, live-mode pause/resume behavior, stronger NBBO snapshot coverage, and moving flow filters behind a popup. Includes server-side live cache changes, web terminal state/UI changes, and tests for synthetic pricing, live snapshot freshness/NBBO retention, and live pause/filter interactions.","status":"closed","priority":1,"issue_type":"task","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T21:02:52Z","created_by":"dirtydishes","updated_at":"2026-04-28T21:13:38Z","started_at":"2026-04-28T21:02:57Z","closed_at":"2026-04-28T21:13:38Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0}
{"_type":"issue","id":"islandflow-e4r","title":"Implement smart-money flow filtering and synthetic firehose modes","description":"Implement the approved multi-surface plan for named synthetic market profiles, options raw-vs-signal filtering, live/API filter contracts, Tape page client-side flow filters, firehose-readiness improvements, tests, and README updates.","status":"closed","priority":1,"issue_type":"feature","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T20:10:49Z","created_by":"dirtydishes","updated_at":"2026-04-28T20:29:29Z","started_at":"2026-04-28T20:10:53Z","closed_at":"2026-04-28T20:29:29Z","close_reason":"Implemented synthetic market profiles, options signal-path filtering, signal-aware API/replay contracts, Tape page filters, tests, and README updates. Follow-up tracked in islandflow-biq.","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-e4r","title":"Implement smart-money flow filtering and synthetic firehose modes","description":"Implement the approved multi-surface plan for named synthetic market profiles, options raw-vs-signal filtering, live/API filter contracts, Tape page client-side flow filters, firehose-readiness improvements, tests, and README updates.","status":"closed","priority":1,"issue_type":"feature","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T20:10:49Z","created_by":"dirtydishes","updated_at":"2026-04-28T20:29:29Z","started_at":"2026-04-28T20:10:53Z","closed_at":"2026-04-28T20:29:29Z","close_reason":"Implemented synthetic market profiles, options signal-path filtering, signal-aware API/replay contracts, Tape page filters, tests, and README updates. Follow-up tracked in islandflow-biq.","dependency_count":0,"dependent_count":0,"comment_count":0}
{"_type":"issue","id":"islandflow-8fn","title":"implement alpaca-backed news wire view","description":"Why this issue exists and what needs to be done:\\nAdd an Alpaca-powered live news pipeline, API, storage, and web experience, including a dedicated /news route, Home preview, live fanout, history pagination, ticker resolution, and replay-mode live-only empty states.\\n\\nAcceptance criteria:\\n- normalized NewsStory contract and live channel exist\\n- ingest-news service backfills and streams Alpaca news\\n- API persists, serves, and fans out news\\n- web app exposes /news plus Home preview and drawer\\n- tests cover types, storage, API, and key UI behaviors\\n- turn documentation is added\\n\\nDesign:\\nReuse Islandflow drawer, chips, panes, and terminal styling; keep news live-only in v1 replay mode.\\n\\nNotes:\\nImplement client-side ticker filtering in v1 and expose latest revision only per provider+story_id.","status":"closed","priority":2,"issue_type":"feature","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-18T20:37:13Z","created_by":"dirtydishes","updated_at":"2026-05-18T20:55:11Z","started_at":"2026-05-18T20:37:20Z","closed_at":"2026-05-18T20:55:11Z","close_reason":"Closed","dependency_count":0,"dependent_count":0,"comment_count":0}
{"_type":"issue","id":"islandflow-k8i","title":"Fix duplicate alert context import in API entrypoint","description":"Recent alert-context work introduced a duplicate fetchAlertContextByTraceId import in services/api/src/index.ts, which risks breaking TypeScript compilation and API startup. Remove the duplicate import and validate the affected API/web tests.","status":"closed","priority":2,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-18T13:01:58Z","created_by":"dirtydishes","updated_at":"2026-05-18T13:03:40Z","started_at":"2026-05-18T13:02:02Z","closed_at":"2026-05-18T13:03:40Z","close_reason":"Closed","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-k8i","title":"Fix duplicate alert context import in API entrypoint","description":"Recent alert-context work introduced a duplicate fetchAlertContextByTraceId import in services/api/src/index.ts, which risks breaking TypeScript compilation and API startup. Remove the duplicate import and validate the affected API/web tests.","status":"closed","priority":2,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-18T13:01:58Z","created_by":"dirtydishes","updated_at":"2026-05-18T13:03:40Z","started_at":"2026-05-18T13:02:02Z","closed_at":"2026-05-18T13:03:40Z","close_reason":"Closed","dependency_count":0,"dependent_count":0,"comment_count":0}
{"_type":"issue","id":"islandflow-lk9","title":"Fix PR creation workflow after Forgejo migration","description":"## Why\\nCreating pull requests with fails after the repository moved primary collaboration from GitHub to Forgejo. The current workflow still assumes GitHub GraphQL PR creation semantics, which do not work against the Forgejo remote.\\n\\n## What\\nInvestigate the current PR creation path, identify remaining GitHub-specific assumptions, and update the repo workflow/scripts/docs so contributors can reliably publish branches and open PRs in the Forgejo-based setup.\\n\\n## Acceptance Criteria\\n- The repo no longer instructs contributors to use a broken GitHub-specific PR creation path for Forgejo branches\\n- There is a documented and preferably scripted way to create the equivalent review request against Forgejo\\n- Validation demonstrates the new workflow behaves correctly or clearly documents any remaining platform limitation","status":"in_progress","priority":2,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-18T10:26:47Z","created_by":"dirtydishes","updated_at":"2026-05-18T10:26:53Z","started_at":"2026-05-18T10:26:53Z","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-lk9","title":"Fix PR creation workflow after Forgejo migration","description":"## Why\\nCreating pull requests with fails after the repository moved primary collaboration from GitHub to Forgejo. The current workflow still assumes GitHub GraphQL PR creation semantics, which do not work against the Forgejo remote.\\n\\n## What\\nInvestigate the current PR creation path, identify remaining GitHub-specific assumptions, and update the repo workflow/scripts/docs so contributors can reliably publish branches and open PRs in the Forgejo-based setup.\\n\\n## Acceptance Criteria\\n- The repo no longer instructs contributors to use a broken GitHub-specific PR creation path for Forgejo branches\\n- There is a documented and preferably scripted way to create the equivalent review request against Forgejo\\n- Validation demonstrates the new workflow behaves correctly or clearly documents any remaining platform limitation","status":"in_progress","priority":2,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-18T10:26:47Z","created_by":"dirtydishes","updated_at":"2026-05-18T10:26:53Z","started_at":"2026-05-18T10:26:53Z","dependency_count":0,"dependent_count":0,"comment_count":0}
{"_type":"issue","id":"islandflow-1ei","title":"Make deploy helper remote-aware for Forgejo","description":"Why: scripts/deploy.ts hardcodes git remote name origin for fetch/pull/push and branch verification, but this repository now uses forgejo/github remotes and may not have an origin remote. What: update deploy.ts to resolve the deploy git remote robustly (Forgejo-aware), use it across local prechecks, branch publish, and remote rollout git operations, and keep behavior explicit in output.","status":"closed","priority":2,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-18T03:20:12Z","created_by":"dirtydishes","updated_at":"2026-05-18T03:22:39Z","started_at":"2026-05-18T03:20:16Z","closed_at":"2026-05-18T03:22:39Z","close_reason":"Closed","dependency_count":0,"dependent_count":0,"comment_count":0} {"_type":"issue","id":"islandflow-1ei","title":"Make deploy helper remote-aware for Forgejo","description":"Why: scripts/deploy.ts hardcodes git remote name origin for fetch/pull/push and branch verification, but this repository now uses forgejo/github remotes and may not have an origin remote. What: update deploy.ts to resolve the deploy git remote robustly (Forgejo-aware), use it across local prechecks, branch publish, and remote rollout git operations, and keep behavior explicit in output.","status":"closed","priority":2,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-18T03:20:12Z","created_by":"dirtydishes","updated_at":"2026-05-18T03:22:39Z","started_at":"2026-05-18T03:20:16Z","closed_at":"2026-05-18T03:22:39Z","close_reason":"Closed","dependency_count":0,"dependent_count":0,"comment_count":0}

View file

@ -117,8 +117,9 @@ Each turn document must include these sections:
2. **Changes Made** 2. **Changes Made**
3. **Context** 3. **Context**
4. **Important Implementation Details** 4. **Important Implementation Details**
5. **Expected Impact for End-Users** 5. **Relevant Diff Snippets**
5. **Validation** 6. **Expected Impact for End-Users**
7. **Validation**
6. **Issues, Limitations, and Mitigations** 6. **Issues, Limitations, and Mitigations**
7. **Follow-up Work** 7. **Follow-up Work**

View file

@ -708,7 +708,12 @@ h3 {
grid-template-columns: repeat(3, minmax(0, 1fr)); grid-template-columns: repeat(3, minmax(0, 1fr));
} }
.page-grid-news {
grid-template-columns: minmax(0, 1fr);
}
.page-grid-home > :nth-child(3), .page-grid-home > :nth-child(3),
.page-grid-home > :nth-child(4),
.page-grid-tape > :nth-child(1), .page-grid-tape > :nth-child(1),
.page-grid-replay > :nth-child(1) { .page-grid-replay > :nth-child(1) {
grid-column: 1 / -1; grid-column: 1 / -1;
@ -933,6 +938,7 @@ h3 {
} }
.page-grid-home > :nth-child(3), .page-grid-home > :nth-child(3),
.page-grid-home > :nth-child(4),
.page-grid-replay > :not(:first-child) { .page-grid-replay > :not(:first-child) {
height: clamp(430px, 58vh, 760px); height: clamp(430px, 58vh, 760px);
} }
@ -1747,6 +1753,72 @@ h3 {
gap: 10px; gap: 10px;
} }
.terminal-link-button {
text-decoration: none;
}
.news-list {
display: flex;
flex-direction: column;
gap: 10px;
}
.news-row {
width: 100%;
display: flex;
flex-direction: column;
gap: 8px;
padding: 14px 16px;
border: 1px solid var(--border);
border-radius: 12px;
background: oklch(0.18 0.012 250 / 0.6);
color: var(--text);
text-align: left;
transition: border-color 150ms ease, background 150ms ease;
}
.news-row:hover {
border-color: var(--accent-soft);
background: oklch(0.2 0.015 250 / 0.75);
}
.news-row-head,
.news-row-meta {
display: flex;
align-items: center;
justify-content: space-between;
gap: 10px;
flex-wrap: wrap;
}
.news-row h3 {
margin: 0;
font-size: 0.96rem;
font-weight: 600;
}
.news-row-time {
color: var(--text-dim);
font-family: var(--font-mono), monospace;
font-size: 0.78rem;
}
.news-row-meta {
color: var(--text-dim);
font-size: 0.78rem;
}
.news-drawer-body a {
color: var(--accent);
}
.news-drawer-body p,
.news-drawer-body ul,
.news-drawer-body ol,
.news-drawer-body blockquote {
margin: 0 0 12px;
}
.synthetic-status-grid strong, .synthetic-status-grid strong,
.synthetic-hit-row strong { .synthetic-hit-row strong {
font-family: var(--font-mono), monospace; font-family: var(--font-mono), monospace;
@ -1964,6 +2036,7 @@ h3 {
} }
.page-grid-home > :nth-child(3), .page-grid-home > :nth-child(3),
.page-grid-home > :nth-child(4),
.page-grid-tape > :nth-child(1), .page-grid-tape > :nth-child(1),
.page-grid-replay > :nth-child(1) { .page-grid-replay > :nth-child(1) {
grid-column: auto; grid-column: auto;
@ -1973,6 +2046,7 @@ h3 {
.page-grid-home > :nth-child(1), .page-grid-home > :nth-child(1),
.page-grid-home > :nth-child(2), .page-grid-home > :nth-child(2),
.page-grid-home > :nth-child(3), .page-grid-home > :nth-child(3),
.page-grid-home > :nth-child(4),
.page-grid-signals > .terminal-pane, .page-grid-signals > .terminal-pane,
.page-grid-replay > :not(:first-child), .page-grid-replay > :not(:first-child),
.page-grid-tape > :first-child, .page-grid-tape > :first-child,

View file

@ -0,0 +1,7 @@
import { NewsRoute } from "../terminal";
export const dynamic = "force-dynamic";
export default function Page() {
return <NewsRoute />;
}

View file

@ -247,6 +247,15 @@ describe("live manifest", () => {
]); ]);
}); });
it("includes news subscriptions on home and /news", () => {
expect(getLiveManifest("/", "SPY", 60000, buildDefaultFlowFilters()).map((subscription) => subscription.channel)).toContain(
"news"
);
expect(getLiveManifest("/news", "SPY", 60000, buildDefaultFlowFilters()).map((subscription) => subscription.channel)).toEqual([
"news"
]);
});
it("scopes /charts subscriptions to chart channels only", () => { it("scopes /charts subscriptions to chart channels only", () => {
const channels = getLiveManifest("/charts", "SPY", 60000, buildDefaultFlowFilters()).map( const channels = getLiveManifest("/charts", "SPY", 60000, buildDefaultFlowFilters()).map(
(subscription) => subscription.channel (subscription) => subscription.channel
@ -431,6 +440,13 @@ describe("route feature map", () => {
expect(features.equityOverlay).toBe(true); expect(features.equityOverlay).toBe(true);
expect(features.alerts).toBe(false); expect(features.alerts).toBe(false);
}); });
it("maps /news to the dedicated news pane", () => {
const features = getRouteFeatures("/news");
expect(features.news).toBe(true);
expect(features.showNewsPane).toBe(true);
expect(features.showAlertsPane).toBe(false);
});
}); });
describe("fixed tape virtualization config", () => { describe("fixed tape virtualization config", () => {
@ -461,10 +477,11 @@ describe("dark underlying route dependency helper", () => {
}); });
describe("terminal navigation", () => { describe("terminal navigation", () => {
it("exposes only Home and Tape as top-level destinations", () => { it("exposes Home, Tape, and News as top-level destinations", () => {
expect(NAV_ITEMS).toEqual([ expect(NAV_ITEMS).toEqual([
{ href: "/", label: "Home" }, { href: "/", label: "Home" },
{ href: "/tape", label: "Tape" } { href: "/tape", label: "Tape" },
{ href: "/news", label: "News" }
]); ]);
}); });
}); });

View file

@ -33,6 +33,7 @@ import type {
LiveServerMessage, LiveServerMessage,
LiveHotChannelHealthMap, LiveHotChannelHealthMap,
LiveSubscription, LiveSubscription,
NewsStory,
OptionFlowFilters, OptionFlowFilters,
OptionFlowView, OptionFlowView,
OptionNbboSide, OptionNbboSide,
@ -158,6 +159,7 @@ type RouteFeatures = {
nbbo: boolean; nbbo: boolean;
equities: boolean; equities: boolean;
flow: boolean; flow: boolean;
news: boolean;
alerts: boolean; alerts: boolean;
smartMoney: boolean; smartMoney: boolean;
classifierHits: boolean; classifierHits: boolean;
@ -168,6 +170,7 @@ type RouteFeatures = {
showOptionsPane: boolean; showOptionsPane: boolean;
showEquitiesPane: boolean; showEquitiesPane: boolean;
showFlowPane: boolean; showFlowPane: boolean;
showNewsPane: boolean;
showAlertsPane: boolean; showAlertsPane: boolean;
showClassifierPane: boolean; showClassifierPane: boolean;
showDarkPane: boolean; showDarkPane: boolean;
@ -187,6 +190,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
const includeEquitiesFallback = shouldIncludeEquitiesForDarkUnderlyingFallback(); const includeEquitiesFallback = shouldIncludeEquitiesForDarkUnderlyingFallback();
const normalizedPath = const normalizedPath =
pathname === "/tape" || pathname === "/tape" ||
pathname === "/news" ||
pathname === "/signals" || pathname === "/signals" ||
pathname === "/charts" || pathname === "/charts" ||
pathname === "/replay" pathname === "/replay"
@ -200,6 +204,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
nbbo: true, nbbo: true,
equities: true, equities: true,
flow: true, flow: true,
news: false,
alerts: false, alerts: false,
smartMoney: false, smartMoney: false,
classifierHits: false, classifierHits: false,
@ -210,6 +215,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
showOptionsPane: true, showOptionsPane: true,
showEquitiesPane: true, showEquitiesPane: true,
showFlowPane: true, showFlowPane: true,
showNewsPane: false,
showAlertsPane: false, showAlertsPane: false,
showClassifierPane: false, showClassifierPane: false,
showDarkPane: false, showDarkPane: false,
@ -220,12 +226,41 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
needsAlertEvidencePrefetch: false, needsAlertEvidencePrefetch: false,
needsDarkUnderlying: false needsDarkUnderlying: false
}; };
case "/news":
return {
options: false,
nbbo: false,
equities: false,
flow: false,
news: true,
alerts: false,
smartMoney: false,
classifierHits: false,
inferredDark: false,
equityJoins: false,
equityCandles: false,
equityOverlay: false,
showOptionsPane: false,
showEquitiesPane: false,
showFlowPane: false,
showNewsPane: true,
showAlertsPane: false,
showClassifierPane: false,
showDarkPane: false,
showChartPane: false,
showFocusPane: false,
showReplayConsole: false,
needsClassifierDecor: false,
needsAlertEvidencePrefetch: false,
needsDarkUnderlying: false
};
case "/signals": case "/signals":
return { return {
options: false, options: false,
nbbo: false, nbbo: false,
equities: includeEquitiesFallback, equities: includeEquitiesFallback,
flow: false, flow: false,
news: false,
alerts: true, alerts: true,
smartMoney: true, smartMoney: true,
classifierHits: true, classifierHits: true,
@ -236,6 +271,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
showOptionsPane: false, showOptionsPane: false,
showEquitiesPane: false, showEquitiesPane: false,
showFlowPane: false, showFlowPane: false,
showNewsPane: false,
showAlertsPane: true, showAlertsPane: true,
showClassifierPane: true, showClassifierPane: true,
showDarkPane: true, showDarkPane: true,
@ -252,6 +288,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
nbbo: false, nbbo: false,
equities: includeEquitiesFallback, equities: includeEquitiesFallback,
flow: false, flow: false,
news: false,
alerts: false, alerts: false,
smartMoney: true, smartMoney: true,
classifierHits: false, classifierHits: false,
@ -262,6 +299,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
showOptionsPane: false, showOptionsPane: false,
showEquitiesPane: false, showEquitiesPane: false,
showFlowPane: false, showFlowPane: false,
showNewsPane: false,
showAlertsPane: false, showAlertsPane: false,
showClassifierPane: false, showClassifierPane: false,
showDarkPane: false, showDarkPane: false,
@ -278,6 +316,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
nbbo: false, nbbo: false,
equities: false, equities: false,
flow: false, flow: false,
news: false,
alerts: false, alerts: false,
smartMoney: false, smartMoney: false,
classifierHits: false, classifierHits: false,
@ -288,6 +327,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
showOptionsPane: true, showOptionsPane: true,
showEquitiesPane: false, showEquitiesPane: false,
showFlowPane: true, showFlowPane: true,
showNewsPane: false,
showAlertsPane: true, showAlertsPane: true,
showClassifierPane: false, showClassifierPane: false,
showDarkPane: false, showDarkPane: false,
@ -305,6 +345,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
nbbo: false, nbbo: false,
equities: true, equities: true,
flow: false, flow: false,
news: true,
alerts: true, alerts: true,
smartMoney: true, smartMoney: true,
classifierHits: false, classifierHits: false,
@ -315,6 +356,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
showOptionsPane: false, showOptionsPane: false,
showEquitiesPane: true, showEquitiesPane: true,
showFlowPane: false, showFlowPane: false,
showNewsPane: true,
showAlertsPane: true, showAlertsPane: true,
showClassifierPane: false, showClassifierPane: false,
showDarkPane: false, showDarkPane: false,
@ -332,6 +374,7 @@ const EMPTY_ALERT_EVENTS: AlertEvent[] = [];
const EMPTY_CLASSIFIER_HIT_EVENTS: ClassifierHitEvent[] = []; const EMPTY_CLASSIFIER_HIT_EVENTS: ClassifierHitEvent[] = [];
const EMPTY_SMART_MONEY_EVENTS: SmartMoneyEvent[] = []; const EMPTY_SMART_MONEY_EVENTS: SmartMoneyEvent[] = [];
const EMPTY_INFERRED_DARK_EVENTS: InferredDarkEvent[] = []; const EMPTY_INFERRED_DARK_EVENTS: InferredDarkEvent[] = [];
const EMPTY_NEWS_STORIES: NewsStory[] = [];
type CandlestickSeries = ReturnType<IChartApi["addCandlestickSeries"]>; type CandlestickSeries = ReturnType<IChartApi["addCandlestickSeries"]>;
@ -1194,6 +1237,44 @@ const formatDateTime = (ts: number): string => {
return `${date.toLocaleDateString()} ${date.toLocaleTimeString()}`; return `${date.toLocaleDateString()} ${date.toLocaleTimeString()}`;
}; };
const isSameLocalDay = (left: number, right: number): boolean => {
const a = new Date(left);
const b = new Date(right);
return (
a.getFullYear() === b.getFullYear() &&
a.getMonth() === b.getMonth() &&
a.getDate() === b.getDate()
);
};
export const formatNewsTimestamp = (ts: number, now = Date.now()): string => {
const date = new Date(ts);
return isSameLocalDay(ts, now)
? date.toLocaleTimeString([], { hour: "numeric", minute: "2-digit" })
: date.toLocaleString([], { month: "short", day: "numeric", hour: "numeric", minute: "2-digit" });
};
const sanitizeNewsHtml = (value: string): { html: string; fallbackText: string; sanitized: boolean } => {
const fallbackText = value
.replace(/<script[\s\S]*?<\/script>/gi, " ")
.replace(/<style[\s\S]*?<\/style>/gi, " ")
.replace(/<[^>]+>/g, " ")
.replace(/\s+/g, " ")
.trim();
try {
const sanitized = value
.replace(/<script[\s\S]*?<\/script>/gi, "")
.replace(/<style[\s\S]*?<\/style>/gi, "")
.replace(/\son\w+=(?:"[^"]*"|'[^']*'|[^\s>]+)/gi, "")
.replace(/\shref=(["'])javascript:[\s\S]*?\1/gi, ' href="#"')
.replace(/<(?!\/?(p|div|section|article|span|strong|em|b|i|ul|ol|li|br|a|h1|h2|h3|h4|blockquote)\b)[^>]*>/gi, "");
return { html: sanitized, fallbackText, sanitized: true };
} catch {
return { html: "", fallbackText, sanitized: false };
}
};
const humanizeClassifierId = (value: string): string => { const humanizeClassifierId = (value: string): string => {
if (!value) { if (!value) {
return "Classifier"; return "Classifier";
@ -2870,6 +2951,7 @@ type LiveSessionState = {
smartMoneyHistory: SmartMoneyEvent[]; smartMoneyHistory: SmartMoneyEvent[];
classifierHitsHistory: ClassifierHitEvent[]; classifierHitsHistory: ClassifierHitEvent[];
alertsHistory: AlertEvent[]; alertsHistory: AlertEvent[];
newsHistory: NewsStory[];
inferredDarkHistory: InferredDarkEvent[]; inferredDarkHistory: InferredDarkEvent[];
options: OptionPrint[]; options: OptionPrint[];
nbbo: OptionNBBO[]; nbbo: OptionNBBO[];
@ -2880,6 +2962,7 @@ type LiveSessionState = {
smartMoney: SmartMoneyEvent[]; smartMoney: SmartMoneyEvent[];
classifierHits: ClassifierHitEvent[]; classifierHits: ClassifierHitEvent[];
alerts: AlertEvent[]; alerts: AlertEvent[];
news: NewsStory[];
inferredDark: InferredDarkEvent[]; inferredDark: InferredDarkEvent[];
chartCandles: EquityCandle[]; chartCandles: EquityCandle[];
chartOverlay: EquityPrint[]; chartOverlay: EquityPrint[];
@ -2900,6 +2983,7 @@ const LIVE_HISTORY_ENDPOINTS: Partial<Record<LiveSubscription["channel"], string
"smart-money": "/history/smart-money", "smart-money": "/history/smart-money",
"classifier-hits": "/history/classifier-hits", "classifier-hits": "/history/classifier-hits",
alerts: "/history/alerts", alerts: "/history/alerts",
news: "/history/news",
"inferred-dark": "/history/inferred-dark" "inferred-dark": "/history/inferred-dark"
}; };
@ -3072,6 +3156,9 @@ export const getLiveManifest = (
if (features.flow) { if (features.flow) {
subscriptions.push({ channel: "flow", filters: flowFilters, snapshot_limit: LIVE_HOT_WINDOW }); subscriptions.push({ channel: "flow", filters: flowFilters, snapshot_limit: LIVE_HOT_WINDOW });
} }
if (features.news) {
subscriptions.push({ channel: "news", snapshot_limit: LIVE_OPTIONS_HEAD_LIMIT });
}
if (features.alerts) { if (features.alerts) {
subscriptions.push({ channel: "alerts", snapshot_limit: LIVE_HOT_WINDOW }); subscriptions.push({ channel: "alerts", snapshot_limit: LIVE_HOT_WINDOW });
} }
@ -3133,6 +3220,7 @@ const useLiveSession = (
const [smartMoney, setSmartMoney] = useState<SmartMoneyEvent[]>([]); const [smartMoney, setSmartMoney] = useState<SmartMoneyEvent[]>([]);
const [classifierHits, setClassifierHits] = useState<ClassifierHitEvent[]>([]); const [classifierHits, setClassifierHits] = useState<ClassifierHitEvent[]>([]);
const [alerts, setAlerts] = useState<AlertEvent[]>([]); const [alerts, setAlerts] = useState<AlertEvent[]>([]);
const [news, setNews] = useState<NewsStory[]>([]);
const [inferredDark, setInferredDark] = useState<InferredDarkEvent[]>([]); const [inferredDark, setInferredDark] = useState<InferredDarkEvent[]>([]);
const [optionsHistory, setOptionsHistory] = useState<OptionPrint[]>([]); const [optionsHistory, setOptionsHistory] = useState<OptionPrint[]>([]);
const [nbboHistory, setNbboHistory] = useState<OptionNBBO[]>([]); const [nbboHistory, setNbboHistory] = useState<OptionNBBO[]>([]);
@ -3142,6 +3230,7 @@ const useLiveSession = (
const [smartMoneyHistory, setSmartMoneyHistory] = useState<SmartMoneyEvent[]>([]); const [smartMoneyHistory, setSmartMoneyHistory] = useState<SmartMoneyEvent[]>([]);
const [classifierHitsHistory, setClassifierHitsHistory] = useState<ClassifierHitEvent[]>([]); const [classifierHitsHistory, setClassifierHitsHistory] = useState<ClassifierHitEvent[]>([]);
const [alertsHistory, setAlertsHistory] = useState<AlertEvent[]>([]); const [alertsHistory, setAlertsHistory] = useState<AlertEvent[]>([]);
const [newsHistory, setNewsHistory] = useState<NewsStory[]>([]);
const [inferredDarkHistory, setInferredDarkHistory] = useState<InferredDarkEvent[]>([]); const [inferredDarkHistory, setInferredDarkHistory] = useState<InferredDarkEvent[]>([]);
const [chartCandles, setChartCandles] = useState<EquityCandle[]>([]); const [chartCandles, setChartCandles] = useState<EquityCandle[]>([]);
const [chartOverlay, setChartOverlay] = useState<EquityPrint[]>([]); const [chartOverlay, setChartOverlay] = useState<EquityPrint[]>([]);
@ -3154,6 +3243,7 @@ const useLiveSession = (
const smartMoneyRef = useRef<SmartMoneyEvent[]>([]); const smartMoneyRef = useRef<SmartMoneyEvent[]>([]);
const classifierHitsRef = useRef<ClassifierHitEvent[]>([]); const classifierHitsRef = useRef<ClassifierHitEvent[]>([]);
const alertsRef = useRef<AlertEvent[]>([]); const alertsRef = useRef<AlertEvent[]>([]);
const newsRef = useRef<NewsStory[]>([]);
const inferredDarkRef = useRef<InferredDarkEvent[]>([]); const inferredDarkRef = useRef<InferredDarkEvent[]>([]);
const chartCandlesRef = useRef<EquityCandle[]>([]); const chartCandlesRef = useRef<EquityCandle[]>([]);
const chartOverlayRef = useRef<EquityPrint[]>([]); const chartOverlayRef = useRef<EquityPrint[]>([]);
@ -3165,6 +3255,7 @@ const useLiveSession = (
const smartMoneyHistoryRef = useRef<SmartMoneyEvent[]>([]); const smartMoneyHistoryRef = useRef<SmartMoneyEvent[]>([]);
const classifierHitsHistoryRef = useRef<ClassifierHitEvent[]>([]); const classifierHitsHistoryRef = useRef<ClassifierHitEvent[]>([]);
const alertsHistoryRef = useRef<AlertEvent[]>([]); const alertsHistoryRef = useRef<AlertEvent[]>([]);
const newsHistoryRef = useRef<NewsStory[]>([]);
const inferredDarkHistoryRef = useRef<InferredDarkEvent[]>([]); const inferredDarkHistoryRef = useRef<InferredDarkEvent[]>([]);
const socketRef = useRef<WebSocket | null>(null); const socketRef = useRef<WebSocket | null>(null);
const reconnectRef = useRef<number | null>(null); const reconnectRef = useRef<number | null>(null);
@ -3218,6 +3309,7 @@ const useLiveSession = (
setSmartMoney([]); setSmartMoney([]);
setClassifierHits([]); setClassifierHits([]);
setAlerts([]); setAlerts([]);
setNews([]);
setInferredDark([]); setInferredDark([]);
setOptionsHistory([]); setOptionsHistory([]);
setNbboHistory([]); setNbboHistory([]);
@ -3227,6 +3319,7 @@ const useLiveSession = (
setSmartMoneyHistory([]); setSmartMoneyHistory([]);
setClassifierHitsHistory([]); setClassifierHitsHistory([]);
setAlertsHistory([]); setAlertsHistory([]);
setNewsHistory([]);
setInferredDarkHistory([]); setInferredDarkHistory([]);
setChartCandles([]); setChartCandles([]);
setChartOverlay([]); setChartOverlay([]);
@ -3239,6 +3332,7 @@ const useLiveSession = (
smartMoneyRef.current = []; smartMoneyRef.current = [];
classifierHitsRef.current = []; classifierHitsRef.current = [];
alertsRef.current = []; alertsRef.current = [];
newsRef.current = [];
inferredDarkRef.current = []; inferredDarkRef.current = [];
chartCandlesRef.current = []; chartCandlesRef.current = [];
chartOverlayRef.current = []; chartOverlayRef.current = [];
@ -3250,6 +3344,7 @@ const useLiveSession = (
smartMoneyHistoryRef.current = []; smartMoneyHistoryRef.current = [];
classifierHitsHistoryRef.current = []; classifierHitsHistoryRef.current = [];
alertsHistoryRef.current = []; alertsHistoryRef.current = [];
newsHistoryRef.current = [];
inferredDarkHistoryRef.current = []; inferredDarkHistoryRef.current = [];
subscribedKeysRef.current = new Set(); subscribedKeysRef.current = new Set();
subscribedMapRef.current = new Map(); subscribedMapRef.current = new Map();
@ -3403,6 +3498,12 @@ const useLiveSession = (
ref: alertsHistoryRef ref: alertsHistoryRef
}); });
break; break;
case "news":
mergeItems(setNews, newsRef, items as NewsStory[], LIVE_OPTIONS_HEAD_LIMIT, {
setter: setNewsHistory,
ref: newsHistoryRef
});
break;
case "inferred-dark": case "inferred-dark":
mergeItems(setInferredDark, inferredDarkRef, items as InferredDarkEvent[], LIVE_HOT_WINDOW, { mergeItems(setInferredDark, inferredDarkRef, items as InferredDarkEvent[], LIVE_HOT_WINDOW, {
setter: setInferredDarkHistory, setter: setInferredDarkHistory,
@ -3694,6 +3795,9 @@ const useLiveSession = (
case "alerts": case "alerts":
mergeOlder(setAlertsHistory, alertsHistoryRef, alertsRef.current); mergeOlder(setAlertsHistory, alertsHistoryRef, alertsRef.current);
break; break;
case "news":
mergeOlder(setNewsHistory, newsHistoryRef, newsRef.current);
break;
case "inferred-dark": case "inferred-dark":
mergeOlder(setInferredDarkHistory, inferredDarkHistoryRef, inferredDarkRef.current); mergeOlder(setInferredDarkHistory, inferredDarkHistoryRef, inferredDarkRef.current);
break; break;
@ -3735,6 +3839,7 @@ const useLiveSession = (
smartMoneyHistory, smartMoneyHistory,
classifierHitsHistory, classifierHitsHistory,
alertsHistory, alertsHistory,
newsHistory,
inferredDarkHistory, inferredDarkHistory,
options, options,
nbbo, nbbo,
@ -3745,6 +3850,7 @@ const useLiveSession = (
smartMoney, smartMoney,
classifierHits, classifierHits,
alerts, alerts,
news,
inferredDark, inferredDark,
chartCandles, chartCandles,
chartOverlay chartOverlay
@ -4822,6 +4928,69 @@ const AlertDrawer = ({ alert, flowPacket, evidence, contextStatus, onClose }: Al
); );
}; };
type NewsDrawerProps = {
story: NewsStory;
onClose: () => void;
};
const NewsDrawer = ({ story, onClose }: NewsDrawerProps) => {
const body = sanitizeNewsHtml(story.content_html);
return (
<aside className="drawer">
<div className="drawer-header">
<div>
<p className="drawer-eyebrow">News wire</p>
<h3>{story.headline}</h3>
<p className="drawer-subtitle">
{story.source} · Published {formatDateTime(story.published_ts)}
{story.updated_ts !== story.published_ts ? ` · Updated ${formatDateTime(story.updated_ts)}` : ""}
</p>
</div>
<button className="drawer-close" type="button" onClick={onClose}>
Close
</button>
</div>
<div className="drawer-meta">
{story.resolved_symbols.map((symbol) => (
<span className="drawer-chip" key={`${story.trace_id}-${symbol}`}>
{symbol}
</span>
))}
<span className="drawer-chip">{story.symbol_resolution}</span>
</div>
{story.summary ? (
<div className="drawer-section">
<h4>Summary</h4>
<p className="drawer-note">{story.summary}</p>
</div>
) : null}
<div className="drawer-section">
<h4>Story</h4>
{body.sanitized && body.html ? (
<div className="drawer-note news-drawer-body" dangerouslySetInnerHTML={{ __html: body.html }} />
) : body.fallbackText ? (
<p className="drawer-note">{body.fallbackText}</p>
) : (
<p className="drawer-empty">Story body unavailable.</p>
)}
</div>
{story.url ? (
<div className="drawer-section">
<h4>Source link</h4>
<a className="terminal-button terminal-link-button" href={story.url} rel="noreferrer" target="_blank">
Open original article
</a>
</div>
) : null}
</aside>
);
};
type ClassifierHitDrawerProps = { type ClassifierHitDrawerProps = {
hit: ClassifierHitEvent; hit: ClassifierHitEvent;
flowPacket: FlowPacket | null; flowPacket: FlowPacket | null;
@ -5178,6 +5347,7 @@ const useTerminalState = () => {
const [mode, setMode] = useState<TapeMode>("live"); const [mode, setMode] = useState<TapeMode>("live");
const [replaySource, setReplaySource] = useState<string | null>(null); const [replaySource, setReplaySource] = useState<string | null>(null);
const [selectedAlert, setSelectedAlert] = useState<AlertEvent | null>(null); const [selectedAlert, setSelectedAlert] = useState<AlertEvent | null>(null);
const [selectedNewsStory, setSelectedNewsStory] = useState<NewsStory | null>(null);
const [selectedDarkEvent, setSelectedDarkEvent] = useState<InferredDarkEvent | null>(null); const [selectedDarkEvent, setSelectedDarkEvent] = useState<InferredDarkEvent | null>(null);
const [selectedClassifierHit, setSelectedClassifierHit] = useState<ClassifierHitEvent | null>(null); const [selectedClassifierHit, setSelectedClassifierHit] = useState<ClassifierHitEvent | null>(null);
const [selectedSmartMoneyEvent, setSelectedSmartMoneyEvent] = useState<SmartMoneyEvent | null>(null); const [selectedSmartMoneyEvent, setSelectedSmartMoneyEvent] = useState<SmartMoneyEvent | null>(null);
@ -5274,12 +5444,13 @@ const useTerminalState = () => {
}, [mode]); }, [mode]);
useEffect(() => { useEffect(() => {
if (!selectedAlert && !selectedClassifierHit && !selectedDarkEvent && !selectedSmartMoneyEvent) { if (!selectedAlert && !selectedNewsStory && !selectedClassifierHit && !selectedDarkEvent && !selectedSmartMoneyEvent) {
return; return;
} }
const dismissDrawers = () => { const dismissDrawers = () => {
setSelectedAlert(null); setSelectedAlert(null);
setSelectedNewsStory(null);
setSelectedClassifierHit(null); setSelectedClassifierHit(null);
setSelectedSmartMoneyEvent(null); setSelectedSmartMoneyEvent(null);
setSelectedDarkEvent(null); setSelectedDarkEvent(null);
@ -5305,7 +5476,7 @@ const useTerminalState = () => {
document.removeEventListener("mousedown", handlePointerDown); document.removeEventListener("mousedown", handlePointerDown);
document.removeEventListener("keydown", handleKeyDown); document.removeEventListener("keydown", handleKeyDown);
}; };
}, [selectedAlert, selectedClassifierHit, selectedDarkEvent, selectedSmartMoneyEvent]); }, [selectedAlert, selectedNewsStory, selectedClassifierHit, selectedDarkEvent, selectedSmartMoneyEvent]);
const optionsScroll = useListScroll(); const optionsScroll = useListScroll();
const equitiesScroll = useListScroll(); const equitiesScroll = useListScroll();
@ -5540,6 +5711,14 @@ const useTerminalState = () => {
) )
: equityJoins; : equityJoins;
const flowFeed = mode === "live" ? liveFlow : flow; const flowFeed = mode === "live" ? liveFlow : flow;
const newsFeed =
mode === "live"
? toStaticTapeState(
liveSession.status,
composeTapeItems([], liveSession.news, liveSession.newsHistory),
liveSession.lastUpdate
)
: toStaticTapeState("disconnected", [], null);
const alertsFeed = const alertsFeed =
mode === "live" mode === "live"
? toStaticTapeState( ? toStaticTapeState(
@ -6490,6 +6669,16 @@ const useTerminalState = () => {
routeFeatures.needsAlertEvidencePrefetch routeFeatures.needsAlertEvidencePrefetch
]); ]);
const filteredNews = useMemo(() => {
if (!routeFeatures.news && !routeFeatures.showNewsPane) {
return EMPTY_NEWS_STORIES;
}
if (tickerSet.size === 0) {
return newsFeed.items;
}
return newsFeed.items.filter((story) => story.resolved_symbols.some((symbol) => matchesTicker(symbol)));
}, [matchesTicker, newsFeed.items, routeFeatures.news, routeFeatures.showNewsPane, tickerSet]);
const visibleAlerts = useMemo(() => { const visibleAlerts = useMemo(() => {
if (routeFeatures.needsAlertEvidencePrefetch) { if (routeFeatures.needsAlertEvidencePrefetch) {
return filteredAlerts.slice(0, 12); return filteredAlerts.slice(0, 12);
@ -6767,6 +6956,7 @@ const useTerminalState = () => {
(hit: ClassifierHitEvent) => { (hit: ClassifierHitEvent) => {
const alert = findAlertForClassifierHit(hit); const alert = findAlertForClassifierHit(hit);
if (alert) { if (alert) {
setSelectedNewsStory(null);
setSelectedClassifierHit(null); setSelectedClassifierHit(null);
setSelectedDarkEvent(null); setSelectedDarkEvent(null);
setSelectedSmartMoneyEvent(null); setSelectedSmartMoneyEvent(null);
@ -6774,6 +6964,7 @@ const useTerminalState = () => {
return; return;
} }
setSelectedNewsStory(null);
setSelectedAlert(null); setSelectedAlert(null);
setSelectedDarkEvent(null); setSelectedDarkEvent(null);
setSelectedSmartMoneyEvent(null); setSelectedSmartMoneyEvent(null);
@ -6783,6 +6974,7 @@ const useTerminalState = () => {
); );
const openFromSmartMoneyEvent = useCallback((event: SmartMoneyEvent) => { const openFromSmartMoneyEvent = useCallback((event: SmartMoneyEvent) => {
setSelectedNewsStory(null);
setSelectedAlert(null); setSelectedAlert(null);
setSelectedClassifierHit(null); setSelectedClassifierHit(null);
setSelectedDarkEvent(null); setSelectedDarkEvent(null);
@ -6797,6 +6989,7 @@ const useTerminalState = () => {
); );
const handleDarkMarkerClick = useCallback((event: InferredDarkEvent) => { const handleDarkMarkerClick = useCallback((event: InferredDarkEvent) => {
setSelectedNewsStory(null);
setSelectedAlert(null); setSelectedAlert(null);
setSelectedClassifierHit(null); setSelectedClassifierHit(null);
setSelectedSmartMoneyEvent(null); setSelectedSmartMoneyEvent(null);
@ -6817,6 +7010,9 @@ const useTerminalState = () => {
if (routeFeatures.flow || routeFeatures.showFlowPane) { if (routeFeatures.flow || routeFeatures.showFlowPane) {
updates.push(flowFeed.lastUpdate); updates.push(flowFeed.lastUpdate);
} }
if (routeFeatures.news || routeFeatures.showNewsPane) {
updates.push(newsFeed.lastUpdate);
}
if (routeFeatures.alerts || routeFeatures.showAlertsPane) { if (routeFeatures.alerts || routeFeatures.showAlertsPane) {
updates.push(alertsFeed.lastUpdate); updates.push(alertsFeed.lastUpdate);
} }
@ -6839,6 +7035,8 @@ const useTerminalState = () => {
routeFeatures.showFocusPane, routeFeatures.showFocusPane,
routeFeatures.flow, routeFeatures.flow,
routeFeatures.showFlowPane, routeFeatures.showFlowPane,
routeFeatures.news,
routeFeatures.showNewsPane,
routeFeatures.alerts, routeFeatures.alerts,
routeFeatures.showAlertsPane, routeFeatures.showAlertsPane,
routeFeatures.smartMoney, routeFeatures.smartMoney,
@ -6849,6 +7047,7 @@ const useTerminalState = () => {
equitiesFeed.lastUpdate, equitiesFeed.lastUpdate,
inferredDarkFeed.lastUpdate, inferredDarkFeed.lastUpdate,
flowFeed.lastUpdate, flowFeed.lastUpdate,
newsFeed.lastUpdate,
alertsFeed.lastUpdate, alertsFeed.lastUpdate,
smartMoneyFeed.lastUpdate, smartMoneyFeed.lastUpdate,
classifierHitsFeed.lastUpdate classifierHitsFeed.lastUpdate
@ -6861,6 +7060,8 @@ const useTerminalState = () => {
setReplaySource, setReplaySource,
selectedAlert, selectedAlert,
setSelectedAlert, setSelectedAlert,
selectedNewsStory,
setSelectedNewsStory,
selectedDarkEvent, selectedDarkEvent,
setSelectedDarkEvent, setSelectedDarkEvent,
selectedClassifierHit, selectedClassifierHit,
@ -6887,6 +7088,7 @@ const useTerminalState = () => {
equityJoins: equityJoinsFeed, equityJoins: equityJoinsFeed,
nbbo: nbboFeed, nbbo: nbboFeed,
inferredDark: inferredDarkFeed, inferredDark: inferredDarkFeed,
news: newsFeed,
flow: flowFeed, flow: flowFeed,
alerts: alertsFeed, alerts: alertsFeed,
smartMoney: smartMoneyFeed, smartMoney: smartMoneyFeed,
@ -6920,6 +7122,7 @@ const useTerminalState = () => {
equitiesScopedQuiet, equitiesScopedQuiet,
equitiesSilentWarning, equitiesSilentWarning,
filteredInferredDark, filteredInferredDark,
filteredNews,
filteredFlow, filteredFlow,
filteredAlerts, filteredAlerts,
filteredSmartMoneyEvents, filteredSmartMoneyEvents,
@ -6953,7 +7156,8 @@ const useTerminal = (): TerminalState => {
export const NAV_ITEMS = [ export const NAV_ITEMS = [
{ href: "/", label: "Home" }, { href: "/", label: "Home" },
{ href: "/tape", label: "Tape" } { href: "/tape", label: "Tape" },
{ href: "/news", label: "News" }
] as const; ] as const;
type PageFrameProps = { type PageFrameProps = {
@ -7780,6 +7984,7 @@ const AlertsPane = memo(({ state, limit, withStrip = false, className }: AlertsP
data-tape-key={key} data-tape-key={key}
style={{ transform: `translateY(${start}px)` }} style={{ transform: `translateY(${start}px)` }}
onClick={() => { onClick={() => {
state.setSelectedNewsStory(null);
state.setSelectedDarkEvent(null); state.setSelectedDarkEvent(null);
state.setSelectedClassifierHit(null); state.setSelectedClassifierHit(null);
state.setSelectedSmartMoneyEvent(null); state.setSelectedSmartMoneyEvent(null);
@ -7806,6 +8011,83 @@ const AlertsPane = memo(({ state, limit, withStrip = false, className }: AlertsP
); );
}); });
type NewsPaneProps = {
state: TerminalState;
limit?: number;
className?: string;
};
const NewsPane = memo(({ state, limit, className }: NewsPaneProps) => {
const items = limit ? state.filteredNews.slice(0, limit) : state.filteredNews;
const canLoadOlder = state.mode === "live" && !limit && items.length > 0;
return (
<Pane
className={className}
title="News Wire"
status={
limit ? (
<Link className="terminal-button terminal-link-button" href="/news">
View all
</Link>
) : (
<div className="status-inline status-connected">
<span className="status-dot" />
<span>{state.mode === "live" ? "Live wire" : "Live-only in v1"}</span>
</div>
)
}
actions={
canLoadOlder ? (
<button className="terminal-button" type="button" onClick={() => void state.liveSession.loadOlder("news")}>
Older
</button>
) : null
}
>
{state.mode === "replay" ? (
<div className="empty">News is live-only in v1.</div>
) : items.length === 0 ? (
<div className="empty">
{state.tickerSet.size > 0 ? "No news stories match the current filter." : "Waiting for live news stories."}
</div>
) : (
<div className="news-list" role="list" aria-label="News stories">
{items.map((story) => (
<button
className="news-row"
key={`${story.trace_id}:${story.updated_ts}:${story.seq}`}
type="button"
onClick={() => {
state.setSelectedNewsStory(null);
state.setSelectedAlert(null);
state.setSelectedClassifierHit(null);
state.setSelectedSmartMoneyEvent(null);
state.setSelectedDarkEvent(null);
state.setSelectedNewsStory(story);
}}
>
<div className="news-row-head">
<h3>{story.headline}</h3>
<span className="news-row-time">{formatNewsTimestamp(story.published_ts)}</span>
</div>
<div className="news-row-meta">
<span>{story.source}</span>
{story.resolved_symbols.map((symbol) => (
<span className="drawer-chip" key={`${story.trace_id}-${symbol}`}>
{symbol}
</span>
))}
</div>
{!limit && story.summary ? <p className="drawer-note">{story.summary}</p> : null}
</button>
))}
</div>
)}
</Pane>
);
});
type ClassifierPaneProps = { type ClassifierPaneProps = {
state: TerminalState; state: TerminalState;
limit?: number; limit?: number;
@ -8016,6 +8298,7 @@ const DarkPane = memo(({ state, limit, className }: DarkPaneProps) => {
data-tape-key={key} data-tape-key={key}
style={{ transform: `translateY(${start}px)` }} style={{ transform: `translateY(${start}px)` }}
onClick={() => { onClick={() => {
state.setSelectedNewsStory(null);
state.setSelectedAlert(null); state.setSelectedAlert(null);
state.setSelectedClassifierHit(null); state.setSelectedClassifierHit(null);
state.setSelectedSmartMoneyEvent(null); state.setSelectedSmartMoneyEvent(null);
@ -8624,6 +8907,10 @@ export function TerminalAppShell({ children }: { children: ReactNode }) {
/> />
) : null} ) : null}
{state.selectedNewsStory ? (
<NewsDrawer story={state.selectedNewsStory} onClose={() => state.setSelectedNewsStory(null)} />
) : null}
{state.selectedClassifierHit ? ( {state.selectedClassifierHit ? (
<ClassifierHitDrawer <ClassifierHitDrawer
hit={state.selectedClassifierHit} hit={state.selectedClassifierHit}
@ -8662,12 +8949,24 @@ export function OverviewRoute() {
<div className="page-grid page-grid-home"> <div className="page-grid page-grid-home">
<ChartPane state={state} /> <ChartPane state={state} />
<EquitiesPane state={state} /> <EquitiesPane state={state} />
<NewsPane state={state} limit={6} />
<AlertsPane state={state} withStrip /> <AlertsPane state={state} withStrip />
</div> </div>
</PageFrame> </PageFrame>
); );
} }
export function NewsRoute() {
const state = useTerminal();
return (
<PageFrame title="News">
<div className="page-grid page-grid-news">
<NewsPane state={state} className="news-pane-full" />
</div>
</PageFrame>
);
}
export function TapeRoute() { export function TapeRoute() {
const state = useTerminal(); const state = useTerminal();
return ( return (

View file

@ -121,6 +121,17 @@
"zod": "^3.23.8", "zod": "^3.23.8",
}, },
}, },
"services/ingest-news": {
"name": "@islandflow/ingest-news",
"dependencies": {
"@islandflow/bus": "workspace:*",
"@islandflow/config": "workspace:*",
"@islandflow/observability": "workspace:*",
"@islandflow/types": "workspace:*",
"ws": "^8.18.3",
"zod": "^3.23.8",
},
},
"services/ingest-options": { "services/ingest-options": {
"name": "@islandflow/ingest-options", "name": "@islandflow/ingest-options",
"dependencies": { "dependencies": {
@ -250,6 +261,8 @@
"@islandflow/ingest-equities": ["@islandflow/ingest-equities@workspace:services/ingest-equities"], "@islandflow/ingest-equities": ["@islandflow/ingest-equities@workspace:services/ingest-equities"],
"@islandflow/ingest-news": ["@islandflow/ingest-news@workspace:services/ingest-news"],
"@islandflow/ingest-options": ["@islandflow/ingest-options@workspace:services/ingest-options"], "@islandflow/ingest-options": ["@islandflow/ingest-options@workspace:services/ingest-options"],
"@islandflow/observability": ["@islandflow/observability@workspace:packages/observability"], "@islandflow/observability": ["@islandflow/observability@workspace:packages/observability"],

View file

@ -31,6 +31,7 @@ COPY --from=services candles/package.json ./services/candles/package.json
COPY --from=services compute/package.json ./services/compute/package.json COPY --from=services compute/package.json ./services/compute/package.json
COPY --from=services eod-enricher/package.json ./services/eod-enricher/package.json COPY --from=services eod-enricher/package.json ./services/eod-enricher/package.json
COPY --from=services ingest-equities/package.json ./services/ingest-equities/package.json COPY --from=services ingest-equities/package.json ./services/ingest-equities/package.json
COPY --from=services ingest-news/package.json ./services/ingest-news/package.json
COPY --from=services ingest-options/package.json ./services/ingest-options/package.json COPY --from=services ingest-options/package.json ./services/ingest-options/package.json
COPY --from=services ingest-options/py/requirements.txt ./services/ingest-options/py/requirements.txt COPY --from=services ingest-options/py/requirements.txt ./services/ingest-options/py/requirements.txt
COPY --from=services refdata/package.json ./services/refdata/package.json COPY --from=services refdata/package.json ./services/refdata/package.json

View file

@ -24,6 +24,7 @@ COPY --from=services candles/package.json ./services/candles/package.json
COPY --from=services compute/package.json ./services/compute/package.json COPY --from=services compute/package.json ./services/compute/package.json
COPY --from=services eod-enricher/package.json ./services/eod-enricher/package.json COPY --from=services eod-enricher/package.json ./services/eod-enricher/package.json
COPY --from=services ingest-equities/package.json ./services/ingest-equities/package.json COPY --from=services ingest-equities/package.json ./services/ingest-equities/package.json
COPY --from=services ingest-news/package.json ./services/ingest-news/package.json
COPY --from=services ingest-options/package.json ./services/ingest-options/package.json COPY --from=services ingest-options/package.json ./services/ingest-options/package.json
COPY --from=services refdata/package.json ./services/refdata/package.json COPY --from=services refdata/package.json ./services/refdata/package.json
COPY --from=services replay/package.json ./services/replay/package.json COPY --from=services replay/package.json ./services/replay/package.json

View file

@ -30,6 +30,7 @@ COPY --from=services candles/package.json ./services/candles/package.json
COPY --from=services compute/package.json ./services/compute/package.json COPY --from=services compute/package.json ./services/compute/package.json
COPY --from=services eod-enricher/package.json ./services/eod-enricher/package.json COPY --from=services eod-enricher/package.json ./services/eod-enricher/package.json
COPY --from=services ingest-equities/package.json ./services/ingest-equities/package.json COPY --from=services ingest-equities/package.json ./services/ingest-equities/package.json
COPY --from=services ingest-news/package.json ./services/ingest-news/package.json
COPY --from=services ingest-options/package.json ./services/ingest-options/package.json COPY --from=services ingest-options/package.json ./services/ingest-options/package.json
COPY --from=services refdata/package.json ./services/refdata/package.json COPY --from=services refdata/package.json ./services/refdata/package.json
COPY --from=services replay/package.json ./services/replay/package.json COPY --from=services replay/package.json ./services/replay/package.json

View file

@ -115,6 +115,10 @@ services:
<<: *service-common <<: *service-common
command: ["services/ingest-equities/src/index.ts"] command: ["services/ingest-equities/src/index.ts"]
ingest-news:
<<: *service-common
command: ["services/ingest-news/src/index.ts"]
replay: replay:
<<: *service-common <<: *service-common
profiles: ["replay"] profiles: ["replay"]

View file

@ -121,6 +121,17 @@
"zod": "^3.23.8", "zod": "^3.23.8",
}, },
}, },
"services/ingest-news": {
"name": "@islandflow/ingest-news",
"dependencies": {
"@islandflow/bus": "workspace:*",
"@islandflow/config": "workspace:*",
"@islandflow/observability": "workspace:*",
"@islandflow/types": "workspace:*",
"ws": "^8.18.3",
"zod": "^3.23.8",
},
},
"services/ingest-options": { "services/ingest-options": {
"name": "@islandflow/ingest-options", "name": "@islandflow/ingest-options",
"dependencies": { "dependencies": {
@ -250,6 +261,8 @@
"@islandflow/ingest-equities": ["@islandflow/ingest-equities@workspace:services/ingest-equities"], "@islandflow/ingest-equities": ["@islandflow/ingest-equities@workspace:services/ingest-equities"],
"@islandflow/ingest-news": ["@islandflow/ingest-news@workspace:services/ingest-news"],
"@islandflow/ingest-options": ["@islandflow/ingest-options@workspace:services/ingest-options"], "@islandflow/ingest-options": ["@islandflow/ingest-options@workspace:services/ingest-options"],
"@islandflow/observability": ["@islandflow/observability@workspace:packages/observability"], "@islandflow/observability": ["@islandflow/observability@workspace:packages/observability"],

View file

@ -0,0 +1,152 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>Turn Report: News Wire View via Alpaca Feed</title>
<style>
:root {
color-scheme: dark;
--bg: #0b1016;
--panel: #111820;
--panel-2: #0d141b;
--border: rgba(255, 255, 255, 0.08);
--text: #e6edf4;
--dim: #90a0b2;
--accent: #f5a623;
}
body {
margin: 0;
padding: 32px;
background: linear-gradient(180deg, #06080b 0%, #0b1016 100%);
color: var(--text);
font: 15px/1.6 "IBM Plex Sans", sans-serif;
}
main {
max-width: 980px;
margin: 0 auto;
background: var(--panel);
border: 1px solid var(--border);
border-radius: 16px;
padding: 28px;
}
h1, h2 {
margin: 0 0 12px;
font-family: "Quantico", sans-serif;
letter-spacing: 0.06em;
}
h1 { font-size: 1.8rem; }
h2 { font-size: 1rem; margin-top: 28px; }
p, li { color: var(--text); }
.summary {
padding: 16px 18px;
border: 1px solid rgba(245, 166, 35, 0.28);
border-radius: 12px;
background: rgba(245, 166, 35, 0.08);
}
.meta, code, pre { font-family: "IBM Plex Mono", monospace; }
.meta { color: var(--dim); font-size: 0.85rem; }
section {
padding-top: 4px;
border-top: 1px solid var(--border);
}
section:first-of-type { border-top: 0; }
ul { padding-left: 18px; }
pre {
overflow: auto;
padding: 14px;
border-radius: 12px;
background: var(--panel-2);
border: 1px solid var(--border);
}
a { color: var(--accent); }
</style>
</head>
<body>
<main>
<p class="meta">Created 2026-05-18 · Task: News Wire View via Alpaca Feed</p>
<h1>News Wire View via Alpaca Feed</h1>
<div class="summary">
<strong>Summary</strong>
<p>
Added an Alpaca-backed live news pipeline end to end: normalized <code>NewsStory</code> types,
a dedicated JetStream subject/stream, ClickHouse storage helpers with latest-revision semantics,
a new <code>services/ingest-news</code> service, API endpoints and live fanout, and a web
<code>/news</code> route plus Home preview with a right-side story drawer.
</p>
</div>
<section>
<h2>Changes Made</h2>
<ul>
<li>Added <code>NewsStorySchema</code>, the <code>news</code> live channel, and subscription parsing support in <code>packages/types</code>.</li>
<li>Added bus constants for the <code>flow.news</code> subject and <code>NEWS</code> stream.</li>
<li>Added ClickHouse news storage helpers, including recent, before-cursor, and after-cursor queries that collapse provider revisions to the latest row per <code>provider + story_id</code>.</li>
<li>Created <code>services/ingest-news</code> with Alpaca REST backfill, Alpaca websocket streaming, normalization, and deterministic ticker resolution.</li>
<li>Extended the API service to persist live news in the shared cache, expose <code>GET /news</code> and <code>GET /history/news</code>, and fan out <code>news</code> events on <code>/ws/live</code>.</li>
<li>Added a top-level <code>/news</code> route, primary nav entry, Home preview pane, replay-mode live-only empty states, and a sanitized full-story drawer.</li>
<li>Updated dev and deployment wiring so the new service is included in local runners and the Docker workspace snapshot.</li>
</ul>
</section>
<section>
<h2>Context</h2>
<p>
The plan called for a free-provider v1 news surface that behaves like the rest of Islandflow:
compact, evidence-first, and live-native. The implementation keeps replay intentionally out of scope
for news while still integrating news into the same live manifest, history pagination, rail navigation,
and drawer language used elsewhere in the terminal.
</p>
</section>
<section>
<h2>Important Implementation Details</h2>
<ul>
<li>Ticker resolution prefers provider symbols first, then falls back only to structured patterns in provider HTML: ticker anchors, <code>EXCHANGE:SYM</code>, and <code>$SYM</code>.</li>
<li>News history uses <code>published_ts</code> as the visible cursor while revisions are collapsed with a window function over <code>provider, story_id</code> ordered by <code>updated_ts</code>, <code>ingest_ts</code>, and <code>seq</code>.</li>
<li>The web drawer sanitizes provider HTML by removing scripts, inline event handlers, and unsupported tags; if sanitization yields nothing useful, the drawer falls back to stripped plain text.</li>
<li>Replay mode intentionally renders a clear empty state for news on both Home and <code>/news</code> instead of pretending news is replay-synced.</li>
</ul>
<pre><code>resolved_symbols = provider_symbols
or ticker anchors in content_html
or EXCHANGE:SYM matches
or $SYM matches</code></pre>
</section>
<section>
<h2>Expected Impact for End-Users</h2>
<p>
Traders can now monitor a dedicated live news wire inside Islandflow, spot symbol-linked headlines from
the Home view, and open full stories in-context without leaving the app. The displayed ticker chips are
grounded in stored provider and derived symbol metadata, which makes the feed safer to filter and trust.
</p>
</section>
<section>
<h2>Validation</h2>
<ul>
<li>Ran targeted Bun tests covering types, storage, API live-state behavior, ingest-news symbol resolution, route wiring, and terminal helpers.</li>
<li>Built the Next.js web app with <code>bun --cwd=apps/web run build</code>.</li>
<li>Ran <code>bun run check:docker-workspace</code> after syncing the deployment workspace snapshot.</li>
</ul>
</section>
<section>
<h2>Issues, Limitations, and Mitigations</h2>
<ul>
<li>Replay support remains intentionally absent in v1; the UI now states that explicitly instead of showing misleading empty historical behavior.</li>
<li>The sanitizer is intentionally conservative and custom, which keeps dependencies light but may strip some harmless provider formatting.</li>
<li>The ingest service assumes Alpacas current REST and websocket news contracts; if Alpaca changes those payload shapes, the normalization layer will need adjustment.</li>
</ul>
</section>
<section>
<h2>Follow-up Work</h2>
<ul>
<li>No additional follow-up issue was required during this turn.</li>
<li>Future extensions are still available behind the same contract: multi-provider aggregation, server-side symbol filtering, and replay-aware news history.</li>
</ul>
</section>
</main>
</body>
</html>

View file

@ -7,6 +7,7 @@ import {
STREAM_EQUITY_QUOTES, STREAM_EQUITY_QUOTES,
STREAM_FLOW_PACKETS, STREAM_FLOW_PACKETS,
STREAM_INFERRED_DARK, STREAM_INFERRED_DARK,
STREAM_NEWS,
STREAM_OPTION_NBBO, STREAM_OPTION_NBBO,
STREAM_OPTION_PRINTS, STREAM_OPTION_PRINTS,
STREAM_OPTION_SIGNAL_PRINTS, STREAM_OPTION_SIGNAL_PRINTS,
@ -19,6 +20,7 @@ import {
SUBJECT_EQUITY_QUOTES, SUBJECT_EQUITY_QUOTES,
SUBJECT_FLOW_PACKETS, SUBJECT_FLOW_PACKETS,
SUBJECT_INFERRED_DARK, SUBJECT_INFERRED_DARK,
SUBJECT_NEWS,
SUBJECT_OPTION_NBBO, SUBJECT_OPTION_NBBO,
SUBJECT_OPTION_PRINTS, SUBJECT_OPTION_PRINTS,
SUBJECT_OPTION_SIGNAL_PRINTS, SUBJECT_OPTION_SIGNAL_PRINTS,
@ -53,7 +55,8 @@ export const STREAM_CATALOG: readonly KnownStreamDefinition[] = [
retentionClass: "derived" retentionClass: "derived"
}, },
{ name: STREAM_CLASSIFIER_HITS, subject: SUBJECT_CLASSIFIER_HITS, retentionClass: "derived" }, { name: STREAM_CLASSIFIER_HITS, subject: SUBJECT_CLASSIFIER_HITS, retentionClass: "derived" },
{ name: STREAM_ALERTS, subject: SUBJECT_ALERTS, retentionClass: "derived" } { name: STREAM_ALERTS, subject: SUBJECT_ALERTS, retentionClass: "derived" },
{ name: STREAM_NEWS, subject: SUBJECT_NEWS, retentionClass: "derived" }
]; ];
const STREAM_CATALOG_BY_NAME = new Map(STREAM_CATALOG.map((definition) => [definition.name, definition])); const STREAM_CATALOG_BY_NAME = new Map(STREAM_CATALOG.map((definition) => [definition.name, definition]));

View file

@ -22,3 +22,5 @@ export const STREAM_CLASSIFIER_HITS = "CLASSIFIER_HITS";
export const SUBJECT_CLASSIFIER_HITS = "flow.classifier_hits"; export const SUBJECT_CLASSIFIER_HITS = "flow.classifier_hits";
export const STREAM_ALERTS = "ALERTS"; export const STREAM_ALERTS = "ALERTS";
export const SUBJECT_ALERTS = "flow.alerts"; export const SUBJECT_ALERTS = "flow.alerts";
export const STREAM_NEWS = "NEWS";
export const SUBJECT_NEWS = "flow.news";

View file

@ -7,6 +7,7 @@ import {
EquityPrintJoinSchema, EquityPrintJoinSchema,
InferredDarkEventSchema, InferredDarkEventSchema,
FlowPacketSchema, FlowPacketSchema,
NewsStorySchema,
OptionNBBOSchema, OptionNBBOSchema,
OptionPrintSchema, OptionPrintSchema,
SmartMoneyEventSchema SmartMoneyEventSchema
@ -20,6 +21,7 @@ import type {
EquityPrintJoin, EquityPrintJoin,
InferredDarkEvent, InferredDarkEvent,
FlowPacket, FlowPacket,
NewsStory,
SmartMoneyEvent, SmartMoneyEvent,
OptionNBBO, OptionNBBO,
OptionPrint, OptionPrint,
@ -91,6 +93,13 @@ import {
toSmartMoneyEventRecord, toSmartMoneyEventRecord,
type SmartMoneyEventRecord type SmartMoneyEventRecord
} from "./smart-money-events"; } from "./smart-money-events";
import {
NEWS_TABLE,
newsTableDDL,
fromNewsRecord,
toNewsRecord,
type NewsRecord
} from "./news";
export type ClickHouseOptions = { export type ClickHouseOptions = {
url: string; url: string;
@ -320,6 +329,12 @@ export const ensureAlertsTable = async (client: ClickHouseClient): Promise<void>
} }
}; };
export const ensureNewsTable = async (client: ClickHouseClient): Promise<void> => {
await client.exec({
query: newsTableDDL()
});
};
export const insertOptionPrint = async ( export const insertOptionPrint = async (
client: ClickHouseClient, client: ClickHouseClient,
print: OptionPrint print: OptionPrint
@ -449,6 +464,15 @@ export const insertAlert = async (client: ClickHouseClient, alert: AlertEvent):
}); });
}; };
export const insertNewsStory = async (client: ClickHouseClient, story: NewsStory): Promise<void> => {
const record = toNewsRecord(story);
await client.insert({
table: NEWS_TABLE,
values: [record],
format: "JSONEachRow"
});
};
export type ClickHouseBatchWriterOptions = { export type ClickHouseBatchWriterOptions = {
flushIntervalMs?: number; flushIntervalMs?: number;
maxRows?: number; maxRows?: number;
@ -600,6 +624,13 @@ export const enqueueAlertInsert = (
writer.enqueue(ALERTS_TABLE, toAlertRecord(alert)); writer.enqueue(ALERTS_TABLE, toAlertRecord(alert));
}; };
export const enqueueNewsStoryInsert = (
writer: ClickHouseBatchWriter,
story: NewsStory
): void => {
writer.enqueue(NEWS_TABLE, toNewsRecord(story));
};
const clampLimit = (limit: number): number => { const clampLimit = (limit: number): number => {
if (!Number.isFinite(limit)) { if (!Number.isFinite(limit)) {
return 100; return 100;
@ -1016,6 +1047,32 @@ const normalizeAlertRow = (row: unknown): AlertRecord | null => {
}; };
}; };
const normalizeNewsRow = (row: unknown): NewsRecord | null => {
if (!row || typeof row !== "object") {
return null;
}
const record = row as Record<string, unknown>;
return {
source_ts: coerceNumber(record.source_ts) as number,
ingest_ts: coerceNumber(record.ingest_ts) as number,
seq: coerceNumber(record.seq) as number,
trace_id: String(record.trace_id ?? ""),
story_id: coerceNumber(record.story_id) as number,
provider: String(record.provider ?? ""),
source: String(record.source ?? ""),
headline: String(record.headline ?? ""),
summary: String(record.summary ?? ""),
content_html: String(record.content_html ?? ""),
url: String(record.url ?? ""),
published_ts: coerceNumber(record.published_ts) as number,
updated_ts: coerceNumber(record.updated_ts) as number,
provider_symbols_json: String(record.provider_symbols_json ?? "[]"),
resolved_symbols_json: String(record.resolved_symbols_json ?? "[]"),
symbol_resolution: String(record.symbol_resolution ?? "none") as NewsRecord["symbol_resolution"]
};
};
export const fetchRecentOptionPrints = async ( export const fetchRecentOptionPrints = async (
client: ClickHouseClient, client: ClickHouseClient,
limit: number, limit: number,
@ -1207,6 +1264,50 @@ export const fetchRecentAlerts = async (
return AlertEventSchema.array().parse(alerts); return AlertEventSchema.array().parse(alerts);
}; };
const latestNewsSelect = `
SELECT
source_ts,
ingest_ts,
seq,
trace_id,
story_id,
provider,
source,
headline,
summary,
content_html,
url,
published_ts,
updated_ts,
provider_symbols_json,
resolved_symbols_json,
symbol_resolution
FROM (
SELECT
*,
row_number() OVER (PARTITION BY provider, story_id ORDER BY updated_ts DESC, ingest_ts DESC, seq DESC) AS revision_rank
FROM ${NEWS_TABLE}
)
WHERE revision_rank = 1
`;
export const fetchRecentNews = async (
client: ClickHouseClient,
limit: number
): Promise<NewsStory[]> => {
const safeLimit = clampLimit(limit);
const result = await client.query({
query: `${latestNewsSelect} ORDER BY published_ts DESC, story_id DESC LIMIT ${safeLimit}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const records = rows
.map(normalizeNewsRow)
.filter((record): record is NewsRecord => record !== null);
return NewsStorySchema.array().parse(records.map(fromNewsRecord));
};
const normalizeAlertEvidenceRefs = (refs: string[]): string[] => { const normalizeAlertEvidenceRefs = (refs: string[]): string[] => {
return Array.from(new Set(refs.map((ref) => ref.trim()).filter(Boolean))); return Array.from(new Set(refs.map((ref) => ref.trim()).filter(Boolean)));
}; };
@ -1600,6 +1701,27 @@ export const fetchAlertsAfter = async (
return AlertEventSchema.array().parse(alerts); return AlertEventSchema.array().parse(alerts);
}; };
export const fetchNewsAfter = async (
client: ClickHouseClient,
afterTs: number,
afterSeq: number,
limit: number
): Promise<NewsStory[]> => {
const safeLimit = clampLimit(limit);
const safeAfterTs = clampCursor(afterTs);
const safeAfterSeq = clampCursor(afterSeq);
const result = await client.query({
query: `${latestNewsSelect} AND (published_ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY published_ts ASC, seq ASC LIMIT ${safeLimit}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const records = rows
.map(normalizeNewsRow)
.filter((record): record is NewsRecord => record !== null);
return NewsStorySchema.array().parse(records.map(fromNewsRecord));
};
export const fetchOptionPrintsBefore = async ( export const fetchOptionPrintsBefore = async (
client: ClickHouseClient, client: ClickHouseClient,
beforeTs: number, beforeTs: number,
@ -1778,6 +1900,25 @@ export const fetchAlertsBefore = async (
return AlertEventSchema.array().parse(records.map(fromAlertRecord)); return AlertEventSchema.array().parse(records.map(fromAlertRecord));
}; };
export const fetchNewsBefore = async (
client: ClickHouseClient,
beforeTs: number,
beforeSeq: number,
limit: number
): Promise<NewsStory[]> => {
const safeLimit = clampLimit(limit);
const result = await client.query({
query: `${latestNewsSelect} AND ${buildBeforeTupleCondition("published_ts", "seq", beforeTs, beforeSeq)} ORDER BY published_ts DESC, seq DESC LIMIT ${safeLimit}`,
format: "JSONEachRow"
});
const rows = await result.json<unknown[]>();
const records = rows
.map(normalizeNewsRow)
.filter((record): record is NewsRecord => record !== null);
return NewsStorySchema.array().parse(records.map(fromNewsRecord));
};
export const fetchInferredDarkBefore = async ( export const fetchInferredDarkBefore = async (
client: ClickHouseClient, client: ClickHouseClient,
beforeTs: number, beforeTs: number,

View file

@ -10,3 +10,4 @@ export * from "./equity-print-joins";
export * from "./inferred-dark"; export * from "./inferred-dark";
export * from "./option-prints"; export * from "./option-prints";
export * from "./option-nbbo"; export * from "./option-nbbo";
export * from "./news";

View file

@ -0,0 +1,102 @@
import type { NewsStory, NewsSymbolResolution } from "@islandflow/types";
export const NEWS_TABLE = "news";
export type NewsRecord = {
source_ts: number;
ingest_ts: number;
seq: number;
trace_id: string;
story_id: number;
provider: string;
source: string;
headline: string;
summary: string;
content_html: string;
url: string;
published_ts: number;
updated_ts: number;
provider_symbols_json: string;
resolved_symbols_json: string;
symbol_resolution: NewsSymbolResolution;
};
export const newsTableDDL = (): string => {
return `
CREATE TABLE IF NOT EXISTS ${NEWS_TABLE} (
source_ts UInt64,
ingest_ts UInt64,
seq UInt64,
trace_id String,
story_id UInt64,
provider String,
source String,
headline String,
summary String,
content_html String,
url String,
published_ts UInt64,
updated_ts UInt64,
provider_symbols_json String,
resolved_symbols_json String,
symbol_resolution String
)
ENGINE = ReplacingMergeTree(updated_ts)
ORDER BY (provider, story_id, updated_ts, seq)
`;
};
const safeStringArray = (value: string): string[] => {
try {
const parsed = JSON.parse(value);
if (Array.isArray(parsed)) {
return parsed.map((entry) => String(entry));
}
} catch {
// ignore
}
return [];
};
export const toNewsRecord = (story: NewsStory): NewsRecord => {
return {
source_ts: story.source_ts,
ingest_ts: story.ingest_ts,
seq: story.seq,
trace_id: story.trace_id,
story_id: story.story_id,
provider: story.provider,
source: story.source,
headline: story.headline,
summary: story.summary,
content_html: story.content_html,
url: story.url,
published_ts: story.published_ts,
updated_ts: story.updated_ts,
provider_symbols_json: JSON.stringify(story.provider_symbols),
resolved_symbols_json: JSON.stringify(story.resolved_symbols),
symbol_resolution: story.symbol_resolution
};
};
export const fromNewsRecord = (record: NewsRecord): NewsStory => {
return {
source_ts: record.source_ts,
ingest_ts: record.ingest_ts,
seq: record.seq,
trace_id: record.trace_id,
story_id: record.story_id,
provider: record.provider,
source: record.source,
headline: record.headline,
summary: record.summary,
content_html: record.content_html,
url: record.url,
published_ts: record.published_ts,
updated_ts: record.updated_ts,
provider_symbols: safeStringArray(record.provider_symbols_json),
resolved_symbols: safeStringArray(record.resolved_symbols_json),
symbol_resolution: record.symbol_resolution
};
};

View file

@ -0,0 +1,78 @@
import { describe, expect, it } from "bun:test";
import type { ClickHouseClient } from "../src/clickhouse";
import {
NEWS_TABLE,
fromNewsRecord,
newsTableDDL,
toNewsRecord
} from "../src/news";
import {
fetchNewsAfter,
fetchNewsBefore,
fetchRecentNews
} from "../src/clickhouse";
const makeClient = (resolver: (query: string) => unknown[]): ClickHouseClient =>
({
exec: async () => {},
insert: async () => {},
ping: async () => ({ success: true }),
close: async () => {},
query: async ({ query }: { query: string }) => ({
async json<T>() {
return resolver(query) as T;
}
})
}) as ClickHouseClient;
const story = {
source_ts: 100,
ingest_ts: 101,
seq: 3,
trace_id: "alpaca:77",
story_id: 77,
provider: "alpaca",
source: "Benzinga",
headline: "TSLA rises",
summary: "Summary",
content_html: "<p>TSLA rises</p>",
url: "https://example.com/story",
published_ts: 100,
updated_ts: 120,
provider_symbols: ["TSLA"],
resolved_symbols: ["TSLA", "AAPL"],
symbol_resolution: "mixed" as const
};
describe("news storage helpers", () => {
it("includes the correct table name in the DDL", () => {
const ddl = newsTableDDL();
expect(ddl).toContain(NEWS_TABLE);
expect(ddl).toContain("ReplacingMergeTree");
});
it("round-trips news records", () => {
const record = toNewsRecord(story);
const restored = fromNewsRecord(record);
expect(restored).toEqual(story);
});
it("uses latest-revision selection for recent and cursor queries", async () => {
const queries: string[] = [];
const client = makeClient((query) => {
queries.push(query);
return [toNewsRecord(story)];
});
const recent = await fetchRecentNews(client, 10);
const before = await fetchNewsBefore(client, 200, 10, 10);
const after = await fetchNewsAfter(client, 50, 1, 10);
expect(recent[0]?.trace_id).toBe("alpaca:77");
expect(before[0]?.story_id).toBe(77);
expect(after[0]?.updated_ts).toBe(120);
expect(queries[0]).toContain("row_number() OVER");
expect(queries[1]).toContain("published_ts");
expect(queries[2]).toContain("(published_ts, seq) > (50, 1)");
});
});

View file

@ -262,3 +262,26 @@ export const InferredDarkEventSchema = EventMetaSchema.merge(
); );
export type InferredDarkEvent = z.infer<typeof InferredDarkEventSchema>; export type InferredDarkEvent = z.infer<typeof InferredDarkEventSchema>;
export const NewsSymbolResolutionSchema = z.enum(["provider", "derived", "mixed", "none"]);
export type NewsSymbolResolution = z.infer<typeof NewsSymbolResolutionSchema>;
export const NewsStorySchema = EventMetaSchema.merge(
z.object({
story_id: z.number().int().nonnegative(),
provider: z.string().min(1),
source: z.string().min(1),
headline: z.string().min(1),
summary: z.string(),
content_html: z.string(),
url: z.string().url().or(z.literal("")),
published_ts: z.number().int().nonnegative(),
updated_ts: z.number().int().nonnegative(),
provider_symbols: z.array(z.string().min(1)),
resolved_symbols: z.array(z.string().min(1)),
symbol_resolution: NewsSymbolResolutionSchema
})
);
export type NewsStory = z.infer<typeof NewsStorySchema>;

View file

@ -8,6 +8,7 @@ import {
EquityQuoteSchema, EquityQuoteSchema,
FlowPacketSchema, FlowPacketSchema,
InferredDarkEventSchema, InferredDarkEventSchema,
NewsStorySchema,
OptionNBBOSchema, OptionNBBOSchema,
OptionPrintSchema, OptionPrintSchema,
SmartMoneyEventSchema SmartMoneyEventSchema
@ -34,7 +35,8 @@ export const LiveGenericChannelSchema = z.enum([
"smart-money", "smart-money",
"classifier-hits", "classifier-hits",
"alerts", "alerts",
"inferred-dark" "inferred-dark",
"news"
]); ]);
export const LiveChannelSchema = z.enum([ export const LiveChannelSchema = z.enum([
@ -48,6 +50,7 @@ export const LiveChannelSchema = z.enum([
"classifier-hits", "classifier-hits",
"alerts", "alerts",
"inferred-dark", "inferred-dark",
"news",
"equity-candles", "equity-candles",
"equity-overlay" "equity-overlay"
]); ]);
@ -91,7 +94,7 @@ export const LiveSubscriptionSchema = z.discriminatedUnion("channel", [
snapshot_limit: z.number().int().positive().optional() snapshot_limit: z.number().int().positive().optional()
}), }),
z.object({ z.object({
channel: z.enum(["nbbo", "equity-quotes", "equity-joins", "classifier-hits", "alerts", "inferred-dark"]), channel: z.enum(["nbbo", "equity-quotes", "equity-joins", "classifier-hits", "alerts", "inferred-dark", "news"]),
snapshot_limit: z.number().int().positive().optional() snapshot_limit: z.number().int().positive().optional()
}), }),
z.object({ z.object({
@ -123,6 +126,7 @@ const livePayloadSchemas = {
"classifier-hits": ClassifierHitEventSchema, "classifier-hits": ClassifierHitEventSchema,
alerts: AlertEventSchema, alerts: AlertEventSchema,
"inferred-dark": InferredDarkEventSchema, "inferred-dark": InferredDarkEventSchema,
news: NewsStorySchema,
"equity-candles": EquityCandleSchema, "equity-candles": EquityCandleSchema,
"equity-overlay": EquityPrintSchema "equity-overlay": EquityPrintSchema
} as const; } as const;

View file

@ -9,6 +9,7 @@ import {
describe("live protocol types", () => { describe("live protocol types", () => {
it("builds stable keys for generic and parameterized subscriptions", () => { it("builds stable keys for generic and parameterized subscriptions", () => {
expect(getSubscriptionKey({ channel: "flow" })).toBe("flow|{}"); expect(getSubscriptionKey({ channel: "flow" })).toBe("flow|{}");
expect(getSubscriptionKey({ channel: "news" })).toBe("news");
expect( expect(
getSubscriptionKey({ getSubscriptionKey({
channel: "options", channel: "options",
@ -53,12 +54,13 @@ describe("live protocol types", () => {
op: "subscribe", op: "subscribe",
subscriptions: [ subscriptions: [
{ channel: "flow", filters: { nbboSides: ["AA", "A"], minNotional: 50000 } }, { channel: "flow", filters: { nbboSides: ["AA", "A"], minNotional: 50000 } },
{ channel: "news", snapshot_limit: 100 },
{ channel: "equity-candles", underlying_id: "SPY", interval_ms: 60000 } { channel: "equity-candles", underlying_id: "SPY", interval_ms: 60000 }
] ]
}); });
expect(parsed.op).toBe("subscribe"); expect(parsed.op).toBe("subscribe");
expect(parsed.subscriptions).toHaveLength(2); expect(parsed.subscriptions).toHaveLength(3);
}); });
it("validates snapshot and event server messages", () => { it("validates snapshot and event server messages", () => {
@ -74,18 +76,24 @@ describe("live protocol types", () => {
}); });
const event = LiveServerMessageSchema.parse({ const event = LiveServerMessageSchema.parse({
op: "event", op: "event",
subscription: { channel: "equity-overlay", underlying_id: "SPY" }, subscription: { channel: "news" },
item: { item: {
source_ts: 100, source_ts: 100,
ingest_ts: 101, ingest_ts: 101,
seq: 1, seq: 1,
trace_id: "eq-1", trace_id: "alpaca:1",
ts: 100, story_id: 1,
underlying_id: "SPY", provider: "alpaca",
price: 500, source: "Benzinga",
size: 10, headline: "TSLA rises",
exchange: "X", summary: "",
offExchangeFlag: true content_html: "<p>TSLA rises</p>",
url: "https://example.com/story",
published_ts: 100,
updated_ts: 100,
provider_symbols: ["TSLA"],
resolved_symbols: ["TSLA"],
symbol_resolution: "provider"
}, },
watermark: cursor watermark: cursor
}); });

View file

@ -48,7 +48,8 @@ const NATIVE_UNITS = {
ingestOptions: ingestOptions:
process.env.DEPLOY_NATIVE_INGEST_OPTIONS_UNIT?.trim() || "islandflow-ingest-options", process.env.DEPLOY_NATIVE_INGEST_OPTIONS_UNIT?.trim() || "islandflow-ingest-options",
ingestEquities: ingestEquities:
process.env.DEPLOY_NATIVE_INGEST_EQUITIES_UNIT?.trim() || "islandflow-ingest-equities" process.env.DEPLOY_NATIVE_INGEST_EQUITIES_UNIT?.trim() || "islandflow-ingest-equities",
ingestNews: process.env.DEPLOY_NATIVE_INGEST_NEWS_UNIT?.trim() || "islandflow-ingest-news"
} as const; } as const;
const DOCKER_CORE_SERVICES = [ const DOCKER_CORE_SERVICES = [
"api", "api",
@ -56,14 +57,16 @@ const DOCKER_CORE_SERVICES = [
"compute", "compute",
"candles", "candles",
"ingest-options", "ingest-options",
"ingest-equities" "ingest-equities",
"ingest-news"
] as const; ] as const;
const DOCKER_BACKEND_SERVICES = [ const DOCKER_BACKEND_SERVICES = [
"api", "api",
"compute", "compute",
"candles", "candles",
"ingest-options", "ingest-options",
"ingest-equities" "ingest-equities",
"ingest-news"
] as const; ] as const;
const scriptPath = fileURLToPath(import.meta.url); const scriptPath = fileURLToPath(import.meta.url);
@ -106,7 +109,8 @@ Environment:
DEPLOY_NATIVE_COMPUTE_UNIT Override native compute systemd unit name. DEPLOY_NATIVE_COMPUTE_UNIT Override native compute systemd unit name.
DEPLOY_NATIVE_CANDLES_UNIT Override native candles systemd unit name. DEPLOY_NATIVE_CANDLES_UNIT Override native candles systemd unit name.
DEPLOY_NATIVE_INGEST_OPTIONS_UNIT Override native ingest-options systemd unit name. DEPLOY_NATIVE_INGEST_OPTIONS_UNIT Override native ingest-options systemd unit name.
DEPLOY_NATIVE_INGEST_EQUITIES_UNIT Override native ingest-equities systemd unit name.`); DEPLOY_NATIVE_INGEST_EQUITIES_UNIT Override native ingest-equities systemd unit name.
DEPLOY_NATIVE_INGEST_NEWS_UNIT Override native ingest-news systemd unit name.`);
process.exit(exitCode); process.exit(exitCode);
} }
@ -465,7 +469,8 @@ function nativeUnitsForScope(scope: DeployScope): string[] {
NATIVE_UNITS.compute, NATIVE_UNITS.compute,
NATIVE_UNITS.candles, NATIVE_UNITS.candles,
NATIVE_UNITS.ingestOptions, NATIVE_UNITS.ingestOptions,
NATIVE_UNITS.ingestEquities NATIVE_UNITS.ingestEquities,
NATIVE_UNITS.ingestNews
]; ];
default: default:
return [ return [
@ -474,7 +479,8 @@ function nativeUnitsForScope(scope: DeployScope): string[] {
NATIVE_UNITS.compute, NATIVE_UNITS.compute,
NATIVE_UNITS.candles, NATIVE_UNITS.candles,
NATIVE_UNITS.ingestOptions, NATIVE_UNITS.ingestOptions,
NATIVE_UNITS.ingestEquities NATIVE_UNITS.ingestEquities,
NATIVE_UNITS.ingestNews
]; ];
} }
} }

View file

@ -222,6 +222,7 @@ process.on("SIGHUP", () => handleSignal("SIGHUP"));
const tasks: ChildSpec[] = [ const tasks: ChildSpec[] = [
{ name: "ingest-options", cmd: ["bun", "run", "dev"], cwd: "services/ingest-options" }, { name: "ingest-options", cmd: ["bun", "run", "dev"], cwd: "services/ingest-options" },
{ name: "ingest-equities", cmd: ["bun", "run", "dev"], cwd: "services/ingest-equities" }, { name: "ingest-equities", cmd: ["bun", "run", "dev"], cwd: "services/ingest-equities" },
{ name: "ingest-news", cmd: ["bun", "run", "dev"], cwd: "services/ingest-news" },
{ name: "compute", cmd: ["bun", "run", "dev"], cwd: "services/compute" }, { name: "compute", cmd: ["bun", "run", "dev"], cwd: "services/compute" },
{ name: "candles", cmd: ["bun", "run", "dev"], cwd: "services/candles" }, { name: "candles", cmd: ["bun", "run", "dev"], cwd: "services/candles" },
{ name: "refdata", cmd: ["bun", "run", "dev"], cwd: "services/refdata" }, { name: "refdata", cmd: ["bun", "run", "dev"], cwd: "services/refdata" },

View file

@ -325,6 +325,7 @@ const serviceTasks: ChildSpec[] = [
{ name: "web", cmd: ["bun", "run", "dev"], cwd: "apps/web" }, { name: "web", cmd: ["bun", "run", "dev"], cwd: "apps/web" },
{ name: "ingest-options", cmd: ["bun", "run", "dev"], cwd: "services/ingest-options" }, { name: "ingest-options", cmd: ["bun", "run", "dev"], cwd: "services/ingest-options" },
{ name: "ingest-equities", cmd: ["bun", "run", "dev"], cwd: "services/ingest-equities" }, { name: "ingest-equities", cmd: ["bun", "run", "dev"], cwd: "services/ingest-equities" },
{ name: "ingest-news", cmd: ["bun", "run", "dev"], cwd: "services/ingest-news" },
{ name: "compute", cmd: ["bun", "run", "dev"], cwd: "services/compute" }, { name: "compute", cmd: ["bun", "run", "dev"], cwd: "services/compute" },
{ name: "candles", cmd: ["bun", "run", "dev"], cwd: "services/candles" }, { name: "candles", cmd: ["bun", "run", "dev"], cwd: "services/candles" },
{ name: "refdata", cmd: ["bun", "run", "dev"], cwd: "services/refdata" }, { name: "refdata", cmd: ["bun", "run", "dev"], cwd: "services/refdata" },

View file

@ -9,6 +9,7 @@ import {
SUBJECT_EQUITY_QUOTES, SUBJECT_EQUITY_QUOTES,
SUBJECT_INFERRED_DARK, SUBJECT_INFERRED_DARK,
SUBJECT_FLOW_PACKETS, SUBJECT_FLOW_PACKETS,
SUBJECT_NEWS,
SUBJECT_SMART_MONEY_EVENTS, SUBJECT_SMART_MONEY_EVENTS,
SUBJECT_OPTION_NBBO, SUBJECT_OPTION_NBBO,
SUBJECT_OPTION_SIGNAL_PRINTS, SUBJECT_OPTION_SIGNAL_PRINTS,
@ -20,6 +21,7 @@ import {
STREAM_EQUITY_QUOTES, STREAM_EQUITY_QUOTES,
STREAM_INFERRED_DARK, STREAM_INFERRED_DARK,
STREAM_FLOW_PACKETS, STREAM_FLOW_PACKETS,
STREAM_NEWS,
STREAM_SMART_MONEY_EVENTS, STREAM_SMART_MONEY_EVENTS,
STREAM_OPTION_NBBO, STREAM_OPTION_NBBO,
STREAM_OPTION_SIGNAL_PRINTS, STREAM_OPTION_SIGNAL_PRINTS,
@ -35,6 +37,7 @@ import {
import { import {
createClickHouseClient, createClickHouseClient,
ensureAlertsTable, ensureAlertsTable,
ensureNewsTable,
ensureClassifierHitsTable, ensureClassifierHitsTable,
ensureEquityCandlesTable, ensureEquityCandlesTable,
ensureEquityPrintJoinsTable, ensureEquityPrintJoinsTable,
@ -48,6 +51,8 @@ import {
fetchAlertsAfter, fetchAlertsAfter,
fetchAlertsBefore, fetchAlertsBefore,
fetchAlertContextByTraceId, fetchAlertContextByTraceId,
fetchNewsAfter,
fetchNewsBefore,
fetchClassifierHitsAfter, fetchClassifierHitsAfter,
fetchClassifierHitsBefore, fetchClassifierHitsBefore,
fetchSmartMoneyEventsAfter, fetchSmartMoneyEventsAfter,
@ -58,6 +63,7 @@ import {
fetchFlowPacketsByMemberTraceIds, fetchFlowPacketsByMemberTraceIds,
fetchFlowPacketsBefore, fetchFlowPacketsBefore,
fetchRecentAlerts, fetchRecentAlerts,
fetchRecentNews,
fetchRecentClassifierHits, fetchRecentClassifierHits,
fetchRecentSmartMoneyEvents, fetchRecentSmartMoneyEvents,
fetchRecentEquityPrintJoins, fetchRecentEquityPrintJoins,
@ -99,6 +105,7 @@ import {
EquityQuoteSchema, EquityQuoteSchema,
FeedSnapshot, FeedSnapshot,
InferredDarkEventSchema, InferredDarkEventSchema,
NewsStorySchema,
LiveClientMessageSchema, LiveClientMessageSchema,
LiveServerMessage, LiveServerMessage,
LiveSubscription, LiveSubscription,
@ -676,7 +683,8 @@ const run = async () => {
STREAM_FLOW_PACKETS, STREAM_FLOW_PACKETS,
STREAM_SMART_MONEY_EVENTS, STREAM_SMART_MONEY_EVENTS,
STREAM_CLASSIFIER_HITS, STREAM_CLASSIFIER_HITS,
STREAM_ALERTS STREAM_ALERTS,
STREAM_NEWS
], ],
{ logger } { logger }
); );
@ -719,6 +727,7 @@ const run = async () => {
await ensureSmartMoneyEventsTable(clickhouse); await ensureSmartMoneyEventsTable(clickhouse);
await ensureClassifierHitsTable(clickhouse); await ensureClassifierHitsTable(clickhouse);
await ensureAlertsTable(clickhouse); await ensureAlertsTable(clickhouse);
await ensureNewsTable(clickhouse);
}); });
let redis: ReturnType<typeof createClient> | null = null; let redis: ReturnType<typeof createClient> | null = null;
@ -843,6 +852,11 @@ const run = async () => {
subject: SUBJECT_ALERTS, subject: SUBJECT_ALERTS,
stream: STREAM_ALERTS, stream: STREAM_ALERTS,
durableName: "api-alerts" durableName: "api-alerts"
},
{
subject: SUBJECT_NEWS,
stream: STREAM_NEWS,
durableName: "api-news"
} }
] as const; ] as const;
@ -991,10 +1005,16 @@ const run = async () => {
consumerBindings[10].durableName consumerBindings[10].durableName
); );
const newsSubscription = await subscribeWithReset(
consumerBindings[11].subject,
consumerBindings[11].stream,
consumerBindings[11].durableName
);
const fanoutLive = async ( const fanoutLive = async (
subscription: LiveSubscription, subscription: LiveSubscription,
item: unknown, item: unknown,
ingestChannel: "options" | "nbbo" | "equities" | "equity-quotes" | "equity-candles" | "equity-overlay" | "equity-joins" | "flow" | "classifier-hits" | "alerts" | "inferred-dark" ingestChannel: "options" | "nbbo" | "equities" | "equity-quotes" | "equity-candles" | "equity-overlay" | "equity-joins" | "flow" | "classifier-hits" | "alerts" | "inferred-dark" | "news"
) => { ) => {
const watermark = await liveState.ingest(ingestChannel, item); const watermark = await liveState.ingest(ingestChannel, item);
@ -1252,6 +1272,21 @@ const run = async () => {
} }
}; };
const pumpNews = async () => {
for await (const msg of newsSubscription.messages) {
try {
const payload = NewsStorySchema.parse(newsSubscription.decode(msg));
await fanoutLive({ channel: "news" }, payload, "news");
msg.ack();
} catch (error) {
logger.error("failed to process news story", {
error: error instanceof Error ? error.message : String(error)
});
msg.term();
}
}
};
void pumpOptions(); void pumpOptions();
void pumpOptionNbbo(); void pumpOptionNbbo();
void pumpEquities(); void pumpEquities();
@ -1263,6 +1298,7 @@ const run = async () => {
void pumpSmartMoney(); void pumpSmartMoney();
void pumpClassifierHits(); void pumpClassifierHits();
void pumpAlerts(); void pumpAlerts();
void pumpNews();
const buildSyntheticStatusBody = () => { const buildSyntheticStatusBody = () => {
const derived = const derived =
@ -1490,6 +1526,12 @@ const run = async () => {
return jsonResponse({ data }); return jsonResponse({ data });
} }
if (req.method === "GET" && url.pathname === "/news") {
const limit = parseLimit(url.searchParams.get("limit") ?? "100");
const data = await fetchRecentNews(clickhouse, limit);
return jsonResponse({ data });
}
if (req.method === "GET" && isAlertContextPath(url.pathname)) { if (req.method === "GET" && isAlertContextPath(url.pathname)) {
try { try {
const traceId = parseAlertContextTraceIdPath(url.pathname); const traceId = parseAlertContextTraceIdPath(url.pathname);
@ -1607,6 +1649,14 @@ const run = async () => {
); );
} }
if (req.method === "GET" && url.pathname === "/history/news") {
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
const data = await fetchNewsBefore(clickhouse, beforeTs, beforeSeq, limit);
return jsonResponse(
buildHistoryResponse(data, (item) => ({ ts: item.published_ts, seq: item.seq }))
);
}
if (req.method === "GET" && /^\/flow\/packets\/[^/]+$/.test(url.pathname)) { if (req.method === "GET" && /^\/flow\/packets\/[^/]+$/.test(url.pathname)) {
const id = decodeURIComponent(url.pathname.slice("/flow/packets/".length)); const id = decodeURIComponent(url.pathname.slice("/flow/packets/".length));
const data = await fetchFlowPacketById(clickhouse, id); const data = await fetchFlowPacketById(clickhouse, id);

View file

@ -8,6 +8,7 @@ import {
fetchRecentEquityQuotes, fetchRecentEquityQuotes,
fetchRecentFlowPackets, fetchRecentFlowPackets,
fetchRecentInferredDark, fetchRecentInferredDark,
fetchRecentNews,
fetchRecentOptionNBBO, fetchRecentOptionNBBO,
fetchRecentSmartMoneyEvents, fetchRecentSmartMoneyEvents,
type ClickHouseClient type ClickHouseClient
@ -25,6 +26,7 @@ import {
FeedSnapshot, FeedSnapshot,
FlowPacketSchema, FlowPacketSchema,
InferredDarkEventSchema, InferredDarkEventSchema,
NewsStorySchema,
LiveChannelHealth, LiveChannelHealth,
LiveGenericChannel, LiveGenericChannel,
LiveHotChannel, LiveHotChannel,
@ -40,6 +42,7 @@ import {
type EquityCandle, type EquityCandle,
type EquityPrint, type EquityPrint,
type LiveChannel, type LiveChannel,
type NewsStory,
type OptionPrint type OptionPrint
} from "@islandflow/types"; } from "@islandflow/types";
import { createMetrics } from "@islandflow/observability"; import { createMetrics } from "@islandflow/observability";
@ -63,7 +66,8 @@ const GENERIC_LIMIT_ENV_KEYS: Record<LiveGenericChannel, string> = {
"smart-money": "LIVE_LIMIT_SMART_MONEY", "smart-money": "LIVE_LIMIT_SMART_MONEY",
"classifier-hits": "LIVE_LIMIT_CLASSIFIER_HITS", "classifier-hits": "LIVE_LIMIT_CLASSIFIER_HITS",
alerts: "LIVE_LIMIT_ALERTS", alerts: "LIVE_LIMIT_ALERTS",
"inferred-dark": "LIVE_LIMIT_INFERRED_DARK" "inferred-dark": "LIVE_LIMIT_INFERRED_DARK",
news: "LIVE_LIMIT_NEWS"
}; };
const CHART_LIMITS = { const CHART_LIMITS = {
@ -81,7 +85,8 @@ const DEFAULT_LIVE_LIMITS: GenericLiveLimits = {
"smart-money": 300, "smart-money": 300,
"classifier-hits": 300, "classifier-hits": 300,
alerts: 300, alerts: 300,
"inferred-dark": 300 "inferred-dark": 300,
news: 100
}; };
const DEFAULT_SCOPED_CACHE_MAX_KEYS = 32; const DEFAULT_SCOPED_CACHE_MAX_KEYS = 32;
@ -196,16 +201,28 @@ export const resolveGenericLiveLimits = (env: NodeJS.ProcessEnv = process.env):
env, env,
"inferred-dark", "inferred-dark",
env.LIVE_LIMIT_DEFAULT ? liveLimitDefault : DEFAULT_LIVE_LIMITS["inferred-dark"] env.LIVE_LIMIT_DEFAULT ? liveLimitDefault : DEFAULT_LIVE_LIMITS["inferred-dark"]
) ),
news: parseGenericLimit(env, "news", env.LIVE_LIMIT_DEFAULT ? liveLimitDefault : DEFAULT_LIVE_LIMITS.news)
}; };
}; };
const parsePositiveInt = (value: string | undefined, fallback: number): number => { const extractFreshnessTs = (channel: LiveGenericChannel, item: any): number | null => {
const parsed = Number(value); switch (channel) {
if (!Number.isFinite(parsed)) { case "options":
return fallback; case "nbbo":
case "equities":
case "equity-quotes":
return typeof item.ts === "number" ? item.ts : null;
case "flow":
case "classifier-hits":
case "alerts":
case "inferred-dark":
return typeof item.source_ts === "number" ? item.source_ts : null;
case "news":
return typeof item.published_ts === "number" ? item.published_ts : null;
default:
return null;
} }
return Math.max(1, Math.floor(parsed));
}; };
export const resolveLiveStateConfig = (env: NodeJS.ProcessEnv = process.env): LiveStateConfig => ({ export const resolveLiveStateConfig = (env: NodeJS.ProcessEnv = process.env): LiveStateConfig => ({
@ -217,6 +234,13 @@ export const resolveLiveStateConfig = (env: NodeJS.ProcessEnv = process.env): Li
), ),
redisFlushMaxItems: parsePositiveInt(env.LIVE_REDIS_FLUSH_MAX_ITEMS, DEFAULT_REDIS_FLUSH_MAX_ITEMS) redisFlushMaxItems: parsePositiveInt(env.LIVE_REDIS_FLUSH_MAX_ITEMS, DEFAULT_REDIS_FLUSH_MAX_ITEMS)
}); });
const parsePositiveInt = (value: string | undefined, fallback: number): number => {
const parsed = Number(value);
if (!Number.isFinite(parsed)) {
return fallback;
}
return Math.max(1, Math.floor(parsed));
};
type RedisLike = Pick< type RedisLike = Pick<
RedisClientType, RedisClientType,
@ -318,6 +342,14 @@ const getGenericConfig = (limits: GenericLiveLimits): {
parse: (value) => InferredDarkEventSchema.parse(value), parse: (value) => InferredDarkEventSchema.parse(value),
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }), cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
fetchRecent: fetchRecentInferredDark fetchRecent: fetchRecentInferredDark
},
news: {
redisKey: "live:news",
cursorField: "news",
limit: limits.news,
parse: (value) => NewsStorySchema.parse(value),
cursor: (item) => ({ ts: item.published_ts, seq: item.seq }),
fetchRecent: fetchRecentNews
} }
}); });
@ -371,23 +403,6 @@ const normalizeGenericItems = <T>(
return sortGenericItems(items, config.cursor).slice(0, config.limit); return sortGenericItems(items, config.cursor).slice(0, config.limit);
}; };
const extractFreshnessTs = (channel: LiveGenericChannel, item: any): number | null => {
switch (channel) {
case "options":
case "nbbo":
case "equities":
case "equity-quotes":
return typeof item.ts === "number" ? item.ts : null;
case "flow":
case "classifier-hits":
case "alerts":
case "inferred-dark":
return typeof item.source_ts === "number" ? item.source_ts : null;
default:
return null;
}
};
const isWithinLiveFeedLookback = ( const isWithinLiveFeedLookback = (
channel: LiveGenericChannel, channel: LiveGenericChannel,
item: unknown, item: unknown,

View file

@ -0,0 +1,16 @@
{
"name": "@islandflow/ingest-news",
"private": true,
"type": "module",
"scripts": {
"dev": "bun run src/index.ts"
},
"dependencies": {
"@islandflow/bus": "workspace:*",
"@islandflow/config": "workspace:*",
"@islandflow/observability": "workspace:*",
"@islandflow/types": "workspace:*",
"ws": "^8.18.3",
"zod": "^3.23.8"
}
}

View file

@ -0,0 +1,216 @@
import { readEnv } from "@islandflow/config";
import { createLogger } from "@islandflow/observability";
import {
SUBJECT_NEWS,
STREAM_NEWS,
connectJetStreamWithRetry,
ensureKnownStreams,
publishJson
} from "@islandflow/bus";
import { NewsStorySchema, type NewsStory } from "@islandflow/types";
import WebSocket from "ws";
import { z } from "zod";
import { resolveNewsSymbols } from "./symbols";
const service = "ingest-news";
const logger = createLogger({ service });
const envSchema = z.object({
NATS_URL: z.string().default("nats://127.0.0.1:4222"),
ALPACA_API_KEY: z.string().default(""),
ALPACA_REST_URL: z.string().default("https://data.alpaca.markets"),
ALPACA_WS_BASE_URL: z.string().default("wss://stream.data.alpaca.markets"),
ALPACA_NEWS_BACKFILL_LIMIT: z.coerce.number().int().positive().max(200).default(100),
ALPACA_NEWS_WEBSOCKET_PATH: z.string().default("/v1beta1/news")
});
const env = readEnv(envSchema);
type AlpacaNewsItem = {
id?: number;
headline?: string;
summary?: string;
content?: string;
author?: string;
created_at?: string;
updated_at?: string;
url?: string;
symbols?: string[];
source?: string;
};
type AlpacaNewsResponse = {
news?: AlpacaNewsItem[];
};
const buildHeaders = (): Record<string, string> => ({
Authorization: `Bearer ${env.ALPACA_API_KEY}`
});
const parseTimestamp = (value: string | undefined): number => {
const parsed = value ? Date.parse(value) : Number.NaN;
return Number.isFinite(parsed) ? parsed : Date.now();
};
const toStory = (item: AlpacaNewsItem, seq: number): NewsStory | null => {
const storyId = Number(item.id);
if (!Number.isFinite(storyId) || storyId < 0) {
return null;
}
const provider = "alpaca";
const contentHtml = item.content ?? "";
const symbols = resolveNewsSymbols(item.symbols ?? [], contentHtml);
const publishedTs = parseTimestamp(item.created_at);
const updatedTs = parseTimestamp(item.updated_at ?? item.created_at);
return NewsStorySchema.parse({
source_ts: publishedTs,
ingest_ts: Date.now(),
seq,
trace_id: `${provider}:${storyId}`,
story_id: storyId,
provider,
source: item.source?.trim() || item.author?.trim() || "Alpaca News",
headline: item.headline?.trim() || `Story ${storyId}`,
summary: item.summary?.trim() || "",
content_html: contentHtml,
url: item.url?.trim() || "",
published_ts: publishedTs,
updated_ts: updatedTs,
provider_symbols: symbols.provider_symbols,
resolved_symbols: symbols.resolved_symbols,
symbol_resolution: symbols.symbol_resolution
});
};
const fetchBackfill = async (): Promise<AlpacaNewsItem[]> => {
const url = new URL("/v1beta1/news", env.ALPACA_REST_URL);
url.searchParams.set("sort", "desc");
url.searchParams.set("limit", env.ALPACA_NEWS_BACKFILL_LIMIT.toString());
const response = await fetch(url.toString(), {
headers: buildHeaders()
});
if (!response.ok) {
throw new Error(`alpaca news backfill failed (${response.status})`);
}
const payload = (await response.json()) as AlpacaNewsResponse;
return Array.isArray(payload.news) ? payload.news : [];
};
const decodePayload = (data: WebSocket.RawData): unknown => {
if (typeof data === "string") {
return JSON.parse(data) as unknown;
}
if (data instanceof ArrayBuffer) {
return JSON.parse(new TextDecoder().decode(new Uint8Array(data))) as unknown;
}
if (ArrayBuffer.isView(data)) {
return JSON.parse(new TextDecoder().decode(new Uint8Array(data.buffer, data.byteOffset, data.byteLength))) as unknown;
}
return JSON.parse(new TextDecoder().decode(new Uint8Array(data as ArrayBuffer))) as unknown;
};
const run = async () => {
if (!env.ALPACA_API_KEY) {
throw new Error("ALPACA_API_KEY is required for ingest-news.");
}
const { nc, js, jsm } = await connectJetStreamWithRetry(
{
servers: env.NATS_URL,
name: service
},
{ attempts: 120, delayMs: 500 }
);
await ensureKnownStreams(jsm, [STREAM_NEWS], { logger });
let seq = 0;
const publishStory = async (item: AlpacaNewsItem) => {
seq += 1;
const story = toStory(item, seq);
if (!story) {
return;
}
await publishJson(js, SUBJECT_NEWS, story);
};
const backfill = await fetchBackfill();
for (const item of backfill.reverse()) {
await publishStory(item);
}
const wsUrl = new URL(env.ALPACA_NEWS_WEBSOCKET_PATH, env.ALPACA_WS_BASE_URL).toString();
const ws = new WebSocket(wsUrl, {
headers: buildHeaders()
});
ws.on("open", () => {
ws.send(
JSON.stringify({
action: "auth",
key: env.ALPACA_API_KEY,
secret: ""
})
);
});
ws.on("message", (raw) => {
let payload: unknown;
try {
payload = decodePayload(raw);
} catch (error) {
logger.warn("failed to decode alpaca news message", {
error: error instanceof Error ? error.message : String(error)
});
return;
}
if (!Array.isArray(payload)) {
return;
}
for (const entry of payload) {
if (!entry || typeof entry !== "object") {
continue;
}
const message = entry as Record<string, unknown>;
if (message.T === "success") {
const msg = typeof message.msg === "string" ? message.msg : "";
if (msg === "authenticated") {
ws.send(JSON.stringify({ action: "subscribe", news: ["*"] }));
}
continue;
}
if (message.T === "subscription" || message.T === "error") {
continue;
}
void publishStory(message as AlpacaNewsItem).catch((error) => {
logger.error("failed to publish alpaca news story", {
error: error instanceof Error ? error.message : String(error)
});
});
}
});
const shutdown = async (signal: string) => {
logger.info("shutting down", { signal });
ws.close();
await nc.drain();
process.exit(0);
};
process.on("SIGINT", () => void shutdown("SIGINT"));
process.on("SIGTERM", () => void shutdown("SIGTERM"));
};
void run().catch((error) => {
logger.error("service crashed", {
error: error instanceof Error ? error.message : String(error)
});
process.exit(1);
});

View file

@ -0,0 +1,70 @@
import type { NewsSymbolResolution } from "@islandflow/types";
const TICKER_ANCHOR_RE = />\s*([A-Z]{1,5})\s*<\/a>/g;
const EXCHANGE_TICKER_RE = /\b(?:NASDAQ|NYSE|NYSEAMERICAN|AMEX|OTC|CBOE):([A-Z]{1,5})\b/g;
const DOLLAR_TICKER_RE = /\$([A-Z]{1,5})\b/g;
const normalizeSymbols = (symbols: string[]): string[] => {
const seen = new Set<string>();
const normalized: string[] = [];
for (const entry of symbols) {
const symbol = entry.trim().toUpperCase();
if (!symbol || !/^[A-Z]{1,5}$/.test(symbol) || seen.has(symbol)) {
continue;
}
seen.add(symbol);
normalized.push(symbol);
}
return normalized;
};
const collectMatches = (value: string, regex: RegExp): string[] => {
regex.lastIndex = 0;
const matches: string[] = [];
let match: RegExpExecArray | null = null;
while ((match = regex.exec(value)) !== null) {
matches.push(match[1] ?? "");
}
return matches;
};
export const resolveNewsSymbols = (
providerSymbols: string[],
contentHtml: string
): {
provider_symbols: string[];
resolved_symbols: string[];
symbol_resolution: NewsSymbolResolution;
} => {
const normalizedProvider = normalizeSymbols(providerSymbols);
const derived = normalizeSymbols([
...collectMatches(contentHtml, TICKER_ANCHOR_RE),
...collectMatches(contentHtml, EXCHANGE_TICKER_RE),
...collectMatches(contentHtml, DOLLAR_TICKER_RE)
]);
if (normalizedProvider.length > 0) {
const merged = normalizeSymbols([...normalizedProvider, ...derived]);
return {
provider_symbols: normalizedProvider,
resolved_symbols: merged,
symbol_resolution: derived.length > 0 ? "mixed" : "provider"
};
}
if (derived.length > 0) {
return {
provider_symbols: [],
resolved_symbols: derived,
symbol_resolution: "derived"
};
}
return {
provider_symbols: [],
resolved_symbols: [],
symbol_resolution: "none"
};
};

View file

@ -0,0 +1,30 @@
import { describe, expect, it } from "bun:test";
import { resolveNewsSymbols } from "../src/symbols";
describe("resolveNewsSymbols", () => {
it("prefers provider symbols when present", () => {
const result = resolveNewsSymbols(["tsla", "aapl"], "<p>No extra tickers here.</p>");
expect(result.provider_symbols).toEqual(["TSLA", "AAPL"]);
expect(result.resolved_symbols).toEqual(["TSLA", "AAPL"]);
expect(result.symbol_resolution).toBe("provider");
});
it("falls back to ticker anchors", () => {
const result = resolveNewsSymbols([], '<a href="/quote/TSLA">TSLA</a>');
expect(result.resolved_symbols).toEqual(["TSLA"]);
expect(result.symbol_resolution).toBe("derived");
});
it("falls back to exchange and dollar patterns", () => {
const result = resolveNewsSymbols([], "<p>NASDAQ:TSLA met with $IBM executives.</p>");
expect(result.resolved_symbols).toEqual(["TSLA", "IBM"]);
expect(result.symbol_resolution).toBe("derived");
});
it("dedupes and uppercases merged symbols", () => {
const result = resolveNewsSymbols(["tsla"], "<p>$TSLA and NASDAQ:TSLA</p>");
expect(result.provider_symbols).toEqual(["TSLA"]);
expect(result.resolved_symbols).toEqual(["TSLA"]);
expect(result.symbol_resolution).toBe("mixed");
});
});

View file

@ -0,0 +1,7 @@
{
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"types": []
},
"include": ["src/**/*.ts", "tests/**/*.ts"]
}