fix(daemon): emit tool_use from tool_execution_start in pi-rpc (#186)

* fix(daemon): emit tool_use from tool_execution_start in pi-rpc

The pi-rpc adapter emitted tool_use from message_end, which fires
before tool execution starts. The web UI pairs tool results to prior
tool_use events, so receiving tool_result without a preceding tool_use
broke tool card rendering and file auto-open behavior.

Move tool_use emission to tool_execution_start, matching the pattern
in copilot-stream.ts. Remove redundant tool call extraction from
message_end (tool_use is now emitted at execution time, usage is
already emitted from turn_end).

Extract mapPiRpcEvent as a pure exported function so tests exercise
the real event mapping logic instead of an inlined copy that can
diverge from production.

Ref: mrcfps review comments on PR #117

* docs(daemon): clarify mapPiRpcEvent mutability contract

The function mutates ctx.sentFirstToken to track streaming state.
Calling it "pure" is misleading; revised the doc comment to say
no I/O or child process interaction instead.

* fix(pi-rpc): remove redundant status(tool) emission from tool_execution_start

Now that tool_use fires inline from tool_execution_start, the
accompanying status(tool) event is redundant: tool_use already
carries the tool name, and the UI renders running state from the
tool card. The extra status pill breaks consecutive tool_use
grouping in AssistantMessage.buildBlocks. Aligns with
copilot-stream, which emits only tool_use from
tool.execution_start with no status event.
This commit is contained in:
Tom 2026-05-02 15:06:37 +07:00 committed by GitHub
parent d58b07fdad
commit 5a0f954297
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 180 additions and 244 deletions

View file

@ -63,6 +63,139 @@ function replyExtensionUi(writable, raw) {
);
}
/**
* Map a single pi RPC event to zero or more daemon UI events.
*
* No I/O or child process interaction; mutates `ctx.sentFirstToken`
* to track streaming state.
* `send` callback and `ctx` are provided by the caller.
*
* @param {object} raw - parsed JSON from pi's stdout
* @param {function} send - (channel, payload) emitter
* @param {object} ctx - session context
* @param {number} ctx.runStartedAt - Date.now() at session start
* @param {{ value: boolean }} ctx.sentFirstToken - mutable flag
* @returns {string|null} 'agent_end' if the agent is done, null otherwise
*/
export function mapPiRpcEvent(raw, send, ctx) {
if (raw.type === 'agent_start') {
send('agent', { type: 'status', label: 'working' });
return null;
}
if (raw.type === 'agent_end') {
return 'agent_end';
}
if (raw.type === 'turn_start') {
send('agent', { type: 'status', label: 'thinking' });
return null;
}
if (raw.type === 'turn_end') {
if (raw.message?.usage) {
const u = raw.message.usage;
const usage = {};
if (typeof u.input === 'number') usage.input_tokens = u.input;
if (typeof u.output === 'number') usage.output_tokens = u.output;
if (typeof u.cacheRead === 'number') usage.cached_read_tokens = u.cacheRead;
if (typeof u.cacheWrite === 'number') usage.cached_write_tokens = u.cacheWrite;
if (typeof u.totalTokens === 'number') usage.total_tokens = u.totalTokens;
if (Object.keys(usage).length > 0) {
const cost = u.cost;
send('agent', {
type: 'usage',
usage,
costUsd: cost?.total ?? cost?.totalCost ?? null,
durationMs: Date.now() - ctx.runStartedAt,
});
}
}
return null;
}
if (raw.type === 'message_update' && raw.assistantMessageEvent) {
const ev = raw.assistantMessageEvent;
if (ev.type === 'text_delta' && typeof ev.delta === 'string') {
if (!ctx.sentFirstToken.value) {
ctx.sentFirstToken.value = true;
send('agent', {
type: 'status',
label: 'streaming',
ttftMs: Date.now() - ctx.runStartedAt,
});
}
send('agent', { type: 'text_delta', delta: ev.delta });
return null;
}
if (ev.type === 'thinking_delta' && typeof ev.delta === 'string') {
send('agent', { type: 'thinking_delta', delta: ev.delta });
return null;
}
if (ev.type === 'thinking_start') {
send('agent', { type: 'thinking_start' });
return null;
}
if (ev.type === 'thinking_end') {
send('agent', { type: 'thinking_end' });
return null;
}
return null;
}
if (raw.type === 'message_end') {
// message_end carries usage (already emitted from turn_end) and
// tool call blocks (already emitted from tool_execution_start).
// Nothing to extract here.
return null;
}
if (raw.type === 'tool_execution_start') {
send('agent', {
type: 'tool_use',
id: raw.toolCallId ?? null,
name: raw.toolName ?? null,
input: raw.args ?? null,
});
return null;
}
if (raw.type === 'tool_execution_end') {
const content = raw.result?.content;
const text =
Array.isArray(content)
? content
.map((c) => (c?.type === 'text' ? c.text : JSON.stringify(c)))
.join('\n')
: typeof content === 'string'
? content
: '';
send('agent', {
type: 'tool_result',
toolUseId: raw.toolCallId ?? null,
content: text,
isError: raw.isError === true,
});
return null;
}
if (raw.type === 'compaction_start') {
send('agent', { type: 'status', label: 'compacting' });
return null;
}
if (raw.type === 'auto_retry_start') {
send('agent', { type: 'status', label: 'retrying' });
return null;
}
return null;
}
/**
* Attach a pi RPC session to a spawned child process.
*
@ -78,7 +211,7 @@ export function attachPiRpcSession({ child, prompt, cwd, model, send }) {
const runStartedAt = Date.now();
let finished = false;
let fatal = false;
let sentFirstToken = false;
const sentFirstToken = { value: false };
let nextRpcId = 1;
@ -125,14 +258,10 @@ export function attachPiRpcSession({ child, prompt, cwd, model, send }) {
return;
}
// ---- Agent events ----
// Agent events: delegate to the pure mapper.
const result = mapPiRpcEvent(raw, send, { runStartedAt, sentFirstToken });
if (raw.type === 'agent_start') {
send('agent', { type: 'status', label: 'working' });
return;
}
if (raw.type === 'agent_end') {
if (result === 'agent_end') {
finished = true;
// pi's RPC process stays alive after agent_end (designed for
// multi-prompt sessions). The daemon's /api/chat is single-shot,
@ -147,131 +276,6 @@ export function attachPiRpcSession({ child, prompt, cwd, model, send }) {
setTimeout(() => {
if (!child.killed) child.kill('SIGTERM');
}, shutdownMs);
return;
}
if (raw.type === 'turn_start') {
send('agent', { type: 'status', label: 'thinking' });
return;
}
if (raw.type === 'turn_end') {
// Extract usage from the completed message if present.
if (raw.message?.usage) {
const u = raw.message.usage;
const usage = {};
if (typeof u.input === 'number') usage.input_tokens = u.input;
if (typeof u.output === 'number') usage.output_tokens = u.output;
if (typeof u.cacheRead === 'number') usage.cached_read_tokens = u.cacheRead;
if (typeof u.cacheWrite === 'number') usage.cached_write_tokens = u.cacheWrite;
if (typeof u.totalTokens === 'number') usage.total_tokens = u.totalTokens;
if (Object.keys(usage).length > 0) {
const cost = u.cost;
send('agent', {
type: 'usage',
usage,
costUsd: cost?.total ?? cost?.totalCost ?? null,
durationMs: Date.now() - runStartedAt,
});
}
}
return;
}
if (raw.type === 'message_update' && raw.assistantMessageEvent) {
const ev = raw.assistantMessageEvent;
if (ev.type === 'text_delta' && typeof ev.delta === 'string') {
if (!sentFirstToken) {
sentFirstToken = true;
send('agent', {
type: 'status',
label: 'streaming',
ttftMs: Date.now() - runStartedAt,
});
}
send('agent', { type: 'text_delta', delta: ev.delta });
return;
}
if (ev.type === 'thinking_delta' && typeof ev.delta === 'string') {
send('agent', { type: 'thinking_delta', delta: ev.delta });
return;
}
if (ev.type === 'thinking_start') {
send('agent', { type: 'thinking_start' });
return;
}
if (ev.type === 'thinking_end') {
send('agent', { type: 'thinking_end' });
return;
}
return;
}
if (raw.type === 'message_end' && raw.message) {
// Extract tool calls from the completed assistant message content.
// Note: usage is NOT emitted here because `turn_end` already
// emits it. Pi fires both `message_end` and `turn_end` per turn,
// both carrying usage; emitting from both would double-count.
const msg = raw.message;
// Extract tool calls from the completed assistant message content.
if (Array.isArray(msg.content)) {
for (const block of msg.content) {
if (block.type === 'toolCall') {
send('agent', {
type: 'tool_use',
id: block.id,
name: block.name,
input: block.arguments ?? null,
});
}
}
}
return;
}
if (raw.type === 'tool_execution_start') {
send('agent', {
type: 'status',
label: 'tool',
toolName: raw.toolName ?? null,
});
return;
}
if (raw.type === 'tool_execution_end') {
const content = raw.result?.content;
const text =
Array.isArray(content)
? content
.map((c) => (c?.type === 'text' ? c.text : JSON.stringify(c)))
.join('\n')
: typeof content === 'string'
? content
: '';
send('agent', {
type: 'tool_result',
toolUseId: raw.toolCallId ?? null,
content: text,
isError: raw.isError === true,
});
return;
}
// Compaction, retry, queue_update — informational only. Forward as
// status so the UI stays aware, but don't break the main text flow.
if (raw.type === 'compaction_start') {
send('agent', { type: 'status', label: 'compacting' });
return;
}
if (raw.type === 'auto_retry_start') {
send('agent', { type: 'status', label: 'retrying' });
return;
}
});

View file

@ -1,7 +1,7 @@
// @ts-nocheck
import { test } from 'vitest';
import assert from 'node:assert/strict';
import { parsePiModels } from '../src/pi-rpc.js';
import { parsePiModels, mapPiRpcEvent } from '../src/pi-rpc.js';
// ─── parsePiModels ─────────────────────────────────────────────────────────
@ -99,10 +99,10 @@ test('parsePiModels skips duplicate default id', () => {
assert.equal(result[1].id, 'default/some-model');
});
// ─── RPC event translation (attachPiRpcSession) ────────────────────────────
// ─── RPC event translation (mapPiRpcEvent) ────────────────────────────────
//
// We test the event translation by simulating pi's RPC stdout lines
// through the same parser pipeline that attachPiRpcSession uses.
// We test the pure event mapper directly — no child process, no stdin.
// This catches regressions like tool event ordering bugs.
import { createJsonLineStream } from '../src/acp.js';
@ -111,72 +111,14 @@ function simulateRpcSession(rpcLines, options = {}) {
const send = (_channel, payload) => {
events.push(payload);
};
const ctx = { runStartedAt: Date.now(), sentFirstToken: { value: false } };
const parser = createJsonLineStream((raw) => {
// Inline the core event translation from attachPiRpcSession
// to test the mapping logic in isolation (no child process needed).
// Skip non-agent events that mapPiRpcEvent doesn't handle.
if (raw.type === 'extension_ui_request') return;
if (raw.type === 'response') return;
if (raw.type === 'extension_ui_request') return; // skip
if (raw.type === 'response') return; // skip
if (raw.type === 'agent_start') {
send('agent', { type: 'status', label: 'working' });
return;
}
if (raw.type === 'turn_start') {
send('agent', { type: 'status', label: 'thinking' });
return;
}
if (raw.type === 'message_update' && raw.assistantMessageEvent) {
const ev = raw.assistantMessageEvent;
if (ev.type === 'text_delta' && typeof ev.delta === 'string') {
send('agent', { type: 'text_delta', delta: ev.delta });
} else if (ev.type === 'thinking_delta' && typeof ev.delta === 'string') {
send('agent', { type: 'thinking_delta', delta: ev.delta });
} else if (ev.type === 'thinking_start') {
send('agent', { type: 'thinking_start' });
} else if (ev.type === 'thinking_end') {
send('agent', { type: 'thinking_end' });
}
return;
}
if (raw.type === 'turn_end' && raw.message?.usage) {
const u = raw.message.usage;
const usage = {};
if (typeof u.input === 'number') usage.input_tokens = u.input;
if (typeof u.output === 'number') usage.output_tokens = u.output;
if (typeof u.cacheRead === 'number') usage.cached_read_tokens = u.cacheRead;
if (typeof u.cacheWrite === 'number') usage.cached_write_tokens = u.cacheWrite;
if (Object.keys(usage).length > 0) {
send('agent', { type: 'usage', usage, durationMs: 100 });
}
return;
}
if (raw.type === 'tool_execution_start') {
send('agent', { type: 'status', label: 'tool', toolName: raw.toolName ?? null });
return;
}
if (raw.type === 'tool_execution_end') {
const content = raw.result?.content;
const text = Array.isArray(content)
? content.map((c) => (c?.type === 'text' ? c.text : JSON.stringify(c))).join('\n')
: typeof content === 'string' ? content : '';
send('agent', { type: 'tool_result', toolUseId: raw.toolCallId ?? null, content: text, isError: raw.isError === true });
return;
}
if (raw.type === 'compaction_start') {
send('agent', { type: 'status', label: 'compacting' });
return;
}
if (raw.type === 'auto_retry_start') {
send('agent', { type: 'status', label: 'retrying' });
return;
}
mapPiRpcEvent(raw, send, ctx);
});
const input = rpcLines.map((l) => JSON.stringify(l)).join('\n') + '\n';
@ -202,6 +144,7 @@ test('pi RPC: text streaming from message_update events', () => {
assert.deepEqual(events, [
{ type: 'status', label: 'working' },
{ type: 'status', label: 'thinking' },
{ type: 'status', label: 'streaming', ttftMs: events[2].ttftMs },
{ type: 'text_delta', delta: 'Hello ' },
{ type: 'text_delta', delta: 'world' },
]);
@ -254,6 +197,7 @@ test('pi RPC: usage extracted from turn_end', () => {
output_tokens: 50,
cached_read_tokens: 20,
cached_write_tokens: 5,
total_tokens: 175,
});
});
@ -270,7 +214,7 @@ test('pi RPC: tool execution events mapped correctly', () => {
]);
assert.deepEqual(events, [
{ type: 'status', label: 'tool', toolName: 'read' },
{ type: 'tool_use', id: 'tc-1', name: 'read', input: { path: 'foo.txt' } },
{ type: 'tool_result', toolUseId: 'tc-1', content: 'file contents here', isError: false },
]);
});
@ -362,8 +306,9 @@ test('pi RPC: full multi-turn session with tools and usage', () => {
},
]);
// 2 turns × (status + text/tool/usage)
// 2 turns with text, tool_use/tool_result, and usage
assert.ok(events.some((e) => e.type === 'text_delta' && e.delta === 'Let me check.'));
assert.ok(events.some((e) => e.type === 'tool_use' && e.id === 'tc-1' && e.name === 'bash'));
assert.ok(events.some((e) => e.type === 'tool_result' && e.toolUseId === 'tc-1'));
assert.ok(events.some((e) => e.type === 'text_delta' && e.delta === 'Done!'));
// Usage from both turns
@ -373,6 +318,23 @@ test('pi RPC: full multi-turn session with tools and usage', () => {
assert.equal(usageEvents[1].usage.cached_read_tokens, 100);
});
test('pi RPC: tool_use arrives before tool_result in event order', () => {
// Regression: tool_use must be emitted from tool_execution_start,
// not message_end, so the UI can pair it with the later tool_result.
const events = simulateRpcSession([
{ type: 'agent_start' },
{ type: 'turn_start' },
{ type: 'tool_execution_start', toolCallId: 'tc-1', toolName: 'read', args: { path: 'a.txt' } },
{ type: 'tool_execution_end', toolCallId: 'tc-1', toolName: 'read', result: { content: [{ type: 'text', text: 'ok' }] }, isError: false },
]);
const toolUseIdx = events.findIndex((e) => e.type === 'tool_use');
const toolResultIdx = events.findIndex((e) => e.type === 'tool_result');
assert.ok(toolUseIdx !== -1, 'tool_use event should exist');
assert.ok(toolResultIdx !== -1, 'tool_result event should exist');
assert.ok(toolUseIdx < toolResultIdx, 'tool_use must arrive before tool_result');
});
// ─── sendCommand format ─────────────────────────────────────────────────────
test('pi RPC: sendCommand writes well-formed pi command JSON', async () => {

View file

@ -1,7 +1,7 @@
import { describe, expect, it } from 'vitest';
import { createClaudeStreamHandler } from '../../apps/daemon/src/claude-stream.js';
import { createCopilotStreamHandler } from '../../apps/daemon/src/copilot-stream.js';
import { createJsonLineStream } from '../../apps/daemon/src/acp.js';
import { mapPiRpcEvent } from '../../apps/daemon/src/pi-rpc.js';
describe('structured agent stream fixtures', () => {
it('emits TodoWrite tool_use from Claude Code stream JSON', () => {
@ -37,49 +37,19 @@ describe('structured agent stream fixtures', () => {
it('emits TodoWrite tool_use from Pi RPC tool_execution events', () => {
const events: unknown[] = [];
// Simulate pi's RPC stdout through the same JSONL parser that
// attachPiRpcSession uses, then apply the event mapping.
const parser = createJsonLineStream((raw: any) => {
if (raw.type === 'tool_execution_start') {
events.push({
type: 'tool_use',
id: raw.toolCallId,
name: raw.toolName,
input: raw.args ?? null,
});
}
if (raw.type === 'tool_execution_end') {
const content = raw.result?.content;
const text = Array.isArray(content)
? content.map((c: any) => (c?.type === 'text' ? c.text : JSON.stringify(c))).join('\n')
: '';
events.push({
type: 'tool_result',
toolUseId: raw.toolCallId,
content: text,
isError: raw.isError === true,
});
}
});
const send = (_channel: string, payload: unknown) => { events.push(payload); };
const ctx = { runStartedAt: Date.now(), sentFirstToken: { value: false } };
parser.feed(
JSON.stringify({
type: 'tool_execution_start',
toolCallId: 'pi-call-1',
toolName: 'TodoWrite',
args: { todos: [{ content: 'Run QA', status: 'pending' }] },
}) + '\n',
mapPiRpcEvent(
{ type: 'tool_execution_start', toolCallId: 'pi-call-1', toolName: 'TodoWrite', args: { todos: [{ content: 'Run QA', status: 'pending' }] } },
send,
ctx,
);
parser.feed(
JSON.stringify({
type: 'tool_execution_end',
toolCallId: 'pi-call-1',
toolName: 'TodoWrite',
result: { content: [{ type: 'text', text: 'written' }] },
isError: false,
}) + '\n',
mapPiRpcEvent(
{ type: 'tool_execution_end', toolCallId: 'pi-call-1', toolName: 'TodoWrite', result: { content: [{ type: 'text', text: 'written' }] }, isError: false },
send,
ctx,
);
parser.flush();
expect(events).toContainEqual({
type: 'tool_use',