Compare commits
2 commits
62aae70878
...
04baecebe0
| Author | SHA1 | Date | |
|---|---|---|---|
| 04baecebe0 | |||
| 906fe411c9 |
32 changed files with 1410 additions and 52 deletions
|
|
@ -13,6 +13,7 @@
|
|||
{"_type":"issue","id":"islandflow-ayo","title":"Drop stale backlog events from live fanout","description":"Follow-up to live freshness rollout: /ws/live was still fanning out stale backlog events for freshness-gated channels, which kept tape panes in Live feed behind despite active synthetic ingest. Gate fanout and cache ingest by freshness for options/nbbo/equities/flow.","status":"closed","priority":1,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T21:26:39Z","created_by":"dirtydishes","updated_at":"2026-04-28T21:26:44Z","started_at":"2026-04-28T21:26:44Z","closed_at":"2026-04-28T21:26:44Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0}
|
||||
{"_type":"issue","id":"islandflow-0v6","title":"Fix tape freshness, NBBO coverage, pause controls, and filter popup","description":"Implement the tape fixes requested for synthetic options notional sizing, strict live freshness, live-mode pause/resume behavior, stronger NBBO snapshot coverage, and moving flow filters behind a popup. Includes server-side live cache changes, web terminal state/UI changes, and tests for synthetic pricing, live snapshot freshness/NBBO retention, and live pause/filter interactions.","status":"closed","priority":1,"issue_type":"task","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T21:02:52Z","created_by":"dirtydishes","updated_at":"2026-04-28T21:13:38Z","started_at":"2026-04-28T21:02:57Z","closed_at":"2026-04-28T21:13:38Z","close_reason":"Completed","dependency_count":0,"dependent_count":0,"comment_count":0}
|
||||
{"_type":"issue","id":"islandflow-e4r","title":"Implement smart-money flow filtering and synthetic firehose modes","description":"Implement the approved multi-surface plan for named synthetic market profiles, options raw-vs-signal filtering, live/API filter contracts, Tape page client-side flow filters, firehose-readiness improvements, tests, and README updates.","status":"closed","priority":1,"issue_type":"feature","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-04-28T20:10:49Z","created_by":"dirtydishes","updated_at":"2026-04-28T20:29:29Z","started_at":"2026-04-28T20:10:53Z","closed_at":"2026-04-28T20:29:29Z","close_reason":"Implemented synthetic market profiles, options signal-path filtering, signal-aware API/replay contracts, Tape page filters, tests, and README updates. Follow-up tracked in islandflow-biq.","dependency_count":0,"dependent_count":0,"comment_count":0}
|
||||
{"_type":"issue","id":"islandflow-8fn","title":"implement alpaca-backed news wire view","description":"Why this issue exists and what needs to be done:\\nAdd an Alpaca-powered live news pipeline, API, storage, and web experience, including a dedicated /news route, Home preview, live fanout, history pagination, ticker resolution, and replay-mode live-only empty states.\\n\\nAcceptance criteria:\\n- normalized NewsStory contract and live channel exist\\n- ingest-news service backfills and streams Alpaca news\\n- API persists, serves, and fans out news\\n- web app exposes /news plus Home preview and drawer\\n- tests cover types, storage, API, and key UI behaviors\\n- turn documentation is added\\n\\nDesign:\\nReuse Islandflow drawer, chips, panes, and terminal styling; keep news live-only in v1 replay mode.\\n\\nNotes:\\nImplement client-side ticker filtering in v1 and expose latest revision only per provider+story_id.","status":"closed","priority":2,"issue_type":"feature","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-18T20:37:13Z","created_by":"dirtydishes","updated_at":"2026-05-18T20:55:11Z","started_at":"2026-05-18T20:37:20Z","closed_at":"2026-05-18T20:55:11Z","close_reason":"Closed","dependency_count":0,"dependent_count":0,"comment_count":0}
|
||||
{"_type":"issue","id":"islandflow-k8i","title":"Fix duplicate alert context import in API entrypoint","description":"Recent alert-context work introduced a duplicate fetchAlertContextByTraceId import in services/api/src/index.ts, which risks breaking TypeScript compilation and API startup. Remove the duplicate import and validate the affected API/web tests.","status":"closed","priority":2,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-18T13:01:58Z","created_by":"dirtydishes","updated_at":"2026-05-18T13:03:40Z","started_at":"2026-05-18T13:02:02Z","closed_at":"2026-05-18T13:03:40Z","close_reason":"Closed","dependency_count":0,"dependent_count":0,"comment_count":0}
|
||||
{"_type":"issue","id":"islandflow-lk9","title":"Fix PR creation workflow after Forgejo migration","description":"## Why\\nCreating pull requests with fails after the repository moved primary collaboration from GitHub to Forgejo. The current workflow still assumes GitHub GraphQL PR creation semantics, which do not work against the Forgejo remote.\\n\\n## What\\nInvestigate the current PR creation path, identify remaining GitHub-specific assumptions, and update the repo workflow/scripts/docs so contributors can reliably publish branches and open PRs in the Forgejo-based setup.\\n\\n## Acceptance Criteria\\n- The repo no longer instructs contributors to use a broken GitHub-specific PR creation path for Forgejo branches\\n- There is a documented and preferably scripted way to create the equivalent review request against Forgejo\\n- Validation demonstrates the new workflow behaves correctly or clearly documents any remaining platform limitation","status":"in_progress","priority":2,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-18T10:26:47Z","created_by":"dirtydishes","updated_at":"2026-05-18T10:26:53Z","started_at":"2026-05-18T10:26:53Z","dependency_count":0,"dependent_count":0,"comment_count":0}
|
||||
{"_type":"issue","id":"islandflow-1ei","title":"Make deploy helper remote-aware for Forgejo","description":"Why: scripts/deploy.ts hardcodes git remote name origin for fetch/pull/push and branch verification, but this repository now uses forgejo/github remotes and may not have an origin remote. What: update deploy.ts to resolve the deploy git remote robustly (Forgejo-aware), use it across local prechecks, branch publish, and remote rollout git operations, and keep behavior explicit in output.","status":"closed","priority":2,"issue_type":"bug","assignee":"dirtydishes","owner":"dishes@dpdrm.com","created_at":"2026-05-18T03:20:12Z","created_by":"dirtydishes","updated_at":"2026-05-18T03:22:39Z","started_at":"2026-05-18T03:20:16Z","closed_at":"2026-05-18T03:22:39Z","close_reason":"Closed","dependency_count":0,"dependent_count":0,"comment_count":0}
|
||||
|
|
|
|||
|
|
@ -117,8 +117,9 @@ Each turn document must include these sections:
|
|||
2. **Changes Made**
|
||||
3. **Context**
|
||||
4. **Important Implementation Details**
|
||||
5. **Expected Impact for End-Users**
|
||||
5. **Validation**
|
||||
5. **Relevant Diff Snippets**
|
||||
6. **Expected Impact for End-Users**
|
||||
7. **Validation**
|
||||
6. **Issues, Limitations, and Mitigations**
|
||||
7. **Follow-up Work**
|
||||
|
||||
|
|
|
|||
|
|
@ -708,7 +708,12 @@ h3 {
|
|||
grid-template-columns: repeat(3, minmax(0, 1fr));
|
||||
}
|
||||
|
||||
.page-grid-news {
|
||||
grid-template-columns: minmax(0, 1fr);
|
||||
}
|
||||
|
||||
.page-grid-home > :nth-child(3),
|
||||
.page-grid-home > :nth-child(4),
|
||||
.page-grid-tape > :nth-child(1),
|
||||
.page-grid-replay > :nth-child(1) {
|
||||
grid-column: 1 / -1;
|
||||
|
|
@ -933,6 +938,7 @@ h3 {
|
|||
}
|
||||
|
||||
.page-grid-home > :nth-child(3),
|
||||
.page-grid-home > :nth-child(4),
|
||||
.page-grid-replay > :not(:first-child) {
|
||||
height: clamp(430px, 58vh, 760px);
|
||||
}
|
||||
|
|
@ -1747,6 +1753,72 @@ h3 {
|
|||
gap: 10px;
|
||||
}
|
||||
|
||||
.terminal-link-button {
|
||||
text-decoration: none;
|
||||
}
|
||||
|
||||
.news-list {
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
gap: 10px;
|
||||
}
|
||||
|
||||
.news-row {
|
||||
width: 100%;
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
gap: 8px;
|
||||
padding: 14px 16px;
|
||||
border: 1px solid var(--border);
|
||||
border-radius: 12px;
|
||||
background: oklch(0.18 0.012 250 / 0.6);
|
||||
color: var(--text);
|
||||
text-align: left;
|
||||
transition: border-color 150ms ease, background 150ms ease;
|
||||
}
|
||||
|
||||
.news-row:hover {
|
||||
border-color: var(--accent-soft);
|
||||
background: oklch(0.2 0.015 250 / 0.75);
|
||||
}
|
||||
|
||||
.news-row-head,
|
||||
.news-row-meta {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
justify-content: space-between;
|
||||
gap: 10px;
|
||||
flex-wrap: wrap;
|
||||
}
|
||||
|
||||
.news-row h3 {
|
||||
margin: 0;
|
||||
font-size: 0.96rem;
|
||||
font-weight: 600;
|
||||
}
|
||||
|
||||
.news-row-time {
|
||||
color: var(--text-dim);
|
||||
font-family: var(--font-mono), monospace;
|
||||
font-size: 0.78rem;
|
||||
}
|
||||
|
||||
.news-row-meta {
|
||||
color: var(--text-dim);
|
||||
font-size: 0.78rem;
|
||||
}
|
||||
|
||||
.news-drawer-body a {
|
||||
color: var(--accent);
|
||||
}
|
||||
|
||||
.news-drawer-body p,
|
||||
.news-drawer-body ul,
|
||||
.news-drawer-body ol,
|
||||
.news-drawer-body blockquote {
|
||||
margin: 0 0 12px;
|
||||
}
|
||||
|
||||
.synthetic-status-grid strong,
|
||||
.synthetic-hit-row strong {
|
||||
font-family: var(--font-mono), monospace;
|
||||
|
|
@ -1964,6 +2036,7 @@ h3 {
|
|||
}
|
||||
|
||||
.page-grid-home > :nth-child(3),
|
||||
.page-grid-home > :nth-child(4),
|
||||
.page-grid-tape > :nth-child(1),
|
||||
.page-grid-replay > :nth-child(1) {
|
||||
grid-column: auto;
|
||||
|
|
@ -1973,6 +2046,7 @@ h3 {
|
|||
.page-grid-home > :nth-child(1),
|
||||
.page-grid-home > :nth-child(2),
|
||||
.page-grid-home > :nth-child(3),
|
||||
.page-grid-home > :nth-child(4),
|
||||
.page-grid-signals > .terminal-pane,
|
||||
.page-grid-replay > :not(:first-child),
|
||||
.page-grid-tape > :first-child,
|
||||
|
|
|
|||
7
apps/web/app/news/page.tsx
Normal file
7
apps/web/app/news/page.tsx
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
import { NewsRoute } from "../terminal";
|
||||
|
||||
export const dynamic = "force-dynamic";
|
||||
|
||||
export default function Page() {
|
||||
return <NewsRoute />;
|
||||
}
|
||||
|
|
@ -247,6 +247,15 @@ describe("live manifest", () => {
|
|||
]);
|
||||
});
|
||||
|
||||
it("includes news subscriptions on home and /news", () => {
|
||||
expect(getLiveManifest("/", "SPY", 60000, buildDefaultFlowFilters()).map((subscription) => subscription.channel)).toContain(
|
||||
"news"
|
||||
);
|
||||
expect(getLiveManifest("/news", "SPY", 60000, buildDefaultFlowFilters()).map((subscription) => subscription.channel)).toEqual([
|
||||
"news"
|
||||
]);
|
||||
});
|
||||
|
||||
it("scopes /charts subscriptions to chart channels only", () => {
|
||||
const channels = getLiveManifest("/charts", "SPY", 60000, buildDefaultFlowFilters()).map(
|
||||
(subscription) => subscription.channel
|
||||
|
|
@ -431,6 +440,13 @@ describe("route feature map", () => {
|
|||
expect(features.equityOverlay).toBe(true);
|
||||
expect(features.alerts).toBe(false);
|
||||
});
|
||||
|
||||
it("maps /news to the dedicated news pane", () => {
|
||||
const features = getRouteFeatures("/news");
|
||||
expect(features.news).toBe(true);
|
||||
expect(features.showNewsPane).toBe(true);
|
||||
expect(features.showAlertsPane).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("fixed tape virtualization config", () => {
|
||||
|
|
@ -461,10 +477,11 @@ describe("dark underlying route dependency helper", () => {
|
|||
});
|
||||
|
||||
describe("terminal navigation", () => {
|
||||
it("exposes only Home and Tape as top-level destinations", () => {
|
||||
it("exposes Home, Tape, and News as top-level destinations", () => {
|
||||
expect(NAV_ITEMS).toEqual([
|
||||
{ href: "/", label: "Home" },
|
||||
{ href: "/tape", label: "Tape" }
|
||||
{ href: "/tape", label: "Tape" },
|
||||
{ href: "/news", label: "News" }
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ import type {
|
|||
LiveServerMessage,
|
||||
LiveHotChannelHealthMap,
|
||||
LiveSubscription,
|
||||
NewsStory,
|
||||
OptionFlowFilters,
|
||||
OptionFlowView,
|
||||
OptionNbboSide,
|
||||
|
|
@ -158,6 +159,7 @@ type RouteFeatures = {
|
|||
nbbo: boolean;
|
||||
equities: boolean;
|
||||
flow: boolean;
|
||||
news: boolean;
|
||||
alerts: boolean;
|
||||
smartMoney: boolean;
|
||||
classifierHits: boolean;
|
||||
|
|
@ -168,6 +170,7 @@ type RouteFeatures = {
|
|||
showOptionsPane: boolean;
|
||||
showEquitiesPane: boolean;
|
||||
showFlowPane: boolean;
|
||||
showNewsPane: boolean;
|
||||
showAlertsPane: boolean;
|
||||
showClassifierPane: boolean;
|
||||
showDarkPane: boolean;
|
||||
|
|
@ -187,6 +190,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
|
|||
const includeEquitiesFallback = shouldIncludeEquitiesForDarkUnderlyingFallback();
|
||||
const normalizedPath =
|
||||
pathname === "/tape" ||
|
||||
pathname === "/news" ||
|
||||
pathname === "/signals" ||
|
||||
pathname === "/charts" ||
|
||||
pathname === "/replay"
|
||||
|
|
@ -200,6 +204,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
|
|||
nbbo: true,
|
||||
equities: true,
|
||||
flow: true,
|
||||
news: false,
|
||||
alerts: false,
|
||||
smartMoney: false,
|
||||
classifierHits: false,
|
||||
|
|
@ -210,6 +215,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
|
|||
showOptionsPane: true,
|
||||
showEquitiesPane: true,
|
||||
showFlowPane: true,
|
||||
showNewsPane: false,
|
||||
showAlertsPane: false,
|
||||
showClassifierPane: false,
|
||||
showDarkPane: false,
|
||||
|
|
@ -220,12 +226,41 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
|
|||
needsAlertEvidencePrefetch: false,
|
||||
needsDarkUnderlying: false
|
||||
};
|
||||
case "/news":
|
||||
return {
|
||||
options: false,
|
||||
nbbo: false,
|
||||
equities: false,
|
||||
flow: false,
|
||||
news: true,
|
||||
alerts: false,
|
||||
smartMoney: false,
|
||||
classifierHits: false,
|
||||
inferredDark: false,
|
||||
equityJoins: false,
|
||||
equityCandles: false,
|
||||
equityOverlay: false,
|
||||
showOptionsPane: false,
|
||||
showEquitiesPane: false,
|
||||
showFlowPane: false,
|
||||
showNewsPane: true,
|
||||
showAlertsPane: false,
|
||||
showClassifierPane: false,
|
||||
showDarkPane: false,
|
||||
showChartPane: false,
|
||||
showFocusPane: false,
|
||||
showReplayConsole: false,
|
||||
needsClassifierDecor: false,
|
||||
needsAlertEvidencePrefetch: false,
|
||||
needsDarkUnderlying: false
|
||||
};
|
||||
case "/signals":
|
||||
return {
|
||||
options: false,
|
||||
nbbo: false,
|
||||
equities: includeEquitiesFallback,
|
||||
flow: false,
|
||||
news: false,
|
||||
alerts: true,
|
||||
smartMoney: true,
|
||||
classifierHits: true,
|
||||
|
|
@ -236,6 +271,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
|
|||
showOptionsPane: false,
|
||||
showEquitiesPane: false,
|
||||
showFlowPane: false,
|
||||
showNewsPane: false,
|
||||
showAlertsPane: true,
|
||||
showClassifierPane: true,
|
||||
showDarkPane: true,
|
||||
|
|
@ -252,6 +288,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
|
|||
nbbo: false,
|
||||
equities: includeEquitiesFallback,
|
||||
flow: false,
|
||||
news: false,
|
||||
alerts: false,
|
||||
smartMoney: true,
|
||||
classifierHits: false,
|
||||
|
|
@ -262,6 +299,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
|
|||
showOptionsPane: false,
|
||||
showEquitiesPane: false,
|
||||
showFlowPane: false,
|
||||
showNewsPane: false,
|
||||
showAlertsPane: false,
|
||||
showClassifierPane: false,
|
||||
showDarkPane: false,
|
||||
|
|
@ -278,6 +316,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
|
|||
nbbo: false,
|
||||
equities: false,
|
||||
flow: false,
|
||||
news: false,
|
||||
alerts: false,
|
||||
smartMoney: false,
|
||||
classifierHits: false,
|
||||
|
|
@ -288,6 +327,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
|
|||
showOptionsPane: true,
|
||||
showEquitiesPane: false,
|
||||
showFlowPane: true,
|
||||
showNewsPane: false,
|
||||
showAlertsPane: true,
|
||||
showClassifierPane: false,
|
||||
showDarkPane: false,
|
||||
|
|
@ -305,6 +345,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
|
|||
nbbo: false,
|
||||
equities: true,
|
||||
flow: false,
|
||||
news: true,
|
||||
alerts: true,
|
||||
smartMoney: true,
|
||||
classifierHits: false,
|
||||
|
|
@ -315,6 +356,7 @@ export const getRouteFeatures = (pathname: string): RouteFeatures => {
|
|||
showOptionsPane: false,
|
||||
showEquitiesPane: true,
|
||||
showFlowPane: false,
|
||||
showNewsPane: true,
|
||||
showAlertsPane: true,
|
||||
showClassifierPane: false,
|
||||
showDarkPane: false,
|
||||
|
|
@ -332,6 +374,7 @@ const EMPTY_ALERT_EVENTS: AlertEvent[] = [];
|
|||
const EMPTY_CLASSIFIER_HIT_EVENTS: ClassifierHitEvent[] = [];
|
||||
const EMPTY_SMART_MONEY_EVENTS: SmartMoneyEvent[] = [];
|
||||
const EMPTY_INFERRED_DARK_EVENTS: InferredDarkEvent[] = [];
|
||||
const EMPTY_NEWS_STORIES: NewsStory[] = [];
|
||||
|
||||
type CandlestickSeries = ReturnType<IChartApi["addCandlestickSeries"]>;
|
||||
|
||||
|
|
@ -1194,6 +1237,44 @@ const formatDateTime = (ts: number): string => {
|
|||
return `${date.toLocaleDateString()} ${date.toLocaleTimeString()}`;
|
||||
};
|
||||
|
||||
const isSameLocalDay = (left: number, right: number): boolean => {
|
||||
const a = new Date(left);
|
||||
const b = new Date(right);
|
||||
return (
|
||||
a.getFullYear() === b.getFullYear() &&
|
||||
a.getMonth() === b.getMonth() &&
|
||||
a.getDate() === b.getDate()
|
||||
);
|
||||
};
|
||||
|
||||
export const formatNewsTimestamp = (ts: number, now = Date.now()): string => {
|
||||
const date = new Date(ts);
|
||||
return isSameLocalDay(ts, now)
|
||||
? date.toLocaleTimeString([], { hour: "numeric", minute: "2-digit" })
|
||||
: date.toLocaleString([], { month: "short", day: "numeric", hour: "numeric", minute: "2-digit" });
|
||||
};
|
||||
|
||||
const sanitizeNewsHtml = (value: string): { html: string; fallbackText: string; sanitized: boolean } => {
|
||||
const fallbackText = value
|
||||
.replace(/<script[\s\S]*?<\/script>/gi, " ")
|
||||
.replace(/<style[\s\S]*?<\/style>/gi, " ")
|
||||
.replace(/<[^>]+>/g, " ")
|
||||
.replace(/\s+/g, " ")
|
||||
.trim();
|
||||
|
||||
try {
|
||||
const sanitized = value
|
||||
.replace(/<script[\s\S]*?<\/script>/gi, "")
|
||||
.replace(/<style[\s\S]*?<\/style>/gi, "")
|
||||
.replace(/\son\w+=(?:"[^"]*"|'[^']*'|[^\s>]+)/gi, "")
|
||||
.replace(/\shref=(["'])javascript:[\s\S]*?\1/gi, ' href="#"')
|
||||
.replace(/<(?!\/?(p|div|section|article|span|strong|em|b|i|ul|ol|li|br|a|h1|h2|h3|h4|blockquote)\b)[^>]*>/gi, "");
|
||||
return { html: sanitized, fallbackText, sanitized: true };
|
||||
} catch {
|
||||
return { html: "", fallbackText, sanitized: false };
|
||||
}
|
||||
};
|
||||
|
||||
const humanizeClassifierId = (value: string): string => {
|
||||
if (!value) {
|
||||
return "Classifier";
|
||||
|
|
@ -2870,6 +2951,7 @@ type LiveSessionState = {
|
|||
smartMoneyHistory: SmartMoneyEvent[];
|
||||
classifierHitsHistory: ClassifierHitEvent[];
|
||||
alertsHistory: AlertEvent[];
|
||||
newsHistory: NewsStory[];
|
||||
inferredDarkHistory: InferredDarkEvent[];
|
||||
options: OptionPrint[];
|
||||
nbbo: OptionNBBO[];
|
||||
|
|
@ -2880,6 +2962,7 @@ type LiveSessionState = {
|
|||
smartMoney: SmartMoneyEvent[];
|
||||
classifierHits: ClassifierHitEvent[];
|
||||
alerts: AlertEvent[];
|
||||
news: NewsStory[];
|
||||
inferredDark: InferredDarkEvent[];
|
||||
chartCandles: EquityCandle[];
|
||||
chartOverlay: EquityPrint[];
|
||||
|
|
@ -2900,6 +2983,7 @@ const LIVE_HISTORY_ENDPOINTS: Partial<Record<LiveSubscription["channel"], string
|
|||
"smart-money": "/history/smart-money",
|
||||
"classifier-hits": "/history/classifier-hits",
|
||||
alerts: "/history/alerts",
|
||||
news: "/history/news",
|
||||
"inferred-dark": "/history/inferred-dark"
|
||||
};
|
||||
|
||||
|
|
@ -3072,6 +3156,9 @@ export const getLiveManifest = (
|
|||
if (features.flow) {
|
||||
subscriptions.push({ channel: "flow", filters: flowFilters, snapshot_limit: LIVE_HOT_WINDOW });
|
||||
}
|
||||
if (features.news) {
|
||||
subscriptions.push({ channel: "news", snapshot_limit: LIVE_OPTIONS_HEAD_LIMIT });
|
||||
}
|
||||
if (features.alerts) {
|
||||
subscriptions.push({ channel: "alerts", snapshot_limit: LIVE_HOT_WINDOW });
|
||||
}
|
||||
|
|
@ -3133,6 +3220,7 @@ const useLiveSession = (
|
|||
const [smartMoney, setSmartMoney] = useState<SmartMoneyEvent[]>([]);
|
||||
const [classifierHits, setClassifierHits] = useState<ClassifierHitEvent[]>([]);
|
||||
const [alerts, setAlerts] = useState<AlertEvent[]>([]);
|
||||
const [news, setNews] = useState<NewsStory[]>([]);
|
||||
const [inferredDark, setInferredDark] = useState<InferredDarkEvent[]>([]);
|
||||
const [optionsHistory, setOptionsHistory] = useState<OptionPrint[]>([]);
|
||||
const [nbboHistory, setNbboHistory] = useState<OptionNBBO[]>([]);
|
||||
|
|
@ -3142,6 +3230,7 @@ const useLiveSession = (
|
|||
const [smartMoneyHistory, setSmartMoneyHistory] = useState<SmartMoneyEvent[]>([]);
|
||||
const [classifierHitsHistory, setClassifierHitsHistory] = useState<ClassifierHitEvent[]>([]);
|
||||
const [alertsHistory, setAlertsHistory] = useState<AlertEvent[]>([]);
|
||||
const [newsHistory, setNewsHistory] = useState<NewsStory[]>([]);
|
||||
const [inferredDarkHistory, setInferredDarkHistory] = useState<InferredDarkEvent[]>([]);
|
||||
const [chartCandles, setChartCandles] = useState<EquityCandle[]>([]);
|
||||
const [chartOverlay, setChartOverlay] = useState<EquityPrint[]>([]);
|
||||
|
|
@ -3154,6 +3243,7 @@ const useLiveSession = (
|
|||
const smartMoneyRef = useRef<SmartMoneyEvent[]>([]);
|
||||
const classifierHitsRef = useRef<ClassifierHitEvent[]>([]);
|
||||
const alertsRef = useRef<AlertEvent[]>([]);
|
||||
const newsRef = useRef<NewsStory[]>([]);
|
||||
const inferredDarkRef = useRef<InferredDarkEvent[]>([]);
|
||||
const chartCandlesRef = useRef<EquityCandle[]>([]);
|
||||
const chartOverlayRef = useRef<EquityPrint[]>([]);
|
||||
|
|
@ -3165,6 +3255,7 @@ const useLiveSession = (
|
|||
const smartMoneyHistoryRef = useRef<SmartMoneyEvent[]>([]);
|
||||
const classifierHitsHistoryRef = useRef<ClassifierHitEvent[]>([]);
|
||||
const alertsHistoryRef = useRef<AlertEvent[]>([]);
|
||||
const newsHistoryRef = useRef<NewsStory[]>([]);
|
||||
const inferredDarkHistoryRef = useRef<InferredDarkEvent[]>([]);
|
||||
const socketRef = useRef<WebSocket | null>(null);
|
||||
const reconnectRef = useRef<number | null>(null);
|
||||
|
|
@ -3218,6 +3309,7 @@ const useLiveSession = (
|
|||
setSmartMoney([]);
|
||||
setClassifierHits([]);
|
||||
setAlerts([]);
|
||||
setNews([]);
|
||||
setInferredDark([]);
|
||||
setOptionsHistory([]);
|
||||
setNbboHistory([]);
|
||||
|
|
@ -3227,6 +3319,7 @@ const useLiveSession = (
|
|||
setSmartMoneyHistory([]);
|
||||
setClassifierHitsHistory([]);
|
||||
setAlertsHistory([]);
|
||||
setNewsHistory([]);
|
||||
setInferredDarkHistory([]);
|
||||
setChartCandles([]);
|
||||
setChartOverlay([]);
|
||||
|
|
@ -3239,6 +3332,7 @@ const useLiveSession = (
|
|||
smartMoneyRef.current = [];
|
||||
classifierHitsRef.current = [];
|
||||
alertsRef.current = [];
|
||||
newsRef.current = [];
|
||||
inferredDarkRef.current = [];
|
||||
chartCandlesRef.current = [];
|
||||
chartOverlayRef.current = [];
|
||||
|
|
@ -3250,6 +3344,7 @@ const useLiveSession = (
|
|||
smartMoneyHistoryRef.current = [];
|
||||
classifierHitsHistoryRef.current = [];
|
||||
alertsHistoryRef.current = [];
|
||||
newsHistoryRef.current = [];
|
||||
inferredDarkHistoryRef.current = [];
|
||||
subscribedKeysRef.current = new Set();
|
||||
subscribedMapRef.current = new Map();
|
||||
|
|
@ -3403,6 +3498,12 @@ const useLiveSession = (
|
|||
ref: alertsHistoryRef
|
||||
});
|
||||
break;
|
||||
case "news":
|
||||
mergeItems(setNews, newsRef, items as NewsStory[], LIVE_OPTIONS_HEAD_LIMIT, {
|
||||
setter: setNewsHistory,
|
||||
ref: newsHistoryRef
|
||||
});
|
||||
break;
|
||||
case "inferred-dark":
|
||||
mergeItems(setInferredDark, inferredDarkRef, items as InferredDarkEvent[], LIVE_HOT_WINDOW, {
|
||||
setter: setInferredDarkHistory,
|
||||
|
|
@ -3694,6 +3795,9 @@ const useLiveSession = (
|
|||
case "alerts":
|
||||
mergeOlder(setAlertsHistory, alertsHistoryRef, alertsRef.current);
|
||||
break;
|
||||
case "news":
|
||||
mergeOlder(setNewsHistory, newsHistoryRef, newsRef.current);
|
||||
break;
|
||||
case "inferred-dark":
|
||||
mergeOlder(setInferredDarkHistory, inferredDarkHistoryRef, inferredDarkRef.current);
|
||||
break;
|
||||
|
|
@ -3735,6 +3839,7 @@ const useLiveSession = (
|
|||
smartMoneyHistory,
|
||||
classifierHitsHistory,
|
||||
alertsHistory,
|
||||
newsHistory,
|
||||
inferredDarkHistory,
|
||||
options,
|
||||
nbbo,
|
||||
|
|
@ -3745,6 +3850,7 @@ const useLiveSession = (
|
|||
smartMoney,
|
||||
classifierHits,
|
||||
alerts,
|
||||
news,
|
||||
inferredDark,
|
||||
chartCandles,
|
||||
chartOverlay
|
||||
|
|
@ -4822,6 +4928,69 @@ const AlertDrawer = ({ alert, flowPacket, evidence, contextStatus, onClose }: Al
|
|||
);
|
||||
};
|
||||
|
||||
type NewsDrawerProps = {
|
||||
story: NewsStory;
|
||||
onClose: () => void;
|
||||
};
|
||||
|
||||
const NewsDrawer = ({ story, onClose }: NewsDrawerProps) => {
|
||||
const body = sanitizeNewsHtml(story.content_html);
|
||||
|
||||
return (
|
||||
<aside className="drawer">
|
||||
<div className="drawer-header">
|
||||
<div>
|
||||
<p className="drawer-eyebrow">News wire</p>
|
||||
<h3>{story.headline}</h3>
|
||||
<p className="drawer-subtitle">
|
||||
{story.source} · Published {formatDateTime(story.published_ts)}
|
||||
{story.updated_ts !== story.published_ts ? ` · Updated ${formatDateTime(story.updated_ts)}` : ""}
|
||||
</p>
|
||||
</div>
|
||||
<button className="drawer-close" type="button" onClick={onClose}>
|
||||
Close
|
||||
</button>
|
||||
</div>
|
||||
|
||||
<div className="drawer-meta">
|
||||
{story.resolved_symbols.map((symbol) => (
|
||||
<span className="drawer-chip" key={`${story.trace_id}-${symbol}`}>
|
||||
{symbol}
|
||||
</span>
|
||||
))}
|
||||
<span className="drawer-chip">{story.symbol_resolution}</span>
|
||||
</div>
|
||||
|
||||
{story.summary ? (
|
||||
<div className="drawer-section">
|
||||
<h4>Summary</h4>
|
||||
<p className="drawer-note">{story.summary}</p>
|
||||
</div>
|
||||
) : null}
|
||||
|
||||
<div className="drawer-section">
|
||||
<h4>Story</h4>
|
||||
{body.sanitized && body.html ? (
|
||||
<div className="drawer-note news-drawer-body" dangerouslySetInnerHTML={{ __html: body.html }} />
|
||||
) : body.fallbackText ? (
|
||||
<p className="drawer-note">{body.fallbackText}</p>
|
||||
) : (
|
||||
<p className="drawer-empty">Story body unavailable.</p>
|
||||
)}
|
||||
</div>
|
||||
|
||||
{story.url ? (
|
||||
<div className="drawer-section">
|
||||
<h4>Source link</h4>
|
||||
<a className="terminal-button terminal-link-button" href={story.url} rel="noreferrer" target="_blank">
|
||||
Open original article
|
||||
</a>
|
||||
</div>
|
||||
) : null}
|
||||
</aside>
|
||||
);
|
||||
};
|
||||
|
||||
type ClassifierHitDrawerProps = {
|
||||
hit: ClassifierHitEvent;
|
||||
flowPacket: FlowPacket | null;
|
||||
|
|
@ -5178,6 +5347,7 @@ const useTerminalState = () => {
|
|||
const [mode, setMode] = useState<TapeMode>("live");
|
||||
const [replaySource, setReplaySource] = useState<string | null>(null);
|
||||
const [selectedAlert, setSelectedAlert] = useState<AlertEvent | null>(null);
|
||||
const [selectedNewsStory, setSelectedNewsStory] = useState<NewsStory | null>(null);
|
||||
const [selectedDarkEvent, setSelectedDarkEvent] = useState<InferredDarkEvent | null>(null);
|
||||
const [selectedClassifierHit, setSelectedClassifierHit] = useState<ClassifierHitEvent | null>(null);
|
||||
const [selectedSmartMoneyEvent, setSelectedSmartMoneyEvent] = useState<SmartMoneyEvent | null>(null);
|
||||
|
|
@ -5274,12 +5444,13 @@ const useTerminalState = () => {
|
|||
}, [mode]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!selectedAlert && !selectedClassifierHit && !selectedDarkEvent && !selectedSmartMoneyEvent) {
|
||||
if (!selectedAlert && !selectedNewsStory && !selectedClassifierHit && !selectedDarkEvent && !selectedSmartMoneyEvent) {
|
||||
return;
|
||||
}
|
||||
|
||||
const dismissDrawers = () => {
|
||||
setSelectedAlert(null);
|
||||
setSelectedNewsStory(null);
|
||||
setSelectedClassifierHit(null);
|
||||
setSelectedSmartMoneyEvent(null);
|
||||
setSelectedDarkEvent(null);
|
||||
|
|
@ -5305,7 +5476,7 @@ const useTerminalState = () => {
|
|||
document.removeEventListener("mousedown", handlePointerDown);
|
||||
document.removeEventListener("keydown", handleKeyDown);
|
||||
};
|
||||
}, [selectedAlert, selectedClassifierHit, selectedDarkEvent, selectedSmartMoneyEvent]);
|
||||
}, [selectedAlert, selectedNewsStory, selectedClassifierHit, selectedDarkEvent, selectedSmartMoneyEvent]);
|
||||
|
||||
const optionsScroll = useListScroll();
|
||||
const equitiesScroll = useListScroll();
|
||||
|
|
@ -5540,6 +5711,14 @@ const useTerminalState = () => {
|
|||
)
|
||||
: equityJoins;
|
||||
const flowFeed = mode === "live" ? liveFlow : flow;
|
||||
const newsFeed =
|
||||
mode === "live"
|
||||
? toStaticTapeState(
|
||||
liveSession.status,
|
||||
composeTapeItems([], liveSession.news, liveSession.newsHistory),
|
||||
liveSession.lastUpdate
|
||||
)
|
||||
: toStaticTapeState("disconnected", [], null);
|
||||
const alertsFeed =
|
||||
mode === "live"
|
||||
? toStaticTapeState(
|
||||
|
|
@ -6490,6 +6669,16 @@ const useTerminalState = () => {
|
|||
routeFeatures.needsAlertEvidencePrefetch
|
||||
]);
|
||||
|
||||
const filteredNews = useMemo(() => {
|
||||
if (!routeFeatures.news && !routeFeatures.showNewsPane) {
|
||||
return EMPTY_NEWS_STORIES;
|
||||
}
|
||||
if (tickerSet.size === 0) {
|
||||
return newsFeed.items;
|
||||
}
|
||||
return newsFeed.items.filter((story) => story.resolved_symbols.some((symbol) => matchesTicker(symbol)));
|
||||
}, [matchesTicker, newsFeed.items, routeFeatures.news, routeFeatures.showNewsPane, tickerSet]);
|
||||
|
||||
const visibleAlerts = useMemo(() => {
|
||||
if (routeFeatures.needsAlertEvidencePrefetch) {
|
||||
return filteredAlerts.slice(0, 12);
|
||||
|
|
@ -6767,6 +6956,7 @@ const useTerminalState = () => {
|
|||
(hit: ClassifierHitEvent) => {
|
||||
const alert = findAlertForClassifierHit(hit);
|
||||
if (alert) {
|
||||
setSelectedNewsStory(null);
|
||||
setSelectedClassifierHit(null);
|
||||
setSelectedDarkEvent(null);
|
||||
setSelectedSmartMoneyEvent(null);
|
||||
|
|
@ -6774,6 +6964,7 @@ const useTerminalState = () => {
|
|||
return;
|
||||
}
|
||||
|
||||
setSelectedNewsStory(null);
|
||||
setSelectedAlert(null);
|
||||
setSelectedDarkEvent(null);
|
||||
setSelectedSmartMoneyEvent(null);
|
||||
|
|
@ -6783,6 +6974,7 @@ const useTerminalState = () => {
|
|||
);
|
||||
|
||||
const openFromSmartMoneyEvent = useCallback((event: SmartMoneyEvent) => {
|
||||
setSelectedNewsStory(null);
|
||||
setSelectedAlert(null);
|
||||
setSelectedClassifierHit(null);
|
||||
setSelectedDarkEvent(null);
|
||||
|
|
@ -6797,6 +6989,7 @@ const useTerminalState = () => {
|
|||
);
|
||||
|
||||
const handleDarkMarkerClick = useCallback((event: InferredDarkEvent) => {
|
||||
setSelectedNewsStory(null);
|
||||
setSelectedAlert(null);
|
||||
setSelectedClassifierHit(null);
|
||||
setSelectedSmartMoneyEvent(null);
|
||||
|
|
@ -6817,6 +7010,9 @@ const useTerminalState = () => {
|
|||
if (routeFeatures.flow || routeFeatures.showFlowPane) {
|
||||
updates.push(flowFeed.lastUpdate);
|
||||
}
|
||||
if (routeFeatures.news || routeFeatures.showNewsPane) {
|
||||
updates.push(newsFeed.lastUpdate);
|
||||
}
|
||||
if (routeFeatures.alerts || routeFeatures.showAlertsPane) {
|
||||
updates.push(alertsFeed.lastUpdate);
|
||||
}
|
||||
|
|
@ -6839,6 +7035,8 @@ const useTerminalState = () => {
|
|||
routeFeatures.showFocusPane,
|
||||
routeFeatures.flow,
|
||||
routeFeatures.showFlowPane,
|
||||
routeFeatures.news,
|
||||
routeFeatures.showNewsPane,
|
||||
routeFeatures.alerts,
|
||||
routeFeatures.showAlertsPane,
|
||||
routeFeatures.smartMoney,
|
||||
|
|
@ -6849,6 +7047,7 @@ const useTerminalState = () => {
|
|||
equitiesFeed.lastUpdate,
|
||||
inferredDarkFeed.lastUpdate,
|
||||
flowFeed.lastUpdate,
|
||||
newsFeed.lastUpdate,
|
||||
alertsFeed.lastUpdate,
|
||||
smartMoneyFeed.lastUpdate,
|
||||
classifierHitsFeed.lastUpdate
|
||||
|
|
@ -6861,6 +7060,8 @@ const useTerminalState = () => {
|
|||
setReplaySource,
|
||||
selectedAlert,
|
||||
setSelectedAlert,
|
||||
selectedNewsStory,
|
||||
setSelectedNewsStory,
|
||||
selectedDarkEvent,
|
||||
setSelectedDarkEvent,
|
||||
selectedClassifierHit,
|
||||
|
|
@ -6887,6 +7088,7 @@ const useTerminalState = () => {
|
|||
equityJoins: equityJoinsFeed,
|
||||
nbbo: nbboFeed,
|
||||
inferredDark: inferredDarkFeed,
|
||||
news: newsFeed,
|
||||
flow: flowFeed,
|
||||
alerts: alertsFeed,
|
||||
smartMoney: smartMoneyFeed,
|
||||
|
|
@ -6920,6 +7122,7 @@ const useTerminalState = () => {
|
|||
equitiesScopedQuiet,
|
||||
equitiesSilentWarning,
|
||||
filteredInferredDark,
|
||||
filteredNews,
|
||||
filteredFlow,
|
||||
filteredAlerts,
|
||||
filteredSmartMoneyEvents,
|
||||
|
|
@ -6953,7 +7156,8 @@ const useTerminal = (): TerminalState => {
|
|||
|
||||
export const NAV_ITEMS = [
|
||||
{ href: "/", label: "Home" },
|
||||
{ href: "/tape", label: "Tape" }
|
||||
{ href: "/tape", label: "Tape" },
|
||||
{ href: "/news", label: "News" }
|
||||
] as const;
|
||||
|
||||
type PageFrameProps = {
|
||||
|
|
@ -7780,6 +7984,7 @@ const AlertsPane = memo(({ state, limit, withStrip = false, className }: AlertsP
|
|||
data-tape-key={key}
|
||||
style={{ transform: `translateY(${start}px)` }}
|
||||
onClick={() => {
|
||||
state.setSelectedNewsStory(null);
|
||||
state.setSelectedDarkEvent(null);
|
||||
state.setSelectedClassifierHit(null);
|
||||
state.setSelectedSmartMoneyEvent(null);
|
||||
|
|
@ -7806,6 +8011,83 @@ const AlertsPane = memo(({ state, limit, withStrip = false, className }: AlertsP
|
|||
);
|
||||
});
|
||||
|
||||
type NewsPaneProps = {
|
||||
state: TerminalState;
|
||||
limit?: number;
|
||||
className?: string;
|
||||
};
|
||||
|
||||
const NewsPane = memo(({ state, limit, className }: NewsPaneProps) => {
|
||||
const items = limit ? state.filteredNews.slice(0, limit) : state.filteredNews;
|
||||
const canLoadOlder = state.mode === "live" && !limit && items.length > 0;
|
||||
|
||||
return (
|
||||
<Pane
|
||||
className={className}
|
||||
title="News Wire"
|
||||
status={
|
||||
limit ? (
|
||||
<Link className="terminal-button terminal-link-button" href="/news">
|
||||
View all
|
||||
</Link>
|
||||
) : (
|
||||
<div className="status-inline status-connected">
|
||||
<span className="status-dot" />
|
||||
<span>{state.mode === "live" ? "Live wire" : "Live-only in v1"}</span>
|
||||
</div>
|
||||
)
|
||||
}
|
||||
actions={
|
||||
canLoadOlder ? (
|
||||
<button className="terminal-button" type="button" onClick={() => void state.liveSession.loadOlder("news")}>
|
||||
Older
|
||||
</button>
|
||||
) : null
|
||||
}
|
||||
>
|
||||
{state.mode === "replay" ? (
|
||||
<div className="empty">News is live-only in v1.</div>
|
||||
) : items.length === 0 ? (
|
||||
<div className="empty">
|
||||
{state.tickerSet.size > 0 ? "No news stories match the current filter." : "Waiting for live news stories."}
|
||||
</div>
|
||||
) : (
|
||||
<div className="news-list" role="list" aria-label="News stories">
|
||||
{items.map((story) => (
|
||||
<button
|
||||
className="news-row"
|
||||
key={`${story.trace_id}:${story.updated_ts}:${story.seq}`}
|
||||
type="button"
|
||||
onClick={() => {
|
||||
state.setSelectedNewsStory(null);
|
||||
state.setSelectedAlert(null);
|
||||
state.setSelectedClassifierHit(null);
|
||||
state.setSelectedSmartMoneyEvent(null);
|
||||
state.setSelectedDarkEvent(null);
|
||||
state.setSelectedNewsStory(story);
|
||||
}}
|
||||
>
|
||||
<div className="news-row-head">
|
||||
<h3>{story.headline}</h3>
|
||||
<span className="news-row-time">{formatNewsTimestamp(story.published_ts)}</span>
|
||||
</div>
|
||||
<div className="news-row-meta">
|
||||
<span>{story.source}</span>
|
||||
{story.resolved_symbols.map((symbol) => (
|
||||
<span className="drawer-chip" key={`${story.trace_id}-${symbol}`}>
|
||||
{symbol}
|
||||
</span>
|
||||
))}
|
||||
</div>
|
||||
{!limit && story.summary ? <p className="drawer-note">{story.summary}</p> : null}
|
||||
</button>
|
||||
))}
|
||||
</div>
|
||||
)}
|
||||
</Pane>
|
||||
);
|
||||
});
|
||||
|
||||
type ClassifierPaneProps = {
|
||||
state: TerminalState;
|
||||
limit?: number;
|
||||
|
|
@ -8016,6 +8298,7 @@ const DarkPane = memo(({ state, limit, className }: DarkPaneProps) => {
|
|||
data-tape-key={key}
|
||||
style={{ transform: `translateY(${start}px)` }}
|
||||
onClick={() => {
|
||||
state.setSelectedNewsStory(null);
|
||||
state.setSelectedAlert(null);
|
||||
state.setSelectedClassifierHit(null);
|
||||
state.setSelectedSmartMoneyEvent(null);
|
||||
|
|
@ -8624,6 +8907,10 @@ export function TerminalAppShell({ children }: { children: ReactNode }) {
|
|||
/>
|
||||
) : null}
|
||||
|
||||
{state.selectedNewsStory ? (
|
||||
<NewsDrawer story={state.selectedNewsStory} onClose={() => state.setSelectedNewsStory(null)} />
|
||||
) : null}
|
||||
|
||||
{state.selectedClassifierHit ? (
|
||||
<ClassifierHitDrawer
|
||||
hit={state.selectedClassifierHit}
|
||||
|
|
@ -8662,12 +8949,24 @@ export function OverviewRoute() {
|
|||
<div className="page-grid page-grid-home">
|
||||
<ChartPane state={state} />
|
||||
<EquitiesPane state={state} />
|
||||
<NewsPane state={state} limit={6} />
|
||||
<AlertsPane state={state} withStrip />
|
||||
</div>
|
||||
</PageFrame>
|
||||
);
|
||||
}
|
||||
|
||||
export function NewsRoute() {
|
||||
const state = useTerminal();
|
||||
return (
|
||||
<PageFrame title="News">
|
||||
<div className="page-grid page-grid-news">
|
||||
<NewsPane state={state} className="news-pane-full" />
|
||||
</div>
|
||||
</PageFrame>
|
||||
);
|
||||
}
|
||||
|
||||
export function TapeRoute() {
|
||||
const state = useTerminal();
|
||||
return (
|
||||
|
|
|
|||
13
bun.lock
13
bun.lock
|
|
@ -121,6 +121,17 @@
|
|||
"zod": "^3.23.8",
|
||||
},
|
||||
},
|
||||
"services/ingest-news": {
|
||||
"name": "@islandflow/ingest-news",
|
||||
"dependencies": {
|
||||
"@islandflow/bus": "workspace:*",
|
||||
"@islandflow/config": "workspace:*",
|
||||
"@islandflow/observability": "workspace:*",
|
||||
"@islandflow/types": "workspace:*",
|
||||
"ws": "^8.18.3",
|
||||
"zod": "^3.23.8",
|
||||
},
|
||||
},
|
||||
"services/ingest-options": {
|
||||
"name": "@islandflow/ingest-options",
|
||||
"dependencies": {
|
||||
|
|
@ -250,6 +261,8 @@
|
|||
|
||||
"@islandflow/ingest-equities": ["@islandflow/ingest-equities@workspace:services/ingest-equities"],
|
||||
|
||||
"@islandflow/ingest-news": ["@islandflow/ingest-news@workspace:services/ingest-news"],
|
||||
|
||||
"@islandflow/ingest-options": ["@islandflow/ingest-options@workspace:services/ingest-options"],
|
||||
|
||||
"@islandflow/observability": ["@islandflow/observability@workspace:packages/observability"],
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ COPY --from=services candles/package.json ./services/candles/package.json
|
|||
COPY --from=services compute/package.json ./services/compute/package.json
|
||||
COPY --from=services eod-enricher/package.json ./services/eod-enricher/package.json
|
||||
COPY --from=services ingest-equities/package.json ./services/ingest-equities/package.json
|
||||
COPY --from=services ingest-news/package.json ./services/ingest-news/package.json
|
||||
COPY --from=services ingest-options/package.json ./services/ingest-options/package.json
|
||||
COPY --from=services ingest-options/py/requirements.txt ./services/ingest-options/py/requirements.txt
|
||||
COPY --from=services refdata/package.json ./services/refdata/package.json
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ COPY --from=services candles/package.json ./services/candles/package.json
|
|||
COPY --from=services compute/package.json ./services/compute/package.json
|
||||
COPY --from=services eod-enricher/package.json ./services/eod-enricher/package.json
|
||||
COPY --from=services ingest-equities/package.json ./services/ingest-equities/package.json
|
||||
COPY --from=services ingest-news/package.json ./services/ingest-news/package.json
|
||||
COPY --from=services ingest-options/package.json ./services/ingest-options/package.json
|
||||
COPY --from=services refdata/package.json ./services/refdata/package.json
|
||||
COPY --from=services replay/package.json ./services/replay/package.json
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ COPY --from=services candles/package.json ./services/candles/package.json
|
|||
COPY --from=services compute/package.json ./services/compute/package.json
|
||||
COPY --from=services eod-enricher/package.json ./services/eod-enricher/package.json
|
||||
COPY --from=services ingest-equities/package.json ./services/ingest-equities/package.json
|
||||
COPY --from=services ingest-news/package.json ./services/ingest-news/package.json
|
||||
COPY --from=services ingest-options/package.json ./services/ingest-options/package.json
|
||||
COPY --from=services refdata/package.json ./services/refdata/package.json
|
||||
COPY --from=services replay/package.json ./services/replay/package.json
|
||||
|
|
|
|||
|
|
@ -115,6 +115,10 @@ services:
|
|||
<<: *service-common
|
||||
command: ["services/ingest-equities/src/index.ts"]
|
||||
|
||||
ingest-news:
|
||||
<<: *service-common
|
||||
command: ["services/ingest-news/src/index.ts"]
|
||||
|
||||
replay:
|
||||
<<: *service-common
|
||||
profiles: ["replay"]
|
||||
|
|
|
|||
|
|
@ -121,6 +121,17 @@
|
|||
"zod": "^3.23.8",
|
||||
},
|
||||
},
|
||||
"services/ingest-news": {
|
||||
"name": "@islandflow/ingest-news",
|
||||
"dependencies": {
|
||||
"@islandflow/bus": "workspace:*",
|
||||
"@islandflow/config": "workspace:*",
|
||||
"@islandflow/observability": "workspace:*",
|
||||
"@islandflow/types": "workspace:*",
|
||||
"ws": "^8.18.3",
|
||||
"zod": "^3.23.8",
|
||||
},
|
||||
},
|
||||
"services/ingest-options": {
|
||||
"name": "@islandflow/ingest-options",
|
||||
"dependencies": {
|
||||
|
|
@ -250,6 +261,8 @@
|
|||
|
||||
"@islandflow/ingest-equities": ["@islandflow/ingest-equities@workspace:services/ingest-equities"],
|
||||
|
||||
"@islandflow/ingest-news": ["@islandflow/ingest-news@workspace:services/ingest-news"],
|
||||
|
||||
"@islandflow/ingest-options": ["@islandflow/ingest-options@workspace:services/ingest-options"],
|
||||
|
||||
"@islandflow/observability": ["@islandflow/observability@workspace:packages/observability"],
|
||||
|
|
|
|||
152
docs/turns/2026-05-18-news-wire-view.html
Normal file
152
docs/turns/2026-05-18-news-wire-view.html
Normal file
|
|
@ -0,0 +1,152 @@
|
|||
<!doctype html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="utf-8" />
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1" />
|
||||
<title>Turn Report: News Wire View via Alpaca Feed</title>
|
||||
<style>
|
||||
:root {
|
||||
color-scheme: dark;
|
||||
--bg: #0b1016;
|
||||
--panel: #111820;
|
||||
--panel-2: #0d141b;
|
||||
--border: rgba(255, 255, 255, 0.08);
|
||||
--text: #e6edf4;
|
||||
--dim: #90a0b2;
|
||||
--accent: #f5a623;
|
||||
}
|
||||
body {
|
||||
margin: 0;
|
||||
padding: 32px;
|
||||
background: linear-gradient(180deg, #06080b 0%, #0b1016 100%);
|
||||
color: var(--text);
|
||||
font: 15px/1.6 "IBM Plex Sans", sans-serif;
|
||||
}
|
||||
main {
|
||||
max-width: 980px;
|
||||
margin: 0 auto;
|
||||
background: var(--panel);
|
||||
border: 1px solid var(--border);
|
||||
border-radius: 16px;
|
||||
padding: 28px;
|
||||
}
|
||||
h1, h2 {
|
||||
margin: 0 0 12px;
|
||||
font-family: "Quantico", sans-serif;
|
||||
letter-spacing: 0.06em;
|
||||
}
|
||||
h1 { font-size: 1.8rem; }
|
||||
h2 { font-size: 1rem; margin-top: 28px; }
|
||||
p, li { color: var(--text); }
|
||||
.summary {
|
||||
padding: 16px 18px;
|
||||
border: 1px solid rgba(245, 166, 35, 0.28);
|
||||
border-radius: 12px;
|
||||
background: rgba(245, 166, 35, 0.08);
|
||||
}
|
||||
.meta, code, pre { font-family: "IBM Plex Mono", monospace; }
|
||||
.meta { color: var(--dim); font-size: 0.85rem; }
|
||||
section {
|
||||
padding-top: 4px;
|
||||
border-top: 1px solid var(--border);
|
||||
}
|
||||
section:first-of-type { border-top: 0; }
|
||||
ul { padding-left: 18px; }
|
||||
pre {
|
||||
overflow: auto;
|
||||
padding: 14px;
|
||||
border-radius: 12px;
|
||||
background: var(--panel-2);
|
||||
border: 1px solid var(--border);
|
||||
}
|
||||
a { color: var(--accent); }
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<main>
|
||||
<p class="meta">Created 2026-05-18 · Task: News Wire View via Alpaca Feed</p>
|
||||
<h1>News Wire View via Alpaca Feed</h1>
|
||||
<div class="summary">
|
||||
<strong>Summary</strong>
|
||||
<p>
|
||||
Added an Alpaca-backed live news pipeline end to end: normalized <code>NewsStory</code> types,
|
||||
a dedicated JetStream subject/stream, ClickHouse storage helpers with latest-revision semantics,
|
||||
a new <code>services/ingest-news</code> service, API endpoints and live fanout, and a web
|
||||
<code>/news</code> route plus Home preview with a right-side story drawer.
|
||||
</p>
|
||||
</div>
|
||||
|
||||
<section>
|
||||
<h2>Changes Made</h2>
|
||||
<ul>
|
||||
<li>Added <code>NewsStorySchema</code>, the <code>news</code> live channel, and subscription parsing support in <code>packages/types</code>.</li>
|
||||
<li>Added bus constants for the <code>flow.news</code> subject and <code>NEWS</code> stream.</li>
|
||||
<li>Added ClickHouse news storage helpers, including recent, before-cursor, and after-cursor queries that collapse provider revisions to the latest row per <code>provider + story_id</code>.</li>
|
||||
<li>Created <code>services/ingest-news</code> with Alpaca REST backfill, Alpaca websocket streaming, normalization, and deterministic ticker resolution.</li>
|
||||
<li>Extended the API service to persist live news in the shared cache, expose <code>GET /news</code> and <code>GET /history/news</code>, and fan out <code>news</code> events on <code>/ws/live</code>.</li>
|
||||
<li>Added a top-level <code>/news</code> route, primary nav entry, Home preview pane, replay-mode live-only empty states, and a sanitized full-story drawer.</li>
|
||||
<li>Updated dev and deployment wiring so the new service is included in local runners and the Docker workspace snapshot.</li>
|
||||
</ul>
|
||||
</section>
|
||||
|
||||
<section>
|
||||
<h2>Context</h2>
|
||||
<p>
|
||||
The plan called for a free-provider v1 news surface that behaves like the rest of Islandflow:
|
||||
compact, evidence-first, and live-native. The implementation keeps replay intentionally out of scope
|
||||
for news while still integrating news into the same live manifest, history pagination, rail navigation,
|
||||
and drawer language used elsewhere in the terminal.
|
||||
</p>
|
||||
</section>
|
||||
|
||||
<section>
|
||||
<h2>Important Implementation Details</h2>
|
||||
<ul>
|
||||
<li>Ticker resolution prefers provider symbols first, then falls back only to structured patterns in provider HTML: ticker anchors, <code>EXCHANGE:SYM</code>, and <code>$SYM</code>.</li>
|
||||
<li>News history uses <code>published_ts</code> as the visible cursor while revisions are collapsed with a window function over <code>provider, story_id</code> ordered by <code>updated_ts</code>, <code>ingest_ts</code>, and <code>seq</code>.</li>
|
||||
<li>The web drawer sanitizes provider HTML by removing scripts, inline event handlers, and unsupported tags; if sanitization yields nothing useful, the drawer falls back to stripped plain text.</li>
|
||||
<li>Replay mode intentionally renders a clear empty state for news on both Home and <code>/news</code> instead of pretending news is replay-synced.</li>
|
||||
</ul>
|
||||
<pre><code>resolved_symbols = provider_symbols
|
||||
or ticker anchors in content_html
|
||||
or EXCHANGE:SYM matches
|
||||
or $SYM matches</code></pre>
|
||||
</section>
|
||||
|
||||
<section>
|
||||
<h2>Expected Impact for End-Users</h2>
|
||||
<p>
|
||||
Traders can now monitor a dedicated live news wire inside Islandflow, spot symbol-linked headlines from
|
||||
the Home view, and open full stories in-context without leaving the app. The displayed ticker chips are
|
||||
grounded in stored provider and derived symbol metadata, which makes the feed safer to filter and trust.
|
||||
</p>
|
||||
</section>
|
||||
|
||||
<section>
|
||||
<h2>Validation</h2>
|
||||
<ul>
|
||||
<li>Ran targeted Bun tests covering types, storage, API live-state behavior, ingest-news symbol resolution, route wiring, and terminal helpers.</li>
|
||||
<li>Built the Next.js web app with <code>bun --cwd=apps/web run build</code>.</li>
|
||||
<li>Ran <code>bun run check:docker-workspace</code> after syncing the deployment workspace snapshot.</li>
|
||||
</ul>
|
||||
</section>
|
||||
|
||||
<section>
|
||||
<h2>Issues, Limitations, and Mitigations</h2>
|
||||
<ul>
|
||||
<li>Replay support remains intentionally absent in v1; the UI now states that explicitly instead of showing misleading empty historical behavior.</li>
|
||||
<li>The sanitizer is intentionally conservative and custom, which keeps dependencies light but may strip some harmless provider formatting.</li>
|
||||
<li>The ingest service assumes Alpaca’s current REST and websocket news contracts; if Alpaca changes those payload shapes, the normalization layer will need adjustment.</li>
|
||||
</ul>
|
||||
</section>
|
||||
|
||||
<section>
|
||||
<h2>Follow-up Work</h2>
|
||||
<ul>
|
||||
<li>No additional follow-up issue was required during this turn.</li>
|
||||
<li>Future extensions are still available behind the same contract: multi-provider aggregation, server-side symbol filtering, and replay-aware news history.</li>
|
||||
</ul>
|
||||
</section>
|
||||
</main>
|
||||
</body>
|
||||
</html>
|
||||
|
|
@ -7,6 +7,7 @@ import {
|
|||
STREAM_EQUITY_QUOTES,
|
||||
STREAM_FLOW_PACKETS,
|
||||
STREAM_INFERRED_DARK,
|
||||
STREAM_NEWS,
|
||||
STREAM_OPTION_NBBO,
|
||||
STREAM_OPTION_PRINTS,
|
||||
STREAM_OPTION_SIGNAL_PRINTS,
|
||||
|
|
@ -19,6 +20,7 @@ import {
|
|||
SUBJECT_EQUITY_QUOTES,
|
||||
SUBJECT_FLOW_PACKETS,
|
||||
SUBJECT_INFERRED_DARK,
|
||||
SUBJECT_NEWS,
|
||||
SUBJECT_OPTION_NBBO,
|
||||
SUBJECT_OPTION_PRINTS,
|
||||
SUBJECT_OPTION_SIGNAL_PRINTS,
|
||||
|
|
@ -53,7 +55,8 @@ export const STREAM_CATALOG: readonly KnownStreamDefinition[] = [
|
|||
retentionClass: "derived"
|
||||
},
|
||||
{ name: STREAM_CLASSIFIER_HITS, subject: SUBJECT_CLASSIFIER_HITS, retentionClass: "derived" },
|
||||
{ name: STREAM_ALERTS, subject: SUBJECT_ALERTS, retentionClass: "derived" }
|
||||
{ name: STREAM_ALERTS, subject: SUBJECT_ALERTS, retentionClass: "derived" },
|
||||
{ name: STREAM_NEWS, subject: SUBJECT_NEWS, retentionClass: "derived" }
|
||||
];
|
||||
|
||||
const STREAM_CATALOG_BY_NAME = new Map(STREAM_CATALOG.map((definition) => [definition.name, definition]));
|
||||
|
|
|
|||
|
|
@ -22,3 +22,5 @@ export const STREAM_CLASSIFIER_HITS = "CLASSIFIER_HITS";
|
|||
export const SUBJECT_CLASSIFIER_HITS = "flow.classifier_hits";
|
||||
export const STREAM_ALERTS = "ALERTS";
|
||||
export const SUBJECT_ALERTS = "flow.alerts";
|
||||
export const STREAM_NEWS = "NEWS";
|
||||
export const SUBJECT_NEWS = "flow.news";
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import {
|
|||
EquityPrintJoinSchema,
|
||||
InferredDarkEventSchema,
|
||||
FlowPacketSchema,
|
||||
NewsStorySchema,
|
||||
OptionNBBOSchema,
|
||||
OptionPrintSchema,
|
||||
SmartMoneyEventSchema
|
||||
|
|
@ -20,6 +21,7 @@ import type {
|
|||
EquityPrintJoin,
|
||||
InferredDarkEvent,
|
||||
FlowPacket,
|
||||
NewsStory,
|
||||
SmartMoneyEvent,
|
||||
OptionNBBO,
|
||||
OptionPrint,
|
||||
|
|
@ -91,6 +93,13 @@ import {
|
|||
toSmartMoneyEventRecord,
|
||||
type SmartMoneyEventRecord
|
||||
} from "./smart-money-events";
|
||||
import {
|
||||
NEWS_TABLE,
|
||||
newsTableDDL,
|
||||
fromNewsRecord,
|
||||
toNewsRecord,
|
||||
type NewsRecord
|
||||
} from "./news";
|
||||
|
||||
export type ClickHouseOptions = {
|
||||
url: string;
|
||||
|
|
@ -320,6 +329,12 @@ export const ensureAlertsTable = async (client: ClickHouseClient): Promise<void>
|
|||
}
|
||||
};
|
||||
|
||||
export const ensureNewsTable = async (client: ClickHouseClient): Promise<void> => {
|
||||
await client.exec({
|
||||
query: newsTableDDL()
|
||||
});
|
||||
};
|
||||
|
||||
export const insertOptionPrint = async (
|
||||
client: ClickHouseClient,
|
||||
print: OptionPrint
|
||||
|
|
@ -449,6 +464,15 @@ export const insertAlert = async (client: ClickHouseClient, alert: AlertEvent):
|
|||
});
|
||||
};
|
||||
|
||||
export const insertNewsStory = async (client: ClickHouseClient, story: NewsStory): Promise<void> => {
|
||||
const record = toNewsRecord(story);
|
||||
await client.insert({
|
||||
table: NEWS_TABLE,
|
||||
values: [record],
|
||||
format: "JSONEachRow"
|
||||
});
|
||||
};
|
||||
|
||||
export type ClickHouseBatchWriterOptions = {
|
||||
flushIntervalMs?: number;
|
||||
maxRows?: number;
|
||||
|
|
@ -600,6 +624,13 @@ export const enqueueAlertInsert = (
|
|||
writer.enqueue(ALERTS_TABLE, toAlertRecord(alert));
|
||||
};
|
||||
|
||||
export const enqueueNewsStoryInsert = (
|
||||
writer: ClickHouseBatchWriter,
|
||||
story: NewsStory
|
||||
): void => {
|
||||
writer.enqueue(NEWS_TABLE, toNewsRecord(story));
|
||||
};
|
||||
|
||||
const clampLimit = (limit: number): number => {
|
||||
if (!Number.isFinite(limit)) {
|
||||
return 100;
|
||||
|
|
@ -1016,6 +1047,32 @@ const normalizeAlertRow = (row: unknown): AlertRecord | null => {
|
|||
};
|
||||
};
|
||||
|
||||
const normalizeNewsRow = (row: unknown): NewsRecord | null => {
|
||||
if (!row || typeof row !== "object") {
|
||||
return null;
|
||||
}
|
||||
|
||||
const record = row as Record<string, unknown>;
|
||||
return {
|
||||
source_ts: coerceNumber(record.source_ts) as number,
|
||||
ingest_ts: coerceNumber(record.ingest_ts) as number,
|
||||
seq: coerceNumber(record.seq) as number,
|
||||
trace_id: String(record.trace_id ?? ""),
|
||||
story_id: coerceNumber(record.story_id) as number,
|
||||
provider: String(record.provider ?? ""),
|
||||
source: String(record.source ?? ""),
|
||||
headline: String(record.headline ?? ""),
|
||||
summary: String(record.summary ?? ""),
|
||||
content_html: String(record.content_html ?? ""),
|
||||
url: String(record.url ?? ""),
|
||||
published_ts: coerceNumber(record.published_ts) as number,
|
||||
updated_ts: coerceNumber(record.updated_ts) as number,
|
||||
provider_symbols_json: String(record.provider_symbols_json ?? "[]"),
|
||||
resolved_symbols_json: String(record.resolved_symbols_json ?? "[]"),
|
||||
symbol_resolution: String(record.symbol_resolution ?? "none") as NewsRecord["symbol_resolution"]
|
||||
};
|
||||
};
|
||||
|
||||
export const fetchRecentOptionPrints = async (
|
||||
client: ClickHouseClient,
|
||||
limit: number,
|
||||
|
|
@ -1207,6 +1264,50 @@ export const fetchRecentAlerts = async (
|
|||
return AlertEventSchema.array().parse(alerts);
|
||||
};
|
||||
|
||||
const latestNewsSelect = `
|
||||
SELECT
|
||||
source_ts,
|
||||
ingest_ts,
|
||||
seq,
|
||||
trace_id,
|
||||
story_id,
|
||||
provider,
|
||||
source,
|
||||
headline,
|
||||
summary,
|
||||
content_html,
|
||||
url,
|
||||
published_ts,
|
||||
updated_ts,
|
||||
provider_symbols_json,
|
||||
resolved_symbols_json,
|
||||
symbol_resolution
|
||||
FROM (
|
||||
SELECT
|
||||
*,
|
||||
row_number() OVER (PARTITION BY provider, story_id ORDER BY updated_ts DESC, ingest_ts DESC, seq DESC) AS revision_rank
|
||||
FROM ${NEWS_TABLE}
|
||||
)
|
||||
WHERE revision_rank = 1
|
||||
`;
|
||||
|
||||
export const fetchRecentNews = async (
|
||||
client: ClickHouseClient,
|
||||
limit: number
|
||||
): Promise<NewsStory[]> => {
|
||||
const safeLimit = clampLimit(limit);
|
||||
const result = await client.query({
|
||||
query: `${latestNewsSelect} ORDER BY published_ts DESC, story_id DESC LIMIT ${safeLimit}`,
|
||||
format: "JSONEachRow"
|
||||
});
|
||||
|
||||
const rows = await result.json<unknown[]>();
|
||||
const records = rows
|
||||
.map(normalizeNewsRow)
|
||||
.filter((record): record is NewsRecord => record !== null);
|
||||
return NewsStorySchema.array().parse(records.map(fromNewsRecord));
|
||||
};
|
||||
|
||||
const normalizeAlertEvidenceRefs = (refs: string[]): string[] => {
|
||||
return Array.from(new Set(refs.map((ref) => ref.trim()).filter(Boolean)));
|
||||
};
|
||||
|
|
@ -1600,6 +1701,27 @@ export const fetchAlertsAfter = async (
|
|||
return AlertEventSchema.array().parse(alerts);
|
||||
};
|
||||
|
||||
export const fetchNewsAfter = async (
|
||||
client: ClickHouseClient,
|
||||
afterTs: number,
|
||||
afterSeq: number,
|
||||
limit: number
|
||||
): Promise<NewsStory[]> => {
|
||||
const safeLimit = clampLimit(limit);
|
||||
const safeAfterTs = clampCursor(afterTs);
|
||||
const safeAfterSeq = clampCursor(afterSeq);
|
||||
const result = await client.query({
|
||||
query: `${latestNewsSelect} AND (published_ts, seq) > (${safeAfterTs}, ${safeAfterSeq}) ORDER BY published_ts ASC, seq ASC LIMIT ${safeLimit}`,
|
||||
format: "JSONEachRow"
|
||||
});
|
||||
|
||||
const rows = await result.json<unknown[]>();
|
||||
const records = rows
|
||||
.map(normalizeNewsRow)
|
||||
.filter((record): record is NewsRecord => record !== null);
|
||||
return NewsStorySchema.array().parse(records.map(fromNewsRecord));
|
||||
};
|
||||
|
||||
export const fetchOptionPrintsBefore = async (
|
||||
client: ClickHouseClient,
|
||||
beforeTs: number,
|
||||
|
|
@ -1778,6 +1900,25 @@ export const fetchAlertsBefore = async (
|
|||
return AlertEventSchema.array().parse(records.map(fromAlertRecord));
|
||||
};
|
||||
|
||||
export const fetchNewsBefore = async (
|
||||
client: ClickHouseClient,
|
||||
beforeTs: number,
|
||||
beforeSeq: number,
|
||||
limit: number
|
||||
): Promise<NewsStory[]> => {
|
||||
const safeLimit = clampLimit(limit);
|
||||
const result = await client.query({
|
||||
query: `${latestNewsSelect} AND ${buildBeforeTupleCondition("published_ts", "seq", beforeTs, beforeSeq)} ORDER BY published_ts DESC, seq DESC LIMIT ${safeLimit}`,
|
||||
format: "JSONEachRow"
|
||||
});
|
||||
|
||||
const rows = await result.json<unknown[]>();
|
||||
const records = rows
|
||||
.map(normalizeNewsRow)
|
||||
.filter((record): record is NewsRecord => record !== null);
|
||||
return NewsStorySchema.array().parse(records.map(fromNewsRecord));
|
||||
};
|
||||
|
||||
export const fetchInferredDarkBefore = async (
|
||||
client: ClickHouseClient,
|
||||
beforeTs: number,
|
||||
|
|
|
|||
|
|
@ -10,3 +10,4 @@ export * from "./equity-print-joins";
|
|||
export * from "./inferred-dark";
|
||||
export * from "./option-prints";
|
||||
export * from "./option-nbbo";
|
||||
export * from "./news";
|
||||
|
|
|
|||
102
packages/storage/src/news.ts
Normal file
102
packages/storage/src/news.ts
Normal file
|
|
@ -0,0 +1,102 @@
|
|||
import type { NewsStory, NewsSymbolResolution } from "@islandflow/types";
|
||||
|
||||
export const NEWS_TABLE = "news";
|
||||
|
||||
export type NewsRecord = {
|
||||
source_ts: number;
|
||||
ingest_ts: number;
|
||||
seq: number;
|
||||
trace_id: string;
|
||||
story_id: number;
|
||||
provider: string;
|
||||
source: string;
|
||||
headline: string;
|
||||
summary: string;
|
||||
content_html: string;
|
||||
url: string;
|
||||
published_ts: number;
|
||||
updated_ts: number;
|
||||
provider_symbols_json: string;
|
||||
resolved_symbols_json: string;
|
||||
symbol_resolution: NewsSymbolResolution;
|
||||
};
|
||||
|
||||
export const newsTableDDL = (): string => {
|
||||
return `
|
||||
CREATE TABLE IF NOT EXISTS ${NEWS_TABLE} (
|
||||
source_ts UInt64,
|
||||
ingest_ts UInt64,
|
||||
seq UInt64,
|
||||
trace_id String,
|
||||
story_id UInt64,
|
||||
provider String,
|
||||
source String,
|
||||
headline String,
|
||||
summary String,
|
||||
content_html String,
|
||||
url String,
|
||||
published_ts UInt64,
|
||||
updated_ts UInt64,
|
||||
provider_symbols_json String,
|
||||
resolved_symbols_json String,
|
||||
symbol_resolution String
|
||||
)
|
||||
ENGINE = ReplacingMergeTree(updated_ts)
|
||||
ORDER BY (provider, story_id, updated_ts, seq)
|
||||
`;
|
||||
};
|
||||
|
||||
const safeStringArray = (value: string): string[] => {
|
||||
try {
|
||||
const parsed = JSON.parse(value);
|
||||
if (Array.isArray(parsed)) {
|
||||
return parsed.map((entry) => String(entry));
|
||||
}
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
|
||||
return [];
|
||||
};
|
||||
|
||||
export const toNewsRecord = (story: NewsStory): NewsRecord => {
|
||||
return {
|
||||
source_ts: story.source_ts,
|
||||
ingest_ts: story.ingest_ts,
|
||||
seq: story.seq,
|
||||
trace_id: story.trace_id,
|
||||
story_id: story.story_id,
|
||||
provider: story.provider,
|
||||
source: story.source,
|
||||
headline: story.headline,
|
||||
summary: story.summary,
|
||||
content_html: story.content_html,
|
||||
url: story.url,
|
||||
published_ts: story.published_ts,
|
||||
updated_ts: story.updated_ts,
|
||||
provider_symbols_json: JSON.stringify(story.provider_symbols),
|
||||
resolved_symbols_json: JSON.stringify(story.resolved_symbols),
|
||||
symbol_resolution: story.symbol_resolution
|
||||
};
|
||||
};
|
||||
|
||||
export const fromNewsRecord = (record: NewsRecord): NewsStory => {
|
||||
return {
|
||||
source_ts: record.source_ts,
|
||||
ingest_ts: record.ingest_ts,
|
||||
seq: record.seq,
|
||||
trace_id: record.trace_id,
|
||||
story_id: record.story_id,
|
||||
provider: record.provider,
|
||||
source: record.source,
|
||||
headline: record.headline,
|
||||
summary: record.summary,
|
||||
content_html: record.content_html,
|
||||
url: record.url,
|
||||
published_ts: record.published_ts,
|
||||
updated_ts: record.updated_ts,
|
||||
provider_symbols: safeStringArray(record.provider_symbols_json),
|
||||
resolved_symbols: safeStringArray(record.resolved_symbols_json),
|
||||
symbol_resolution: record.symbol_resolution
|
||||
};
|
||||
};
|
||||
78
packages/storage/tests/news.test.ts
Normal file
78
packages/storage/tests/news.test.ts
Normal file
|
|
@ -0,0 +1,78 @@
|
|||
import { describe, expect, it } from "bun:test";
|
||||
import type { ClickHouseClient } from "../src/clickhouse";
|
||||
import {
|
||||
NEWS_TABLE,
|
||||
fromNewsRecord,
|
||||
newsTableDDL,
|
||||
toNewsRecord
|
||||
} from "../src/news";
|
||||
import {
|
||||
fetchNewsAfter,
|
||||
fetchNewsBefore,
|
||||
fetchRecentNews
|
||||
} from "../src/clickhouse";
|
||||
|
||||
const makeClient = (resolver: (query: string) => unknown[]): ClickHouseClient =>
|
||||
({
|
||||
exec: async () => {},
|
||||
insert: async () => {},
|
||||
ping: async () => ({ success: true }),
|
||||
close: async () => {},
|
||||
query: async ({ query }: { query: string }) => ({
|
||||
async json<T>() {
|
||||
return resolver(query) as T;
|
||||
}
|
||||
})
|
||||
}) as ClickHouseClient;
|
||||
|
||||
const story = {
|
||||
source_ts: 100,
|
||||
ingest_ts: 101,
|
||||
seq: 3,
|
||||
trace_id: "alpaca:77",
|
||||
story_id: 77,
|
||||
provider: "alpaca",
|
||||
source: "Benzinga",
|
||||
headline: "TSLA rises",
|
||||
summary: "Summary",
|
||||
content_html: "<p>TSLA rises</p>",
|
||||
url: "https://example.com/story",
|
||||
published_ts: 100,
|
||||
updated_ts: 120,
|
||||
provider_symbols: ["TSLA"],
|
||||
resolved_symbols: ["TSLA", "AAPL"],
|
||||
symbol_resolution: "mixed" as const
|
||||
};
|
||||
|
||||
describe("news storage helpers", () => {
|
||||
it("includes the correct table name in the DDL", () => {
|
||||
const ddl = newsTableDDL();
|
||||
expect(ddl).toContain(NEWS_TABLE);
|
||||
expect(ddl).toContain("ReplacingMergeTree");
|
||||
});
|
||||
|
||||
it("round-trips news records", () => {
|
||||
const record = toNewsRecord(story);
|
||||
const restored = fromNewsRecord(record);
|
||||
expect(restored).toEqual(story);
|
||||
});
|
||||
|
||||
it("uses latest-revision selection for recent and cursor queries", async () => {
|
||||
const queries: string[] = [];
|
||||
const client = makeClient((query) => {
|
||||
queries.push(query);
|
||||
return [toNewsRecord(story)];
|
||||
});
|
||||
|
||||
const recent = await fetchRecentNews(client, 10);
|
||||
const before = await fetchNewsBefore(client, 200, 10, 10);
|
||||
const after = await fetchNewsAfter(client, 50, 1, 10);
|
||||
|
||||
expect(recent[0]?.trace_id).toBe("alpaca:77");
|
||||
expect(before[0]?.story_id).toBe(77);
|
||||
expect(after[0]?.updated_ts).toBe(120);
|
||||
expect(queries[0]).toContain("row_number() OVER");
|
||||
expect(queries[1]).toContain("published_ts");
|
||||
expect(queries[2]).toContain("(published_ts, seq) > (50, 1)");
|
||||
});
|
||||
});
|
||||
|
|
@ -262,3 +262,26 @@ export const InferredDarkEventSchema = EventMetaSchema.merge(
|
|||
);
|
||||
|
||||
export type InferredDarkEvent = z.infer<typeof InferredDarkEventSchema>;
|
||||
|
||||
export const NewsSymbolResolutionSchema = z.enum(["provider", "derived", "mixed", "none"]);
|
||||
|
||||
export type NewsSymbolResolution = z.infer<typeof NewsSymbolResolutionSchema>;
|
||||
|
||||
export const NewsStorySchema = EventMetaSchema.merge(
|
||||
z.object({
|
||||
story_id: z.number().int().nonnegative(),
|
||||
provider: z.string().min(1),
|
||||
source: z.string().min(1),
|
||||
headline: z.string().min(1),
|
||||
summary: z.string(),
|
||||
content_html: z.string(),
|
||||
url: z.string().url().or(z.literal("")),
|
||||
published_ts: z.number().int().nonnegative(),
|
||||
updated_ts: z.number().int().nonnegative(),
|
||||
provider_symbols: z.array(z.string().min(1)),
|
||||
resolved_symbols: z.array(z.string().min(1)),
|
||||
symbol_resolution: NewsSymbolResolutionSchema
|
||||
})
|
||||
);
|
||||
|
||||
export type NewsStory = z.infer<typeof NewsStorySchema>;
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import {
|
|||
EquityQuoteSchema,
|
||||
FlowPacketSchema,
|
||||
InferredDarkEventSchema,
|
||||
NewsStorySchema,
|
||||
OptionNBBOSchema,
|
||||
OptionPrintSchema,
|
||||
SmartMoneyEventSchema
|
||||
|
|
@ -34,7 +35,8 @@ export const LiveGenericChannelSchema = z.enum([
|
|||
"smart-money",
|
||||
"classifier-hits",
|
||||
"alerts",
|
||||
"inferred-dark"
|
||||
"inferred-dark",
|
||||
"news"
|
||||
]);
|
||||
|
||||
export const LiveChannelSchema = z.enum([
|
||||
|
|
@ -48,6 +50,7 @@ export const LiveChannelSchema = z.enum([
|
|||
"classifier-hits",
|
||||
"alerts",
|
||||
"inferred-dark",
|
||||
"news",
|
||||
"equity-candles",
|
||||
"equity-overlay"
|
||||
]);
|
||||
|
|
@ -91,7 +94,7 @@ export const LiveSubscriptionSchema = z.discriminatedUnion("channel", [
|
|||
snapshot_limit: z.number().int().positive().optional()
|
||||
}),
|
||||
z.object({
|
||||
channel: z.enum(["nbbo", "equity-quotes", "equity-joins", "classifier-hits", "alerts", "inferred-dark"]),
|
||||
channel: z.enum(["nbbo", "equity-quotes", "equity-joins", "classifier-hits", "alerts", "inferred-dark", "news"]),
|
||||
snapshot_limit: z.number().int().positive().optional()
|
||||
}),
|
||||
z.object({
|
||||
|
|
@ -123,6 +126,7 @@ const livePayloadSchemas = {
|
|||
"classifier-hits": ClassifierHitEventSchema,
|
||||
alerts: AlertEventSchema,
|
||||
"inferred-dark": InferredDarkEventSchema,
|
||||
news: NewsStorySchema,
|
||||
"equity-candles": EquityCandleSchema,
|
||||
"equity-overlay": EquityPrintSchema
|
||||
} as const;
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import {
|
|||
describe("live protocol types", () => {
|
||||
it("builds stable keys for generic and parameterized subscriptions", () => {
|
||||
expect(getSubscriptionKey({ channel: "flow" })).toBe("flow|{}");
|
||||
expect(getSubscriptionKey({ channel: "news" })).toBe("news");
|
||||
expect(
|
||||
getSubscriptionKey({
|
||||
channel: "options",
|
||||
|
|
@ -53,12 +54,13 @@ describe("live protocol types", () => {
|
|||
op: "subscribe",
|
||||
subscriptions: [
|
||||
{ channel: "flow", filters: { nbboSides: ["AA", "A"], minNotional: 50000 } },
|
||||
{ channel: "news", snapshot_limit: 100 },
|
||||
{ channel: "equity-candles", underlying_id: "SPY", interval_ms: 60000 }
|
||||
]
|
||||
});
|
||||
|
||||
expect(parsed.op).toBe("subscribe");
|
||||
expect(parsed.subscriptions).toHaveLength(2);
|
||||
expect(parsed.subscriptions).toHaveLength(3);
|
||||
});
|
||||
|
||||
it("validates snapshot and event server messages", () => {
|
||||
|
|
@ -74,18 +76,24 @@ describe("live protocol types", () => {
|
|||
});
|
||||
const event = LiveServerMessageSchema.parse({
|
||||
op: "event",
|
||||
subscription: { channel: "equity-overlay", underlying_id: "SPY" },
|
||||
subscription: { channel: "news" },
|
||||
item: {
|
||||
source_ts: 100,
|
||||
ingest_ts: 101,
|
||||
seq: 1,
|
||||
trace_id: "eq-1",
|
||||
ts: 100,
|
||||
underlying_id: "SPY",
|
||||
price: 500,
|
||||
size: 10,
|
||||
exchange: "X",
|
||||
offExchangeFlag: true
|
||||
trace_id: "alpaca:1",
|
||||
story_id: 1,
|
||||
provider: "alpaca",
|
||||
source: "Benzinga",
|
||||
headline: "TSLA rises",
|
||||
summary: "",
|
||||
content_html: "<p>TSLA rises</p>",
|
||||
url: "https://example.com/story",
|
||||
published_ts: 100,
|
||||
updated_ts: 100,
|
||||
provider_symbols: ["TSLA"],
|
||||
resolved_symbols: ["TSLA"],
|
||||
symbol_resolution: "provider"
|
||||
},
|
||||
watermark: cursor
|
||||
});
|
||||
|
|
|
|||
|
|
@ -48,7 +48,8 @@ const NATIVE_UNITS = {
|
|||
ingestOptions:
|
||||
process.env.DEPLOY_NATIVE_INGEST_OPTIONS_UNIT?.trim() || "islandflow-ingest-options",
|
||||
ingestEquities:
|
||||
process.env.DEPLOY_NATIVE_INGEST_EQUITIES_UNIT?.trim() || "islandflow-ingest-equities"
|
||||
process.env.DEPLOY_NATIVE_INGEST_EQUITIES_UNIT?.trim() || "islandflow-ingest-equities",
|
||||
ingestNews: process.env.DEPLOY_NATIVE_INGEST_NEWS_UNIT?.trim() || "islandflow-ingest-news"
|
||||
} as const;
|
||||
const DOCKER_CORE_SERVICES = [
|
||||
"api",
|
||||
|
|
@ -56,14 +57,16 @@ const DOCKER_CORE_SERVICES = [
|
|||
"compute",
|
||||
"candles",
|
||||
"ingest-options",
|
||||
"ingest-equities"
|
||||
"ingest-equities",
|
||||
"ingest-news"
|
||||
] as const;
|
||||
const DOCKER_BACKEND_SERVICES = [
|
||||
"api",
|
||||
"compute",
|
||||
"candles",
|
||||
"ingest-options",
|
||||
"ingest-equities"
|
||||
"ingest-equities",
|
||||
"ingest-news"
|
||||
] as const;
|
||||
|
||||
const scriptPath = fileURLToPath(import.meta.url);
|
||||
|
|
@ -106,7 +109,8 @@ Environment:
|
|||
DEPLOY_NATIVE_COMPUTE_UNIT Override native compute systemd unit name.
|
||||
DEPLOY_NATIVE_CANDLES_UNIT Override native candles systemd unit name.
|
||||
DEPLOY_NATIVE_INGEST_OPTIONS_UNIT Override native ingest-options systemd unit name.
|
||||
DEPLOY_NATIVE_INGEST_EQUITIES_UNIT Override native ingest-equities systemd unit name.`);
|
||||
DEPLOY_NATIVE_INGEST_EQUITIES_UNIT Override native ingest-equities systemd unit name.
|
||||
DEPLOY_NATIVE_INGEST_NEWS_UNIT Override native ingest-news systemd unit name.`);
|
||||
process.exit(exitCode);
|
||||
}
|
||||
|
||||
|
|
@ -465,7 +469,8 @@ function nativeUnitsForScope(scope: DeployScope): string[] {
|
|||
NATIVE_UNITS.compute,
|
||||
NATIVE_UNITS.candles,
|
||||
NATIVE_UNITS.ingestOptions,
|
||||
NATIVE_UNITS.ingestEquities
|
||||
NATIVE_UNITS.ingestEquities,
|
||||
NATIVE_UNITS.ingestNews
|
||||
];
|
||||
default:
|
||||
return [
|
||||
|
|
@ -474,7 +479,8 @@ function nativeUnitsForScope(scope: DeployScope): string[] {
|
|||
NATIVE_UNITS.compute,
|
||||
NATIVE_UNITS.candles,
|
||||
NATIVE_UNITS.ingestOptions,
|
||||
NATIVE_UNITS.ingestEquities
|
||||
NATIVE_UNITS.ingestEquities,
|
||||
NATIVE_UNITS.ingestNews
|
||||
];
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -222,6 +222,7 @@ process.on("SIGHUP", () => handleSignal("SIGHUP"));
|
|||
const tasks: ChildSpec[] = [
|
||||
{ name: "ingest-options", cmd: ["bun", "run", "dev"], cwd: "services/ingest-options" },
|
||||
{ name: "ingest-equities", cmd: ["bun", "run", "dev"], cwd: "services/ingest-equities" },
|
||||
{ name: "ingest-news", cmd: ["bun", "run", "dev"], cwd: "services/ingest-news" },
|
||||
{ name: "compute", cmd: ["bun", "run", "dev"], cwd: "services/compute" },
|
||||
{ name: "candles", cmd: ["bun", "run", "dev"], cwd: "services/candles" },
|
||||
{ name: "refdata", cmd: ["bun", "run", "dev"], cwd: "services/refdata" },
|
||||
|
|
|
|||
|
|
@ -325,6 +325,7 @@ const serviceTasks: ChildSpec[] = [
|
|||
{ name: "web", cmd: ["bun", "run", "dev"], cwd: "apps/web" },
|
||||
{ name: "ingest-options", cmd: ["bun", "run", "dev"], cwd: "services/ingest-options" },
|
||||
{ name: "ingest-equities", cmd: ["bun", "run", "dev"], cwd: "services/ingest-equities" },
|
||||
{ name: "ingest-news", cmd: ["bun", "run", "dev"], cwd: "services/ingest-news" },
|
||||
{ name: "compute", cmd: ["bun", "run", "dev"], cwd: "services/compute" },
|
||||
{ name: "candles", cmd: ["bun", "run", "dev"], cwd: "services/candles" },
|
||||
{ name: "refdata", cmd: ["bun", "run", "dev"], cwd: "services/refdata" },
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import {
|
|||
SUBJECT_EQUITY_QUOTES,
|
||||
SUBJECT_INFERRED_DARK,
|
||||
SUBJECT_FLOW_PACKETS,
|
||||
SUBJECT_NEWS,
|
||||
SUBJECT_SMART_MONEY_EVENTS,
|
||||
SUBJECT_OPTION_NBBO,
|
||||
SUBJECT_OPTION_SIGNAL_PRINTS,
|
||||
|
|
@ -20,6 +21,7 @@ import {
|
|||
STREAM_EQUITY_QUOTES,
|
||||
STREAM_INFERRED_DARK,
|
||||
STREAM_FLOW_PACKETS,
|
||||
STREAM_NEWS,
|
||||
STREAM_SMART_MONEY_EVENTS,
|
||||
STREAM_OPTION_NBBO,
|
||||
STREAM_OPTION_SIGNAL_PRINTS,
|
||||
|
|
@ -35,6 +37,7 @@ import {
|
|||
import {
|
||||
createClickHouseClient,
|
||||
ensureAlertsTable,
|
||||
ensureNewsTable,
|
||||
ensureClassifierHitsTable,
|
||||
ensureEquityCandlesTable,
|
||||
ensureEquityPrintJoinsTable,
|
||||
|
|
@ -48,6 +51,8 @@ import {
|
|||
fetchAlertsAfter,
|
||||
fetchAlertsBefore,
|
||||
fetchAlertContextByTraceId,
|
||||
fetchNewsAfter,
|
||||
fetchNewsBefore,
|
||||
fetchClassifierHitsAfter,
|
||||
fetchClassifierHitsBefore,
|
||||
fetchSmartMoneyEventsAfter,
|
||||
|
|
@ -58,6 +63,7 @@ import {
|
|||
fetchFlowPacketsByMemberTraceIds,
|
||||
fetchFlowPacketsBefore,
|
||||
fetchRecentAlerts,
|
||||
fetchRecentNews,
|
||||
fetchRecentClassifierHits,
|
||||
fetchRecentSmartMoneyEvents,
|
||||
fetchRecentEquityPrintJoins,
|
||||
|
|
@ -99,6 +105,7 @@ import {
|
|||
EquityQuoteSchema,
|
||||
FeedSnapshot,
|
||||
InferredDarkEventSchema,
|
||||
NewsStorySchema,
|
||||
LiveClientMessageSchema,
|
||||
LiveServerMessage,
|
||||
LiveSubscription,
|
||||
|
|
@ -676,7 +683,8 @@ const run = async () => {
|
|||
STREAM_FLOW_PACKETS,
|
||||
STREAM_SMART_MONEY_EVENTS,
|
||||
STREAM_CLASSIFIER_HITS,
|
||||
STREAM_ALERTS
|
||||
STREAM_ALERTS,
|
||||
STREAM_NEWS
|
||||
],
|
||||
{ logger }
|
||||
);
|
||||
|
|
@ -719,6 +727,7 @@ const run = async () => {
|
|||
await ensureSmartMoneyEventsTable(clickhouse);
|
||||
await ensureClassifierHitsTable(clickhouse);
|
||||
await ensureAlertsTable(clickhouse);
|
||||
await ensureNewsTable(clickhouse);
|
||||
});
|
||||
|
||||
let redis: ReturnType<typeof createClient> | null = null;
|
||||
|
|
@ -843,6 +852,11 @@ const run = async () => {
|
|||
subject: SUBJECT_ALERTS,
|
||||
stream: STREAM_ALERTS,
|
||||
durableName: "api-alerts"
|
||||
},
|
||||
{
|
||||
subject: SUBJECT_NEWS,
|
||||
stream: STREAM_NEWS,
|
||||
durableName: "api-news"
|
||||
}
|
||||
] as const;
|
||||
|
||||
|
|
@ -991,10 +1005,16 @@ const run = async () => {
|
|||
consumerBindings[10].durableName
|
||||
);
|
||||
|
||||
const newsSubscription = await subscribeWithReset(
|
||||
consumerBindings[11].subject,
|
||||
consumerBindings[11].stream,
|
||||
consumerBindings[11].durableName
|
||||
);
|
||||
|
||||
const fanoutLive = async (
|
||||
subscription: LiveSubscription,
|
||||
item: unknown,
|
||||
ingestChannel: "options" | "nbbo" | "equities" | "equity-quotes" | "equity-candles" | "equity-overlay" | "equity-joins" | "flow" | "classifier-hits" | "alerts" | "inferred-dark"
|
||||
ingestChannel: "options" | "nbbo" | "equities" | "equity-quotes" | "equity-candles" | "equity-overlay" | "equity-joins" | "flow" | "classifier-hits" | "alerts" | "inferred-dark" | "news"
|
||||
) => {
|
||||
const watermark = await liveState.ingest(ingestChannel, item);
|
||||
|
||||
|
|
@ -1252,6 +1272,21 @@ const run = async () => {
|
|||
}
|
||||
};
|
||||
|
||||
const pumpNews = async () => {
|
||||
for await (const msg of newsSubscription.messages) {
|
||||
try {
|
||||
const payload = NewsStorySchema.parse(newsSubscription.decode(msg));
|
||||
await fanoutLive({ channel: "news" }, payload, "news");
|
||||
msg.ack();
|
||||
} catch (error) {
|
||||
logger.error("failed to process news story", {
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
msg.term();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
void pumpOptions();
|
||||
void pumpOptionNbbo();
|
||||
void pumpEquities();
|
||||
|
|
@ -1263,6 +1298,7 @@ const run = async () => {
|
|||
void pumpSmartMoney();
|
||||
void pumpClassifierHits();
|
||||
void pumpAlerts();
|
||||
void pumpNews();
|
||||
|
||||
const buildSyntheticStatusBody = () => {
|
||||
const derived =
|
||||
|
|
@ -1490,6 +1526,12 @@ const run = async () => {
|
|||
return jsonResponse({ data });
|
||||
}
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/news") {
|
||||
const limit = parseLimit(url.searchParams.get("limit") ?? "100");
|
||||
const data = await fetchRecentNews(clickhouse, limit);
|
||||
return jsonResponse({ data });
|
||||
}
|
||||
|
||||
if (req.method === "GET" && isAlertContextPath(url.pathname)) {
|
||||
try {
|
||||
const traceId = parseAlertContextTraceIdPath(url.pathname);
|
||||
|
|
@ -1607,6 +1649,14 @@ const run = async () => {
|
|||
);
|
||||
}
|
||||
|
||||
if (req.method === "GET" && url.pathname === "/history/news") {
|
||||
const { beforeTs, beforeSeq, limit } = parseBeforeParams(url);
|
||||
const data = await fetchNewsBefore(clickhouse, beforeTs, beforeSeq, limit);
|
||||
return jsonResponse(
|
||||
buildHistoryResponse(data, (item) => ({ ts: item.published_ts, seq: item.seq }))
|
||||
);
|
||||
}
|
||||
|
||||
if (req.method === "GET" && /^\/flow\/packets\/[^/]+$/.test(url.pathname)) {
|
||||
const id = decodeURIComponent(url.pathname.slice("/flow/packets/".length));
|
||||
const data = await fetchFlowPacketById(clickhouse, id);
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import {
|
|||
fetchRecentEquityQuotes,
|
||||
fetchRecentFlowPackets,
|
||||
fetchRecentInferredDark,
|
||||
fetchRecentNews,
|
||||
fetchRecentOptionNBBO,
|
||||
fetchRecentSmartMoneyEvents,
|
||||
type ClickHouseClient
|
||||
|
|
@ -25,6 +26,7 @@ import {
|
|||
FeedSnapshot,
|
||||
FlowPacketSchema,
|
||||
InferredDarkEventSchema,
|
||||
NewsStorySchema,
|
||||
LiveChannelHealth,
|
||||
LiveGenericChannel,
|
||||
LiveHotChannel,
|
||||
|
|
@ -40,6 +42,7 @@ import {
|
|||
type EquityCandle,
|
||||
type EquityPrint,
|
||||
type LiveChannel,
|
||||
type NewsStory,
|
||||
type OptionPrint
|
||||
} from "@islandflow/types";
|
||||
import { createMetrics } from "@islandflow/observability";
|
||||
|
|
@ -63,7 +66,8 @@ const GENERIC_LIMIT_ENV_KEYS: Record<LiveGenericChannel, string> = {
|
|||
"smart-money": "LIVE_LIMIT_SMART_MONEY",
|
||||
"classifier-hits": "LIVE_LIMIT_CLASSIFIER_HITS",
|
||||
alerts: "LIVE_LIMIT_ALERTS",
|
||||
"inferred-dark": "LIVE_LIMIT_INFERRED_DARK"
|
||||
"inferred-dark": "LIVE_LIMIT_INFERRED_DARK",
|
||||
news: "LIVE_LIMIT_NEWS"
|
||||
};
|
||||
|
||||
const CHART_LIMITS = {
|
||||
|
|
@ -81,7 +85,8 @@ const DEFAULT_LIVE_LIMITS: GenericLiveLimits = {
|
|||
"smart-money": 300,
|
||||
"classifier-hits": 300,
|
||||
alerts: 300,
|
||||
"inferred-dark": 300
|
||||
"inferred-dark": 300,
|
||||
news: 100
|
||||
};
|
||||
|
||||
const DEFAULT_SCOPED_CACHE_MAX_KEYS = 32;
|
||||
|
|
@ -196,16 +201,28 @@ export const resolveGenericLiveLimits = (env: NodeJS.ProcessEnv = process.env):
|
|||
env,
|
||||
"inferred-dark",
|
||||
env.LIVE_LIMIT_DEFAULT ? liveLimitDefault : DEFAULT_LIVE_LIMITS["inferred-dark"]
|
||||
)
|
||||
),
|
||||
news: parseGenericLimit(env, "news", env.LIVE_LIMIT_DEFAULT ? liveLimitDefault : DEFAULT_LIVE_LIMITS.news)
|
||||
};
|
||||
};
|
||||
|
||||
const parsePositiveInt = (value: string | undefined, fallback: number): number => {
|
||||
const parsed = Number(value);
|
||||
if (!Number.isFinite(parsed)) {
|
||||
return fallback;
|
||||
const extractFreshnessTs = (channel: LiveGenericChannel, item: any): number | null => {
|
||||
switch (channel) {
|
||||
case "options":
|
||||
case "nbbo":
|
||||
case "equities":
|
||||
case "equity-quotes":
|
||||
return typeof item.ts === "number" ? item.ts : null;
|
||||
case "flow":
|
||||
case "classifier-hits":
|
||||
case "alerts":
|
||||
case "inferred-dark":
|
||||
return typeof item.source_ts === "number" ? item.source_ts : null;
|
||||
case "news":
|
||||
return typeof item.published_ts === "number" ? item.published_ts : null;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
return Math.max(1, Math.floor(parsed));
|
||||
};
|
||||
|
||||
export const resolveLiveStateConfig = (env: NodeJS.ProcessEnv = process.env): LiveStateConfig => ({
|
||||
|
|
@ -217,6 +234,13 @@ export const resolveLiveStateConfig = (env: NodeJS.ProcessEnv = process.env): Li
|
|||
),
|
||||
redisFlushMaxItems: parsePositiveInt(env.LIVE_REDIS_FLUSH_MAX_ITEMS, DEFAULT_REDIS_FLUSH_MAX_ITEMS)
|
||||
});
|
||||
const parsePositiveInt = (value: string | undefined, fallback: number): number => {
|
||||
const parsed = Number(value);
|
||||
if (!Number.isFinite(parsed)) {
|
||||
return fallback;
|
||||
}
|
||||
return Math.max(1, Math.floor(parsed));
|
||||
};
|
||||
|
||||
type RedisLike = Pick<
|
||||
RedisClientType,
|
||||
|
|
@ -318,6 +342,14 @@ const getGenericConfig = (limits: GenericLiveLimits): {
|
|||
parse: (value) => InferredDarkEventSchema.parse(value),
|
||||
cursor: (item) => ({ ts: item.source_ts, seq: item.seq }),
|
||||
fetchRecent: fetchRecentInferredDark
|
||||
},
|
||||
news: {
|
||||
redisKey: "live:news",
|
||||
cursorField: "news",
|
||||
limit: limits.news,
|
||||
parse: (value) => NewsStorySchema.parse(value),
|
||||
cursor: (item) => ({ ts: item.published_ts, seq: item.seq }),
|
||||
fetchRecent: fetchRecentNews
|
||||
}
|
||||
});
|
||||
|
||||
|
|
@ -371,23 +403,6 @@ const normalizeGenericItems = <T>(
|
|||
return sortGenericItems(items, config.cursor).slice(0, config.limit);
|
||||
};
|
||||
|
||||
const extractFreshnessTs = (channel: LiveGenericChannel, item: any): number | null => {
|
||||
switch (channel) {
|
||||
case "options":
|
||||
case "nbbo":
|
||||
case "equities":
|
||||
case "equity-quotes":
|
||||
return typeof item.ts === "number" ? item.ts : null;
|
||||
case "flow":
|
||||
case "classifier-hits":
|
||||
case "alerts":
|
||||
case "inferred-dark":
|
||||
return typeof item.source_ts === "number" ? item.source_ts : null;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
const isWithinLiveFeedLookback = (
|
||||
channel: LiveGenericChannel,
|
||||
item: unknown,
|
||||
|
|
|
|||
16
services/ingest-news/package.json
Normal file
16
services/ingest-news/package.json
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
{
|
||||
"name": "@islandflow/ingest-news",
|
||||
"private": true,
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"dev": "bun run src/index.ts"
|
||||
},
|
||||
"dependencies": {
|
||||
"@islandflow/bus": "workspace:*",
|
||||
"@islandflow/config": "workspace:*",
|
||||
"@islandflow/observability": "workspace:*",
|
||||
"@islandflow/types": "workspace:*",
|
||||
"ws": "^8.18.3",
|
||||
"zod": "^3.23.8"
|
||||
}
|
||||
}
|
||||
216
services/ingest-news/src/index.ts
Normal file
216
services/ingest-news/src/index.ts
Normal file
|
|
@ -0,0 +1,216 @@
|
|||
import { readEnv } from "@islandflow/config";
|
||||
import { createLogger } from "@islandflow/observability";
|
||||
import {
|
||||
SUBJECT_NEWS,
|
||||
STREAM_NEWS,
|
||||
connectJetStreamWithRetry,
|
||||
ensureKnownStreams,
|
||||
publishJson
|
||||
} from "@islandflow/bus";
|
||||
import { NewsStorySchema, type NewsStory } from "@islandflow/types";
|
||||
import WebSocket from "ws";
|
||||
import { z } from "zod";
|
||||
import { resolveNewsSymbols } from "./symbols";
|
||||
|
||||
const service = "ingest-news";
|
||||
const logger = createLogger({ service });
|
||||
|
||||
const envSchema = z.object({
|
||||
NATS_URL: z.string().default("nats://127.0.0.1:4222"),
|
||||
ALPACA_API_KEY: z.string().default(""),
|
||||
ALPACA_REST_URL: z.string().default("https://data.alpaca.markets"),
|
||||
ALPACA_WS_BASE_URL: z.string().default("wss://stream.data.alpaca.markets"),
|
||||
ALPACA_NEWS_BACKFILL_LIMIT: z.coerce.number().int().positive().max(200).default(100),
|
||||
ALPACA_NEWS_WEBSOCKET_PATH: z.string().default("/v1beta1/news")
|
||||
});
|
||||
|
||||
const env = readEnv(envSchema);
|
||||
|
||||
type AlpacaNewsItem = {
|
||||
id?: number;
|
||||
headline?: string;
|
||||
summary?: string;
|
||||
content?: string;
|
||||
author?: string;
|
||||
created_at?: string;
|
||||
updated_at?: string;
|
||||
url?: string;
|
||||
symbols?: string[];
|
||||
source?: string;
|
||||
};
|
||||
|
||||
type AlpacaNewsResponse = {
|
||||
news?: AlpacaNewsItem[];
|
||||
};
|
||||
|
||||
const buildHeaders = (): Record<string, string> => ({
|
||||
Authorization: `Bearer ${env.ALPACA_API_KEY}`
|
||||
});
|
||||
|
||||
const parseTimestamp = (value: string | undefined): number => {
|
||||
const parsed = value ? Date.parse(value) : Number.NaN;
|
||||
return Number.isFinite(parsed) ? parsed : Date.now();
|
||||
};
|
||||
|
||||
const toStory = (item: AlpacaNewsItem, seq: number): NewsStory | null => {
|
||||
const storyId = Number(item.id);
|
||||
if (!Number.isFinite(storyId) || storyId < 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const provider = "alpaca";
|
||||
const contentHtml = item.content ?? "";
|
||||
const symbols = resolveNewsSymbols(item.symbols ?? [], contentHtml);
|
||||
const publishedTs = parseTimestamp(item.created_at);
|
||||
const updatedTs = parseTimestamp(item.updated_at ?? item.created_at);
|
||||
|
||||
return NewsStorySchema.parse({
|
||||
source_ts: publishedTs,
|
||||
ingest_ts: Date.now(),
|
||||
seq,
|
||||
trace_id: `${provider}:${storyId}`,
|
||||
story_id: storyId,
|
||||
provider,
|
||||
source: item.source?.trim() || item.author?.trim() || "Alpaca News",
|
||||
headline: item.headline?.trim() || `Story ${storyId}`,
|
||||
summary: item.summary?.trim() || "",
|
||||
content_html: contentHtml,
|
||||
url: item.url?.trim() || "",
|
||||
published_ts: publishedTs,
|
||||
updated_ts: updatedTs,
|
||||
provider_symbols: symbols.provider_symbols,
|
||||
resolved_symbols: symbols.resolved_symbols,
|
||||
symbol_resolution: symbols.symbol_resolution
|
||||
});
|
||||
};
|
||||
|
||||
const fetchBackfill = async (): Promise<AlpacaNewsItem[]> => {
|
||||
const url = new URL("/v1beta1/news", env.ALPACA_REST_URL);
|
||||
url.searchParams.set("sort", "desc");
|
||||
url.searchParams.set("limit", env.ALPACA_NEWS_BACKFILL_LIMIT.toString());
|
||||
|
||||
const response = await fetch(url.toString(), {
|
||||
headers: buildHeaders()
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`alpaca news backfill failed (${response.status})`);
|
||||
}
|
||||
|
||||
const payload = (await response.json()) as AlpacaNewsResponse;
|
||||
return Array.isArray(payload.news) ? payload.news : [];
|
||||
};
|
||||
|
||||
const decodePayload = (data: WebSocket.RawData): unknown => {
|
||||
if (typeof data === "string") {
|
||||
return JSON.parse(data) as unknown;
|
||||
}
|
||||
if (data instanceof ArrayBuffer) {
|
||||
return JSON.parse(new TextDecoder().decode(new Uint8Array(data))) as unknown;
|
||||
}
|
||||
if (ArrayBuffer.isView(data)) {
|
||||
return JSON.parse(new TextDecoder().decode(new Uint8Array(data.buffer, data.byteOffset, data.byteLength))) as unknown;
|
||||
}
|
||||
return JSON.parse(new TextDecoder().decode(new Uint8Array(data as ArrayBuffer))) as unknown;
|
||||
};
|
||||
|
||||
const run = async () => {
|
||||
if (!env.ALPACA_API_KEY) {
|
||||
throw new Error("ALPACA_API_KEY is required for ingest-news.");
|
||||
}
|
||||
|
||||
const { nc, js, jsm } = await connectJetStreamWithRetry(
|
||||
{
|
||||
servers: env.NATS_URL,
|
||||
name: service
|
||||
},
|
||||
{ attempts: 120, delayMs: 500 }
|
||||
);
|
||||
|
||||
await ensureKnownStreams(jsm, [STREAM_NEWS], { logger });
|
||||
|
||||
let seq = 0;
|
||||
const publishStory = async (item: AlpacaNewsItem) => {
|
||||
seq += 1;
|
||||
const story = toStory(item, seq);
|
||||
if (!story) {
|
||||
return;
|
||||
}
|
||||
await publishJson(js, SUBJECT_NEWS, story);
|
||||
};
|
||||
|
||||
const backfill = await fetchBackfill();
|
||||
for (const item of backfill.reverse()) {
|
||||
await publishStory(item);
|
||||
}
|
||||
|
||||
const wsUrl = new URL(env.ALPACA_NEWS_WEBSOCKET_PATH, env.ALPACA_WS_BASE_URL).toString();
|
||||
const ws = new WebSocket(wsUrl, {
|
||||
headers: buildHeaders()
|
||||
});
|
||||
|
||||
ws.on("open", () => {
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
action: "auth",
|
||||
key: env.ALPACA_API_KEY,
|
||||
secret: ""
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
ws.on("message", (raw) => {
|
||||
let payload: unknown;
|
||||
try {
|
||||
payload = decodePayload(raw);
|
||||
} catch (error) {
|
||||
logger.warn("failed to decode alpaca news message", {
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (!Array.isArray(payload)) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (const entry of payload) {
|
||||
if (!entry || typeof entry !== "object") {
|
||||
continue;
|
||||
}
|
||||
const message = entry as Record<string, unknown>;
|
||||
if (message.T === "success") {
|
||||
const msg = typeof message.msg === "string" ? message.msg : "";
|
||||
if (msg === "authenticated") {
|
||||
ws.send(JSON.stringify({ action: "subscribe", news: ["*"] }));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (message.T === "subscription" || message.T === "error") {
|
||||
continue;
|
||||
}
|
||||
void publishStory(message as AlpacaNewsItem).catch((error) => {
|
||||
logger.error("failed to publish alpaca news story", {
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
const shutdown = async (signal: string) => {
|
||||
logger.info("shutting down", { signal });
|
||||
ws.close();
|
||||
await nc.drain();
|
||||
process.exit(0);
|
||||
};
|
||||
|
||||
process.on("SIGINT", () => void shutdown("SIGINT"));
|
||||
process.on("SIGTERM", () => void shutdown("SIGTERM"));
|
||||
};
|
||||
|
||||
void run().catch((error) => {
|
||||
logger.error("service crashed", {
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
process.exit(1);
|
||||
});
|
||||
70
services/ingest-news/src/symbols.ts
Normal file
70
services/ingest-news/src/symbols.ts
Normal file
|
|
@ -0,0 +1,70 @@
|
|||
import type { NewsSymbolResolution } from "@islandflow/types";
|
||||
|
||||
const TICKER_ANCHOR_RE = />\s*([A-Z]{1,5})\s*<\/a>/g;
|
||||
const EXCHANGE_TICKER_RE = /\b(?:NASDAQ|NYSE|NYSEAMERICAN|AMEX|OTC|CBOE):([A-Z]{1,5})\b/g;
|
||||
const DOLLAR_TICKER_RE = /\$([A-Z]{1,5})\b/g;
|
||||
|
||||
const normalizeSymbols = (symbols: string[]): string[] => {
|
||||
const seen = new Set<string>();
|
||||
const normalized: string[] = [];
|
||||
|
||||
for (const entry of symbols) {
|
||||
const symbol = entry.trim().toUpperCase();
|
||||
if (!symbol || !/^[A-Z]{1,5}$/.test(symbol) || seen.has(symbol)) {
|
||||
continue;
|
||||
}
|
||||
seen.add(symbol);
|
||||
normalized.push(symbol);
|
||||
}
|
||||
|
||||
return normalized;
|
||||
};
|
||||
|
||||
const collectMatches = (value: string, regex: RegExp): string[] => {
|
||||
regex.lastIndex = 0;
|
||||
const matches: string[] = [];
|
||||
let match: RegExpExecArray | null = null;
|
||||
while ((match = regex.exec(value)) !== null) {
|
||||
matches.push(match[1] ?? "");
|
||||
}
|
||||
return matches;
|
||||
};
|
||||
|
||||
export const resolveNewsSymbols = (
|
||||
providerSymbols: string[],
|
||||
contentHtml: string
|
||||
): {
|
||||
provider_symbols: string[];
|
||||
resolved_symbols: string[];
|
||||
symbol_resolution: NewsSymbolResolution;
|
||||
} => {
|
||||
const normalizedProvider = normalizeSymbols(providerSymbols);
|
||||
const derived = normalizeSymbols([
|
||||
...collectMatches(contentHtml, TICKER_ANCHOR_RE),
|
||||
...collectMatches(contentHtml, EXCHANGE_TICKER_RE),
|
||||
...collectMatches(contentHtml, DOLLAR_TICKER_RE)
|
||||
]);
|
||||
|
||||
if (normalizedProvider.length > 0) {
|
||||
const merged = normalizeSymbols([...normalizedProvider, ...derived]);
|
||||
return {
|
||||
provider_symbols: normalizedProvider,
|
||||
resolved_symbols: merged,
|
||||
symbol_resolution: derived.length > 0 ? "mixed" : "provider"
|
||||
};
|
||||
}
|
||||
|
||||
if (derived.length > 0) {
|
||||
return {
|
||||
provider_symbols: [],
|
||||
resolved_symbols: derived,
|
||||
symbol_resolution: "derived"
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
provider_symbols: [],
|
||||
resolved_symbols: [],
|
||||
symbol_resolution: "none"
|
||||
};
|
||||
};
|
||||
30
services/ingest-news/tests/symbols.test.ts
Normal file
30
services/ingest-news/tests/symbols.test.ts
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
import { describe, expect, it } from "bun:test";
|
||||
import { resolveNewsSymbols } from "../src/symbols";
|
||||
|
||||
describe("resolveNewsSymbols", () => {
|
||||
it("prefers provider symbols when present", () => {
|
||||
const result = resolveNewsSymbols(["tsla", "aapl"], "<p>No extra tickers here.</p>");
|
||||
expect(result.provider_symbols).toEqual(["TSLA", "AAPL"]);
|
||||
expect(result.resolved_symbols).toEqual(["TSLA", "AAPL"]);
|
||||
expect(result.symbol_resolution).toBe("provider");
|
||||
});
|
||||
|
||||
it("falls back to ticker anchors", () => {
|
||||
const result = resolveNewsSymbols([], '<a href="/quote/TSLA">TSLA</a>');
|
||||
expect(result.resolved_symbols).toEqual(["TSLA"]);
|
||||
expect(result.symbol_resolution).toBe("derived");
|
||||
});
|
||||
|
||||
it("falls back to exchange and dollar patterns", () => {
|
||||
const result = resolveNewsSymbols([], "<p>NASDAQ:TSLA met with $IBM executives.</p>");
|
||||
expect(result.resolved_symbols).toEqual(["TSLA", "IBM"]);
|
||||
expect(result.symbol_resolution).toBe("derived");
|
||||
});
|
||||
|
||||
it("dedupes and uppercases merged symbols", () => {
|
||||
const result = resolveNewsSymbols(["tsla"], "<p>$TSLA and NASDAQ:TSLA</p>");
|
||||
expect(result.provider_symbols).toEqual(["TSLA"]);
|
||||
expect(result.resolved_symbols).toEqual(["TSLA"]);
|
||||
expect(result.symbol_resolution).toBe("mixed");
|
||||
});
|
||||
});
|
||||
7
services/ingest-news/tsconfig.json
Normal file
7
services/ingest-news/tsconfig.json
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
{
|
||||
"extends": "../../tsconfig.base.json",
|
||||
"compilerOptions": {
|
||||
"types": []
|
||||
},
|
||||
"include": ["src/**/*.ts", "tests/**/*.ts"]
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue