diff --git a/.github/workflows/release-beta.yml b/.github/workflows/release-beta.yml index 11419c3d1..68fe52231 100644 --- a/.github/workflows/release-beta.yml +++ b/.github/workflows/release-beta.yml @@ -32,6 +32,9 @@ concurrency: group: open-design-release-beta cancel-in-progress: false +env: + OPEN_DESIGN_TELEMETRY_RELAY_URL: ${{ vars.OPEN_DESIGN_TELEMETRY_RELAY_URL }} + jobs: metadata: name: Prepare beta metadata diff --git a/.github/workflows/release-stable.yml b/.github/workflows/release-stable.yml index 192dcdd85..9e626c9c1 100644 --- a/.github/workflows/release-stable.yml +++ b/.github/workflows/release-stable.yml @@ -24,6 +24,9 @@ concurrency: group: open-design-release-stable-${{ inputs.channel }} cancel-in-progress: false +env: + OPEN_DESIGN_TELEMETRY_RELAY_URL: ${{ vars.OPEN_DESIGN_TELEMETRY_RELAY_URL }} + jobs: metadata: name: Prepare release metadata diff --git a/apps/daemon/src/langfuse-trace.ts b/apps/daemon/src/langfuse-trace.ts index b79334543..003a277e4 100644 --- a/apps/daemon/src/langfuse-trace.ts +++ b/apps/daemon/src/langfuse-trace.ts @@ -1,8 +1,9 @@ // Langfuse trace forwarding for completed agent runs. // -// This module is intentionally dependency-free (no `langfuse` SDK). It posts -// a trace with nested observations to Langfuse's public ingestion endpoint when -// a run reaches a terminal state. Without LANGFUSE_PUBLIC_KEY / +// This module is intentionally dependency-free (no `langfuse` SDK). It builds +// Langfuse ingestion batches for completed runs and sends them either to the +// official Open Design telemetry relay or, for local smoke tests, directly to +// Langfuse. Without OPEN_DESIGN_TELEMETRY_RELAY_URL or LANGFUSE_PUBLIC_KEY / // LANGFUSE_SECRET_KEY in the env, every entry point becomes a no-op so that // dev runs and forks of this open-source repo do not accidentally report. // @@ -33,6 +34,7 @@ const SESSION_ID_MAX = 200; // Langfuse drops sessionIds longer than this. const HARD_BATCH_MAX_BYTES = 1024 * 1024; const DEFAULT_FETCH_TIMEOUT_MS = 20_000; const DEFAULT_FETCH_RETRIES = 1; +let missingTelemetrySinkWarned = false; export interface LangfuseConfig { authHeader: string; @@ -41,6 +43,17 @@ export interface LangfuseConfig { retries: number; } +export type TelemetrySinkConfig = + | { + kind: 'relay'; + relayUrl: string; + timeoutMs: number; + retries: number; + } + | ({ + kind: 'langfuse'; + } & LangfuseConfig); + export interface RunSummary { runId: string; status: 'succeeded' | 'failed' | 'canceled'; @@ -133,7 +146,7 @@ export interface ReportContext { } export interface ReportRunOpts { - config?: LangfuseConfig | null; + config?: TelemetrySinkConfig | LangfuseConfig | null; fetchImpl?: typeof fetch; } @@ -161,6 +174,33 @@ export function readLangfuseConfig( }; } +/** + * Resolve telemetry delivery in release-safe order: hosted relay first, + * direct Langfuse credentials second for local smoke tests, disabled last. + */ +export function readTelemetrySinkConfig( + env: NodeJS.ProcessEnv = process.env, +): TelemetrySinkConfig | null { + const relayUrl = env.OPEN_DESIGN_TELEMETRY_RELAY_URL?.trim(); + if (relayUrl) { + return { + kind: 'relay', + relayUrl: relayUrl.replace(/\/+$/, ''), + timeoutMs: parsePositiveInt( + env.OPEN_DESIGN_TELEMETRY_TIMEOUT_MS ?? env.LANGFUSE_TIMEOUT_MS, + DEFAULT_FETCH_TIMEOUT_MS, + ), + retries: parseNonNegativeInt( + env.OPEN_DESIGN_TELEMETRY_RETRIES ?? env.LANGFUSE_RETRIES, + DEFAULT_FETCH_RETRIES, + ), + }; + } + + const config = readLangfuseConfig(env); + return config == null ? null : { kind: 'langfuse', ...config }; +} + function parsePositiveInt(value: string | undefined, fallback: number): number { if (value === undefined) return fallback; const parsed = Number.parseInt(value, 10); @@ -471,21 +511,7 @@ async function postLangfuseBatch( // silently disappear server-side. const body = await response.text().catch(() => ''); if (!body) return; - let parsed: unknown; - try { - parsed = JSON.parse(body); - } catch { - return; - } - const errors = - parsed && typeof parsed === 'object' && !Array.isArray(parsed) - ? (parsed as { errors?: unknown }).errors - : undefined; - if (Array.isArray(errors) && errors.length > 0) { - console.warn( - `[langfuse-trace] Per-event errors (${errors.length}): ${JSON.stringify(errors).slice(0, 500)}`, - ); - } + warnPerEventErrors(body, 'Per-event errors'); return; } catch (error) { if (attempt < attempts) { @@ -498,21 +524,110 @@ async function postLangfuseBatch( } } +async function postRelayBatch( + config: Extract, + body: string, + fetchImpl: typeof fetch, +): Promise { + const attempts = config.retries + 1; + for (let attempt = 1; attempt <= attempts; attempt += 1) { + try { + const response = await fetchImpl(config.relayUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-Open-Design-Telemetry': 'langfuse-ingestion-v1', + }, + signal: AbortSignal.timeout(config.timeoutMs), + body, + }); + if (!response.ok) { + const responseBody = await response.text().catch(() => ''); + if ( + attempt < attempts && + (response.status === 429 || response.status >= 500) + ) { + await waitBeforeRetry(attempt); + continue; + } + console.warn( + `[langfuse-trace] Relay failed ${response.status}: ${responseBody.slice(0, 200)}`, + ); + return; + } + + const responseBody = await response.text().catch(() => ''); + if (!responseBody) return; + warnPerEventErrors(responseBody, 'Relay per-event errors'); + return; + } catch (error) { + if (attempt < attempts) { + await waitBeforeRetry(attempt); + continue; + } + console.warn(`[langfuse-trace] Relay fetch error: ${String(error)}`); + return; + } + } +} + function waitBeforeRetry(attempt: number): Promise { return new Promise((resolve) => setTimeout(resolve, Math.min(250 * attempt, 1000)), ); } +function normalizeTelemetrySinkConfig( + config: TelemetrySinkConfig | LangfuseConfig, +): TelemetrySinkConfig { + if ('kind' in config) return config; + return { kind: 'langfuse', ...config }; +} + +function resolveReportConfig( + opts: ReportRunOpts, +): TelemetrySinkConfig | null { + if (opts.config === undefined) return readTelemetrySinkConfig(); + if (opts.config == null) return null; + return normalizeTelemetrySinkConfig(opts.config); +} + +function warnPerEventErrors(responseBody: string, label: string): void { + let parsed: unknown; + try { + parsed = JSON.parse(responseBody); + } catch { + return; + } + const errors = + parsed && typeof parsed === 'object' && !Array.isArray(parsed) + ? (parsed as { errors?: unknown }).errors + : undefined; + if (Array.isArray(errors) && errors.length > 0) { + console.warn( + `[langfuse-trace] ${label} (${errors.length}): ${JSON.stringify(errors).slice(0, 500)}`, + ); + } +} + export async function reportRunCompleted( ctx: ReportContext, opts: ReportRunOpts = {}, ): Promise { if (ctx.prefs.metrics !== true) return; - const config = - opts.config !== undefined ? opts.config : readLangfuseConfig(); - if (!config) return; + const config = resolveReportConfig(opts); + if (!config) { + if (!missingTelemetrySinkWarned) { + // Warn once per daemon process; packaged config is loaded at process + // start, so repeated run-level warnings would only add noise. + missingTelemetrySinkWarned = true; + console.warn( + '[langfuse-trace] Telemetry metrics are enabled but no relay or Langfuse credentials are configured', + ); + } + return; + } let batch: unknown[]; try { @@ -535,5 +650,9 @@ export async function reportRunCompleted( } const fetchImpl = opts.fetchImpl ?? globalThis.fetch; + if (config.kind === 'relay') { + await postRelayBatch(config, serialized, fetchImpl); + return; + } await postLangfuseBatch(config, batch, fetchImpl); } diff --git a/apps/daemon/tests/langfuse-trace.test.ts b/apps/daemon/tests/langfuse-trace.test.ts index 953643ba9..c62b8169d 100644 --- a/apps/daemon/tests/langfuse-trace.test.ts +++ b/apps/daemon/tests/langfuse-trace.test.ts @@ -3,9 +3,11 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { buildTracePayload, readLangfuseConfig, + readTelemetrySinkConfig, reportRunCompleted, type LangfuseConfig, type ReportContext, + type TelemetrySinkConfig, } from '../src/langfuse-trace.js'; function makeCtx(overrides: Partial = {}): ReportContext { @@ -141,6 +143,48 @@ describe('readLangfuseConfig', () => { }); }); +describe('readTelemetrySinkConfig', () => { + it('prefers the Open Design telemetry relay when configured', () => { + const cfg = readTelemetrySinkConfig({ + OPEN_DESIGN_TELEMETRY_RELAY_URL: 'https://telemetry.open-design.ai/api/langfuse//', + LANGFUSE_PUBLIC_KEY: 'pk', + LANGFUSE_SECRET_KEY: 'sk', + }); + expect(cfg).toEqual({ + kind: 'relay', + relayUrl: 'https://telemetry.open-design.ai/api/langfuse', + timeoutMs: 20_000, + retries: 1, + }); + }); + + it('uses relay-specific timeout and retry tuning when present', () => { + const cfg = readTelemetrySinkConfig({ + OPEN_DESIGN_TELEMETRY_RELAY_URL: 'https://telemetry.open-design.ai/api/langfuse', + OPEN_DESIGN_TELEMETRY_TIMEOUT_MS: '30000', + OPEN_DESIGN_TELEMETRY_RETRIES: '3', + LANGFUSE_TIMEOUT_MS: '1', + LANGFUSE_RETRIES: '0', + }); + expect(cfg).toMatchObject({ + kind: 'relay', + timeoutMs: 30_000, + retries: 3, + }); + }); + + it('falls back to direct Langfuse config for local smoke tests', () => { + const cfg = readTelemetrySinkConfig({ + LANGFUSE_PUBLIC_KEY: 'pk', + LANGFUSE_SECRET_KEY: 'sk', + }); + expect(cfg).toMatchObject({ + kind: 'langfuse', + baseUrl: 'https://us.cloud.langfuse.com', + }); + }); +}); + describe('buildTracePayload', () => { it('emits a trace with nested agent + generation observations', () => { const batch = buildTracePayload(makeCtx()); @@ -490,6 +534,55 @@ describe('reportRunCompleted', () => { ]); }); + it('POSTs serialized ingestion batches to the Open Design telemetry relay', async () => { + const relayConfig: TelemetrySinkConfig = { + kind: 'relay', + relayUrl: 'https://telemetry.open-design.ai/api/langfuse', + timeoutMs: 20_000, + retries: 0, + }; + const fetchSpy = vi.fn().mockResolvedValue( + new Response('{}', { status: 200 }), + ); + await reportRunCompleted(makeCtx(), { + config: relayConfig, + fetchImpl: fetchSpy as any, + }); + expect(fetchSpy).toHaveBeenCalledTimes(1); + const call = fetchSpy.mock.calls[0]!; + const url = call[0] as string; + const init = call[1] as RequestInit & { headers: Record }; + expect(url).toBe('https://telemetry.open-design.ai/api/langfuse'); + expect(init.method).toBe('POST'); + expect(init.headers.Authorization).toBeUndefined(); + expect(init.headers['Content-Type']).toBe('application/json'); + expect(init.headers['X-Open-Design-Telemetry']).toBe('langfuse-ingestion-v1'); + const body = JSON.parse(init.body as string); + expect(Array.isArray(body.batch)).toBe(true); + }); + + it('warns when the relay returns per-event errors', async () => { + const relayConfig: TelemetrySinkConfig = { + kind: 'relay', + relayUrl: 'https://telemetry.open-design.ai/api/langfuse', + timeoutMs: 20_000, + retries: 0, + }; + const fetchSpy = vi.fn().mockResolvedValue( + new Response( + JSON.stringify({ successes: [], errors: [{ id: 'bad', status: 400 }] }), + { status: 207 }, + ), + ); + await reportRunCompleted(makeCtx(), { + config: relayConfig, + fetchImpl: fetchSpy as any, + }); + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining('Relay per-event errors (1)'), + ); + }); + it('warns and drops when serialized batch exceeds the hard cap', async () => { // Per-field truncation already caps prompt/output, so we overflow the // hard cap by stuffing 50 artifact entries with very long slugs while diff --git a/apps/packaged/src/config.ts b/apps/packaged/src/config.ts index f655c9809..13c4e9709 100644 --- a/apps/packaged/src/config.ts +++ b/apps/packaged/src/config.ts @@ -21,6 +21,9 @@ export type RawPackagedConfig = { namespaceBaseRoot?: string; nodeCommandRelative?: string; resourceRoot?: string; + // Baked by tools/pack from OPEN_DESIGN_TELEMETRY_RELAY_URL and forwarded to + // the daemon at runtime; Langfuse credentials never ship in packaged config. + telemetryRelayUrl?: string; webSidecarEntryRelative?: string; webStandaloneRoot?: string; webOutputMode?: string; @@ -34,6 +37,7 @@ export type PackagedConfig = { namespaceBaseRoot: string; nodeCommand: string | null; resourceRoot: string; + telemetryRelayUrl: string | null; webSidecarEntry: string | null; webStandaloneRoot: string | null; webOutputMode: PackagedWebOutputMode; @@ -152,6 +156,7 @@ export async function readPackagedConfig(): Promise { namespaceBaseRoot, nodeCommand, resourceRoot, + telemetryRelayUrl: cleanOptionalString(raw.telemetryRelayUrl), webSidecarEntry, webStandaloneRoot, webOutputMode, diff --git a/apps/packaged/src/headless.ts b/apps/packaged/src/headless.ts index b51a219d8..30aad1cf6 100644 --- a/apps/packaged/src/headless.ts +++ b/apps/packaged/src/headless.ts @@ -60,6 +60,7 @@ function resolveHeadlessConfig(): PackagedConfig { namespaceBaseRoot, nodeCommand: null, resourceRoot, + telemetryRelayUrl: process.env.OPEN_DESIGN_TELEMETRY_RELAY_URL?.trim() || null, webSidecarEntry: null, webStandaloneRoot: null, webOutputMode: "server", @@ -107,6 +108,7 @@ async function main(): Promise { daemonCliEntry: config.daemonCliEntry, daemonSidecarEntry: config.daemonSidecarEntry, nodeCommand: config.nodeCommand, + telemetryRelayUrl: config.telemetryRelayUrl, // PR #974 round-5 (lefarcen P2): headless packaged mode runs daemon // + web only, no Electron, no privileged shell.openPath surface. // Pinning OD_REQUIRE_DESKTOP_AUTH here would arm a gate no client diff --git a/apps/packaged/src/index.ts b/apps/packaged/src/index.ts index fdfe57f32..9472c4ff4 100644 --- a/apps/packaged/src/index.ts +++ b/apps/packaged/src/index.ts @@ -84,6 +84,7 @@ async function main(): Promise { daemonCliEntry: config.daemonCliEntry, daemonSidecarEntry: config.daemonSidecarEntry, nodeCommand: config.nodeCommand, + telemetryRelayUrl: config.telemetryRelayUrl, // PR #974 round-5 (lefarcen P2): the Electron entry runs desktop // main alongside the daemon, so the import-folder gate must be // pinned ON from request 0. See `apps/packaged/src/headless.ts` for diff --git a/apps/packaged/src/sidecars.ts b/apps/packaged/src/sidecars.ts index 40638ad63..393c0e0ff 100644 --- a/apps/packaged/src/sidecars.ts +++ b/apps/packaged/src/sidecars.ts @@ -218,6 +218,7 @@ export type PackagedDaemonSpawnEnvOptions = { */ requireDesktopAuth: boolean; legacyDataDir?: string | null; + telemetryRelayUrl?: string | null; }; /** @@ -247,6 +248,9 @@ export function buildPackagedDaemonSpawnEnv( // Electron userData, bundle names, or ports. ...createPackagedDaemonManagedPathEnv(paths), ...(options.appVersion == null ? {} : { OD_APP_VERSION: options.appVersion }), + ...(options.telemetryRelayUrl == null || options.telemetryRelayUrl.length === 0 + ? {} + : { OPEN_DESIGN_TELEMETRY_RELAY_URL: options.telemetryRelayUrl }), // OD_LEGACY_DATA_DIR is the one-shot recovery handle for users // upgrading from 0.3.x .od/ layouts. The daemon's startup // migrator (legacy-data-migrator.ts) reads it; the env-allowlist @@ -334,6 +338,7 @@ export async function startPackagedSidecars( daemonCliEntry: string | null; daemonSidecarEntry: string | null; nodeCommand: string | null; + telemetryRelayUrl: string | null; /** * PR #974 round-5 (lefarcen P2): caller asserts whether a desktop * runtime is being started in this packaged process group. The @@ -369,6 +374,7 @@ export async function startPackagedSidecars( daemonCliEntry: options.daemonCliEntry, legacyDataDir: process.env.OD_LEGACY_DATA_DIR ?? null, requireDesktopAuth: options.requireDesktopAuth, + telemetryRelayUrl: options.telemetryRelayUrl, }), nodeCommand: options.nodeCommand, paths, diff --git a/apps/packaged/tests/sidecars.test.ts b/apps/packaged/tests/sidecars.test.ts index 80a1c845c..6956f6d42 100644 --- a/apps/packaged/tests/sidecars.test.ts +++ b/apps/packaged/tests/sidecars.test.ts @@ -208,6 +208,19 @@ describe('buildPackagedDaemonSpawnEnv', () => { }); expect(env.OD_DAEMON_CLI_PATH).toBe('/path/to/cli/dist/index.js'); }); + + it('forwards the packaged telemetry relay URL to the daemon when configured', () => { + const env = buildPackagedDaemonSpawnEnv(fakePaths(), { + appVersion: null, + daemonCliEntry: null, + legacyDataDir: null, + requireDesktopAuth: true, + telemetryRelayUrl: 'https://telemetry.open-design.ai/api/langfuse', + }); + expect(env.OPEN_DESIGN_TELEMETRY_RELAY_URL).toBe( + 'https://telemetry.open-design.ai/api/langfuse', + ); + }); }); describe('waitForStatus child-exit fast-fail', () => { diff --git a/apps/telemetry-worker/README.md b/apps/telemetry-worker/README.md new file mode 100644 index 000000000..959ac06ce --- /dev/null +++ b/apps/telemetry-worker/README.md @@ -0,0 +1,56 @@ +# Open Design Telemetry Relay + +Cloudflare Worker relay for opt-in Open Design telemetry. The shipped desktop +client sends redacted Langfuse ingestion batches here after the user enables +metrics. This Worker holds the Langfuse write credentials and forwards valid +batches to Langfuse. + +The relay keeps Langfuse secret keys out of packaged clients. Release builds +only include the public relay URL; the Worker adds Langfuse authentication +server-side after validating the request. If the relay is unavailable, the +daemon retries, logs the failure, and continues the user flow without blocking +the CLI or desktop app. + +Local development can bypass the relay by setting direct `LANGFUSE_PUBLIC_KEY` +and `LANGFUSE_SECRET_KEY` environment variables for the daemon. Packaged +release config should use only `OPEN_DESIGN_TELEMETRY_RELAY_URL`. + +## Abuse controls + +The Worker requires the Open Design telemetry marker header, validates the +Langfuse ingestion batch shape and size before forwarding, and uses Cloudflare +Rate Limiting bindings for two independent keys: + +- `TELEMETRY_CLIENT_RATE_LIMITER`: anonymous installation/user id, 120 requests + per minute. +- `TELEMETRY_IP_RATE_LIMITER`: Cloudflare `CF-Connecting-IP`, 600 requests per + minute. + +## Secrets + +```bash +pnpm --dir apps/telemetry-worker dlx wrangler secret put LANGFUSE_PUBLIC_KEY +pnpm --dir apps/telemetry-worker dlx wrangler secret put LANGFUSE_SECRET_KEY +``` + +`LANGFUSE_BASE_URL` defaults to `https://us.cloud.langfuse.com` in +`wrangler.toml`. + +## Deploy + +```bash +pnpm --filter @open-design/telemetry-worker deploy +``` + +After deploy, set the repository variable `OPEN_DESIGN_TELEMETRY_RELAY_URL` to +the Worker route, for example: + +```text +https://telemetry.open-design.ai/api/langfuse +``` + +Opening `/api/langfuse` or `/health` in a browser returns relay health JSON. +Telemetry ingestion still uses POST to `/api/langfuse`. + +Release workflows bake only this public relay URL into packaged config. The +Langfuse secret key stays in Cloudflare Worker secrets. diff --git a/apps/telemetry-worker/package.json b/apps/telemetry-worker/package.json new file mode 100644 index 000000000..d64c9d78b --- /dev/null +++ b/apps/telemetry-worker/package.json @@ -0,0 +1,19 @@ +{ + "name": "@open-design/telemetry-worker", + "version": "0.0.0", + "private": true, + "type": "module", + "scripts": { + "deploy": "pnpm dlx wrangler deploy", + "dev": "pnpm dlx wrangler dev", + "test": "vitest run -c vitest.config.ts", + "typecheck": "tsc -p tsconfig.json --noEmit" + }, + "devDependencies": { + "typescript": "^5.6.3", + "vitest": "^2.1.8" + }, + "engines": { + "node": "~24" + } +} diff --git a/apps/telemetry-worker/src/index.ts b/apps/telemetry-worker/src/index.ts new file mode 100644 index 000000000..9b489cbae --- /dev/null +++ b/apps/telemetry-worker/src/index.ts @@ -0,0 +1,201 @@ +const DEFAULT_LANGFUSE_BASE_URL = 'https://us.cloud.langfuse.com'; +const MAX_BODY_BYTES = 1024 * 1024; +const MAX_BATCH_EVENTS = 100; +const RELAY_MARKER_HEADER = 'X-Open-Design-Telemetry'; +const RELAY_MARKER_VALUE = 'langfuse-ingestion-v1'; +const ALLOWED_EVENT_TYPES = new Set([ + 'trace-create', + 'span-create', + 'generation-create', + 'event-create', + 'score-create', +]); + +interface RateLimitBinding { + limit(options: { key: string }): Promise<{ success: boolean }>; +} + +export interface Env { + LANGFUSE_PUBLIC_KEY?: string; + LANGFUSE_SECRET_KEY?: string; + LANGFUSE_BASE_URL?: string; + TELEMETRY_CLIENT_RATE_LIMITER?: RateLimitBinding; + TELEMETRY_IP_RATE_LIMITER?: RateLimitBinding; +} + +function jsonResponse(status: number, body: Record): Response { + return new Response(JSON.stringify(body), { + status, + headers: { + 'Cache-Control': 'no-store', + 'Content-Type': 'application/json', + }, + }); +} + +function isRecord(value: unknown): value is Record { + return value !== null && typeof value === 'object' && !Array.isArray(value); +} + +function bodySizeBytes(value: string): number { + return new TextEncoder().encode(value).byteLength; +} + +function basicAuthHeader(publicKey: string, secretKey: string): string { + const bytes = new TextEncoder().encode(`${publicKey}:${secretKey}`); + let binary = ''; + for (const byte of bytes) binary += String.fromCharCode(byte); + return `Basic ${btoa(binary)}`; +} + +function validateIngestionBody(value: unknown): string | null { + if (!isRecord(value)) return 'body must be a JSON object'; + const batch = value.batch; + if (!Array.isArray(batch)) return 'body.batch must be an array'; + if (batch.length === 0) return 'body.batch must not be empty'; + if (batch.length > MAX_BATCH_EVENTS) return 'body.batch has too many events'; + + for (const [index, event] of batch.entries()) { + if (!isRecord(event)) return `body.batch[${index}] must be an object`; + if (typeof event.id !== 'string' || event.id.length === 0) { + return `body.batch[${index}].id must be a string`; + } + if (event.id.length > 200) return `body.batch[${index}].id is too long`; + if (typeof event.type !== 'string' || !ALLOWED_EVENT_TYPES.has(event.type)) { + return `body.batch[${index}].type is not allowed`; + } + if (!isRecord(event.body)) return `body.batch[${index}].body must be an object`; + } + return null; +} + +function findTraceUserId(value: unknown): string | null { + if (!isRecord(value) || !Array.isArray(value.batch)) return null; + for (const event of value.batch) { + if (!isRecord(event) || event.type !== 'trace-create' || !isRecord(event.body)) { + continue; + } + const userId = event.body.userId; + return typeof userId === 'string' && userId.length > 0 ? userId.slice(0, 200) : null; + } + return null; +} + +async function enforceRateLimits( + request: Request, + env: Env, + parsedBody: unknown, +): Promise { + const clientKey = findTraceUserId(parsedBody); + if (clientKey && env.TELEMETRY_CLIENT_RATE_LIMITER) { + const { success } = await env.TELEMETRY_CLIENT_RATE_LIMITER.limit({ + key: `client:${clientKey}`, + }); + if (!success) return jsonResponse(429, { error: 'rate limit exceeded' }); + } + + const ip = request.headers.get('CF-Connecting-IP')?.trim(); + if (ip && env.TELEMETRY_IP_RATE_LIMITER) { + const { success } = await env.TELEMETRY_IP_RATE_LIMITER.limit({ + key: `ip:${ip}`, + }); + if (!success) return jsonResponse(429, { error: 'rate limit exceeded' }); + } + + return null; +} + +async function readBoundedBody(request: Request): Promise { + const contentLength = request.headers.get('content-length'); + if (contentLength != null && Number(contentLength) > MAX_BODY_BYTES) { + return jsonResponse(413, { error: 'payload too large' }); + } + + const text = await request.text(); + if (bodySizeBytes(text) > MAX_BODY_BYTES) { + return jsonResponse(413, { error: 'payload too large' }); + } + return text; +} + +function resolveLangfuseUrl(env: Env): string { + return `${(env.LANGFUSE_BASE_URL?.trim() || DEFAULT_LANGFUSE_BASE_URL).replace(/\/+$/, '')}/api/public/ingestion`; +} + +function hasLangfuseCredentials(env: Env): boolean { + return Boolean(env.LANGFUSE_PUBLIC_KEY?.trim() && env.LANGFUSE_SECRET_KEY?.trim()); +} + +function isHealthPath(request: Request): boolean { + const { pathname } = new URL(request.url); + return pathname === '/api/langfuse' || pathname === '/health'; +} + +async function handleRequest(request: Request, env: Env): Promise { + if (request.method === 'GET' && isHealthPath(request)) { + return jsonResponse(200, { + ok: true, + service: 'open-design-telemetry-relay', + configured: hasLangfuseCredentials(env), + upstream: resolveLangfuseUrl(env), + }); + } + + if (request.method !== 'POST') { + return jsonResponse(405, { error: 'method not allowed' }); + } + + if (request.headers.get(RELAY_MARKER_HEADER) !== RELAY_MARKER_VALUE) { + return jsonResponse(403, { error: 'missing telemetry client marker' }); + } + + const publicKey = env.LANGFUSE_PUBLIC_KEY?.trim(); + const secretKey = env.LANGFUSE_SECRET_KEY?.trim(); + if (!publicKey || !secretKey) { + return jsonResponse(503, { error: 'telemetry relay is not configured' }); + } + + const contentType = request.headers.get('content-type') ?? ''; + if (!contentType.toLowerCase().includes('application/json')) { + return jsonResponse(415, { error: 'content-type must be application/json' }); + } + + const rawBody = await readBoundedBody(request); + if (rawBody instanceof Response) return rawBody; + + let parsed: unknown; + try { + parsed = JSON.parse(rawBody); + } catch { + return jsonResponse(400, { error: 'invalid JSON' }); + } + + const validationError = validateIngestionBody(parsed); + if (validationError != null) { + return jsonResponse(400, { error: validationError }); + } + + const rateLimitResponse = await enforceRateLimits(request, env, parsed); + if (rateLimitResponse) return rateLimitResponse; + + const upstream = await fetch(resolveLangfuseUrl(env), { + method: 'POST', + headers: { + Authorization: basicAuthHeader(publicKey, secretKey), + 'Content-Type': 'application/json', + }, + body: rawBody, + }); + const upstreamBody = await upstream.text(); + return new Response(upstreamBody, { + status: upstream.status, + headers: { + 'Cache-Control': 'no-store', + 'Content-Type': upstream.headers.get('content-type') ?? 'application/json', + }, + }); +} + +export default { + fetch: handleRequest, +}; diff --git a/apps/telemetry-worker/tests/index.test.ts b/apps/telemetry-worker/tests/index.test.ts new file mode 100644 index 000000000..9aa37fe3a --- /dev/null +++ b/apps/telemetry-worker/tests/index.test.ts @@ -0,0 +1,148 @@ +import { describe, expect, it, vi } from 'vitest'; + +import worker, { type Env } from '../src/index'; + +const env: Env = { + LANGFUSE_PUBLIC_KEY: 'pk-lf-test', + LANGFUSE_SECRET_KEY: 'sk-lf-test', + LANGFUSE_BASE_URL: 'https://us.cloud.langfuse.com', +}; + +function makeRequest(body: unknown): Request { + return new Request('https://telemetry.open-design.ai/api/langfuse', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-Open-Design-Telemetry': 'langfuse-ingestion-v1', + }, + body: JSON.stringify(body), + }); +} + +function makeRateLimiter(success: boolean) { + return { + limit: vi.fn(async () => ({ success })), + }; +} + +describe('telemetry worker', () => { + it('returns a health response for browser checks', async () => { + const response = await worker.fetch( + new Request('https://telemetry.open-design.ai/api/langfuse'), + env, + ); + + expect(response.status).toBe(200); + expect(await response.json()).toEqual({ + ok: true, + service: 'open-design-telemetry-relay', + configured: true, + upstream: 'https://us.cloud.langfuse.com/api/public/ingestion', + }); + }); + + it('reports unconfigured health without exposing secrets', async () => { + const response = await worker.fetch(new Request('https://telemetry.open-design.ai/health'), {}); + + expect(response.status).toBe(200); + expect(await response.json()).toEqual({ + ok: true, + service: 'open-design-telemetry-relay', + configured: false, + upstream: 'https://us.cloud.langfuse.com/api/public/ingestion', + }); + }); + + it('forwards valid Langfuse ingestion batches with server-side auth', async () => { + const fetchSpy = vi.spyOn(globalThis, 'fetch').mockResolvedValue( + new Response(JSON.stringify({ successes: [{ id: 'evt-1' }], errors: [] }), { + status: 207, + headers: { 'Content-Type': 'application/json' }, + }), + ); + + const response = await worker.fetch( + makeRequest({ + batch: [ + { + id: 'evt-1', + type: 'trace-create', + timestamp: '2026-05-11T00:00:00.000Z', + body: { id: 'trace-1', name: 'open-design-turn' }, + }, + ], + }), + env, + ); + + expect(response.status).toBe(207); + expect(fetchSpy).toHaveBeenCalledTimes(1); + const [url, init] = fetchSpy.mock.calls[0]!; + expect(url).toBe('https://us.cloud.langfuse.com/api/public/ingestion'); + expect((init as RequestInit).headers).toMatchObject({ + Authorization: expect.stringMatching(/^Basic /), + 'Content-Type': 'application/json', + }); + + fetchSpy.mockRestore(); + }); + + it('rejects requests without the Open Design client marker', async () => { + const response = await worker.fetch( + new Request('https://telemetry.open-design.ai/api/langfuse', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ batch: [] }), + }), + env, + ); + + expect(response.status).toBe(403); + }); + + it('rate limits validated batches before forwarding', async () => { + const fetchSpy = vi.spyOn(globalThis, 'fetch'); + const limiter = makeRateLimiter(false); + const response = await worker.fetch( + makeRequest({ + batch: [ + { + id: 'evt-1', + type: 'trace-create', + timestamp: '2026-05-11T00:00:00.000Z', + body: { + id: 'trace-1', + name: 'open-design-turn', + userId: 'installation-1', + }, + }, + ], + }), + { ...env, TELEMETRY_CLIENT_RATE_LIMITER: limiter }, + ); + + expect(response.status).toBe(429); + expect(limiter.limit).toHaveBeenCalledWith({ key: 'client:installation-1' }); + expect(fetchSpy).not.toHaveBeenCalled(); + + fetchSpy.mockRestore(); + }); + + it('rejects malformed batches before forwarding', async () => { + const fetchSpy = vi.spyOn(globalThis, 'fetch'); + const response = await worker.fetch(makeRequest({ batch: [{ type: 'bad' }] }), env); + + expect(response.status).toBe(400); + expect(await response.json()).toEqual({ + error: 'body.batch[0].id must be a string', + }); + expect(fetchSpy).not.toHaveBeenCalled(); + + fetchSpy.mockRestore(); + }); + + it('fails closed when Langfuse credentials are absent', async () => { + const response = await worker.fetch(makeRequest({ batch: [] }), {}); + expect(response.status).toBe(503); + }); +}); diff --git a/apps/telemetry-worker/tsconfig.json b/apps/telemetry-worker/tsconfig.json new file mode 100644 index 000000000..0d7dfbbd7 --- /dev/null +++ b/apps/telemetry-worker/tsconfig.json @@ -0,0 +1,19 @@ +{ + "compilerOptions": { + "target": "ES2022", + "lib": ["ES2022", "WebWorker"], + "module": "ESNext", + "moduleResolution": "Bundler", + "strict": true, + "noUncheckedIndexedAccess": true, + "exactOptionalPropertyTypes": true, + "allowJs": false, + "checkJs": false, + "isolatedModules": true, + "forceConsistentCasingInFileNames": true, + "skipLibCheck": true, + "types": ["vitest/globals"] + }, + "include": ["src/**/*.ts", "tests/**/*.ts", "vitest.config.ts"], + "exclude": ["node_modules", "dist"] +} diff --git a/apps/telemetry-worker/vitest.config.ts b/apps/telemetry-worker/vitest.config.ts new file mode 100644 index 000000000..4ac6027d5 --- /dev/null +++ b/apps/telemetry-worker/vitest.config.ts @@ -0,0 +1,7 @@ +import { defineConfig } from 'vitest/config'; + +export default defineConfig({ + test: { + environment: 'node', + }, +}); diff --git a/apps/telemetry-worker/wrangler.toml b/apps/telemetry-worker/wrangler.toml new file mode 100644 index 000000000..3516ccd16 --- /dev/null +++ b/apps/telemetry-worker/wrangler.toml @@ -0,0 +1,25 @@ +name = "open-design-telemetry-relay" +account_id = "64ad4569ffd912432d6b86d5656484c4" +main = "src/index.ts" +compatibility_date = "2026-05-01" +workers_dev = false +routes = [ + { pattern = "telemetry.open-design.ai", custom_domain = true } +] + +[vars] +LANGFUSE_BASE_URL = "https://us.cloud.langfuse.com" + +[[ratelimits]] +name = "TELEMETRY_CLIENT_RATE_LIMITER" +namespace_id = "1001" + [ratelimits.simple] + limit = 120 + period = 60 + +[[ratelimits]] +name = "TELEMETRY_IP_RATE_LIMITER" +namespace_id = "1002" + [ratelimits.simple] + limit = 600 + period = 60 diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 7f49054ec..b72d0ffd0 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -178,6 +178,15 @@ importers: specifier: ^2.1.8 version: 2.1.9(@types/node@24.12.2)(jsdom@29.1.1) + apps/telemetry-worker: + devDependencies: + typescript: + specifier: ^5.6.3 + version: 5.9.3 + vitest: + specifier: ^2.1.8 + version: 2.1.9(@types/node@24.12.2)(jsdom@29.1.1) + apps/web: dependencies: '@anthropic-ai/sdk': diff --git a/tools/pack/src/config.ts b/tools/pack/src/config.ts index 5a207d7ad..1652dbc59 100644 --- a/tools/pack/src/config.ts +++ b/tools/pack/src/config.ts @@ -72,6 +72,7 @@ export type ToolPackConfig = { roots: ToolPackRoots; silent: boolean; signed: boolean; + telemetryRelayUrl?: string; to: ToolPackBuildOutput; webOutputMode: ToolPackWebOutputMode; workspaceRoot: string; @@ -108,6 +109,22 @@ function resolveToolPackWebOutputMode(platform: ToolPackPlatform, value: string throw new Error(`unsupported OD_WEB_OUTPUT_MODE value: ${value}`); } +function resolveToolPackTelemetryRelayUrl(value: string | undefined): string | undefined { + if (value == null) return undefined; + const normalized = value.trim(); + if (normalized.length === 0) return undefined; + let parsed: URL; + try { + parsed = new URL(normalized); + } catch { + throw new Error(`OPEN_DESIGN_TELEMETRY_RELAY_URL must be an absolute https URL: ${value}`); + } + if (parsed.protocol !== "https:") { + throw new Error(`OPEN_DESIGN_TELEMETRY_RELAY_URL must use https: ${value}`); + } + return normalized.replace(/\/+$/, ""); +} + function resolveElectronVersion(workspaceRoot: string): string { const require = createRequire(join(workspaceRoot, "apps/desktop/package.json")); const desktopPackage = require(join(workspaceRoot, "apps/desktop/package.json")) as { @@ -177,6 +194,7 @@ export function resolveToolPackConfig( removeSidecars: options.removeSidecars === true, silent: options.silent !== false, signed: options.signed === true, + telemetryRelayUrl: resolveToolPackTelemetryRelayUrl(process.env.OPEN_DESIGN_TELEMETRY_RELAY_URL), to: resolveToolPackBuildOutput(platform, options.to), webOutputMode: resolveToolPackWebOutputMode(platform, process.env.OD_WEB_OUTPUT_MODE), workspaceRoot: WORKSPACE_ROOT, diff --git a/tools/pack/src/linux.ts b/tools/pack/src/linux.ts index f2ebfa78c..8ddef5c07 100644 --- a/tools/pack/src/linux.ts +++ b/tools/pack/src/linux.ts @@ -134,7 +134,7 @@ export function buildDockerArgs( } const innerCommand = `${pnpmCmd} install --frozen-lockfile && ` + innerArgs.join(" "); - return [ + const dockerArgs = [ "run", "--rm", "--user", @@ -155,13 +155,19 @@ export function buildDockerArgs( "ELECTRON_CACHE=/home/builder/.cache/electron", "-e", "ELECTRON_BUILDER_CACHE=/home/builder/.cache/electron-builder", + ]; + if (config.telemetryRelayUrl != null) { + dockerArgs.push("-e", `OPEN_DESIGN_TELEMETRY_RELAY_URL=${config.telemetryRelayUrl}`); + } + dockerArgs.push( "-w", "/project", "electronuserland/builder:base", "bash", "-lc", innerCommand, - ]; + ); + return dockerArgs; } export type DesktopTemplateValues = { @@ -385,6 +391,7 @@ async function writeAssembledApp( appVersion: version, namespace: config.namespace, nodeCommandRelative: "open-design/bin/node", + ...(config.telemetryRelayUrl == null ? {} : { telemetryRelayUrl: config.telemetryRelayUrl }), ...(config.portable ? {} : { namespaceBaseRoot: config.roots.runtime.namespaceBaseRoot }), }, null, diff --git a/tools/pack/src/mac/app.ts b/tools/pack/src/mac/app.ts index ab2b6f1de..0d1580190 100644 --- a/tools/pack/src/mac/app.ts +++ b/tools/pack/src/mac/app.ts @@ -233,6 +233,7 @@ export async function writeAssembledApp( ...(usePrebundledStandaloneWeb ? { daemonSidecarEntryRelative: MAC_PREBUNDLED_DAEMON_SIDECAR_RELATIVE_PATH } : {}), namespace: config.namespace, nodeCommandRelative: "open-design/bin/node", + ...(config.telemetryRelayUrl == null ? {} : { telemetryRelayUrl: config.telemetryRelayUrl }), ...(usePrebundledStandaloneWeb ? { webSidecarEntryRelative: MAC_PREBUNDLED_WEB_SIDECAR_RELATIVE_PATH } : {}), webOutputMode: config.webOutputMode, ...(config.portable ? {} : { namespaceBaseRoot: config.roots.runtime.namespaceBaseRoot }), diff --git a/tools/pack/src/win/manifest.ts b/tools/pack/src/win/manifest.ts index 77a04f402..875d05eab 100644 --- a/tools/pack/src/win/manifest.ts +++ b/tools/pack/src/win/manifest.ts @@ -19,6 +19,7 @@ function createPackagedConfig(config: ToolPackConfig, packagedVersion: string): return { appVersion: packagedVersion, namespace: config.namespace, + ...(config.telemetryRelayUrl == null ? {} : { telemetryRelayUrl: config.telemetryRelayUrl }), webOutputMode: config.webOutputMode, ...(config.portable ? {} : { namespaceBaseRoot: config.roots.runtime.namespaceBaseRoot }), }; diff --git a/tools/pack/tests/config.test.ts b/tools/pack/tests/config.test.ts new file mode 100644 index 000000000..bce1a7903 --- /dev/null +++ b/tools/pack/tests/config.test.ts @@ -0,0 +1,35 @@ +import { afterEach, describe, expect, it } from "vitest"; + +import { resolveToolPackConfig } from "../src/config.js"; + +const savedTelemetryRelayUrl = process.env.OPEN_DESIGN_TELEMETRY_RELAY_URL; + +afterEach(() => { + if (savedTelemetryRelayUrl == null) { + delete process.env.OPEN_DESIGN_TELEMETRY_RELAY_URL; + } else { + process.env.OPEN_DESIGN_TELEMETRY_RELAY_URL = savedTelemetryRelayUrl; + } +}); + +describe("resolveToolPackConfig telemetry relay", () => { + it("reads and normalizes OPEN_DESIGN_TELEMETRY_RELAY_URL for packaged config", () => { + process.env.OPEN_DESIGN_TELEMETRY_RELAY_URL = "https://telemetry.open-design.ai/api/langfuse//"; + const config = resolveToolPackConfig("mac", { namespace: "telemetry-test" }); + expect(config.telemetryRelayUrl).toBe("https://telemetry.open-design.ai/api/langfuse"); + }); + + it("rejects invalid telemetry relay URLs", () => { + process.env.OPEN_DESIGN_TELEMETRY_RELAY_URL = "not-a-url"; + expect(() => resolveToolPackConfig("mac")).toThrow( + /OPEN_DESIGN_TELEMETRY_RELAY_URL must be an absolute https URL/, + ); + }); + + it("rejects plaintext telemetry relay URLs for packaged config", () => { + process.env.OPEN_DESIGN_TELEMETRY_RELAY_URL = "http://telemetry.open-design.ai/api/langfuse"; + expect(() => resolveToolPackConfig("mac")).toThrow( + /OPEN_DESIGN_TELEMETRY_RELAY_URL must use https/, + ); + }); +}); diff --git a/tools/pack/tests/linux.test.ts b/tools/pack/tests/linux.test.ts index ab19d26f0..5e2953915 100644 --- a/tools/pack/tests/linux.test.ts +++ b/tools/pack/tests/linux.test.ts @@ -86,6 +86,17 @@ describe("buildDockerArgs", () => { expect(args).toContain("ELECTRON_BUILDER_CACHE=/home/builder/.cache/electron-builder"); }); + it("passes the telemetry relay URL into containerized builds when configured", () => { + const args = buildDockerArgs( + { + ...makeConfig(), + telemetryRelayUrl: "https://telemetry.open-design.ai/api/langfuse", + }, + { uid: 1000, gid: 1000 }, + ); + expect(args).toContain("OPEN_DESIGN_TELEMETRY_RELAY_URL=https://telemetry.open-design.ai/api/langfuse"); + }); + it("re-invokes pnpm tools-pack linux build inside the container without --containerized", () => { const args = buildDockerArgs(makeConfig(), { uid: 1000, gid: 1000 }); const last = args[args.length - 1];