diff --git a/crates/agent/src/tools/diagnostics_tool.rs b/crates/agent/src/tools/diagnostics_tool.rs index 1d6528007d0..89d4ef54677 100644 --- a/crates/agent/src/tools/diagnostics_tool.rs +++ b/crates/agent/src/tools/diagnostics_tool.rs @@ -1,16 +1,18 @@ use crate::{AgentTool, ToolCallEventStream, ToolInput}; use agent_client_protocol::schema as acp; -use anyhow::Result; -use futures::FutureExt as _; -use gpui::{App, Entity, Task}; +use futures::{Future, FutureExt as _}; +use gpui::{App, AsyncApp, Entity, Task}; use language::{DiagnosticSeverity, OffsetRangeExt}; use project::Project; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use std::path::Path; use std::{fmt::Write, sync::Arc}; use ui::SharedString; use util::markdown::MarkdownInlineCode; +type Result = core::result::Result; + /// Get errors and warnings for the project or a specific file. /// /// This tool can be invoked after a series of edits to determine if further edits are necessary, or if the user asks to fix errors or warnings in their codebase. @@ -18,6 +20,11 @@ use util::markdown::MarkdownInlineCode; /// When a path is provided, shows all diagnostics for that specific file. /// When no path is provided, shows a summary of error and warning counts for all files in the project. /// +/// This tool attempts to refresh diagnostics before returning. +/// If refreshing diagnostics fails (for example, if the language server does not support pull-based diagnostics), it will return any diagnostics already present. +/// Note that, in this case, the results may be out-of-date, and may or may not reflect the most recent edits. +/// If this happens, do not attempt to re-run this tool in the hope that refreshing will later succeed. Failures are typically persistent. +/// /// /// To get diagnostics for a specific file: /// { @@ -60,6 +67,71 @@ impl DiagnosticsTool { } } +async fn with_cancellation(f: impl Future, s: &ToolCallEventStream) -> Result { + futures::select! { + result = f.fuse() => Ok(result), + _ = s.cancelled_by_user().fuse() => { + Err("Diagnostics cancelled by user".to_string()) + } + } +} + +fn freshness_message(refreshed: bool) -> &'static str { + if refreshed { + "Diagnostics successfully refreshed." + } else { + "Failed to refresh diagnostics. Diagnostics may be stale." + } +} + +/// Attempt to pull fresh diagnostics from the LSP before reading them. +/// +/// Returns `Ok(true)` if diagnostics were successfully refreshed, +/// `Ok(false)` if the pull failed (callers should fall through to +/// read cached diagnostics), or `Err` if cancelled by the user. +async fn pull_diagnostics( + project: &Entity, + path: Option<&Path>, + event_stream: &ToolCallEventStream, + cx: &mut AsyncApp, +) -> Result { + match path { + Some(path) => { + let open_buffer_task = project.update(cx, |project, cx| { + let Some(project_path) = project.find_project_path(path, cx) else { + return Err(format!("Could not find path {} in project", path.display())); + }; + Ok(project.open_buffer(project_path, cx)) + })?; + + let buffer = with_cancellation(open_buffer_task, event_stream) + .await? + .map_err(|e| e.to_string())?; + + let lsp_store = project.read_with(cx, |project, _cx| project.lsp_store()); + let pull_task = lsp_store.update(cx, |lsp_store, cx| { + lsp_store.pull_diagnostics_for_buffer(buffer, cx) + }); + let pull_result = with_cancellation(pull_task, event_stream).await?; + if let Err(error) = &pull_result { + log::warn!("Failed to pull diagnostics, using cached: {error:#}"); + } + Ok(pull_result.is_ok()) + } + None => { + let lsp_store = project.read_with(cx, |project, _cx| project.lsp_store()); + let pull_task = lsp_store.update(cx, |lsp_store, cx| { + lsp_store.pull_workspace_diagnostics_once(cx) + }); + let succeeded = with_cancellation(pull_task, event_stream).await?; + if !succeeded { + log::warn!("Failed to pull workspace diagnostics, using cached"); + } + Ok(succeeded) + } + } +} + impl AgentTool for DiagnosticsTool { type Input = DiagnosticsToolInput; type Output = String; @@ -96,21 +168,22 @@ impl AgentTool for DiagnosticsTool { let input = input.recv().await.map_err(|e| e.to_string())?; match input.path { - Some(path) if !path.is_empty() => { - let (_project_path, open_buffer_task) = project.update(cx, |project, cx| { - let Some(project_path) = project.find_project_path(&path, cx) else { + Some(ref path) if !path.is_empty() => { + let refreshed = + pull_diagnostics(&project, Some(Path::new(path)), &event_stream, cx) + .await?; + + let open_buffer_task = project.update(cx, |project, cx| { + let Some(project_path) = project.find_project_path(path, cx) else { return Err(format!("Could not find path {path} in project")); }; - let task = project.open_buffer(project_path.clone(), cx); - Ok((project_path, task)) + Ok(project.open_buffer(project_path, cx)) })?; - let buffer = futures::select! { - result = open_buffer_task.fuse() => result.map_err(|e| e.to_string())?, - _ = event_stream.cancelled_by_user().fuse() => { - return Err("Diagnostics cancelled by user".to_string()); - } - }; + let buffer = with_cancellation(open_buffer_task, &event_stream) + .await? + .map_err(|e| e.to_string())?; + let mut output = String::new(); let snapshot = buffer.read_with(cx, |buffer, _cx| buffer.snapshot()); @@ -133,13 +206,18 @@ impl AgentTool for DiagnosticsTool { .ok(); } + let freshness = freshness_message(refreshed); if output.is_empty() { - Ok("File doesn't have errors or warnings!".to_string()) + Ok(format!( + "{freshness}\n\nFile doesn't have errors or warnings!" + )) } else { - Ok(output) + Ok(format!("{freshness}\n\n{output}")) } } _ => { + let refreshed = pull_diagnostics(&project, None, &event_stream, cx).await?; + let (output, has_diagnostics) = project.read_with(cx, |project, cx| { let mut output = String::new(); let mut has_diagnostics = false; @@ -165,10 +243,13 @@ impl AgentTool for DiagnosticsTool { (output, has_diagnostics) }); + let freshness = freshness_message(refreshed); if has_diagnostics { - Ok(output) + Ok(format!("{freshness}\n\n{output}")) } else { - Ok("No errors or warnings found in the project.".into()) + Ok(format!( + "{freshness}\n\nNo errors or warnings found in the project." + )) } } } diff --git a/crates/project/src/lsp_store.rs b/crates/project/src/lsp_store.rs index c943498817d..fd738b0f7ad 100644 --- a/crates/project/src/lsp_store.rs +++ b/crates/project/src/lsp_store.rs @@ -58,6 +58,7 @@ use clock::Global; use collections::{BTreeMap, BTreeSet, HashMap, HashSet, btree_map}; use futures::{ AsyncWriteExt, Future, FutureExt, StreamExt, + channel::oneshot, future::{Either, Shared, join_all, pending, select}, select, select_biased, stream::FuturesUnordered, @@ -12423,11 +12424,45 @@ impl LspStore { .and_then(|local| local.language_servers.get_mut(&server_id)) { for diagnostics in workspace_diagnostics_refresh_tasks.values_mut() { - diagnostics.refresh_tx.try_send(()).ok(); + diagnostics.refresh_tx.try_send(None).ok(); } } } + /// Triggers a workspace diagnostics pull on all running language servers + /// and returns a [`Task`] that resolves once the requests have completed. + /// + /// This reuses the same background refresh loops as + /// [`Self::pull_workspace_diagnostics`], but provides a completion signal + /// so callers can wait for fresh diagnostics before reading them. + pub fn pull_workspace_diagnostics_once(&mut self, cx: &mut Context) -> Task { + let Some(local) = self.as_local_mut() else { + return Task::ready(true); + }; + + let mut receivers = Vec::new(); + for state in local.language_servers.values_mut() { + let LanguageServerState::Running { + workspace_diagnostics_refresh_tasks, + .. + } = state + else { + continue; + }; + for task in workspace_diagnostics_refresh_tasks.values_mut() { + let (tx, rx) = oneshot::channel(); + task.refresh_tx.try_send(Some(tx)).ok(); + receivers.push(rx); + } + } + + cx.background_spawn(async { + FuturesUnordered::from_iter(receivers) + .all(async |result| result.unwrap_or(false)) + .await + }) + } + /// Refreshes `textDocument/diagnostic` for all open buffers associated with the given server. /// This is called in response to `workspace/diagnostic/refresh` to comply with the LSP spec, /// which requires refreshing both workspace and document diagnostics. @@ -13517,8 +13552,8 @@ fn lsp_workspace_diagnostics_refresh( let registration_id_shared = registration_id.as_ref().map(SharedString::from); let (progress_tx, mut progress_rx) = mpsc::channel(1); - let (mut refresh_tx, mut refresh_rx) = mpsc::channel(1); - refresh_tx.try_send(()).ok(); + let (mut refresh_tx, mut refresh_rx) = mpsc::channel::>>(1); + refresh_tx.try_send(None).ok(); let request_timeout = ProjectSettings::get_global(cx) .global_lsp_settings @@ -13538,7 +13573,7 @@ fn lsp_workspace_diagnostics_refresh( let mut requests = 0; loop { - let Some(()) = refresh_rx.recv().await else { + let Some(mut completion_tx) = refresh_rx.recv().await else { return; }; @@ -13614,6 +13649,9 @@ fn lsp_workspace_diagnostics_refresh( } ConnectionResult::Result(Err(e)) => { log::error!("Error during workspace diagnostics pull: {e:#}"); + if let Some(tx) = completion_tx.take() { + tx.send(false).ok(); + } break 'request; } ConnectionResult::Result(Ok(pulled_diagnostics)) => { @@ -13631,6 +13669,9 @@ fn lsp_workspace_diagnostics_refresh( { return; } + if let Some(tx) = completion_tx.take() { + tx.send(true).ok(); + } break 'request; } } @@ -14109,7 +14150,7 @@ impl LanguageServerLogType { } pub struct WorkspaceRefreshTask { - refresh_tx: mpsc::Sender<()>, + refresh_tx: mpsc::Sender>>, progress_tx: mpsc::Sender<()>, #[allow(dead_code)] task: Task<()>,