From 5a0f954297242ee8a9c468e5882a25cb38cb5b34 Mon Sep 17 00:00:00 2001 From: Tom Date: Sat, 2 May 2026 15:06:37 +0700 Subject: [PATCH] fix(daemon): emit tool_use from tool_execution_start in pi-rpc (#186) * fix(daemon): emit tool_use from tool_execution_start in pi-rpc The pi-rpc adapter emitted tool_use from message_end, which fires before tool execution starts. The web UI pairs tool results to prior tool_use events, so receiving tool_result without a preceding tool_use broke tool card rendering and file auto-open behavior. Move tool_use emission to tool_execution_start, matching the pattern in copilot-stream.ts. Remove redundant tool call extraction from message_end (tool_use is now emitted at execution time, usage is already emitted from turn_end). Extract mapPiRpcEvent as a pure exported function so tests exercise the real event mapping logic instead of an inlined copy that can diverge from production. Ref: mrcfps review comments on PR #117 * docs(daemon): clarify mapPiRpcEvent mutability contract The function mutates ctx.sentFirstToken to track streaming state. Calling it "pure" is misleading; revised the doc comment to say no I/O or child process interaction instead. * fix(pi-rpc): remove redundant status(tool) emission from tool_execution_start Now that tool_use fires inline from tool_execution_start, the accompanying status(tool) event is redundant: tool_use already carries the tool name, and the UI renders running state from the tool card. The extra status pill breaks consecutive tool_use grouping in AssistantMessage.buildBlocks. Aligns with copilot-stream, which emits only tool_use from tool.execution_start with no status event. --- apps/daemon/src/pi-rpc.ts | 270 ++++++++++++++------------- apps/daemon/tests/pi-rpc.test.ts | 102 ++++------ e2e/tests/structured-streams.test.ts | 52 ++---- 3 files changed, 180 insertions(+), 244 deletions(-) diff --git a/apps/daemon/src/pi-rpc.ts b/apps/daemon/src/pi-rpc.ts index cbafa4391..c0919ac8e 100644 --- a/apps/daemon/src/pi-rpc.ts +++ b/apps/daemon/src/pi-rpc.ts @@ -63,6 +63,139 @@ function replyExtensionUi(writable, raw) { ); } +/** + * Map a single pi RPC event to zero or more daemon UI events. + * + * No I/O or child process interaction; mutates `ctx.sentFirstToken` + * to track streaming state. + * `send` callback and `ctx` are provided by the caller. + * + * @param {object} raw - parsed JSON from pi's stdout + * @param {function} send - (channel, payload) emitter + * @param {object} ctx - session context + * @param {number} ctx.runStartedAt - Date.now() at session start + * @param {{ value: boolean }} ctx.sentFirstToken - mutable flag + * @returns {string|null} 'agent_end' if the agent is done, null otherwise + */ +export function mapPiRpcEvent(raw, send, ctx) { + if (raw.type === 'agent_start') { + send('agent', { type: 'status', label: 'working' }); + return null; + } + + if (raw.type === 'agent_end') { + return 'agent_end'; + } + + if (raw.type === 'turn_start') { + send('agent', { type: 'status', label: 'thinking' }); + return null; + } + + if (raw.type === 'turn_end') { + if (raw.message?.usage) { + const u = raw.message.usage; + const usage = {}; + if (typeof u.input === 'number') usage.input_tokens = u.input; + if (typeof u.output === 'number') usage.output_tokens = u.output; + if (typeof u.cacheRead === 'number') usage.cached_read_tokens = u.cacheRead; + if (typeof u.cacheWrite === 'number') usage.cached_write_tokens = u.cacheWrite; + if (typeof u.totalTokens === 'number') usage.total_tokens = u.totalTokens; + if (Object.keys(usage).length > 0) { + const cost = u.cost; + send('agent', { + type: 'usage', + usage, + costUsd: cost?.total ?? cost?.totalCost ?? null, + durationMs: Date.now() - ctx.runStartedAt, + }); + } + } + return null; + } + + if (raw.type === 'message_update' && raw.assistantMessageEvent) { + const ev = raw.assistantMessageEvent; + + if (ev.type === 'text_delta' && typeof ev.delta === 'string') { + if (!ctx.sentFirstToken.value) { + ctx.sentFirstToken.value = true; + send('agent', { + type: 'status', + label: 'streaming', + ttftMs: Date.now() - ctx.runStartedAt, + }); + } + send('agent', { type: 'text_delta', delta: ev.delta }); + return null; + } + + if (ev.type === 'thinking_delta' && typeof ev.delta === 'string') { + send('agent', { type: 'thinking_delta', delta: ev.delta }); + return null; + } + + if (ev.type === 'thinking_start') { + send('agent', { type: 'thinking_start' }); + return null; + } + + if (ev.type === 'thinking_end') { + send('agent', { type: 'thinking_end' }); + return null; + } + + return null; + } + + if (raw.type === 'message_end') { + // message_end carries usage (already emitted from turn_end) and + // tool call blocks (already emitted from tool_execution_start). + // Nothing to extract here. + return null; + } + + if (raw.type === 'tool_execution_start') { + send('agent', { + type: 'tool_use', + id: raw.toolCallId ?? null, + name: raw.toolName ?? null, + input: raw.args ?? null, + }); + return null; + } + + if (raw.type === 'tool_execution_end') { + const content = raw.result?.content; + const text = + Array.isArray(content) + ? content + .map((c) => (c?.type === 'text' ? c.text : JSON.stringify(c))) + .join('\n') + : typeof content === 'string' + ? content + : ''; + send('agent', { + type: 'tool_result', + toolUseId: raw.toolCallId ?? null, + content: text, + isError: raw.isError === true, + }); + return null; + } + + if (raw.type === 'compaction_start') { + send('agent', { type: 'status', label: 'compacting' }); + return null; + } + if (raw.type === 'auto_retry_start') { + send('agent', { type: 'status', label: 'retrying' }); + return null; + } + + return null; +} + /** * Attach a pi RPC session to a spawned child process. * @@ -78,7 +211,7 @@ export function attachPiRpcSession({ child, prompt, cwd, model, send }) { const runStartedAt = Date.now(); let finished = false; let fatal = false; - let sentFirstToken = false; + const sentFirstToken = { value: false }; let nextRpcId = 1; @@ -125,14 +258,10 @@ export function attachPiRpcSession({ child, prompt, cwd, model, send }) { return; } - // ---- Agent events ---- + // Agent events: delegate to the pure mapper. + const result = mapPiRpcEvent(raw, send, { runStartedAt, sentFirstToken }); - if (raw.type === 'agent_start') { - send('agent', { type: 'status', label: 'working' }); - return; - } - - if (raw.type === 'agent_end') { + if (result === 'agent_end') { finished = true; // pi's RPC process stays alive after agent_end (designed for // multi-prompt sessions). The daemon's /api/chat is single-shot, @@ -147,131 +276,6 @@ export function attachPiRpcSession({ child, prompt, cwd, model, send }) { setTimeout(() => { if (!child.killed) child.kill('SIGTERM'); }, shutdownMs); - return; - } - - if (raw.type === 'turn_start') { - send('agent', { type: 'status', label: 'thinking' }); - return; - } - - if (raw.type === 'turn_end') { - // Extract usage from the completed message if present. - if (raw.message?.usage) { - const u = raw.message.usage; - const usage = {}; - if (typeof u.input === 'number') usage.input_tokens = u.input; - if (typeof u.output === 'number') usage.output_tokens = u.output; - if (typeof u.cacheRead === 'number') usage.cached_read_tokens = u.cacheRead; - if (typeof u.cacheWrite === 'number') usage.cached_write_tokens = u.cacheWrite; - if (typeof u.totalTokens === 'number') usage.total_tokens = u.totalTokens; - if (Object.keys(usage).length > 0) { - const cost = u.cost; - send('agent', { - type: 'usage', - usage, - costUsd: cost?.total ?? cost?.totalCost ?? null, - durationMs: Date.now() - runStartedAt, - }); - } - } - return; - } - - if (raw.type === 'message_update' && raw.assistantMessageEvent) { - const ev = raw.assistantMessageEvent; - - if (ev.type === 'text_delta' && typeof ev.delta === 'string') { - if (!sentFirstToken) { - sentFirstToken = true; - send('agent', { - type: 'status', - label: 'streaming', - ttftMs: Date.now() - runStartedAt, - }); - } - send('agent', { type: 'text_delta', delta: ev.delta }); - return; - } - - if (ev.type === 'thinking_delta' && typeof ev.delta === 'string') { - send('agent', { type: 'thinking_delta', delta: ev.delta }); - return; - } - - if (ev.type === 'thinking_start') { - send('agent', { type: 'thinking_start' }); - return; - } - - if (ev.type === 'thinking_end') { - send('agent', { type: 'thinking_end' }); - return; - } - - return; - } - - if (raw.type === 'message_end' && raw.message) { - // Extract tool calls from the completed assistant message content. - // Note: usage is NOT emitted here because `turn_end` already - // emits it. Pi fires both `message_end` and `turn_end` per turn, - // both carrying usage; emitting from both would double-count. - const msg = raw.message; - - // Extract tool calls from the completed assistant message content. - if (Array.isArray(msg.content)) { - for (const block of msg.content) { - if (block.type === 'toolCall') { - send('agent', { - type: 'tool_use', - id: block.id, - name: block.name, - input: block.arguments ?? null, - }); - } - } - } - return; - } - - if (raw.type === 'tool_execution_start') { - send('agent', { - type: 'status', - label: 'tool', - toolName: raw.toolName ?? null, - }); - return; - } - - if (raw.type === 'tool_execution_end') { - const content = raw.result?.content; - const text = - Array.isArray(content) - ? content - .map((c) => (c?.type === 'text' ? c.text : JSON.stringify(c))) - .join('\n') - : typeof content === 'string' - ? content - : ''; - send('agent', { - type: 'tool_result', - toolUseId: raw.toolCallId ?? null, - content: text, - isError: raw.isError === true, - }); - return; - } - - // Compaction, retry, queue_update — informational only. Forward as - // status so the UI stays aware, but don't break the main text flow. - if (raw.type === 'compaction_start') { - send('agent', { type: 'status', label: 'compacting' }); - return; - } - if (raw.type === 'auto_retry_start') { - send('agent', { type: 'status', label: 'retrying' }); - return; } }); diff --git a/apps/daemon/tests/pi-rpc.test.ts b/apps/daemon/tests/pi-rpc.test.ts index 65ab23b63..aab7dfde6 100644 --- a/apps/daemon/tests/pi-rpc.test.ts +++ b/apps/daemon/tests/pi-rpc.test.ts @@ -1,7 +1,7 @@ // @ts-nocheck import { test } from 'vitest'; import assert from 'node:assert/strict'; -import { parsePiModels } from '../src/pi-rpc.js'; +import { parsePiModels, mapPiRpcEvent } from '../src/pi-rpc.js'; // ─── parsePiModels ───────────────────────────────────────────────────────── @@ -99,10 +99,10 @@ test('parsePiModels skips duplicate default id', () => { assert.equal(result[1].id, 'default/some-model'); }); -// ─── RPC event translation (attachPiRpcSession) ──────────────────────────── +// ─── RPC event translation (mapPiRpcEvent) ──────────────────────────────── // -// We test the event translation by simulating pi's RPC stdout lines -// through the same parser pipeline that attachPiRpcSession uses. +// We test the pure event mapper directly — no child process, no stdin. +// This catches regressions like tool event ordering bugs. import { createJsonLineStream } from '../src/acp.js'; @@ -111,72 +111,14 @@ function simulateRpcSession(rpcLines, options = {}) { const send = (_channel, payload) => { events.push(payload); }; + const ctx = { runStartedAt: Date.now(), sentFirstToken: { value: false } }; + const parser = createJsonLineStream((raw) => { - // Inline the core event translation from attachPiRpcSession - // to test the mapping logic in isolation (no child process needed). + // Skip non-agent events that mapPiRpcEvent doesn't handle. + if (raw.type === 'extension_ui_request') return; + if (raw.type === 'response') return; - if (raw.type === 'extension_ui_request') return; // skip - if (raw.type === 'response') return; // skip - - if (raw.type === 'agent_start') { - send('agent', { type: 'status', label: 'working' }); - return; - } - - if (raw.type === 'turn_start') { - send('agent', { type: 'status', label: 'thinking' }); - return; - } - - if (raw.type === 'message_update' && raw.assistantMessageEvent) { - const ev = raw.assistantMessageEvent; - if (ev.type === 'text_delta' && typeof ev.delta === 'string') { - send('agent', { type: 'text_delta', delta: ev.delta }); - } else if (ev.type === 'thinking_delta' && typeof ev.delta === 'string') { - send('agent', { type: 'thinking_delta', delta: ev.delta }); - } else if (ev.type === 'thinking_start') { - send('agent', { type: 'thinking_start' }); - } else if (ev.type === 'thinking_end') { - send('agent', { type: 'thinking_end' }); - } - return; - } - - if (raw.type === 'turn_end' && raw.message?.usage) { - const u = raw.message.usage; - const usage = {}; - if (typeof u.input === 'number') usage.input_tokens = u.input; - if (typeof u.output === 'number') usage.output_tokens = u.output; - if (typeof u.cacheRead === 'number') usage.cached_read_tokens = u.cacheRead; - if (typeof u.cacheWrite === 'number') usage.cached_write_tokens = u.cacheWrite; - if (Object.keys(usage).length > 0) { - send('agent', { type: 'usage', usage, durationMs: 100 }); - } - return; - } - - if (raw.type === 'tool_execution_start') { - send('agent', { type: 'status', label: 'tool', toolName: raw.toolName ?? null }); - return; - } - - if (raw.type === 'tool_execution_end') { - const content = raw.result?.content; - const text = Array.isArray(content) - ? content.map((c) => (c?.type === 'text' ? c.text : JSON.stringify(c))).join('\n') - : typeof content === 'string' ? content : ''; - send('agent', { type: 'tool_result', toolUseId: raw.toolCallId ?? null, content: text, isError: raw.isError === true }); - return; - } - - if (raw.type === 'compaction_start') { - send('agent', { type: 'status', label: 'compacting' }); - return; - } - if (raw.type === 'auto_retry_start') { - send('agent', { type: 'status', label: 'retrying' }); - return; - } + mapPiRpcEvent(raw, send, ctx); }); const input = rpcLines.map((l) => JSON.stringify(l)).join('\n') + '\n'; @@ -202,6 +144,7 @@ test('pi RPC: text streaming from message_update events', () => { assert.deepEqual(events, [ { type: 'status', label: 'working' }, { type: 'status', label: 'thinking' }, + { type: 'status', label: 'streaming', ttftMs: events[2].ttftMs }, { type: 'text_delta', delta: 'Hello ' }, { type: 'text_delta', delta: 'world' }, ]); @@ -254,6 +197,7 @@ test('pi RPC: usage extracted from turn_end', () => { output_tokens: 50, cached_read_tokens: 20, cached_write_tokens: 5, + total_tokens: 175, }); }); @@ -270,7 +214,7 @@ test('pi RPC: tool execution events mapped correctly', () => { ]); assert.deepEqual(events, [ - { type: 'status', label: 'tool', toolName: 'read' }, + { type: 'tool_use', id: 'tc-1', name: 'read', input: { path: 'foo.txt' } }, { type: 'tool_result', toolUseId: 'tc-1', content: 'file contents here', isError: false }, ]); }); @@ -362,8 +306,9 @@ test('pi RPC: full multi-turn session with tools and usage', () => { }, ]); - // 2 turns × (status + text/tool/usage) + // 2 turns with text, tool_use/tool_result, and usage assert.ok(events.some((e) => e.type === 'text_delta' && e.delta === 'Let me check.')); + assert.ok(events.some((e) => e.type === 'tool_use' && e.id === 'tc-1' && e.name === 'bash')); assert.ok(events.some((e) => e.type === 'tool_result' && e.toolUseId === 'tc-1')); assert.ok(events.some((e) => e.type === 'text_delta' && e.delta === 'Done!')); // Usage from both turns @@ -373,6 +318,23 @@ test('pi RPC: full multi-turn session with tools and usage', () => { assert.equal(usageEvents[1].usage.cached_read_tokens, 100); }); +test('pi RPC: tool_use arrives before tool_result in event order', () => { + // Regression: tool_use must be emitted from tool_execution_start, + // not message_end, so the UI can pair it with the later tool_result. + const events = simulateRpcSession([ + { type: 'agent_start' }, + { type: 'turn_start' }, + { type: 'tool_execution_start', toolCallId: 'tc-1', toolName: 'read', args: { path: 'a.txt' } }, + { type: 'tool_execution_end', toolCallId: 'tc-1', toolName: 'read', result: { content: [{ type: 'text', text: 'ok' }] }, isError: false }, + ]); + + const toolUseIdx = events.findIndex((e) => e.type === 'tool_use'); + const toolResultIdx = events.findIndex((e) => e.type === 'tool_result'); + assert.ok(toolUseIdx !== -1, 'tool_use event should exist'); + assert.ok(toolResultIdx !== -1, 'tool_result event should exist'); + assert.ok(toolUseIdx < toolResultIdx, 'tool_use must arrive before tool_result'); +}); + // ─── sendCommand format ───────────────────────────────────────────────────── test('pi RPC: sendCommand writes well-formed pi command JSON', async () => { diff --git a/e2e/tests/structured-streams.test.ts b/e2e/tests/structured-streams.test.ts index b10132ca1..9dfca7cef 100644 --- a/e2e/tests/structured-streams.test.ts +++ b/e2e/tests/structured-streams.test.ts @@ -1,7 +1,7 @@ import { describe, expect, it } from 'vitest'; import { createClaudeStreamHandler } from '../../apps/daemon/src/claude-stream.js'; import { createCopilotStreamHandler } from '../../apps/daemon/src/copilot-stream.js'; -import { createJsonLineStream } from '../../apps/daemon/src/acp.js'; +import { mapPiRpcEvent } from '../../apps/daemon/src/pi-rpc.js'; describe('structured agent stream fixtures', () => { it('emits TodoWrite tool_use from Claude Code stream JSON', () => { @@ -37,49 +37,19 @@ describe('structured agent stream fixtures', () => { it('emits TodoWrite tool_use from Pi RPC tool_execution events', () => { const events: unknown[] = []; - // Simulate pi's RPC stdout through the same JSONL parser that - // attachPiRpcSession uses, then apply the event mapping. - const parser = createJsonLineStream((raw: any) => { - if (raw.type === 'tool_execution_start') { - events.push({ - type: 'tool_use', - id: raw.toolCallId, - name: raw.toolName, - input: raw.args ?? null, - }); - } - if (raw.type === 'tool_execution_end') { - const content = raw.result?.content; - const text = Array.isArray(content) - ? content.map((c: any) => (c?.type === 'text' ? c.text : JSON.stringify(c))).join('\n') - : ''; - events.push({ - type: 'tool_result', - toolUseId: raw.toolCallId, - content: text, - isError: raw.isError === true, - }); - } - }); + const send = (_channel: string, payload: unknown) => { events.push(payload); }; + const ctx = { runStartedAt: Date.now(), sentFirstToken: { value: false } }; - parser.feed( - JSON.stringify({ - type: 'tool_execution_start', - toolCallId: 'pi-call-1', - toolName: 'TodoWrite', - args: { todos: [{ content: 'Run QA', status: 'pending' }] }, - }) + '\n', + mapPiRpcEvent( + { type: 'tool_execution_start', toolCallId: 'pi-call-1', toolName: 'TodoWrite', args: { todos: [{ content: 'Run QA', status: 'pending' }] } }, + send, + ctx, ); - parser.feed( - JSON.stringify({ - type: 'tool_execution_end', - toolCallId: 'pi-call-1', - toolName: 'TodoWrite', - result: { content: [{ type: 'text', text: 'written' }] }, - isError: false, - }) + '\n', + mapPiRpcEvent( + { type: 'tool_execution_end', toolCallId: 'pi-call-1', toolName: 'TodoWrite', result: { content: [{ type: 'text', text: 'written' }] }, isError: false }, + send, + ctx, ); - parser.flush(); expect(events).toContainEqual({ type: 'tool_use',