import { spawn, type ChildProcess } from 'node:child_process'; import type { Writable } from 'node:stream'; import path from 'node:path'; const ACP_PROTOCOL_VERSION = 1; const DEFAULT_TIMEOUT_MS = 15_000; const MAX_TIMEOUT_MS = 24 * 60 * 60 * 1000; // Gap-between-chunks watchdog for an ACP session stage. The timer resets on // every line received from the agent, so this bounds *silent* periods, not // total runtime. Default kept in line with the outer chat-run inactivity // watchdog (10 min) so agents that spend several minutes silently writing // large artifacts do not get killed before the outer watchdog can apply. // Callers can override via `stageTimeoutMs`; the chat server reads // `OD_ACP_STAGE_TIMEOUT_MS` from the environment. // A non-positive `stageTimeoutMs` (`<= 0`) disables the watchdog entirely, // mirroring the outer chat watchdog's escape-hatch semantics — without this, // `OD_ACP_STAGE_TIMEOUT_MS=0` would call `setTimeout(..., 0)` and fail every // ACP session on the next tick instead of disabling the watchdog. const DEFAULT_STAGE_TIMEOUT_MS = 600_000; type JsonRpcId = string | number; type JsonObject = Record; type RpcWritable = Pick; type AcpChildProcess = ChildProcess; type TimerHandle = ReturnType; export interface AcpMcpServerInput { type?: unknown; name?: unknown; command?: unknown; args?: unknown; env?: unknown; } interface AcpSessionOptions { mcpServers?: AcpMcpServerInput[]; } export interface ModelOption { id: string; label: string; } interface AcpModelConfigOption { configId: string; currentValue: string | null; values: unknown[]; } const MODEL_CONFIG_OPTION_IDS = new Set(['model', 'models', 'modelid', 'modelids']); interface DetectAcpModelsOptions { bin: string; args: string[]; cwd?: string; env?: NodeJS.ProcessEnv; timeoutMs?: number; clientName?: string; clientVersion?: string; defaultModelOption?: ModelOption; } interface AttachAcpSessionOptions { child: AcpChildProcess; prompt: string; cwd?: string; model?: string | null; imagePaths?: string[]; mcpServers?: AcpMcpServerInput[]; send: (event: string, payload: unknown) => void; clientName?: string; clientVersion?: string; stageTimeoutMs?: number; modelUnavailableErrorCode?: 'AMR_MODEL_UNAVAILABLE'; } function errorMessage(err: unknown): string { return err instanceof Error ? err.message : String(err); } function resolveAcpTimeoutMs(env: NodeJS.ProcessEnv, fallbackMs: number): number { const raw = Number(env.OD_ACP_TIMEOUT_MS); if (!Number.isFinite(raw)) return fallbackMs; return Math.min(MAX_TIMEOUT_MS, Math.max(0, Math.floor(raw))); } function asObject(value: unknown): JsonObject | null { return value && typeof value === 'object' ? value as JsonObject : null; } export function buildAcpSessionNewParams(cwd: string, { mcpServers }: AcpSessionOptions = {}) { const servers = Array.isArray(mcpServers) ? mcpServers : []; return { cwd: path.resolve(cwd), // MCP is an optional compatibility layer. Default to no MCP servers so ACP // agents can run through the skill + CLI path without MCP support. Do not // auto-install or mutate user/global MCP config; callers must pass an // explicit per-session MCP descriptor when a compatible agent supports it. // Normalize to the ACP stdio server shape expected by Kimi/Hermes. mcpServers: servers.map((s) => ({ type: typeof s?.type === 'string' ? s.type : 'stdio', name: typeof s?.name === 'string' ? s.name : '', command: typeof s?.command === 'string' ? s.command : '', args: Array.isArray(s?.args) ? s.args : [], env: Array.isArray(s?.env) ? s.env : [], })), }; } function sendRpc(writable: RpcWritable, id: JsonRpcId, method: string, params: unknown): void { writable.write( `${JSON.stringify({ jsonrpc: '2.0', id, method, params })}\n`, ); } function sendRpcResult(writable: RpcWritable, id: JsonRpcId, result: unknown): void { writable.write(`${JSON.stringify({ jsonrpc: '2.0', id, result })}\n`); } function buildPromptBlocks(prompt: string, imagePaths: string[]): Array> { const blocks: Array> = [{ type: 'text', text: prompt }]; for (const imagePath of imagePaths) { if (typeof imagePath !== 'string' || imagePath.trim().length === 0) continue; blocks.push({ type: 'resource_link', uri: imagePath }); } return blocks; } function isJsonRpcId(value: unknown): value is JsonRpcId { return typeof value === 'number' || typeof value === 'string'; } function rpcErrorMessage(raw: unknown): string { const obj = asObject(raw); const error = asObject(obj?.error); if (!obj || !error) { return ''; } const message = typeof error.message === 'string' ? error.message : typeof error.code === 'number' ? String(error.code) : 'json-rpc error'; return typeof obj.id === 'number' ? `json-rpc id ${obj.id}: ${message}` : message; } interface FormattedUsage { input_tokens?: number; output_tokens?: number; cached_read_tokens?: number; thought_tokens?: number; total_tokens?: number; } function formatUsage(usage: unknown): FormattedUsage | null { const src = asObject(usage); if (!src) return null; const out: FormattedUsage = {}; if (typeof src.inputTokens === 'number') out.input_tokens = src.inputTokens; if (typeof src.outputTokens === 'number') out.output_tokens = src.outputTokens; if (typeof src.cachedReadTokens === 'number') { out.cached_read_tokens = src.cachedReadTokens; } if (typeof src.thoughtTokens === 'number') out.thought_tokens = src.thoughtTokens; if (typeof src.totalTokens === 'number') out.total_tokens = src.totalTokens; return Object.keys(out).length > 0 ? out : null; } function choosePermissionOutcome(options: unknown): string | null { const list = Array.isArray(options) ? options : []; const approveForSession = list.find((option) => option?.optionId === 'approve_for_session'); if (approveForSession) return 'approve_for_session'; const allowAlways = list.find((option) => option?.kind === 'allow_always'); if (allowAlways?.optionId) return allowAlways.optionId; const allowOnce = list.find((option) => option?.kind === 'allow_once'); if (allowOnce?.optionId) return allowOnce.optionId; return null; } function normalizeConfigOptionToken(value: unknown): string { return typeof value === 'string' ? value.trim().toLowerCase().replace(/[\s_-]+/g, '') : ''; } function isModelConfigOption(option: JsonObject, configId: string): boolean { const category = normalizeConfigOptionToken(option.category); if (category === 'model') return true; const id = normalizeConfigOptionToken(configId); if (id === 'model') return true; if (category) return false; const name = normalizeConfigOptionToken(option.name); return MODEL_CONFIG_OPTION_IDS.has(id) || name === 'model'; } function findModelConfigOption(configOptions: unknown): AcpModelConfigOption | null { const options = Array.isArray(configOptions) ? configOptions : []; for (const rawOption of options) { const option = asObject(rawOption); if (!option) continue; const configId = typeof option.id === 'string' ? option.id.trim() : ''; if (!configId) continue; const type = typeof option.type === 'string' ? option.type.trim() : ''; if (type && type !== 'select') continue; if (!isModelConfigOption(option, configId)) continue; const currentValue = typeof option.currentValue === 'string' && option.currentValue.trim() ? option.currentValue.trim() : null; return { configId, currentValue, values: Array.isArray(option.options) ? option.options : [], }; } return null; } function normalizeModelConfigOptions( configOptions: unknown, defaultModelOption: ModelOption, ): { currentModelId: string | null; models: ModelOption[] } | null { const modelConfig = findModelConfigOption(configOptions); if (!modelConfig) return null; const seen = new Set([defaultModelOption.id]); const out = [defaultModelOption]; for (const rawValue of modelConfig.values) { const value = asObject(rawValue); if (!value) continue; const id = typeof value.value === 'string' && value.value.trim() ? value.value.trim() : typeof value.id === 'string' ? value.id.trim() : ''; if (!id || seen.has(id)) continue; seen.add(id); const name = typeof value.name === 'string' ? value.name.trim() : ''; const isCurrent = id === modelConfig.currentValue; const labelBase = name && name !== id ? `${name} (${id})` : id; out.push({ id, label: isCurrent ? `${labelBase} • current` : labelBase }); } return { currentModelId: modelConfig.currentValue, models: out }; } export function normalizeModels( models: unknown, defaultModelOption: ModelOption, configOptions?: unknown, ): ModelOption[] { const configModels = normalizeModelConfigOptions(configOptions, defaultModelOption); if (configModels && configModels.models.length > 1) { return configModels.models; } const modelsObj = asObject(models); const available = Array.isArray(modelsObj?.availableModels) ? modelsObj.availableModels : []; const currentModelId = typeof modelsObj?.currentModelId === 'string' ? modelsObj.currentModelId : null; const seen = new Set([defaultModelOption.id]); const out = [defaultModelOption]; for (const model of available) { const id = typeof model?.modelId === 'string' ? model.modelId.trim() : ''; if (!id || seen.has(id)) continue; seen.add(id); const name = typeof model?.name === 'string' ? model.name.trim() : ''; const isCurrent = id === currentModelId; const labelBase = name && name !== id ? `${name} (${id})` : id; out.push({ id, label: isCurrent ? `${labelBase} • current` : labelBase }); } return out.length > 1 || !configModels ? out : configModels.models; } function modelSelectionErrorIsRecoverable(code: unknown): boolean { return code === -32603 || code === -32602 || code === -32601 || code === -32002; } function currentModelFromSessionResult(result: JsonObject): string | null { const configCurrent = findModelConfigOption(result.configOptions)?.currentValue; if (configCurrent) return configCurrent; const models = asObject(result.models); return typeof models?.currentModelId === 'string' && models.currentModelId.trim() ? models.currentModelId.trim() : null; } export function createJsonLineStream(onMessage: (message: unknown, rawLine: string) => void) { let buffer = ''; let pendingJson = ''; let pendingJsonLineCount = 0; const emit = (candidate: string): boolean => { try { onMessage(JSON.parse(candidate), candidate); return true; } catch { return false; } }; const startPendingJson = (line: string) => { pendingJson = line; pendingJsonLineCount = 1; }; const handleLine = (line: string) => { const trimmed = line.trim(); if (!trimmed) return; if (pendingJson) { if ((trimmed.startsWith('{') || trimmed.startsWith('[')) && emit(trimmed)) { pendingJson = ''; pendingJsonLineCount = 0; return; } const nextCandidate = `${pendingJson}\n${trimmed}`; if (emit(nextCandidate)) { pendingJson = ''; pendingJsonLineCount = 0; return; } pendingJsonLineCount += 1; if (nextCandidate.length <= 128_000 && pendingJsonLineCount <= 256) { pendingJson = nextCandidate; return; } pendingJson = ''; pendingJsonLineCount = 0; handleLine(trimmed); return; } if (emit(trimmed)) return; // ACP is line-delimited JSON-RPC, but a few bridges have emitted // pretty-printed JSON during startup. Keep a bounded aggregate so an // otherwise valid multiline initialize response does not get discarded // line-by-line and leave the session stuck in spawn pending. if (trimmed.startsWith('{') || trimmed.startsWith('[')) { startPendingJson(trimmed); } }; return { feed(chunk: string) { buffer += chunk; const lines = buffer.split('\n'); buffer = lines.pop() || ''; for (const line of lines) { handleLine(line); } }, flush() { const trimmed = buffer.trim(); buffer = ''; if (trimmed) { handleLine(trimmed); } if (pendingJson && emit(pendingJson)) { pendingJson = ''; } // Ignore trailing non-JSON log lines on stdout. }, }; } export async function detectAcpModels({ bin, args, cwd = process.cwd(), env = process.env, timeoutMs = DEFAULT_TIMEOUT_MS, clientName = 'open-design-detect', clientVersion = 'runtime-adapter', defaultModelOption = { id: 'default', label: 'Default (CLI config)' }, }: DetectAcpModelsOptions): Promise { const effectiveTimeoutMs = resolveAcpTimeoutMs(env, timeoutMs); return await new Promise((resolve, reject) => { const child = spawn(bin, args, { cwd, stdio: ['pipe', 'pipe', 'pipe'], env: { ...env }, }); child.stdout.setEncoding('utf8'); child.stderr.setEncoding('utf8'); let settled = false; let stderrBuf = ''; let expectedId = 1; let nextId = 2; let timer: TimerHandle | null = null; const finish = (fn: (value: T) => void, value: T) => { if (settled) return; settled = true; if (timer) clearTimeout(timer); try { child.stdin.end(); } catch {} fn(value); }; const fail = (message: string) => { finish(reject, new Error(message)); if (!child.killed) child.kill('SIGTERM'); }; const writeRpc = (id: JsonRpcId, method: string, params: unknown) => { try { sendRpc(child.stdin, id, method, params); } catch (err) { fail(`stdin write failed: ${errorMessage(err)}`); } }; const sendSessionNew = () => { expectedId = nextId; writeRpc(nextId, 'session/new', buildAcpSessionNewParams(cwd)); nextId += 1; }; const parser = createJsonLineStream((raw) => { const obj = asObject(raw); const error = asObject(obj?.error); const result = asObject(obj?.result); const rpcErr = rpcErrorMessage(raw); if (rpcErr) { // JSON-RPC -32603 "Internal error" during model detection: // If this is for the current expected-id (initialize/session/new), // it's a real probe failure — reject immediately. // Otherwise it's cleanup noise — suppress it. if (error?.code === -32603 && obj?.id !== expectedId) return; fail(rpcErr); return; } if (obj?.id !== expectedId || !result) return; if (expectedId === 1) { sendSessionNew(); return; } if (expectedId === 2) { const models = normalizeModels(result.models, defaultModelOption, result.configOptions); finish(resolve, models); if (!child.killed) child.kill('SIGTERM'); } }); child.stdout.on('data', (chunk) => parser.feed(chunk)); child.stdout.on('close', () => parser.flush()); child.stdin.on('error', (err) => fail(`stdin error: ${err.message}`)); child.stderr.on('data', (chunk) => { stderrBuf = `${stderrBuf}${chunk}`.slice(-16_000); }); child.on('error', (err) => fail(`spawn failed: ${err.message}`)); child.on('close', (code, signal) => { parser.flush(); if (!settled) { const errTail = stderrBuf.trim(); const suffix = errTail ? ` stderr=${errTail}` : ''; fail(`ACP model detection exited code=${code} signal=${signal ?? 'none'}${suffix}`); } }); if (effectiveTimeoutMs > 0) { timer = setTimeout(() => { fail(`ACP model detection timed out after ${effectiveTimeoutMs}ms`); }, effectiveTimeoutMs); } writeRpc(1, 'initialize', { protocolVersion: ACP_PROTOCOL_VERSION, clientCapabilities: { terminal: false }, clientInfo: { name: clientName, version: clientVersion }, }); }); } export function attachAcpSession({ child, prompt, cwd, model, imagePaths = [], mcpServers, send, clientName = 'open-design', clientVersion = 'runtime-adapter', stageTimeoutMs = DEFAULT_STAGE_TIMEOUT_MS, modelUnavailableErrorCode, }: AttachAcpSessionOptions) { const runStartedAt = Date.now(); const effectiveCwd = path.resolve(cwd || process.cwd()); if (!child.stdin || !child.stdout) { throw new Error('ACP child process must expose stdin and stdout streams'); } const stdin = child.stdin; const stdout = child.stdout; let expectedId = 1; let nextId = 2; let promptRequestId: JsonRpcId | null = null; let setModelRequestId: JsonRpcId | null = null; let sessionId: string | null = null; let activeModel: string | null = null; let modelConfigId: string | null = null; let emittedThinkingStart = false; let emittedFirstTokenStatus = false; let emittedTextChunk = false; let emittedTextBuffer = ''; let finished = false; let fatal = false; let aborted = false; let stageTimer: TimerHandle | null = null; const stageWatchdogDisabled = stageTimeoutMs <= 0; const resetStageTimer = (label: string) => { if (stageTimer) clearTimeout(stageTimer); // `stageTimeoutMs <= 0` disables the watchdog. Mirrors the outer chat // inactivity watchdog escape hatch (see server.ts → inactivityTimer). // Without this, an operator setting `OD_ACP_STAGE_TIMEOUT_MS=0` would // schedule a 0ms timeout that fires on the next tick and kills the // session immediately. if (stageWatchdogDisabled) return; stageTimer = setTimeout(() => { fail(`ACP ${label} timed out after ${stageTimeoutMs}ms`); }, stageTimeoutMs); }; const clearStageTimer = () => { if (stageTimer) clearTimeout(stageTimer); stageTimer = null; }; const amrModelUnavailablePayload = (message: string) => ({ message, error: { code: 'AMR_MODEL_UNAVAILABLE', message, retryable: false, details: { kind: 'amr_model', action: 'choose_model' }, }, }); const isModelUnavailableError = (message: string) => { const value = message.toLowerCase(); return ( value.includes('model not found') || value.includes('providermodelnotfounderror') || value.includes('unknown model') || value.includes('invalid model') ); }; const fail = ( message: string, options: { forceModelUnavailable?: boolean } = {}, ) => { if (finished) return; finished = true; fatal = true; clearStageTimer(); const useModelUnavailable = modelUnavailableErrorCode && (options.forceModelUnavailable || isModelUnavailableError(message)); send( 'error', useModelUnavailable ? amrModelUnavailablePayload(message) : { message }, ); if (!child.killed) child.kill('SIGTERM'); }; const writeRpc = (id: JsonRpcId, method: string, params: unknown, timeoutLabel: string) => { resetStageTimer(timeoutLabel); try { sendRpc(stdin, id, method, params); } catch (err) { fail(`stdin write failed: ${errorMessage(err)}`); } }; const sendPrompt = () => { promptRequestId = nextId; expectedId = promptRequestId; writeRpc( promptRequestId, 'session/prompt', { sessionId, prompt: buildPromptBlocks(prompt, imagePaths), }, 'session/prompt', ); nextId += 1; }; const replyPermission = (raw: JsonObject) => { const params = asObject(raw.params); const optionId = choosePermissionOutcome(params?.options); if (!optionId || !isJsonRpcId(raw.id)) { fail(`unhandled ACP permission request: ${JSON.stringify(raw)}`); return; } resetStageTimer('session/request_permission'); try { sendRpcResult(stdin, raw.id, { outcome: { outcome: 'selected', optionId }, }); } catch (err) { fail(`stdin write failed: ${errorMessage(err)}`); } }; const recoverFromModelSelectionError = () => { setModelRequestId = null; activeModel = activeModel || 'default'; send('agent', { type: 'status', label: 'model', model: activeModel }); sendPrompt(); }; const parser = createJsonLineStream((raw, rawLine) => { if (aborted) return; resetStageTimer('response'); const obj = asObject(raw); if (!obj) return; const error = asObject(obj.error); const params = asObject(obj.params); const result = asObject(obj.result); const rpcErr = rpcErrorMessage(obj); if (rpcErr) { // After response completion, any late-arriving errors from the agent // (pipe-broken, cleanup race conditions, etc.) are safe to ignore. if (finished) return; // JSON-RPC error handling: // -32603 unexpected-id errors are cleanup noise. Expected-id model // selection failures are recoverable; all other RPC errors are real // protocol failures for initialize/session/new/session/prompt. if ( obj.id === setModelRequestId && modelSelectionErrorIsRecoverable(error?.code) && promptRequestId === null ) { recoverFromModelSelectionError(); return; } if (error?.code === -32603 && obj.id !== expectedId) { return; } fail(rpcErr); return; } if (obj.method === 'session/request_permission') { replyPermission(obj); return; } const update = asObject(params?.update); if (obj.method === 'session/update' && update) { if (update.sessionUpdate === 'agent_thought_chunk') { const text = asObject(update.content)?.text; if (typeof text === 'string' && text.length > 0) { if (!emittedThinkingStart) { emittedThinkingStart = true; send('agent', { type: 'thinking_start' }); } send('agent', { type: 'thinking_delta', delta: text }); } return; } if (update.sessionUpdate === 'agent_message_chunk') { const text = asObject(update.content)?.text; if (typeof text === 'string' && text.length > 0) { const delta = text.startsWith(emittedTextBuffer) ? text.slice(emittedTextBuffer.length) : text; if (delta.length > 0) { emittedTextChunk = true; emittedTextBuffer += delta; if (!emittedFirstTokenStatus) { emittedFirstTokenStatus = true; send('agent', { type: 'status', label: 'streaming', ttftMs: Date.now() - runStartedAt, }); } send('agent', { type: 'text_delta', delta }); } } return; } return; } if (obj.id !== expectedId || !result) { return; } if (expectedId === 1) { expectedId = nextId; writeRpc( nextId, 'session/new', buildAcpSessionNewParams( effectiveCwd, mcpServers ? { mcpServers } : {}, ), 'session/new', ); nextId += 1; return; } if (expectedId === 2) { sessionId = typeof result.sessionId === 'string' ? result.sessionId : null; const modelConfig = findModelConfigOption(result.configOptions); modelConfigId = modelConfig?.configId ?? null; activeModel = currentModelFromSessionResult(result); if (sessionId && activeModel) { send('agent', { type: 'status', label: 'model', model: activeModel }); } if (sessionId && model && model !== 'default') { setModelRequestId = nextId; expectedId = nextId; const setModelMethod = modelConfigId ? 'session/set_config_option' : 'session/set_model'; const setModelParams = modelConfigId ? { sessionId, configId: modelConfigId, value: model } : { sessionId, modelId: model }; writeRpc( nextId, setModelMethod, setModelParams, setModelMethod, ); nextId += 1; return; } if (!sessionId) { fail(`invalid session/new response: ${rawLine}`); return; } sendPrompt(); return; } if (promptRequestId !== null && obj.id === promptRequestId) { if (!emittedTextChunk && modelUnavailableErrorCode) { fail( 'ACP session completed without producing any assistant text. Refresh the AMR model list, choose a supported model, and retry this run.', { forceModelUnavailable: true }, ); return; } const usage = formatUsage(result.usage); if (usage) { send('agent', { type: 'usage', usage, durationMs: Date.now() - runStartedAt, }); } finished = true; clearStageTimer(); stdin.end(); // Some ACP agents (e.g. Devin for Terminal) keep the child process // alive after stdin closes, waiting for the next prompt. Each Open // Design run spawns its own agent process per turn, so the child must // terminate for `child.on('close')` to fire and the chat run to // finalize — otherwise the chat stays stuck in the "working" state. // Give the child a short grace period to exit on its own first; if it // doesn't, SIGTERM forces it. This mirrors the pattern in // detectAcpModels() which already kills the child after a clean // model-discovery probe completes (see line ~270 in this file). const cleanExitTimer = setTimeout(() => { if (!child.killed) child.kill('SIGTERM'); }, 500); child.once('close', () => clearTimeout(cleanExitTimer)); return; } if (sessionId && model && model !== 'default' && obj.id === expectedId) { activeModel = currentModelFromSessionResult(result) ?? model; send('agent', { type: 'status', label: 'model', model: activeModel }); sendPrompt(); } }); stdout.on('data', (chunk: string) => parser.feed(chunk)); child.on('close', (code, signal) => { clearStageTimer(); parser.flush(); if (!finished && !aborted && !fatal) { fail(`ACP session exited before completion (code=${code ?? 'null'}, signal=${signal ?? 'none'})`); } }); child.on('error', (err: Error) => fail(err.message)); stdin.on('error', (err: Error) => fail(`stdin error: ${err.message}`)); writeRpc(1, 'initialize', { protocolVersion: ACP_PROTOCOL_VERSION, clientCapabilities: { terminal: false }, clientInfo: { name: clientName, version: clientVersion }, }, 'initialize'); return { hasFatalError() { return fatal; }, completedSuccessfully() { // Returns true when the prompt request resolved without a fatal error // and was not aborted. The chat consumer treats this as a successful // run even if the child process subsequently exited via SIGTERM // (which is expected for agents that don't shut down on stdin.end()). return finished && !fatal && !aborted; }, abort() { if (aborted || finished) return; aborted = true; finished = true; clearStageTimer(); if (!child.stdin || child.stdin.destroyed || child.stdin.writableEnded) return; // Only cancel an established session; before session/new resolves there // is no sessionId to cancel, but we must still close stdin below. if (sessionId) { try { sendRpc(child.stdin, nextId, 'session/cancel', { sessionId }); nextId += 1; } catch { // The caller owns process-signal fallback if the ACP transport is gone. } } // Always close stdin so the agent receives EOF and shuts down its own // runtime — the vela ACP bridge tears down its private OpenCode server on // EOF — instead of lingering (and leaking that server) until the caller's // SIGTERM fallback fires. This also covers aborts during ACP startup, // before session/new returns. Mirrors the clean-completion path above. try { child.stdin.end(); } catch { // Best effort; the caller still owns the SIGTERM/SIGKILL fallback. } }, }; }