From aa3234e41d4bd579f987aacff6ee8b8878aa11ef Mon Sep 17 00:00:00 2001 From: Richard Feldman Date: Wed, 27 May 2026 11:47:25 -0400 Subject: [PATCH] Harden archived worktree restore cleanup --- crates/agent_ui/src/thread_metadata_store.rs | 169 +++++++++++------- .../agent_ui/src/thread_worktree_archive.rs | 126 ++++++++++--- crates/sidebar/src/sidebar.rs | 61 ++++++- 3 files changed, 255 insertions(+), 101 deletions(-) diff --git a/crates/agent_ui/src/thread_metadata_store.rs b/crates/agent_ui/src/thread_metadata_store.rs index ad1078700b8..99793ae4730 100644 --- a/crates/agent_ui/src/thread_metadata_store.rs +++ b/crates/agent_ui/src/thread_metadata_store.rs @@ -971,6 +971,13 @@ impl ThreadMetadataStore { } fn save_internal(&mut self, metadata: ThreadMetadata) { + self.cache_thread_metadata_update(metadata.clone()); + self.pending_thread_ops_tx + .try_send(DbOperation::Upsert(metadata)) + .log_err(); + } + + fn cache_thread_metadata_update(&mut self, metadata: ThreadMetadata) { if let Some(thread) = self.threads.get(&metadata.thread_id) { if thread.folder_paths() != metadata.folder_paths() { if let Some(thread_ids) = self.threads_by_paths.get_mut(thread.folder_paths()) { @@ -989,10 +996,7 @@ impl ThreadMetadataStore { } } - self.cache_thread_metadata(metadata.clone()); - self.pending_thread_ops_tx - .try_send(DbOperation::Upsert(metadata)) - .log_err(); + self.cache_thread_metadata(metadata); } fn cache_thread_metadata(&mut self, metadata: ThreadMetadata) { @@ -1171,9 +1175,6 @@ impl ThreadMetadataStore { archived: false, ..thread }; - self.save_internal(metadata.clone()); - cx.notify(); - let (completion_tx, completion_rx) = async_channel::bounded(1); if let Err(error) = self .pending_thread_ops_tx @@ -1197,6 +1198,15 @@ impl ThreadMetadataStore { })) } + pub fn apply_completed_worktree_restore( + &mut self, + metadata: ThreadMetadata, + cx: &mut Context, + ) { + self.cache_thread_metadata_update(metadata); + cx.notify(); + } + /// Apply a mutation to the worktree paths of all threads whose current /// `folder_paths` matches `current_folder_paths`, then re-index. /// When `remote_connection` is provided, only threads with a matching @@ -1460,14 +1470,8 @@ impl ThreadMetadataStore { archived_worktree_ids, completion_tx, } => { - let result = async { - db.save(metadata).await?; - for archived_worktree_id in archived_worktree_ids { - db.delete_archived_worktree(archived_worktree_id).await?; - } - anyhow::Ok(()) - } - .await; + let result = + db.complete_restore(metadata, archived_worktree_ids).await; completion_tx.send(result).await.log_err(); } } @@ -1747,6 +1751,13 @@ impl ThreadMetadataDb { /// session_id on promotion (when the first message is sent) and /// then flow through this same upsert path. pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> { + self.write(move |conn| Self::save_sync(conn, row)).await + } + + fn save_sync( + conn: &db::sqlez::connection::Connection, + row: ThreadMetadata, + ) -> anyhow::Result<()> { let session_id = row.session_id.as_ref().map(|s| s.0.clone()); let agent_id = if row.agent_id.as_ref() == ZED_AGENT_ID.as_ref() { None @@ -1784,39 +1795,53 @@ impl ThreadMetadataDb { let thread_id = row.thread_id; let archived = row.archived; + let sql = "INSERT INTO sidebar_threads(thread_id, session_id, agent_id, title, updated_at, created_at, interacted_at, folder_paths, folder_paths_order, archived, main_worktree_paths, main_worktree_paths_order, remote_connection, title_override) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14) \ + ON CONFLICT(thread_id) DO UPDATE SET \ + session_id = excluded.session_id, \ + agent_id = excluded.agent_id, \ + title = excluded.title, \ + updated_at = excluded.updated_at, \ + created_at = excluded.created_at, \ + interacted_at = excluded.interacted_at, \ + folder_paths = excluded.folder_paths, \ + folder_paths_order = excluded.folder_paths_order, \ + archived = excluded.archived, \ + main_worktree_paths = excluded.main_worktree_paths, \ + main_worktree_paths_order = excluded.main_worktree_paths_order, \ + remote_connection = excluded.remote_connection, \ + title_override = excluded.title_override"; + let mut stmt = Statement::prepare(conn, sql)?; + let mut i = stmt.bind(&thread_id, 1)?; + i = stmt.bind(&session_id, i)?; + i = stmt.bind(&agent_id, i)?; + i = stmt.bind(&title, i)?; + i = stmt.bind(&updated_at, i)?; + i = stmt.bind(&created_at, i)?; + i = stmt.bind(&interacted_at, i)?; + i = stmt.bind(&folder_paths, i)?; + i = stmt.bind(&folder_paths_order, i)?; + i = stmt.bind(&archived, i)?; + i = stmt.bind(&main_worktree_paths, i)?; + i = stmt.bind(&main_worktree_paths_order, i)?; + i = stmt.bind(&remote_connection, i)?; + stmt.bind(&title_override, i)?; + stmt.exec() + } + + pub async fn complete_restore( + &self, + metadata: ThreadMetadata, + archived_worktree_ids: Vec, + ) -> anyhow::Result<()> { self.write(move |conn| { - let sql = "INSERT INTO sidebar_threads(thread_id, session_id, agent_id, title, updated_at, created_at, interacted_at, folder_paths, folder_paths_order, archived, main_worktree_paths, main_worktree_paths_order, remote_connection, title_override) \ - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14) \ - ON CONFLICT(thread_id) DO UPDATE SET \ - session_id = excluded.session_id, \ - agent_id = excluded.agent_id, \ - title = excluded.title, \ - updated_at = excluded.updated_at, \ - created_at = excluded.created_at, \ - interacted_at = excluded.interacted_at, \ - folder_paths = excluded.folder_paths, \ - folder_paths_order = excluded.folder_paths_order, \ - archived = excluded.archived, \ - main_worktree_paths = excluded.main_worktree_paths, \ - main_worktree_paths_order = excluded.main_worktree_paths_order, \ - remote_connection = excluded.remote_connection, \ - title_override = excluded.title_override"; - let mut stmt = Statement::prepare(conn, sql)?; - let mut i = stmt.bind(&thread_id, 1)?; - i = stmt.bind(&session_id, i)?; - i = stmt.bind(&agent_id, i)?; - i = stmt.bind(&title, i)?; - i = stmt.bind(&updated_at, i)?; - i = stmt.bind(&created_at, i)?; - i = stmt.bind(&interacted_at, i)?; - i = stmt.bind(&folder_paths, i)?; - i = stmt.bind(&folder_paths_order, i)?; - i = stmt.bind(&archived, i)?; - i = stmt.bind(&main_worktree_paths, i)?; - i = stmt.bind(&main_worktree_paths_order, i)?; - i = stmt.bind(&remote_connection, i)?; - stmt.bind(&title_override, i)?; - stmt.exec() + conn.with_savepoint("complete_thread_restore", || { + Self::save_sync(conn, metadata)?; + for archived_worktree_id in archived_worktree_ids { + Self::delete_archived_worktree_sync(conn, archived_worktree_id)?; + } + Ok(()) + }) }) .await } @@ -1890,20 +1915,24 @@ impl ThreadMetadataDb { } pub async fn delete_archived_worktree(&self, id: i64) -> anyhow::Result<()> { - self.write(move |conn| { - let mut stmt = Statement::prepare( - conn, - "DELETE FROM thread_archived_worktrees WHERE archived_worktree_id = ?", - )?; - stmt.bind(&id, 1)?; - stmt.exec()?; + self.write(move |conn| Self::delete_archived_worktree_sync(conn, id)) + .await + } - let mut stmt = - Statement::prepare(conn, "DELETE FROM archived_git_worktrees WHERE id = ?")?; - stmt.bind(&id, 1)?; - stmt.exec() - }) - .await + fn delete_archived_worktree_sync( + conn: &db::sqlez::connection::Connection, + id: i64, + ) -> anyhow::Result<()> { + let mut stmt = Statement::prepare( + conn, + "DELETE FROM thread_archived_worktrees WHERE archived_worktree_id = ?", + )?; + stmt.bind(&id, 1)?; + stmt.exec()?; + + let mut stmt = Statement::prepare(conn, "DELETE FROM archived_git_worktrees WHERE id = ?")?; + stmt.bind(&id, 1)?; + stmt.exec() } pub async fn unlink_thread_from_all_archived_worktrees( @@ -3765,9 +3794,12 @@ mod tests { store.complete_worktree_restore(thread_id, &replacements, Vec::new(), cx) }) .expect("thread should exist"); - persist_task + let metadata = persist_task .await .expect("restore completion should persist"); + store.update(cx, |store, cx| { + store.apply_completed_worktree_restore(metadata, cx); + }); let entry = store.read_with(cx, |store, _cx| store.entry(thread_id).cloned()); let entry = entry.unwrap(); @@ -3808,9 +3840,12 @@ mod tests { store.complete_worktree_restore(thread_id, &replacements, Vec::new(), cx) }) .expect("thread should exist"); - persist_task + let metadata = persist_task .await .expect("restore completion should persist"); + store.update(cx, |store, cx| { + store.apply_completed_worktree_restore(metadata, cx); + }); let entry = store.read_with(cx, |store, _cx| store.entry(thread_id).cloned()); let entry = entry.unwrap(); @@ -3854,9 +3889,12 @@ mod tests { store.complete_worktree_restore(thread_id, &replacements, Vec::new(), cx) }) .expect("thread should exist"); - persist_task + let metadata = persist_task .await .expect("restore completion should persist"); + store.update(cx, |store, cx| { + store.apply_completed_worktree_restore(metadata, cx); + }); let entry = store.read_with(cx, |store, _cx| store.entry(thread_id).cloned()); let entry = entry.unwrap(); @@ -3899,9 +3937,12 @@ mod tests { store.complete_worktree_restore(thread_id, &replacements, Vec::new(), cx) }) .expect("thread should exist"); - persist_task + let metadata = persist_task .await .expect("restore completion should persist"); + store.update(cx, |store, cx| { + store.apply_completed_worktree_restore(metadata, cx); + }); let entry = store.read_with(cx, |store, _cx| store.entry(thread_id).cloned()); let entry = entry.unwrap(); diff --git a/crates/agent_ui/src/thread_worktree_archive.rs b/crates/agent_ui/src/thread_worktree_archive.rs index 65060457998..567a886777f 100644 --- a/crates/agent_ui/src/thread_worktree_archive.rs +++ b/crates/agent_ui/src/thread_worktree_archive.rs @@ -479,13 +479,8 @@ pub async fn persist_worktree_state(root: &RootPlan, cx: &mut AsyncApp) -> Resul let thread_ids: Vec = store.read_with(cx, |store, _cx| { store .entries() - .filter(|thread| { - thread - .folder_paths() - .paths() - .iter() - .any(|p| p.as_path() == root.root_path) - }) + .filter(|thread| thread.matches_remote_connection(root.remote_connection.as_ref())) + .filter(|thread| thread.references_folder_path(&root.root_path)) .map(|thread| thread.thread_id) .collect() }); @@ -574,14 +569,13 @@ pub async fn rollback_persist(archived_worktree_id: i64, root: &RootPlan, cx: &m /// **Destructive**: the final step (`restore_archive_checkpoint`) clobbers the /// working directory unconditionally via `git read-tree --reset -u`. If the /// path has any pre-existing content (a non-empty directory, a file, or a -/// symlink) it is moved aside into a `zed-restore-backup-` directory -/// before the rest of the destructive work runs. We try to place the backup -/// next to `worktree_path` so the rename stays on the same filesystem -/// (atomic and fast), and fall back to the system temp directory if a -/// sibling cannot be created. If a later step fails, the backup is moved -/// back over `worktree_path` so the user does not lose their content. On -/// success the backup directory is deleted asynchronously so a multi-GB -/// cleanup does not block the caller. +/// symlink) it is moved aside into a `zed-restore-backup-` sibling +/// directory before the rest of the destructive work runs. If a later step +/// fails, the backup is moved back over `worktree_path` so the user does not +/// lose their content. On success the backup directory is deleted +/// asynchronously so a multi-GB cleanup does not block the caller. If the +/// sibling backup directory cannot be created, restore aborts before any +/// destructive step runs. /// /// Callers MUST first call [`restore_would_overwrite`] and confirm with the /// user before invoking this function — there is no preflight or refusal @@ -593,6 +587,24 @@ pub async fn restore_worktree_via_git( confirmed_overwrite_paths: &HashSet, cx: &mut AsyncApp, ) -> Result { + let restored = restore_worktree_via_git_deferred_cleanup( + row, + remote_connection, + confirmed_overwrite_paths, + cx, + ) + .await?; + let path = restored.path.clone(); + restored.cleanup_backup(cx); + Ok(path) +} + +pub async fn restore_worktree_via_git_deferred_cleanup( + row: &ArchivedGitWorktree, + remote_connection: Option<&RemoteConnectionOptions>, + confirmed_overwrite_paths: &HashSet, + cx: &mut AsyncApp, +) -> Result { if remote_connection.is_some() { anyhow::bail!("restoring archived worktrees on remote machines is not yet supported"); } @@ -693,9 +705,10 @@ pub async fn restore_worktree_via_git( return Err(session.rollback_with_annotation(error).await); } - session.commit_async(app_state.fs.clone(), cx); - - Ok(worktree_path.clone()) + Ok(RestoredWorktree { + path: worktree_path.clone(), + backup: session.into_backup(), + }) } /// Fixed leaf filename inside a backup directory where the original @@ -722,6 +735,68 @@ impl Backup { } } +pub struct RestoredWorktree { + pub path: PathBuf, + backup: Option, +} + +impl RestoredWorktree { + pub fn cleanup_backup(self, cx: &mut AsyncApp) { + let Some(backup) = self.backup else { + return; + }; + let Some(app_state) = current_app_state(cx) else { + log::warn!( + "could not clean up backup directory '{}' after successful restore: no app state available", + backup.dir.display() + ); + return; + }; + schedule_backup_cleanup(app_state.fs.clone(), Some(backup), cx); + } + + pub async fn rollback( + self, + row: &ArchivedGitWorktree, + remote_connection: Option<&RemoteConnectionOptions>, + cx: &mut AsyncApp, + ) { + let Some(app_state) = current_app_state(cx) else { + if let Some(backup) = self.backup { + log::error!( + "could not roll back restored worktree at '{}': no app state available; pre-existing files remain at '{}'", + row.worktree_path.display(), + backup.target().display() + ); + } + return; + }; + let fs = app_state.fs.as_ref(); + match find_or_create_repository(&row.main_repo_path, remote_connection, cx).await { + Ok((main_repo, _temp_project)) => { + remove_new_worktree_on_error(fs, &main_repo, &row.worktree_path, cx).await; + } + Err(error) => { + log::warn!( + "failed to open main repo while rolling back restored worktree '{}': {error:#}", + row.worktree_path.display() + ); + } + } + + let restore_error = anyhow!("restore did not complete"); + if let Some(stranded_path) = + rollback_backup(fs, self.backup.as_ref(), &row.worktree_path, &restore_error).await + { + log::error!( + "rollback left pre-existing files for '{}' at '{}'", + row.worktree_path.display(), + stranded_path.display() + ); + } + } +} + /// Owns the entire "move aside any existing content, undo on failure, /// clean up on success" lifecycle for a single call to /// [`restore_worktree_via_git`]. Bundles the `fs` / `worktree_path` / @@ -735,10 +810,9 @@ impl Backup { /// that doesn't need its own access to the borrow held by the step. /// 3. For steps that share a `cx` borrow with their cleanup, call /// [`Self::rollback_with_annotation`] directly on the error path. -/// 4. On success, `session.commit_async(fs_owned, cx)` schedules -/// background cleanup of the backup directory. Dropping the session -/// without `commit_async` will leak the backup directory — on -/// purpose, since reaching that code path means something panicked. +/// 4. On success, `session.into_backup()` transfers cleanup ownership to +/// the caller, which can delete the backup only after the higher-level +/// restore transaction commits. struct BackupSession<'a> { fs: &'a dyn Fs, worktree_path: &'a Path, @@ -787,12 +861,8 @@ impl<'a> BackupSession<'a> { } } - /// Consumes the session and schedules cleanup of the (now-unused) - /// backup directory on a background task. Failures are logged but - /// not surfaced; the `zed-restore-backup-` naming makes any - /// orphans easy to spot manually. - fn commit_async(self, fs: Arc, cx: &mut AsyncApp) { - schedule_backup_cleanup(fs, self.backup, cx); + fn into_backup(self) -> Option { + self.backup } } diff --git a/crates/sidebar/src/sidebar.rs b/crates/sidebar/src/sidebar.rs index b43820551cc..d7c78a5e932 100644 --- a/crates/sidebar/src/sidebar.rs +++ b/crates/sidebar/src/sidebar.rs @@ -770,7 +770,7 @@ async fn prompt_overwrite_if_needed( /// post-restore plumbing. struct RestoreOutcome { archived: ArchivedGitWorktree, - restored_path: PathBuf, + restored: thread_worktree_archive::RestoredWorktree, } /// Runs the destructive `restore_worktree_via_git` for every archived @@ -795,22 +795,42 @@ async fn run_destructive_restore_pass( ) -> anyhow::Result> { let mut outcomes = Vec::with_capacity(archived_worktrees.len()); for row in archived_worktrees { - let restored_path = thread_worktree_archive::restore_worktree_via_git( + let restored = match thread_worktree_archive::restore_worktree_via_git_deferred_cleanup( row, remote_connection, confirmed_overwrite_paths, cx, ) .await - .with_context(|| format!("restoring worktree at {}", row.worktree_path.display()))?; + .with_context(|| format!("restoring worktree at {}", row.worktree_path.display())) + { + Ok(restored) => restored, + Err(error) => { + rollback_restore_outcomes(outcomes, remote_connection, cx).await; + return Err(error); + } + }; outcomes.push(RestoreOutcome { archived: row.clone(), - restored_path, + restored, }); } Ok(outcomes) } +async fn rollback_restore_outcomes( + outcomes: Vec, + remote_connection: Option<&RemoteConnectionOptions>, + cx: &mut AsyncApp, +) { + for outcome in outcomes.into_iter().rev() { + outcome + .restored + .rollback(&outcome.archived, remote_connection, cx) + .await; + } +} + /// Commits the DB-visible state changes for a successful restore: /// rewrites worktree paths on the thread metadata, unarchives the /// thread, and returns the updated metadata. @@ -845,7 +865,13 @@ async fn commit_db_state_after_restore( let Some(persist_task) = persist_task else { return Ok(None); }; - persist_task.await.map(Some) + let metadata = persist_task.await?; + cx.update(|cx| { + store.update(cx, |store, cx| { + store.apply_completed_worktree_restore(metadata.clone(), cx); + }); + }); + Ok(Some(metadata)) } #[derive(Clone)] @@ -4107,7 +4133,7 @@ impl Sidebar { .map(|outcome| { ( outcome.archived.worktree_path.clone(), - outcome.restored_path.clone(), + outcome.restored.path.clone(), ) }) .collect(); @@ -4120,14 +4146,26 @@ impl Sidebar { // commit above. let archived_worktree_ids = outcomes.iter().map(|outcome| outcome.archived.id).collect(); - let updated_metadata = commit_db_state_after_restore( + let updated_metadata = match commit_db_state_after_restore( &store, thread_id, &path_replacements, archived_worktree_ids, cx, ) - .await?; + .await + { + Ok(updated_metadata) => updated_metadata, + Err(error) => { + rollback_restore_outcomes( + outcomes, + metadata.remote_connection.as_ref(), + cx, + ) + .await; + return Err(error); + } + }; match updated_metadata { Some(updated_metadata) => { @@ -4170,7 +4208,7 @@ impl Sidebar { } let remote_connection = metadata.remote_connection.as_ref(); - let cleanups = outcomes.iter().map(|outcome| { + let cleanups = outcomes.into_iter().map(|outcome| { let mut cx = cx.clone(); async move { thread_worktree_archive::cleanup_archived_worktree_ref( @@ -4179,6 +4217,7 @@ impl Sidebar { &mut cx, ) .await; + outcome.restored.cleanup_backup(&mut cx); } }); futures::future::join_all(cleanups).await; @@ -4652,6 +4691,10 @@ impl Sidebar { except_terminal_id: Option, cx: &App, ) -> Vec { + if remote_connection.is_some() { + return Vec::new(); + } + let workspaces = self.archive_workspaces(cx); folder_paths .ordered_paths()