feat(ai): add op-acp crate — Agent Client Protocol client

Port of the TS packages/pen-acp: ndJSON transport, a hand-rolled
JSON-RPC engine (request-id correlation, session/update notification
routing, session/request_permission auto-approval, and a pending-request
drain on EOF so a dead agent fails fast instead of timing out), and an
AcpConnection driving initialize / session/new / session/prompt over
local stdio or a remote WebSocket. The event adapter maps ACP session
updates onto the chat panel's ChatDelta vocabulary.
This commit is contained in:
Kayshen-X 2026-05-17 22:44:10 +08:00
parent 55c05d85d2
commit 177f4ab6d4
8 changed files with 1392 additions and 0 deletions

35
crates/op-acp/Cargo.toml Normal file
View file

@ -0,0 +1,35 @@
[package]
name = "op-acp"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
description = "OpenPencil ACP client — Agent Client Protocol (ndJSON JSON-RPC) transport, ported from the TS pen-acp package"
[features]
default = ["remote"]
# Remote ACP agents over a WebSocket endpoint. Local (stdio) agents
# work without this feature; `remote` adds the `tokio-tungstenite`
# dependency for the WebSocket transport.
remote = ["dep:tokio-tungstenite", "dep:futures-util"]
[dependencies]
# Desktop-only crate: spawns child processes + drives async IO, so it
# pulls tokio. Never depended on by a wasm crate.
tokio = { version = "1", features = [
"process",
"io-util",
"rt",
"rt-multi-thread",
"macros",
"sync",
"time",
] }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
# Chat-delta vocabulary the ACP event adapter maps onto.
op-ai = { path = "../op-ai" }
tokio-tungstenite = { version = "0.24", optional = true }
futures-util = { version = "0.3", optional = true }

381
crates/op-acp/src/client.rs Normal file
View file

@ -0,0 +1,381 @@
//! ACP connection — connect to a local (stdio) or remote (WebSocket)
//! agent and drive the initialize / session / prompt handshake.
//! Port of `pen-acp/src/client.ts`.
use std::process::Stdio;
use std::time::Duration;
use serde_json::Value;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use crate::jsonrpc::{dispatch_inbound, JsonRpcEngine};
use crate::protocol::{
InitializeResult, NewSessionResult, SessionNotification, METHOD_INITIALIZE,
METHOD_SESSION_NEW, METHOD_SESSION_PROMPT, PROTOCOL_VERSION,
};
use crate::transport::{read_frame, write_frame};
use crate::types::{AcpAgentConfig, AcpAgentInfo, AcpError, ConnectionType};
/// Per-request timeout for the handshake calls.
const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(30);
/// A prompt turn can run a long while — generous ceiling.
const PROMPT_TIMEOUT: Duration = Duration::from_secs(600);
/// A live ACP connection to one agent.
pub struct AcpConnection {
engine: JsonRpcEngine,
notifications: Option<mpsc::UnboundedReceiver<SessionNotification>>,
child: Option<Child>,
tasks: Vec<JoinHandle<()>>,
agent_info: AcpAgentInfo,
}
impl AcpConnection {
/// Build a connection over an arbitrary async byte stream pair
/// (stdio of a child, a test duplex, …). Spawns the reader +
/// writer tasks; does NOT run the `initialize` handshake.
pub fn new<R, W>(read: R, write: W, child: Option<Child>) -> AcpConnection
where
R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
{
let (out_tx, mut out_rx) = mpsc::unbounded_channel::<Value>();
let (notif_tx, notif_rx) = mpsc::unbounded_channel::<SessionNotification>();
let engine = JsonRpcEngine::new(out_tx);
let pending = engine.pending();
let reply_tx = engine.out_tx();
// Writer task — drain outbound frames onto the byte stream.
let mut write = write;
let writer = tokio::spawn(async move {
while let Some(frame) = out_rx.recv().await {
if write_frame(&mut write, &frame).await.is_err() {
break;
}
}
});
// Reader task — classify + dispatch every inbound frame.
let reader = tokio::spawn(async move {
let mut buf = BufReader::new(read);
loop {
match read_frame(&mut buf).await {
Ok(Some(value)) => {
dispatch_inbound(value, &pending, &notif_tx, &reply_tx)
}
// EOF or transport failure — stop reading.
Ok(None) | Err(_) => break,
}
}
// Connection closed: fail every in-flight request now so
// callers get `Closed` immediately instead of stalling
// until the request timeout.
let waiters: Vec<_> = pending.lock().unwrap().drain().collect();
for (_, waiter) in waiters {
let _ = waiter.send(Err(AcpError::Closed));
}
});
AcpConnection {
engine,
notifications: Some(notif_rx),
child,
tasks: vec![writer, reader],
agent_info: AcpAgentInfo::default(),
}
}
/// Run the `initialize` handshake, recording the agent's identity.
/// `fallback_name` is used when the agent reports none.
pub async fn initialize(&mut self, fallback_name: &str) -> Result<(), AcpError> {
let params = serde_json::json!({
"protocolVersion": PROTOCOL_VERSION,
"clientCapabilities": {},
"clientInfo": { "name": "openpencil", "version": env!("CARGO_PKG_VERSION") }
});
let result = self
.engine
.call(METHOD_INITIALIZE, params, HANDSHAKE_TIMEOUT)
.await?;
let parsed: InitializeResult =
serde_json::from_value(result).map_err(|e| AcpError::Protocol(e.to_string()))?;
let info = parsed.agent_info.unwrap_or_default();
self.agent_info = AcpAgentInfo {
name: info.name.unwrap_or_else(|| fallback_name.to_string()),
title: info.title,
version: info.version,
};
Ok(())
}
/// Open a new session, returning its id.
pub async fn new_session(&self) -> Result<String, AcpError> {
let cwd = std::env::current_dir()
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|_| ".".to_string());
let params = serde_json::json!({ "cwd": cwd, "mcpServers": [] });
let result = self
.engine
.call(METHOD_SESSION_NEW, params, HANDSHAKE_TIMEOUT)
.await?;
let parsed: NewSessionResult =
serde_json::from_value(result).map_err(|e| AcpError::Protocol(e.to_string()))?;
Ok(parsed.session_id)
}
/// Drive one prompt turn. Resolves when the agent finishes the
/// turn; streamed output arrives on the notification channel.
pub async fn prompt(&self, session_id: &str, text: &str) -> Result<(), AcpError> {
let params = serde_json::json!({
"sessionId": session_id,
"prompt": [ { "type": "text", "text": text } ]
});
self.engine
.call(METHOD_SESSION_PROMPT, params, PROMPT_TIMEOUT)
.await?;
Ok(())
}
/// Take the `session/update` notification receiver — callable
/// once; subsequent calls return `None`.
pub fn take_notifications(&mut self) -> Option<mpsc::UnboundedReceiver<SessionNotification>> {
self.notifications.take()
}
/// The agent's identity from the `initialize` handshake.
pub fn agent_info(&self) -> &AcpAgentInfo {
&self.agent_info
}
/// Kill the local process (if any) and stop the IO tasks.
pub fn disconnect(&mut self) {
if let Some(mut child) = self.child.take() {
let _ = child.start_kill();
}
for task in self.tasks.drain(..) {
task.abort();
}
}
}
impl Drop for AcpConnection {
fn drop(&mut self) {
self.disconnect();
}
}
/// Connect to the agent described by `config`, running the
/// `initialize` handshake. Routes to the local or remote transport.
pub async fn connect_acp_agent(config: &AcpAgentConfig) -> Result<AcpConnection, AcpError> {
match config.connection_type {
ConnectionType::Local => connect_local(config).await,
ConnectionType::Remote => connect_remote(config).await,
}
}
/// Spawn a local agent process and connect over its stdio.
async fn connect_local(config: &AcpAgentConfig) -> Result<AcpConnection, AcpError> {
let command = config
.command
.as_ref()
.ok_or_else(|| AcpError::Config("local ACP agent requires a command".into()))?;
let mut cmd = Command::new(command);
cmd.args(&config.args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
for (key, value) in &config.env {
cmd.env(key, value);
}
let mut child = cmd.spawn().map_err(|e| AcpError::Spawn(e.to_string()))?;
let stdin = child
.stdin
.take()
.ok_or_else(|| AcpError::Spawn("child has no stdin".into()))?;
let stdout = child
.stdout
.take()
.ok_or_else(|| AcpError::Spawn("child has no stdout".into()))?;
// Drain stderr so the child never blocks on a full pipe.
if let Some(stderr) = child.stderr.take() {
tokio::spawn(async move {
let mut lines = BufReader::new(stderr).lines();
while let Ok(Some(_)) = lines.next_line().await {}
});
}
let mut conn = AcpConnection::new(stdout, stdin, Some(child));
conn.initialize(&config.display_name).await?;
Ok(conn)
}
#[cfg(not(feature = "remote"))]
async fn connect_remote(_config: &AcpAgentConfig) -> Result<AcpConnection, AcpError> {
Err(AcpError::Config(
"remote ACP agents need the `remote` feature".into(),
))
}
/// Connect to a remote agent over a WebSocket endpoint.
#[cfg(feature = "remote")]
async fn connect_remote(config: &AcpAgentConfig) -> Result<AcpConnection, AcpError> {
use futures_util::{SinkExt, StreamExt};
use tokio_tungstenite::tungstenite::Message;
let url = config
.url
.as_ref()
.ok_or_else(|| AcpError::Config("remote ACP agent requires a url".into()))?;
let (ws, _resp) = tokio_tungstenite::connect_async(url)
.await
.map_err(|e| AcpError::Transport(e.to_string()))?;
let (mut sink, mut stream) = ws.split();
let (out_tx, mut out_rx) = mpsc::unbounded_channel::<Value>();
let (notif_tx, notif_rx) = mpsc::unbounded_channel::<SessionNotification>();
let engine = JsonRpcEngine::new(out_tx);
let pending = engine.pending();
let reply_tx = engine.out_tx();
// Writer task — each outbound frame is one WebSocket text message.
let writer = tokio::spawn(async move {
while let Some(frame) = out_rx.recv().await {
let Ok(text) = serde_json::to_string(&frame) else {
continue;
};
if sink.send(Message::Text(text)).await.is_err() {
break;
}
}
});
// Reader task — each text message is one inbound frame.
let reader = tokio::spawn(async move {
while let Some(msg) = stream.next().await {
let Ok(msg) = msg else { break };
let text = match msg {
Message::Text(t) => t,
Message::Binary(b) => String::from_utf8_lossy(&b).into_owned(),
Message::Close(_) => break,
_ => continue,
};
if let Ok(value) = serde_json::from_str::<Value>(text.trim()) {
dispatch_inbound(value, &pending, &notif_tx, &reply_tx);
}
}
// Socket closed: fail in-flight requests immediately.
let waiters: Vec<_> = pending.lock().unwrap().drain().collect();
for (_, waiter) in waiters {
let _ = waiter.send(Err(AcpError::Closed));
}
});
let mut conn = AcpConnection {
engine,
notifications: Some(notif_rx),
child: None,
tasks: vec![writer, reader],
agent_info: AcpAgentInfo::default(),
};
conn.initialize(&config.display_name).await?;
Ok(conn)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::transport::read_frame;
/// A mock ACP agent: answers `initialize` / `session/new`, then on
/// `session/prompt` streams one message chunk and returns.
async fn mock_agent(
read: impl AsyncRead + Unpin,
mut write: impl AsyncWrite + Unpin,
) {
let mut buf = BufReader::new(read);
while let Ok(Some(frame)) = read_frame(&mut buf).await {
let id = frame.get("id").cloned().unwrap_or(Value::Null);
let method = frame.get("method").and_then(|m| m.as_str()).unwrap_or("");
match method {
"initialize" => {
let resp = serde_json::json!({
"jsonrpc": "2.0", "id": id,
"result": { "protocolVersion": 1,
"agentInfo": { "name": "Mock Agent", "version": "9.9" } }
});
write_frame(&mut write, &resp).await.unwrap();
}
"session/new" => {
let resp = serde_json::json!({
"jsonrpc": "2.0", "id": id,
"result": { "sessionId": "sess-1" }
});
write_frame(&mut write, &resp).await.unwrap();
}
"session/prompt" => {
// Stream one chunk, then close the turn.
let note = serde_json::json!({
"jsonrpc": "2.0", "method": "session/update",
"params": { "sessionId": "sess-1",
"update": { "sessionUpdate": "agent_message_chunk",
"content": { "type": "text", "text": "hi there" } } }
});
write_frame(&mut write, &note).await.unwrap();
let resp = serde_json::json!({
"jsonrpc": "2.0", "id": id, "result": { "stopReason": "end_turn" }
});
write_frame(&mut write, &resp).await.unwrap();
}
_ => break,
}
}
}
#[tokio::test]
async fn handshake_and_prompt_against_a_mock_agent() {
// Two duplex pipes: client writes → agent reads, agent writes
// → client reads.
let (client_w, agent_r) = tokio::io::duplex(8192);
let (agent_w, client_r) = tokio::io::duplex(8192);
tokio::spawn(mock_agent(agent_r, agent_w));
let mut conn = AcpConnection::new(client_r, client_w, None);
let mut notes = conn.take_notifications().expect("notifications");
conn.initialize("fallback").await.expect("initialize");
assert_eq!(conn.agent_info().name, "Mock Agent");
assert_eq!(conn.agent_info().version.as_deref(), Some("9.9"));
let session = conn.new_session().await.expect("new_session");
assert_eq!(session, "sess-1");
conn.prompt(&session, "design a button").await.expect("prompt");
// The streamed chunk reached the notification channel.
let note = notes.recv().await.expect("a session/update");
assert_eq!(note.session_id.as_deref(), Some("sess-1"));
}
#[tokio::test]
async fn in_flight_call_fails_fast_when_agent_exits() {
// Agent end is dropped immediately — no response will come.
let (client_w, agent_r) = tokio::io::duplex(1024);
let (agent_w, client_r) = tokio::io::duplex(1024);
drop(agent_r);
drop(agent_w);
let mut conn = AcpConnection::new(client_r, client_w, None);
let started = std::time::Instant::now();
let err = conn.initialize("fallback").await.unwrap_err();
// The reader drains pending requests on EOF, so the call
// resolves to `Closed` at once rather than after the 30s
// handshake timeout.
assert!(matches!(err, AcpError::Closed), "expected Closed, got {err:?}");
assert!(
started.elapsed() < Duration::from_secs(5),
"must fail fast, not wait out the timeout"
);
}
}

View file

@ -0,0 +1,147 @@
//! ACP `session/update` → [`ChatDelta`] adapter — the Rust analogue
//! of `pen-acp/src/event-adapter.ts` (`acpUpdateToSSE`), retargeted
//! from SSE strings to the OP chat panel's delta vocabulary.
//!
//! The turn terminator is the `session/prompt` *result*, not a
//! notification — so this adapter never emits `ChatDelta::Done`; the
//! `AcpProvider` emits it once `prompt()` returns.
use op_ai::chat_provider::ChatDelta;
use serde_json::Value;
use crate::protocol::{ContentBlock, SessionNotification, SessionUpdate};
/// Pull an error message out of a failed tool call's `content` blocks
/// — ACP places the text under `content[].content.text`.
fn extract_tool_error(content: &Value) -> Option<String> {
let blocks = content.as_array()?;
let mut found: Option<String> = None;
for block in blocks {
if let Some(text) = block
.get("content")
.and_then(|c| c.get("text"))
.and_then(|t| t.as_str())
{
found = Some(text.to_string());
}
}
found
}
/// Map one ACP session-update notification to a [`ChatDelta`], or
/// `None` for updates the chat panel does not surface.
pub fn session_update_to_delta(note: &SessionNotification) -> Option<ChatDelta> {
match &note.update {
SessionUpdate::AgentMessageChunk {
content: ContentBlock::Text { text },
} => Some(ChatDelta::TextDelta(text.clone())),
SessionUpdate::AgentThoughtChunk {
content: ContentBlock::Text { text },
} => Some(ChatDelta::Thinking(text.clone())),
SessionUpdate::ToolCall {
tool_call_id,
title,
raw_input,
} => Some(ChatDelta::ToolUse {
name: title.clone().unwrap_or_else(|| tool_call_id.clone()),
args: raw_input.to_string(),
}),
SessionUpdate::ToolCallUpdate {
status,
content,
raw_output,
..
} => match status.as_deref() {
Some("failed") => {
let msg = extract_tool_error(content)
.unwrap_or_else(|| raw_output.to_string());
Some(ChatDelta::Error(msg))
}
// `completed` is not a stream terminator — the turn ends
// when `session/prompt` returns.
_ => None,
},
// Non-text content / other update kinds carry nothing to show.
SessionUpdate::AgentMessageChunk { .. }
| SessionUpdate::AgentThoughtChunk { .. }
| SessionUpdate::Other => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
fn note(update: Value) -> SessionNotification {
serde_json::from_value(serde_json::json!({
"sessionId": "s1",
"update": update,
}))
.unwrap()
}
#[test]
fn message_chunk_maps_to_text_delta() {
let n = note(serde_json::json!({
"sessionUpdate": "agent_message_chunk",
"content": { "type": "text", "text": "hello world" }
}));
assert_eq!(
session_update_to_delta(&n),
Some(ChatDelta::TextDelta("hello world".into()))
);
}
#[test]
fn thought_chunk_maps_to_thinking() {
let n = note(serde_json::json!({
"sessionUpdate": "agent_thought_chunk",
"content": { "type": "text", "text": "hmm" }
}));
assert_eq!(
session_update_to_delta(&n),
Some(ChatDelta::Thinking("hmm".into()))
);
}
#[test]
fn tool_call_maps_to_tool_use() {
let n = note(serde_json::json!({
"sessionUpdate": "tool_call",
"toolCallId": "tc1",
"title": "snapshot_layout",
"rawInput": { "page": 0 }
}));
match session_update_to_delta(&n) {
Some(ChatDelta::ToolUse { name, .. }) => assert_eq!(name, "snapshot_layout"),
other => panic!("unexpected: {other:?}"),
}
}
#[test]
fn failed_tool_update_maps_to_error() {
let n = note(serde_json::json!({
"sessionUpdate": "tool_call_update",
"toolCallId": "tc1",
"status": "failed",
"content": [ { "content": { "type": "text", "text": "permission denied" } } ]
}));
assert_eq!(
session_update_to_delta(&n),
Some(ChatDelta::Error("permission denied".into()))
);
}
#[test]
fn completed_tool_update_and_unknown_yield_nothing() {
let completed = note(serde_json::json!({
"sessionUpdate": "tool_call_update",
"toolCallId": "tc1",
"status": "completed",
"rawOutput": { "ok": true }
}));
assert_eq!(session_update_to_delta(&completed), None);
let plan = note(serde_json::json!({ "sessionUpdate": "plan", "entries": [] }));
assert_eq!(session_update_to_delta(&plan), None);
}
}

View file

@ -0,0 +1,268 @@
//! Minimal JSON-RPC 2.0 engine over the ndJSON transport.
//!
//! [`JsonRpcEngine`] allocates request ids, correlates responses
//! through per-id oneshot channels, and — via [`dispatch_inbound`] —
//! routes inbound frames: responses to their waiter, `session/update`
//! notifications to a channel, and `session/request_permission`
//! requests to an auto-approval reply (TS parity — the user already
//! trusted the agent by configuring it).
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use serde_json::Value;
use tokio::sync::{mpsc, oneshot};
use crate::protocol::{
classify_inbound, Inbound, JsonRpcError, JsonRpcRequest, JsonRpcResponse,
RequestPermissionParams, SessionNotification, METHOD_REQUEST_PERMISSION, METHOD_SESSION_UPDATE,
};
use crate::types::AcpError;
/// Map of in-flight request id → response waiter.
type Pending = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Value, AcpError>>>>>;
/// Shared JSON-RPC engine — cloned between the connection handle and
/// the background reader task.
#[derive(Clone)]
pub struct JsonRpcEngine {
out_tx: mpsc::UnboundedSender<Value>,
pending: Pending,
next_id: Arc<AtomicU64>,
}
impl JsonRpcEngine {
/// Build an engine that writes outbound frames to `out_tx`.
pub fn new(out_tx: mpsc::UnboundedSender<Value>) -> Self {
Self {
out_tx,
pending: Arc::new(Mutex::new(HashMap::new())),
next_id: Arc::new(AtomicU64::new(1)),
}
}
/// The shared pending-request map (the reader task resolves it).
pub fn pending(&self) -> Pending {
self.pending.clone()
}
/// A clone of the outbound-frame sender.
pub fn out_tx(&self) -> mpsc::UnboundedSender<Value> {
self.out_tx.clone()
}
/// Send a request and await its correlated response, up to
/// `timeout`.
pub async fn call(
&self,
method: &str,
params: Value,
timeout: Duration,
) -> Result<Value, AcpError> {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let (tx, rx) = oneshot::channel();
self.pending.lock().unwrap().insert(id, tx);
let req = JsonRpcRequest::new(id, method, params);
let frame =
serde_json::to_value(&req).map_err(|e| AcpError::Protocol(e.to_string()))?;
if self.out_tx.send(frame).is_err() {
self.pending.lock().unwrap().remove(&id);
return Err(AcpError::Closed);
}
match tokio::time::timeout(timeout, rx).await {
Ok(Ok(result)) => result,
// The reader task dropped the sender — connection died.
Ok(Err(_)) => Err(AcpError::Closed),
Err(_) => {
self.pending.lock().unwrap().remove(&id);
Err(AcpError::Transport(format!(
"request '{method}' timed out"
)))
}
}
}
}
/// Choose an "allow" option from a `session/request_permission`
/// request and build the JSON-RPC result that selects it. Mirrors the
/// TS `requestPermission` handler.
pub fn auto_approve_permission(params: &Value) -> Value {
let parsed: RequestPermissionParams =
serde_json::from_value(params.clone()).unwrap_or(RequestPermissionParams { options: vec![] });
let chosen = parsed
.options
.iter()
.find(|o| {
matches!(o.kind.as_deref(), Some("allow_once") | Some("allow_always"))
|| o.option_id.starts_with("allow")
})
.or_else(|| parsed.options.first());
let option_id = chosen
.map(|o| o.option_id.clone())
.unwrap_or_else(|| "allow".to_string());
serde_json::json!({
"outcome": { "outcome": "selected", "optionId": option_id }
})
}
/// Route one inbound JSON frame: resolve a response waiter, forward a
/// `session/update` notification, or reply to an agent → client
/// request.
pub fn dispatch_inbound(
value: Value,
pending: &Pending,
notif_tx: &mpsc::UnboundedSender<SessionNotification>,
out_tx: &mpsc::UnboundedSender<Value>,
) {
match classify_inbound(&value) {
Inbound::Response { id, result, error } => {
if let Some(tx) = pending.lock().unwrap().remove(&id) {
let resolved = match error {
Some(e) => Err(AcpError::Rpc {
code: e.code,
message: e.message,
}),
None => Ok(result.unwrap_or(Value::Null)),
};
let _ = tx.send(resolved);
}
}
Inbound::Request { id, method, params } => {
let response = if method == METHOD_REQUEST_PERMISSION {
JsonRpcResponse::ok(id, auto_approve_permission(&params))
} else {
// Unsupported agent → client request — reply with a
// JSON-RPC "method not found" so the agent fails fast
// instead of hanging.
JsonRpcResponse {
jsonrpc: "2.0",
id,
result: None,
error: Some(JsonRpcError {
code: -32601,
message: format!("method '{method}' not supported"),
data: None,
}),
}
};
if let Ok(frame) = serde_json::to_value(&response) {
let _ = out_tx.send(frame);
}
}
Inbound::Notification { method, params } => {
if method == METHOD_SESSION_UPDATE {
if let Ok(note) = serde_json::from_value::<SessionNotification>(params) {
let _ = notif_tx.send(note);
}
}
// Other notifications are not surfaced.
}
Inbound::Unknown => {}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn auto_approve_prefers_allow_option() {
let params = serde_json::json!({
"options": [
{ "optionId": "reject-1", "kind": "reject_once" },
{ "optionId": "ok-1", "kind": "allow_always" }
]
});
let out = auto_approve_permission(&params);
assert_eq!(out["outcome"]["outcome"], "selected");
assert_eq!(out["outcome"]["optionId"], "ok-1");
}
#[test]
fn auto_approve_falls_back_to_first_option() {
let params = serde_json::json!({
"options": [{ "optionId": "first", "kind": "custom" }]
});
assert_eq!(
auto_approve_permission(&params)["outcome"]["optionId"],
"first"
);
// No options at all → the generic "allow" sentinel.
let empty = serde_json::json!({ "options": [] });
assert_eq!(
auto_approve_permission(&empty)["outcome"]["optionId"],
"allow"
);
}
#[tokio::test]
async fn call_correlates_a_response() {
let (out_tx, mut out_rx) = mpsc::unbounded_channel::<Value>();
let engine = JsonRpcEngine::new(out_tx);
let (notif_tx, _notif_rx) = mpsc::unbounded_channel();
let pending = engine.pending();
let reply_tx = engine.out_tx();
// Background "agent": read the request, echo a response.
tokio::spawn(async move {
let req = out_rx.recv().await.unwrap();
let id = req["id"].as_u64().unwrap();
let response = serde_json::json!({
"jsonrpc": "2.0", "id": id, "result": { "pong": true }
});
dispatch_inbound(response, &pending, &notif_tx, &reply_tx);
});
let result = engine
.call("ping", serde_json::json!({}), Duration::from_secs(2))
.await
.unwrap();
assert_eq!(result["pong"], true);
}
#[tokio::test]
async fn call_surfaces_rpc_error() {
let (out_tx, mut out_rx) = mpsc::unbounded_channel::<Value>();
let engine = JsonRpcEngine::new(out_tx);
let (notif_tx, _notif_rx) = mpsc::unbounded_channel();
let pending = engine.pending();
let reply_tx = engine.out_tx();
tokio::spawn(async move {
let req = out_rx.recv().await.unwrap();
let id = req["id"].as_u64().unwrap();
let err = serde_json::json!({
"jsonrpc": "2.0", "id": id,
"error": { "code": -32000, "message": "boom" }
});
dispatch_inbound(err, &pending, &notif_tx, &reply_tx);
});
let err = engine
.call("fail", serde_json::json!({}), Duration::from_secs(2))
.await
.unwrap_err();
assert!(matches!(err, AcpError::Rpc { code: -32000, .. }));
}
#[tokio::test]
async fn notification_reaches_the_channel() {
let (out_tx, _out_rx) = mpsc::unbounded_channel::<Value>();
let (notif_tx, mut notif_rx) = mpsc::unbounded_channel();
let pending: Pending = Arc::new(Mutex::new(HashMap::new()));
let note = serde_json::json!({
"jsonrpc": "2.0",
"method": "session/update",
"params": {
"sessionId": "s1",
"update": { "sessionUpdate": "agent_message_chunk",
"content": { "type": "text", "text": "hi" } }
}
});
dispatch_inbound(note, &pending, &notif_tx, &out_tx);
let received = notif_rx.recv().await.unwrap();
assert_eq!(received.session_id.as_deref(), Some("s1"));
}
}

30
crates/op-acp/src/lib.rs Normal file
View file

@ -0,0 +1,30 @@
//! OpenPencil ACP client — a Rust port of the TS `pen-acp` package.
//!
//! ACP (Agent Client Protocol) lets third-party agents plug into
//! OpenPencil over JSON-RPC 2.0 framed as newline-delimited JSON. An
//! agent runs either as a local child process (stdio) or behind a
//! remote WebSocket endpoint.
//!
//! Layers:
//! - [`transport`] — ndJSON frame read / write;
//! - [`jsonrpc`] — request-id correlation, notification routing,
//! `session/request_permission` auto-approval;
//! - [`client`] — [`AcpConnection`]: `initialize` / `session/new` /
//! `session/prompt`;
//! - [`event_adapter`] — maps `session/update` notifications onto the
//! chat panel's `ChatDelta` vocabulary.
//!
//! Desktop-only — it spawns processes + drives async IO via tokio, so
//! no wasm crate depends on it.
pub mod client;
pub mod event_adapter;
pub mod jsonrpc;
pub mod protocol;
pub mod transport;
pub mod types;
pub use client::{connect_acp_agent, AcpConnection};
pub use event_adapter::session_update_to_delta;
pub use protocol::{SessionNotification, SessionUpdate};
pub use types::{AcpAgentConfig, AcpAgentInfo, AcpConnectResult, AcpError, ConnectionType};

View file

@ -0,0 +1,305 @@
//! ACP wire protocol — JSON-RPC 2.0 envelopes plus the subset of
//! Agent Client Protocol message shapes the client drives.
//!
//! Method names + the `protocolVersion` constant mirror the
//! `@agentclientprotocol/sdk` schema (`AGENT_METHODS` / `PROTOCOL_VERSION`).
use serde::{Deserialize, Serialize};
use serde_json::Value;
/// ACP protocol version this client speaks (`PROTOCOL_VERSION` in the SDK).
pub const PROTOCOL_VERSION: u32 = 1;
/// `initialize` — handshake.
pub const METHOD_INITIALIZE: &str = "initialize";
/// `session/new` — open a session.
pub const METHOD_SESSION_NEW: &str = "session/new";
/// `session/prompt` — drive one turn.
pub const METHOD_SESSION_PROMPT: &str = "session/prompt";
/// `session/update` — streaming notification from the agent.
pub const METHOD_SESSION_UPDATE: &str = "session/update";
/// `session/request_permission` — agent asks the client to approve a tool.
pub const METHOD_REQUEST_PERMISSION: &str = "session/request_permission";
/// An outgoing JSON-RPC request.
#[derive(Debug, Serialize)]
pub struct JsonRpcRequest {
pub jsonrpc: &'static str,
pub id: u64,
pub method: String,
pub params: Value,
}
impl JsonRpcRequest {
/// Build a `2.0` request.
pub fn new(id: u64, method: &str, params: Value) -> Self {
Self {
jsonrpc: "2.0",
id,
method: method.to_string(),
params,
}
}
}
/// An outgoing JSON-RPC response (used to answer agent → client
/// requests such as `session/request_permission`).
#[derive(Debug, Serialize)]
pub struct JsonRpcResponse {
pub jsonrpc: &'static str,
pub id: Value,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<JsonRpcError>,
}
impl JsonRpcResponse {
/// A success response carrying `result`.
pub fn ok(id: Value, result: Value) -> Self {
Self {
jsonrpc: "2.0",
id,
result: Some(result),
error: None,
}
}
}
/// A JSON-RPC error object.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JsonRpcError {
pub code: i64,
pub message: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub data: Option<Value>,
}
/// One decoded inbound JSON-RPC message — a response to one of our
/// requests, an incoming request from the agent, a notification, or
/// something unrecognized.
#[derive(Debug)]
pub enum Inbound {
/// Response to a request we sent (`id` correlates).
Response {
id: u64,
result: Option<Value>,
error: Option<JsonRpcError>,
},
/// A request from the agent (carries the raw `id` to echo back).
Request {
id: Value,
method: String,
params: Value,
},
/// A notification (no `id`).
Notification { method: String, params: Value },
/// A line that did not parse as a JSON-RPC message.
Unknown,
}
/// Classify one inbound JSON value into an [`Inbound`].
pub fn classify_inbound(value: &Value) -> Inbound {
let obj = match value.as_object() {
Some(o) => o,
None => return Inbound::Unknown,
};
let has_method = obj.contains_key("method");
let id = obj.get("id");
match (has_method, id) {
// method + id → a request from the agent.
(true, Some(id)) if !id.is_null() => Inbound::Request {
id: id.clone(),
method: obj["method"].as_str().unwrap_or_default().to_string(),
params: obj.get("params").cloned().unwrap_or(Value::Null),
},
// method, no id → a notification.
(true, _) => Inbound::Notification {
method: obj["method"].as_str().unwrap_or_default().to_string(),
params: obj.get("params").cloned().unwrap_or(Value::Null),
},
// id, no method → a response to one of our requests.
(false, Some(id)) => match id.as_u64() {
Some(id) => Inbound::Response {
id,
result: obj.get("result").cloned(),
error: obj
.get("error")
.and_then(|e| serde_json::from_value(e.clone()).ok()),
},
None => Inbound::Unknown,
},
_ => Inbound::Unknown,
}
}
/// `initialize` result — only the fields the client reads.
#[derive(Debug, Default, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct InitializeResult {
#[serde(default)]
pub protocol_version: Option<u32>,
#[serde(default)]
pub agent_info: Option<AgentInfoWire>,
}
/// `agentInfo` block of an [`InitializeResult`].
#[derive(Debug, Default, Deserialize)]
pub struct AgentInfoWire {
#[serde(default)]
pub name: Option<String>,
#[serde(default)]
pub title: Option<String>,
#[serde(default)]
pub version: Option<String>,
}
/// `session/new` result.
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NewSessionResult {
pub session_id: String,
}
/// One content block of a prompt / message.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ContentBlock {
/// Plain text.
Text { text: String },
/// Any other content type — preserved opaquely.
#[serde(other)]
Other,
}
/// The discriminated `update` payload of a `session/update`
/// notification (`sessionUpdate` tag).
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "sessionUpdate", rename_all = "snake_case")]
pub enum SessionUpdate {
/// A streamed chunk of the agent's reply.
AgentMessageChunk { content: ContentBlock },
/// A streamed chunk of the agent's private reasoning.
AgentThoughtChunk { content: ContentBlock },
/// The agent announced a tool call (display-only).
ToolCall {
#[serde(rename = "toolCallId", default)]
tool_call_id: String,
#[serde(default)]
title: Option<String>,
#[serde(rename = "rawInput", default)]
raw_input: Value,
},
/// A tool call's status changed.
ToolCallUpdate {
#[serde(rename = "toolCallId", default)]
tool_call_id: String,
#[serde(default)]
status: Option<String>,
#[serde(rename = "rawOutput", default)]
raw_output: Value,
#[serde(default)]
content: Value,
},
/// Any other update kind — ignored by the event adapter.
#[serde(other)]
Other,
}
/// A `session/update` notification's params.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SessionNotification {
#[serde(default)]
pub session_id: Option<String>,
pub update: SessionUpdate,
}
/// One option offered in a `session/request_permission` request.
#[derive(Debug, Clone, Deserialize)]
pub struct PermissionOption {
#[serde(rename = "optionId")]
pub option_id: String,
#[serde(default)]
pub kind: Option<String>,
}
/// `session/request_permission` request params (only `options`).
#[derive(Debug, Clone, Deserialize)]
pub struct RequestPermissionParams {
#[serde(default)]
pub options: Vec<PermissionOption>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn classify_response_request_notification() {
let resp = serde_json::json!({"jsonrpc":"2.0","id":7,"result":{"ok":true}});
assert!(matches!(
classify_inbound(&resp),
Inbound::Response { id: 7, .. }
));
let req = serde_json::json!({"jsonrpc":"2.0","id":3,"method":"session/request_permission","params":{}});
assert!(matches!(classify_inbound(&req), Inbound::Request { .. }));
let note = serde_json::json!({"jsonrpc":"2.0","method":"session/update","params":{}});
assert!(matches!(
classify_inbound(&note),
Inbound::Notification { .. }
));
assert!(matches!(classify_inbound(&serde_json::json!(5)), Inbound::Unknown));
}
#[test]
fn session_update_deserializes_known_variants() {
let chunk = serde_json::json!({
"sessionUpdate": "agent_message_chunk",
"content": { "type": "text", "text": "hello" }
});
let u: SessionUpdate = serde_json::from_value(chunk).unwrap();
match u {
SessionUpdate::AgentMessageChunk {
content: ContentBlock::Text { text },
} => assert_eq!(text, "hello"),
other => panic!("unexpected: {other:?}"),
}
// An unknown update kind falls through to `Other`.
let weird = serde_json::json!({"sessionUpdate": "plan", "entries": []});
assert!(matches!(
serde_json::from_value::<SessionUpdate>(weird).unwrap(),
SessionUpdate::Other
));
}
#[test]
fn tool_call_update_carries_status() {
let v = serde_json::json!({
"sessionUpdate": "tool_call_update",
"toolCallId": "t1",
"status": "completed",
"rawOutput": { "result": 42 }
});
let u: SessionUpdate = serde_json::from_value(v).unwrap();
match u {
SessionUpdate::ToolCallUpdate {
tool_call_id,
status,
..
} => {
assert_eq!(tool_call_id, "t1");
assert_eq!(status.as_deref(), Some("completed"));
}
other => panic!("unexpected: {other:?}"),
}
}
#[test]
fn request_serializes_with_jsonrpc_envelope() {
let req = JsonRpcRequest::new(1, METHOD_INITIALIZE, serde_json::json!({}));
let json = serde_json::to_string(&req).unwrap();
assert!(json.contains("\"jsonrpc\":\"2.0\""));
assert!(json.contains("\"method\":\"initialize\""));
}
}

View file

@ -0,0 +1,98 @@
//! ndJSON framing — newline-delimited JSON over an async byte stream.
//!
//! ACP speaks JSON-RPC where every message is one JSON object on its
//! own line (the SDK's `ndJsonStream`). These helpers read and write
//! that framing; the JSON-RPC engine ([`crate::jsonrpc`]) sits on top.
use serde_json::Value;
use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt};
use crate::types::AcpError;
/// Read the next ndJSON frame from `reader`. Blank lines are skipped;
/// a line that fails to parse as JSON is skipped (ndJSON robustness —
/// a stray non-JSON line must not kill the connection). Returns
/// `Ok(None)` at end of stream.
pub async fn read_frame(
reader: &mut (impl AsyncBufRead + Unpin),
) -> Result<Option<Value>, AcpError> {
let mut line = String::new();
loop {
line.clear();
let n = reader
.read_line(&mut line)
.await
.map_err(|e| AcpError::Transport(e.to_string()))?;
if n == 0 {
return Ok(None);
}
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
match serde_json::from_str::<Value>(trimmed) {
Ok(v) => return Ok(Some(v)),
// Skip a malformed line rather than tearing down the link.
Err(_) => continue,
}
}
}
/// `AsyncBufRead` is needed for `read_line`; re-exported so callers
/// don't need a separate `tokio::io` import.
pub use tokio::io::AsyncBufRead;
/// Write one ndJSON frame: the compact JSON of `value` followed by a
/// newline, flushed.
pub async fn write_frame(
writer: &mut (impl AsyncWrite + Unpin),
value: &Value,
) -> Result<(), AcpError> {
let mut bytes =
serde_json::to_vec(value).map_err(|e| AcpError::Protocol(e.to_string()))?;
bytes.push(b'\n');
writer
.write_all(&bytes)
.await
.map_err(|e| AcpError::Transport(e.to_string()))?;
writer
.flush()
.await
.map_err(|e| AcpError::Transport(e.to_string()))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[tokio::test]
async fn reads_frames_split_on_newline() {
let data = "{\"a\":1}\n{\"b\":2}\n";
let mut reader = Cursor::new(data.as_bytes());
let f1 = read_frame(&mut reader).await.unwrap().unwrap();
assert_eq!(f1["a"], 1);
let f2 = read_frame(&mut reader).await.unwrap().unwrap();
assert_eq!(f2["b"], 2);
// EOF.
assert!(read_frame(&mut reader).await.unwrap().is_none());
}
#[tokio::test]
async fn skips_blank_and_malformed_lines() {
let data = "\n \nnot json at all\n{\"ok\":true}\n";
let mut reader = Cursor::new(data.as_bytes());
let f = read_frame(&mut reader).await.unwrap().unwrap();
assert_eq!(f["ok"], true);
}
#[tokio::test]
async fn write_frame_appends_newline() {
let mut buf: Vec<u8> = Vec::new();
write_frame(&mut buf, &serde_json::json!({"x":1}))
.await
.unwrap();
assert_eq!(buf, b"{\"x\":1}\n");
}
}

128
crates/op-acp/src/types.rs Normal file
View file

@ -0,0 +1,128 @@
//! Public ACP types — port of `pen-acp/src/types.ts`.
use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
/// Transport an [`AcpAgentConfig`] uses.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ConnectionType {
/// A local child process spoken to over stdio.
Local,
/// A remote agent reached over a WebSocket URL.
Remote,
}
/// Persisted config for one user-configured ACP agent.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct AcpAgentConfig {
pub id: String,
pub display_name: String,
pub connection_type: ConnectionType,
/// Local: the binary to spawn.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub command: Option<String>,
/// Local: CLI arguments.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub args: Vec<String>,
/// Local: extra environment variables. A JSON object on the wire
/// (`Record<string, string>`), matching the TS `pen-acp` config.
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub env: BTreeMap<String, String>,
/// Remote: the WebSocket endpoint.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub url: Option<String>,
/// Whether the agent shows in the UI.
pub enabled: bool,
}
/// Agent identity returned by the `initialize` handshake.
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct AcpAgentInfo {
pub name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub title: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub version: Option<String>,
}
/// Result of a connection attempt (TS `AcpConnectResult`).
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AcpConnectResult {
pub connected: bool,
pub agent_info: Option<AcpAgentInfo>,
pub error: Option<String>,
}
/// Errors raised by the ACP client.
#[derive(Debug, thiserror::Error)]
pub enum AcpError {
/// A local agent config had no `command`, or a remote one no `url`.
#[error("ACP config error: {0}")]
Config(String),
/// Failed to spawn the local agent process.
#[error("failed to spawn ACP agent: {0}")]
Spawn(String),
/// Transport-level IO failure (stdio / socket).
#[error("ACP transport error: {0}")]
Transport(String),
/// The agent returned a JSON-RPC error response.
#[error("ACP agent error {code}: {message}")]
Rpc { code: i64, message: String },
/// A response / notification could not be (de)serialized.
#[error("ACP protocol error: {0}")]
Protocol(String),
/// The connection closed before the request completed.
#[error("ACP connection closed")]
Closed,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn agent_config_round_trips_through_json() {
let mut env = BTreeMap::new();
env.insert("KEY".to_string(), "value".to_string());
let cfg = AcpAgentConfig {
id: "a1".into(),
display_name: "My Agent".into(),
connection_type: ConnectionType::Local,
command: Some("my-agent".into()),
args: vec!["--acp".into()],
env,
url: None,
enabled: true,
};
let json = serde_json::to_string(&cfg).unwrap();
// camelCase on the wire.
assert!(json.contains("\"connectionType\":\"local\""));
assert!(json.contains("\"displayName\""));
// `env` is a JSON object, not an array of pairs (TS parity).
assert!(json.contains("\"env\":{\"KEY\":\"value\"}"));
let back: AcpAgentConfig = serde_json::from_str(&json).unwrap();
assert_eq!(back, cfg);
}
#[test]
fn remote_config_omits_local_only_fields() {
let cfg = AcpAgentConfig {
id: "r1".into(),
display_name: "Remote".into(),
connection_type: ConnectionType::Remote,
command: None,
args: vec![],
env: BTreeMap::new(),
url: Some("ws://localhost:9000".into()),
enabled: false,
};
let json = serde_json::to_string(&cfg).unwrap();
assert!(!json.contains("command"));
assert!(json.contains("\"url\""));
let back: AcpAgentConfig = serde_json::from_str(&json).unwrap();
assert_eq!(back, cfg);
}
}