diff --git a/apps/daemon/src/acp.ts b/apps/daemon/src/acp.ts index 77df76bf0..705de8c72 100644 --- a/apps/daemon/src/acp.ts +++ b/apps/daemon/src/acp.ts @@ -288,34 +288,155 @@ function currentModelFromSessionResult(result: JsonObject): string | null { export function createJsonLineStream(onMessage: (message: unknown, rawLine: string) => void) { let buffer = ''; + let pendingJson = ''; + let pendingJsonLineCount = 0; + + const emit = (candidate: string): boolean => { + try { + onMessage(JSON.parse(candidate), candidate); + return true; + } catch { + return false; + } + }; + + const startPendingJson = (line: string) => { + pendingJson = line; + pendingJsonLineCount = 1; + }; + + const resetPendingJson = () => { + pendingJson = ''; + pendingJsonLineCount = 0; + }; + + const handleLine = (line: string) => { + const trimmed = line.trim(); + if (!trimmed) return; + if (pendingJson) { + const nextCandidate = `${pendingJson}\n${trimmed}`; + if (emit(nextCandidate)) { + resetPendingJson(); + return; + } + pendingJsonLineCount += 1; + if ( + pendingJsonLineCount === 2 && + pendingJson !== '{' && + pendingJson !== '[' && + emit(trimmed) + ) { + resetPendingJson(); + return; + } + const state = classifyJsonCandidate(nextCandidate); + if ( + state === 'incomplete' && + nextCandidate.length <= 128_000 && + pendingJsonLineCount <= 256 + ) { + pendingJson = nextCandidate; + return; + } + resetPendingJson(); + handleLine(trimmed); + return; + } + if (emit(trimmed)) return; + // ACP is line-delimited JSON-RPC, but a few bridges have emitted + // pretty-printed JSON during startup. Keep a bounded aggregate so an + // otherwise valid multiline initialize response does not get discarded + // line-by-line and leave the session stuck in spawn pending. + if (trimmed.startsWith('{') || trimmed.startsWith('[')) { + startPendingJson(trimmed); + } + }; + return { feed(chunk: string) { buffer += chunk; const lines = buffer.split('\n'); buffer = lines.pop() || ''; for (const line of lines) { - const trimmed = line.trim(); - if (!trimmed) continue; - try { - onMessage(JSON.parse(trimmed), trimmed); - } catch { - // Ignore non-JSON log lines on stdout. - } + handleLine(line); } }, flush() { const trimmed = buffer.trim(); buffer = ''; - if (!trimmed) return; - try { - onMessage(JSON.parse(trimmed), trimmed); - } catch { - // Ignore trailing non-JSON log lines on stdout. + if (trimmed) { + handleLine(trimmed); } + if (pendingJson && emit(pendingJson)) { + pendingJson = ''; + } + // Ignore trailing non-JSON log lines on stdout. }, }; } +function classifyJsonCandidate(value: string): 'complete' | 'incomplete' | 'invalid' { + const stack: string[] = []; + let started = false; + let complete = false; + let inString = false; + let escaping = false; + + for (const char of value) { + if (!started) { + if (/\s/.test(char)) continue; + if (char === '{') { + started = true; + stack.push('}'); + continue; + } + if (char === '[') { + started = true; + stack.push(']'); + continue; + } + return 'invalid'; + } + + if (complete) { + if (/\s/.test(char)) continue; + return 'invalid'; + } + + if (inString) { + if (escaping) { + escaping = false; + } else if (char === '\\') { + escaping = true; + } else if (char === '"') { + inString = false; + } + continue; + } + + if (char === '"') { + inString = true; + continue; + } + if (char === '{') { + stack.push('}'); + continue; + } + if (char === '[') { + stack.push(']'); + continue; + } + if (char === '}' || char === ']') { + if (stack.pop() !== char) return 'invalid'; + if (stack.length === 0) complete = true; + } + } + + if (!started) return 'invalid'; + if (inString || escaping || stack.length > 0) return 'incomplete'; + return complete ? 'complete' : 'invalid'; +} + export async function detectAcpModels({ bin, args, diff --git a/apps/daemon/tests/acp.test.ts b/apps/daemon/tests/acp.test.ts index c528980c7..2359b8a56 100644 --- a/apps/daemon/tests/acp.test.ts +++ b/apps/daemon/tests/acp.test.ts @@ -383,6 +383,58 @@ test('attachAcpSession.abort during startup ends stdin without sending session/c assert.equal(cancelRequests.length, 0); }); +test('attachAcpSession accepts pretty-printed ACP startup responses', () => { + const child = new FakeAcpChild(); + const writes: string[] = []; + const events: Array<{ event: string; payload: unknown }> = []; + child.stdin.on('data', (chunk) => writes.push(String(chunk))); + + attachAcpSession({ + child: child as never, + prompt: 'hello', + cwd: '/tmp/od-project', + model: null, + mcpServers: [], + send: (event, payload) => events.push({ event, payload }), + }); + + child.stdout.write('{\n "id": 1,\n "result":\n {}\n}\n'); + child.stdout.write('{\n "id": 2,\n "result":\n {\n "sessionId": "session-1"\n }\n}\n'); + + const methods = parseRpcWrites(writes) + .map((entry) => entry.method) + .filter(Boolean); + assert.deepEqual(methods, ['initialize', 'session/new', 'session/prompt']); + assert.equal(events.some((entry) => entry.event === 'error'), false); +}); + +test('attachAcpSession recovers when bracket-prefixed logs precede JSON frames', () => { + const child = new FakeAcpChild(); + const writes: string[] = []; + const events: Array<{ event: string; payload: unknown }> = []; + child.stdin.on('data', (chunk) => writes.push(String(chunk))); + + attachAcpSession({ + child: child as never, + prompt: 'hello', + cwd: '/tmp/od-project', + model: null, + mcpServers: [], + send: (event, payload) => events.push({ event, payload }), + }); + + child.stdout.write('[vela] starting OpenCode bridge\n'); + child.stdout.write(`${JSON.stringify({ id: 1, result: {} })}\n`); + child.stdout.write('{not json but looks like an object log\n'); + child.stdout.write(`${JSON.stringify({ id: 2, result: { sessionId: 'session-1' } })}\n`); + + const methods = parseRpcWrites(writes) + .map((entry) => entry.method) + .filter(Boolean); + assert.deepEqual(methods, ['initialize', 'session/new', 'session/prompt']); + assert.equal(events.some((entry) => entry.event === 'error'), false); +}); + function parseRpcWrites(writes: string[]): Array> { return writes .join('')