collab: Add request stream support (#56455)

Adds streaming RPC forwarding to collab so guests can call
`GetInitialGraphData` and `SearchCommits` against a remote host project.
Previously these requests had no forwarder registered on the server and
would fail when invoked by a guest.

This mirrors the existing single-response forwarding pattern with new
analogues:
- `StreamResponse<R>` + `MessageContext::forward_request_stream`
- `Server::add_request_stream_handler`
- `forward_read_only_project_stream_request`, registered for both
messages

Also hardens both the unary and stream handlers to send
`respond_with_error` when a handler returns `Ok` without sending/ending
a response, so the client doesn't hang waiting for a reply that will
never arrive.

I added git graph collab integration tests for this as well. 


Self-Review Checklist:

- [x] I've reviewed my own diff for quality, security, and reliability
- [x] Unsafe blocks (if any) have justifying comments
- [x] The content is consistent with the [UI/UX
checklist](https://github.com/zed-industries/zed/blob/main/CONTRIBUTING.md#uiux-checklist)
- [x] Tests cover the new/changed behavior
- [x] Performance impact has been considered and is acceptable

Closes #55954

Release Notes:

- N/A
This commit is contained in:
Anthony Eid 2026-05-12 13:43:06 -04:00 committed by GitHub
parent 917c698483
commit 592727b892
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 425 additions and 109 deletions

1
Cargo.lock generated
View file

@ -3220,6 +3220,7 @@ dependencies = [
"fs",
"futures 0.3.32",
"git",
"git_graph",
"git_hosting_providers",
"git_ui",
"gpui",

View file

@ -97,6 +97,7 @@ extension.workspace = true
file_finder.workspace = true
fs = { workspace = true, features = ["test-support"] }
git = { workspace = true, features = ["test-support"] }
git_graph = { workspace = true, features = ["test-support"] }
git_hosting_providers.workspace = true
git_ui = { workspace = true, features = ["test-support"] }
gpui = { workspace = true, features = ["test-support"] }

View file

@ -39,8 +39,10 @@ use tracing::Span;
use util::paths::PathStyle;
use futures::{
FutureExt, SinkExt, StreamExt, TryStreamExt, channel::oneshot, future::BoxFuture,
stream::FuturesUnordered,
FutureExt, SinkExt, StreamExt, TryStreamExt,
channel::oneshot,
future::BoxFuture,
stream::{BoxStream, FuturesUnordered},
};
use prometheus::{IntGauge, register_int_gauge};
use rpc::{
@ -128,6 +130,30 @@ impl<R: RequestMessage> Response<R> {
}
}
struct StreamResponse<R> {
peer: Arc<Peer>,
receipt: Receipt<R>,
ended: Arc<AtomicBool>,
}
impl<R: RequestMessage> StreamResponse<R> {
fn send(&self, payload: R::Response) -> Result<()> {
self.peer.respond(self.receipt, payload)?;
Ok(())
}
fn end(self) -> Result<()> {
// Always mark `ended` even if sending `EndStream` on the wire fails, so that
// `ended` reflects "the handler intended to end the stream". The caller still
// gets the underlying error and routes through the Err arm of the handler,
// which sends `respond_with_error` to terminate the client-side stream.
let result = self.peer.end_stream(self.receipt);
self.ended.store(true, SeqCst);
result?;
Ok(())
}
}
#[derive(Clone, Debug)]
pub enum Principal {
User(User),
@ -178,6 +204,36 @@ impl MessageContext {
.inspect_err(|_| tracing::error!("error forwarding request"))
.inspect_ok(|_| tracing::info!("finished forwarding request"))
}
pub fn forward_request_stream<T: RequestMessage>(
&self,
receiver_id: ConnectionId,
request: T,
) -> impl Future<Output = anyhow::Result<BoxStream<'static, anyhow::Result<T::Response>>>> {
let request_start_time = Instant::now();
let span = self.span.clone();
let peer = self.peer.clone();
let envelope = request.into_envelope(0, None, Some(self.connection_id.into()));
async move {
tracing::info!("start forwarding stream request");
let stream = peer
.request_stream_dynamic(receiver_id, envelope, T::NAME)
.await;
span.record(
HOST_WAITING_MS,
request_start_time.elapsed().as_micros() as f64 / 1000.0,
);
let stream = stream
.inspect_err(|_| tracing::error!("error forwarding stream request"))?
.map(|response| {
T::Response::from_envelope(response?)
.context("received response of the wrong type")
})
.boxed();
tracing::info!("finished opening forwarded stream request");
Ok(stream)
}
}
}
#[derive(Clone)]
@ -438,6 +494,12 @@ impl Server {
.add_request_handler(forward_read_only_project_request::<proto::GitGetWorktrees>)
.add_request_handler(forward_read_only_project_request::<proto::GitGetHeadSha>)
.add_request_handler(forward_read_only_project_request::<proto::GetCommitData>)
.add_request_stream_handler(
forward_read_only_project_stream_request::<proto::GetInitialGraphData>,
)
.add_request_stream_handler(
forward_read_only_project_stream_request::<proto::SearchCommits>,
)
.add_request_handler(forward_mutating_project_request::<proto::GitCreateWorktree>)
.add_request_handler(disallow_guest_request::<proto::GitRemoveWorktree>)
.add_request_handler(disallow_guest_request::<proto::GitRenameWorktree>)
@ -722,7 +784,54 @@ impl Server {
if responded.load(std::sync::atomic::Ordering::SeqCst) {
Ok(())
} else {
Err(anyhow!("handler did not send a response"))?
let error = anyhow!("handler did not send a response");
let proto_err =
ErrorCode::Internal.message(format!("{error}")).to_proto();
peer.respond_with_error(receipt, proto_err)?;
Err(error)?
}
}
Err(error) => {
let proto_err = match &error {
Error::Internal(err) => err.to_proto(),
_ => ErrorCode::Internal.message(format!("{error}")).to_proto(),
};
peer.respond_with_error(receipt, proto_err)?;
Err(error)
}
}
}
})
}
fn add_request_stream_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
where
F: 'static + Send + Sync + Fn(M, StreamResponse<M>, MessageContext) -> Fut,
Fut: Send + Future<Output = Result<()>>,
M: RequestMessage,
{
let handler = Arc::new(handler);
self.add_handler(move |envelope, session| {
let receipt = envelope.receipt();
let handler = handler.clone();
async move {
let peer = session.peer.clone();
let ended = Arc::new(AtomicBool::default());
let response = StreamResponse {
peer: peer.clone(),
ended: ended.clone(),
receipt,
};
match (handler)(envelope.payload, response, session).await {
Ok(()) => {
if ended.load(std::sync::atomic::Ordering::SeqCst) {
Ok(())
} else {
let error = anyhow!("handler did not end a response stream");
let proto_err =
ErrorCode::Internal.message(format!("{error}")).to_proto();
peer.respond_with_error(receipt, proto_err)?;
Err(error)?
}
}
Err(error) => {
@ -2256,6 +2365,32 @@ where
Ok(())
}
/// forward a project stream request to the host. These requests should be read only
/// as guests are allowed to send them.
async fn forward_read_only_project_stream_request<T>(
request: T,
response: StreamResponse<T>,
session: MessageContext,
) -> Result<()>
where
T: EntityMessage + RequestMessage,
{
let project_id = ProjectId::from_proto(request.remote_entity_id());
let host_connection_id = session
.db()
.await
.host_for_read_only_project_request(project_id, session.connection_id)
.await?;
let mut stream = session
.forward_request_stream(host_connection_id, request)
.await?;
while let Some(payload) = stream.next().await {
response.send(payload?)?;
}
response.end()?;
Ok(())
}
/// forward a project request to the host. These requests are disallowed
/// for guests.
async fn forward_mutating_project_request<T>(

View file

@ -1,19 +1,27 @@
use std::path::{self, Path, PathBuf};
use std::{
path::{self, Path, PathBuf},
sync::Arc,
};
use call::ActiveCall;
use client::RECEIVE_TIMEOUT;
use collections::HashMap;
use git::{
Oid,
repository::{CommitData, RepoPath, Worktree as GitWorktree},
repository::{CommitData, InitialGraphCommitData, RepoPath, Worktree as GitWorktree},
status::{DiffStat, FileStatus, StatusCode, TrackedStatus},
};
use git_graph::GitGraph;
use git_ui::{git_panel::GitPanel, project_diff::ProjectDiff};
use gpui::{AppContext as _, BackgroundExecutor, SharedString, TestAppContext, VisualTestContext};
use gpui::{
AppContext as _, BackgroundExecutor, Entity, IntoElement as _, SharedString, TestAppContext,
VisualContext as _, VisualTestContext, point, px, size,
};
use project::{
ProjectPath,
git_store::{CommitDataState, Repository},
};
use rand::{SeedableRng, rngs::StdRng};
use serde_json::json;
use util::{path, rel_path::rel_path};
@ -154,6 +162,52 @@ fn branch_list_snapshot(
})
}
fn build_git_graph(
project: &Entity<project::Project>,
workspace: &Entity<Workspace>,
cx: &mut VisualTestContext,
) -> Entity<GitGraph> {
let (repository_id, git_store) = project.read_with(cx, |project, cx| {
let repository = project
.active_repository(cx)
.expect("project should have an active repository");
(repository.read(cx).id, project.git_store().clone())
});
let workspace = workspace.downgrade();
cx.new_window_entity(|window, cx| {
GitGraph::new(repository_id, git_store, workspace, None, window, cx)
})
}
fn render_git_graph(graph: &Entity<GitGraph>, cx: &mut VisualTestContext) {
cx.draw(point(px(0.), px(0.)), size(px(1200.), px(800.)), |_, _| {
graph.clone().into_any_element()
});
cx.run_until_parked();
}
fn assert_initial_graph_commits_eq(
actual: &[Arc<InitialGraphCommitData>],
expected: &[Arc<InitialGraphCommitData>],
) {
assert_eq!(actual.len(), expected.len(), "commit count should match");
for (index, (actual, expected)) in actual.iter().zip(expected).enumerate() {
assert_eq!(
actual.sha, expected.sha,
"sha should match at index {index}"
);
assert_eq!(
actual.parents, expected.parents,
"parents should match at index {index}"
);
assert_eq!(
actual.ref_names, expected.ref_names,
"ref names should match at index {index}"
);
}
}
fn assert_remote_cache_matches_local_cache(
local_repository: &gpui::Entity<Repository>,
remote_repository: &gpui::Entity<Repository>,
@ -695,6 +749,104 @@ async fn test_remote_git_commit_data_batches(
assert_remote_cache_matches_local_cache(&repo_a, &repo_b, cx_a, cx_b);
}
#[gpui::test]
async fn test_remote_git_graph_data_and_search(
executor: BackgroundExecutor,
cx_a: &mut TestAppContext,
cx_b: &mut TestAppContext,
) {
let mut server = TestServer::start(executor.clone()).await;
let client_a = server.create_client(cx_a, "user_a").await;
let client_b = server.create_client(cx_b, "user_b").await;
server
.create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)])
.await;
cx_a.update(|cx| {
git_ui::init(cx);
git_graph::init(cx);
});
cx_b.update(|cx| {
git_ui::init(cx);
git_graph::init(cx);
});
let active_call_a = cx_a.read(ActiveCall::global);
client_a
.fs()
.insert_tree(
path!("/project"),
json!({ ".git": {}, "file.txt": "content" }),
)
.await;
let search_query = "graph search match";
let mut rng = StdRng::seed_from_u64(7);
let commits = git_graph::generate_random_commit_dag(&mut rng, 12, true);
let dot_git = Path::new(path!("/project/.git"));
client_a.fs().set_graph_commits(dot_git, commits.clone());
client_a.fs().set_commit_data(
dot_git,
commits.iter().enumerate().map(|(index, commit)| {
(
CommitData {
sha: commit.sha,
parents: commit.parents.clone(),
author_name: SharedString::from(format!("Author {index}")),
author_email: SharedString::from(format!("author{index}@example.com")),
commit_timestamp: 1_700_000_000 + index as i64,
subject: SharedString::from(format!("Subject {index}")),
message: SharedString::from(if index % 2 == 0 {
format!("Subject {index}\n\n{search_query} {index}")
} else {
format!("Subject {index}\n\nPlain message {index}")
}),
},
false,
)
}),
);
let (project_a, _) = client_a.build_local_project(path!("/project"), cx_a).await;
executor.run_until_parked();
let project_id = active_call_a
.update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
.await
.unwrap();
let project_b = client_b.join_remote_project(project_id, cx_b).await;
executor.run_until_parked();
let (workspace_b, cx_b) = client_b.build_workspace(&project_b, cx_b);
let remote_graph = build_git_graph(&project_b, &workspace_b, cx_b);
render_git_graph(&remote_graph, cx_b);
let remote_initial_graph_data =
remote_graph.read_with(cx_b, |graph, _| graph.initial_commit_data_for_test());
remote_graph.update(cx_b, |graph, cx| {
graph.search_for_test(SharedString::from(search_query), cx);
});
cx_b.run_until_parked();
let remote_search_results =
remote_graph.read_with(cx_b, |graph, _| graph.search_matches_for_test());
let (workspace_a, cx_a) = client_a.build_workspace(&project_a, cx_a);
let local_graph = build_git_graph(&project_a, &workspace_a, cx_a);
render_git_graph(&local_graph, cx_a);
let local_initial_graph_data =
local_graph.read_with(cx_a, |graph, _| graph.initial_commit_data_for_test());
local_graph.update(cx_a, |graph, cx| {
graph.search_for_test(SharedString::from(search_query), cx);
});
cx_a.run_until_parked();
let local_search_results =
local_graph.read_with(cx_a, |graph, _| graph.search_matches_for_test());
assert_initial_graph_commits_eq(&local_initial_graph_data, &commits);
assert_initial_graph_commits_eq(&remote_initial_graph_data, &local_initial_graph_data);
assert!(!local_search_results.is_empty());
assert_eq!(remote_search_results, local_search_results);
}
#[gpui::test]
async fn test_branch_list_sync(
executor: BackgroundExecutor,

View file

@ -14,6 +14,7 @@ path = "src/git_graph.rs"
[features]
default = []
test-support = [
"dep:rand",
"project/test-support",
"gpui/test-support",
"remote_connection/test-support",
@ -33,6 +34,7 @@ menu.workspace = true
picker.workspace = true
project.workspace = true
project_panel.workspace = true
rand = { workspace = true, optional = true }
search.workspace = true
settings.workspace = true
smallvec.workspace = true

View file

@ -3978,6 +3978,134 @@ mod persistence {
}
}
#[cfg(any(test, feature = "test-support"))]
impl GitGraph {
pub fn search_for_test(&mut self, query: SharedString, cx: &mut Context<Self>) {
self.search(query, cx);
}
pub fn search_matches_for_test(&self) -> Vec<Oid> {
self.search_state.matches.iter().copied().collect()
}
pub fn initial_commit_data_for_test(&self) -> Vec<Arc<InitialGraphCommitData>> {
self.graph_data
.commits
.iter()
.map(|commit| commit.data.clone())
.collect()
}
}
/// Generates a random commit DAG suitable for testing git graph rendering.
///
/// The commits are ordered newest-first (like git log output), so:
/// - Index 0 = most recent commit (HEAD)
/// - Last index = oldest commit (root, has no parents)
/// - Parents of commit at index I must have index > I
///
/// When `adversarial` is true, generates complex topologies with many branches
/// and octopus merges. Otherwise generates more realistic linear histories
/// with occasional branches.
#[cfg(any(test, feature = "test-support"))]
pub fn generate_random_commit_dag(
rng: &mut rand::rngs::StdRng,
num_commits: usize,
adversarial: bool,
) -> Vec<Arc<InitialGraphCommitData>> {
use rand::Rng as _;
if num_commits == 0 {
return Vec::new();
}
let mut commits: Vec<Arc<InitialGraphCommitData>> = Vec::with_capacity(num_commits);
let oids: Vec<Oid> = (0..num_commits).map(|_| Oid::random(rng)).collect();
for i in 0..num_commits {
let sha = oids[i];
let parents = if i == num_commits - 1 {
smallvec![]
} else {
generate_parents_from_oids(rng, &oids, i, num_commits, adversarial)
};
let ref_names = if i == 0 {
vec!["HEAD".into(), "main".into()]
} else if adversarial && rng.random_bool(0.1) {
vec![format!("branch-{i}").into()]
} else {
Vec::new()
};
commits.push(Arc::new(InitialGraphCommitData {
sha,
parents,
ref_names,
}));
}
commits
}
#[cfg(any(test, feature = "test-support"))]
fn generate_parents_from_oids(
rng: &mut rand::rngs::StdRng,
oids: &[Oid],
current_idx: usize,
num_commits: usize,
adversarial: bool,
) -> SmallVec<[Oid; 1]> {
use rand::{Rng as _, seq::SliceRandom as _};
let remaining = num_commits - current_idx - 1;
if remaining == 0 {
return smallvec![];
}
if adversarial {
let merge_chance = 0.4;
let octopus_chance = 0.15;
if remaining >= 3 && rng.random_bool(octopus_chance) {
let num_parents = rng.random_range(3..=remaining.min(5));
let mut parent_indices: Vec<usize> = (current_idx + 1..num_commits).collect();
parent_indices.shuffle(rng);
parent_indices
.into_iter()
.take(num_parents)
.map(|idx| oids[idx])
.collect()
} else if remaining >= 2 && rng.random_bool(merge_chance) {
let mut parent_indices: Vec<usize> = (current_idx + 1..num_commits).collect();
parent_indices.shuffle(rng);
parent_indices
.into_iter()
.take(2)
.map(|idx| oids[idx])
.collect()
} else {
let parent_idx = rng.random_range(current_idx + 1..num_commits);
smallvec![oids[parent_idx]]
}
} else {
let merge_chance = 0.15;
let skip_chance = 0.1;
if remaining >= 2 && rng.random_bool(merge_chance) {
let first_parent = current_idx + 1;
let second_parent = rng.random_range(current_idx + 2..num_commits);
smallvec![oids[first_parent], oids[second_parent]]
} else if rng.random_bool(skip_chance) && remaining >= 2 {
let skip = rng.random_range(1..remaining.min(3));
smallvec![oids[current_idx + 1 + skip]]
} else {
smallvec![oids[current_idx + 1]]
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -4008,109 +4136,6 @@ mod tests {
});
}
/// Generates a random commit DAG suitable for testing git graph rendering.
///
/// The commits are ordered newest-first (like git log output), so:
/// - Index 0 = most recent commit (HEAD)
/// - Last index = oldest commit (root, has no parents)
/// - Parents of commit at index I must have index > I
///
/// When `adversarial` is true, generates complex topologies with many branches
/// and octopus merges. Otherwise generates more realistic linear histories
/// with occasional branches.
fn generate_random_commit_dag(
rng: &mut StdRng,
num_commits: usize,
adversarial: bool,
) -> Vec<Arc<InitialGraphCommitData>> {
if num_commits == 0 {
return Vec::new();
}
let mut commits: Vec<Arc<InitialGraphCommitData>> = Vec::with_capacity(num_commits);
let oids: Vec<Oid> = (0..num_commits).map(|_| Oid::random(rng)).collect();
for i in 0..num_commits {
let sha = oids[i];
let parents = if i == num_commits - 1 {
smallvec![]
} else {
generate_parents_from_oids(rng, &oids, i, num_commits, adversarial)
};
let ref_names = if i == 0 {
vec!["HEAD".into(), "main".into()]
} else if adversarial && rng.random_bool(0.1) {
vec![format!("branch-{}", i).into()]
} else {
Vec::new()
};
commits.push(Arc::new(InitialGraphCommitData {
sha,
parents,
ref_names,
}));
}
commits
}
fn generate_parents_from_oids(
rng: &mut StdRng,
oids: &[Oid],
current_idx: usize,
num_commits: usize,
adversarial: bool,
) -> SmallVec<[Oid; 1]> {
let remaining = num_commits - current_idx - 1;
if remaining == 0 {
return smallvec![];
}
if adversarial {
let merge_chance = 0.4;
let octopus_chance = 0.15;
if remaining >= 3 && rng.random_bool(octopus_chance) {
let num_parents = rng.random_range(3..=remaining.min(5));
let mut parent_indices: Vec<usize> = (current_idx + 1..num_commits).collect();
parent_indices.shuffle(rng);
parent_indices
.into_iter()
.take(num_parents)
.map(|idx| oids[idx])
.collect()
} else if remaining >= 2 && rng.random_bool(merge_chance) {
let mut parent_indices: Vec<usize> = (current_idx + 1..num_commits).collect();
parent_indices.shuffle(rng);
parent_indices
.into_iter()
.take(2)
.map(|idx| oids[idx])
.collect()
} else {
let parent_idx = rng.random_range(current_idx + 1..num_commits);
smallvec![oids[parent_idx]]
}
} else {
let merge_chance = 0.15;
let skip_chance = 0.1;
if remaining >= 2 && rng.random_bool(merge_chance) {
let first_parent = current_idx + 1;
let second_parent = rng.random_range(current_idx + 2..num_commits);
smallvec![oids[first_parent], oids[second_parent]]
} else if rng.random_bool(skip_chance) && remaining >= 2 {
let skip = rng.random_range(1..remaining.min(3));
smallvec![oids[current_idx + 1 + skip]]
} else {
smallvec![oids[current_idx + 1]]
}
}
}
fn build_oid_to_row_map(graph: &GraphData) -> HashMap<Oid, usize> {
graph
.commits