Harden archived worktree restore cleanup

This commit is contained in:
Richard Feldman 2026-05-27 11:47:25 -04:00
parent 4856ffefcd
commit aa3234e41d
No known key found for this signature in database
3 changed files with 255 additions and 101 deletions

View file

@ -971,6 +971,13 @@ impl ThreadMetadataStore {
}
fn save_internal(&mut self, metadata: ThreadMetadata) {
self.cache_thread_metadata_update(metadata.clone());
self.pending_thread_ops_tx
.try_send(DbOperation::Upsert(metadata))
.log_err();
}
fn cache_thread_metadata_update(&mut self, metadata: ThreadMetadata) {
if let Some(thread) = self.threads.get(&metadata.thread_id) {
if thread.folder_paths() != metadata.folder_paths() {
if let Some(thread_ids) = self.threads_by_paths.get_mut(thread.folder_paths()) {
@ -989,10 +996,7 @@ impl ThreadMetadataStore {
}
}
self.cache_thread_metadata(metadata.clone());
self.pending_thread_ops_tx
.try_send(DbOperation::Upsert(metadata))
.log_err();
self.cache_thread_metadata(metadata);
}
fn cache_thread_metadata(&mut self, metadata: ThreadMetadata) {
@ -1171,9 +1175,6 @@ impl ThreadMetadataStore {
archived: false,
..thread
};
self.save_internal(metadata.clone());
cx.notify();
let (completion_tx, completion_rx) = async_channel::bounded(1);
if let Err(error) = self
.pending_thread_ops_tx
@ -1197,6 +1198,15 @@ impl ThreadMetadataStore {
}))
}
pub fn apply_completed_worktree_restore(
&mut self,
metadata: ThreadMetadata,
cx: &mut Context<Self>,
) {
self.cache_thread_metadata_update(metadata);
cx.notify();
}
/// Apply a mutation to the worktree paths of all threads whose current
/// `folder_paths` matches `current_folder_paths`, then re-index.
/// When `remote_connection` is provided, only threads with a matching
@ -1460,14 +1470,8 @@ impl ThreadMetadataStore {
archived_worktree_ids,
completion_tx,
} => {
let result = async {
db.save(metadata).await?;
for archived_worktree_id in archived_worktree_ids {
db.delete_archived_worktree(archived_worktree_id).await?;
}
anyhow::Ok(())
}
.await;
let result =
db.complete_restore(metadata, archived_worktree_ids).await;
completion_tx.send(result).await.log_err();
}
}
@ -1747,6 +1751,13 @@ impl ThreadMetadataDb {
/// session_id on promotion (when the first message is sent) and
/// then flow through this same upsert path.
pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
self.write(move |conn| Self::save_sync(conn, row)).await
}
fn save_sync(
conn: &db::sqlez::connection::Connection,
row: ThreadMetadata,
) -> anyhow::Result<()> {
let session_id = row.session_id.as_ref().map(|s| s.0.clone());
let agent_id = if row.agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
None
@ -1784,39 +1795,53 @@ impl ThreadMetadataDb {
let thread_id = row.thread_id;
let archived = row.archived;
let sql = "INSERT INTO sidebar_threads(thread_id, session_id, agent_id, title, updated_at, created_at, interacted_at, folder_paths, folder_paths_order, archived, main_worktree_paths, main_worktree_paths_order, remote_connection, title_override) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14) \
ON CONFLICT(thread_id) DO UPDATE SET \
session_id = excluded.session_id, \
agent_id = excluded.agent_id, \
title = excluded.title, \
updated_at = excluded.updated_at, \
created_at = excluded.created_at, \
interacted_at = excluded.interacted_at, \
folder_paths = excluded.folder_paths, \
folder_paths_order = excluded.folder_paths_order, \
archived = excluded.archived, \
main_worktree_paths = excluded.main_worktree_paths, \
main_worktree_paths_order = excluded.main_worktree_paths_order, \
remote_connection = excluded.remote_connection, \
title_override = excluded.title_override";
let mut stmt = Statement::prepare(conn, sql)?;
let mut i = stmt.bind(&thread_id, 1)?;
i = stmt.bind(&session_id, i)?;
i = stmt.bind(&agent_id, i)?;
i = stmt.bind(&title, i)?;
i = stmt.bind(&updated_at, i)?;
i = stmt.bind(&created_at, i)?;
i = stmt.bind(&interacted_at, i)?;
i = stmt.bind(&folder_paths, i)?;
i = stmt.bind(&folder_paths_order, i)?;
i = stmt.bind(&archived, i)?;
i = stmt.bind(&main_worktree_paths, i)?;
i = stmt.bind(&main_worktree_paths_order, i)?;
i = stmt.bind(&remote_connection, i)?;
stmt.bind(&title_override, i)?;
stmt.exec()
}
pub async fn complete_restore(
&self,
metadata: ThreadMetadata,
archived_worktree_ids: Vec<i64>,
) -> anyhow::Result<()> {
self.write(move |conn| {
let sql = "INSERT INTO sidebar_threads(thread_id, session_id, agent_id, title, updated_at, created_at, interacted_at, folder_paths, folder_paths_order, archived, main_worktree_paths, main_worktree_paths_order, remote_connection, title_override) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14) \
ON CONFLICT(thread_id) DO UPDATE SET \
session_id = excluded.session_id, \
agent_id = excluded.agent_id, \
title = excluded.title, \
updated_at = excluded.updated_at, \
created_at = excluded.created_at, \
interacted_at = excluded.interacted_at, \
folder_paths = excluded.folder_paths, \
folder_paths_order = excluded.folder_paths_order, \
archived = excluded.archived, \
main_worktree_paths = excluded.main_worktree_paths, \
main_worktree_paths_order = excluded.main_worktree_paths_order, \
remote_connection = excluded.remote_connection, \
title_override = excluded.title_override";
let mut stmt = Statement::prepare(conn, sql)?;
let mut i = stmt.bind(&thread_id, 1)?;
i = stmt.bind(&session_id, i)?;
i = stmt.bind(&agent_id, i)?;
i = stmt.bind(&title, i)?;
i = stmt.bind(&updated_at, i)?;
i = stmt.bind(&created_at, i)?;
i = stmt.bind(&interacted_at, i)?;
i = stmt.bind(&folder_paths, i)?;
i = stmt.bind(&folder_paths_order, i)?;
i = stmt.bind(&archived, i)?;
i = stmt.bind(&main_worktree_paths, i)?;
i = stmt.bind(&main_worktree_paths_order, i)?;
i = stmt.bind(&remote_connection, i)?;
stmt.bind(&title_override, i)?;
stmt.exec()
conn.with_savepoint("complete_thread_restore", || {
Self::save_sync(conn, metadata)?;
for archived_worktree_id in archived_worktree_ids {
Self::delete_archived_worktree_sync(conn, archived_worktree_id)?;
}
Ok(())
})
})
.await
}
@ -1890,20 +1915,24 @@ impl ThreadMetadataDb {
}
pub async fn delete_archived_worktree(&self, id: i64) -> anyhow::Result<()> {
self.write(move |conn| {
let mut stmt = Statement::prepare(
conn,
"DELETE FROM thread_archived_worktrees WHERE archived_worktree_id = ?",
)?;
stmt.bind(&id, 1)?;
stmt.exec()?;
self.write(move |conn| Self::delete_archived_worktree_sync(conn, id))
.await
}
let mut stmt =
Statement::prepare(conn, "DELETE FROM archived_git_worktrees WHERE id = ?")?;
stmt.bind(&id, 1)?;
stmt.exec()
})
.await
fn delete_archived_worktree_sync(
conn: &db::sqlez::connection::Connection,
id: i64,
) -> anyhow::Result<()> {
let mut stmt = Statement::prepare(
conn,
"DELETE FROM thread_archived_worktrees WHERE archived_worktree_id = ?",
)?;
stmt.bind(&id, 1)?;
stmt.exec()?;
let mut stmt = Statement::prepare(conn, "DELETE FROM archived_git_worktrees WHERE id = ?")?;
stmt.bind(&id, 1)?;
stmt.exec()
}
pub async fn unlink_thread_from_all_archived_worktrees(
@ -3765,9 +3794,12 @@ mod tests {
store.complete_worktree_restore(thread_id, &replacements, Vec::new(), cx)
})
.expect("thread should exist");
persist_task
let metadata = persist_task
.await
.expect("restore completion should persist");
store.update(cx, |store, cx| {
store.apply_completed_worktree_restore(metadata, cx);
});
let entry = store.read_with(cx, |store, _cx| store.entry(thread_id).cloned());
let entry = entry.unwrap();
@ -3808,9 +3840,12 @@ mod tests {
store.complete_worktree_restore(thread_id, &replacements, Vec::new(), cx)
})
.expect("thread should exist");
persist_task
let metadata = persist_task
.await
.expect("restore completion should persist");
store.update(cx, |store, cx| {
store.apply_completed_worktree_restore(metadata, cx);
});
let entry = store.read_with(cx, |store, _cx| store.entry(thread_id).cloned());
let entry = entry.unwrap();
@ -3854,9 +3889,12 @@ mod tests {
store.complete_worktree_restore(thread_id, &replacements, Vec::new(), cx)
})
.expect("thread should exist");
persist_task
let metadata = persist_task
.await
.expect("restore completion should persist");
store.update(cx, |store, cx| {
store.apply_completed_worktree_restore(metadata, cx);
});
let entry = store.read_with(cx, |store, _cx| store.entry(thread_id).cloned());
let entry = entry.unwrap();
@ -3899,9 +3937,12 @@ mod tests {
store.complete_worktree_restore(thread_id, &replacements, Vec::new(), cx)
})
.expect("thread should exist");
persist_task
let metadata = persist_task
.await
.expect("restore completion should persist");
store.update(cx, |store, cx| {
store.apply_completed_worktree_restore(metadata, cx);
});
let entry = store.read_with(cx, |store, _cx| store.entry(thread_id).cloned());
let entry = entry.unwrap();

View file

@ -479,13 +479,8 @@ pub async fn persist_worktree_state(root: &RootPlan, cx: &mut AsyncApp) -> Resul
let thread_ids: Vec<ThreadId> = store.read_with(cx, |store, _cx| {
store
.entries()
.filter(|thread| {
thread
.folder_paths()
.paths()
.iter()
.any(|p| p.as_path() == root.root_path)
})
.filter(|thread| thread.matches_remote_connection(root.remote_connection.as_ref()))
.filter(|thread| thread.references_folder_path(&root.root_path))
.map(|thread| thread.thread_id)
.collect()
});
@ -574,14 +569,13 @@ pub async fn rollback_persist(archived_worktree_id: i64, root: &RootPlan, cx: &m
/// **Destructive**: the final step (`restore_archive_checkpoint`) clobbers the
/// working directory unconditionally via `git read-tree --reset -u`. If the
/// path has any pre-existing content (a non-empty directory, a file, or a
/// symlink) it is moved aside into a `zed-restore-backup-<uuid>` directory
/// before the rest of the destructive work runs. We try to place the backup
/// next to `worktree_path` so the rename stays on the same filesystem
/// (atomic and fast), and fall back to the system temp directory if a
/// sibling cannot be created. If a later step fails, the backup is moved
/// back over `worktree_path` so the user does not lose their content. On
/// success the backup directory is deleted asynchronously so a multi-GB
/// cleanup does not block the caller.
/// symlink) it is moved aside into a `zed-restore-backup-<uuid>` sibling
/// directory before the rest of the destructive work runs. If a later step
/// fails, the backup is moved back over `worktree_path` so the user does not
/// lose their content. On success the backup directory is deleted
/// asynchronously so a multi-GB cleanup does not block the caller. If the
/// sibling backup directory cannot be created, restore aborts before any
/// destructive step runs.
///
/// Callers MUST first call [`restore_would_overwrite`] and confirm with the
/// user before invoking this function — there is no preflight or refusal
@ -593,6 +587,24 @@ pub async fn restore_worktree_via_git(
confirmed_overwrite_paths: &HashSet<PathBuf>,
cx: &mut AsyncApp,
) -> Result<PathBuf> {
let restored = restore_worktree_via_git_deferred_cleanup(
row,
remote_connection,
confirmed_overwrite_paths,
cx,
)
.await?;
let path = restored.path.clone();
restored.cleanup_backup(cx);
Ok(path)
}
pub async fn restore_worktree_via_git_deferred_cleanup(
row: &ArchivedGitWorktree,
remote_connection: Option<&RemoteConnectionOptions>,
confirmed_overwrite_paths: &HashSet<PathBuf>,
cx: &mut AsyncApp,
) -> Result<RestoredWorktree> {
if remote_connection.is_some() {
anyhow::bail!("restoring archived worktrees on remote machines is not yet supported");
}
@ -693,9 +705,10 @@ pub async fn restore_worktree_via_git(
return Err(session.rollback_with_annotation(error).await);
}
session.commit_async(app_state.fs.clone(), cx);
Ok(worktree_path.clone())
Ok(RestoredWorktree {
path: worktree_path.clone(),
backup: session.into_backup(),
})
}
/// Fixed leaf filename inside a backup directory where the original
@ -722,6 +735,68 @@ impl Backup {
}
}
pub struct RestoredWorktree {
pub path: PathBuf,
backup: Option<Backup>,
}
impl RestoredWorktree {
pub fn cleanup_backup(self, cx: &mut AsyncApp) {
let Some(backup) = self.backup else {
return;
};
let Some(app_state) = current_app_state(cx) else {
log::warn!(
"could not clean up backup directory '{}' after successful restore: no app state available",
backup.dir.display()
);
return;
};
schedule_backup_cleanup(app_state.fs.clone(), Some(backup), cx);
}
pub async fn rollback(
self,
row: &ArchivedGitWorktree,
remote_connection: Option<&RemoteConnectionOptions>,
cx: &mut AsyncApp,
) {
let Some(app_state) = current_app_state(cx) else {
if let Some(backup) = self.backup {
log::error!(
"could not roll back restored worktree at '{}': no app state available; pre-existing files remain at '{}'",
row.worktree_path.display(),
backup.target().display()
);
}
return;
};
let fs = app_state.fs.as_ref();
match find_or_create_repository(&row.main_repo_path, remote_connection, cx).await {
Ok((main_repo, _temp_project)) => {
remove_new_worktree_on_error(fs, &main_repo, &row.worktree_path, cx).await;
}
Err(error) => {
log::warn!(
"failed to open main repo while rolling back restored worktree '{}': {error:#}",
row.worktree_path.display()
);
}
}
let restore_error = anyhow!("restore did not complete");
if let Some(stranded_path) =
rollback_backup(fs, self.backup.as_ref(), &row.worktree_path, &restore_error).await
{
log::error!(
"rollback left pre-existing files for '{}' at '{}'",
row.worktree_path.display(),
stranded_path.display()
);
}
}
}
/// Owns the entire "move aside any existing content, undo on failure,
/// clean up on success" lifecycle for a single call to
/// [`restore_worktree_via_git`]. Bundles the `fs` / `worktree_path` /
@ -735,10 +810,9 @@ impl Backup {
/// that doesn't need its own access to the borrow held by the step.
/// 3. For steps that share a `cx` borrow with their cleanup, call
/// [`Self::rollback_with_annotation`] directly on the error path.
/// 4. On success, `session.commit_async(fs_owned, cx)` schedules
/// background cleanup of the backup directory. Dropping the session
/// without `commit_async` will leak the backup directory — on
/// purpose, since reaching that code path means something panicked.
/// 4. On success, `session.into_backup()` transfers cleanup ownership to
/// the caller, which can delete the backup only after the higher-level
/// restore transaction commits.
struct BackupSession<'a> {
fs: &'a dyn Fs,
worktree_path: &'a Path,
@ -787,12 +861,8 @@ impl<'a> BackupSession<'a> {
}
}
/// Consumes the session and schedules cleanup of the (now-unused)
/// backup directory on a background task. Failures are logged but
/// not surfaced; the `zed-restore-backup-<uuid>` naming makes any
/// orphans easy to spot manually.
fn commit_async(self, fs: Arc<dyn Fs>, cx: &mut AsyncApp) {
schedule_backup_cleanup(fs, self.backup, cx);
fn into_backup(self) -> Option<Backup> {
self.backup
}
}

View file

@ -770,7 +770,7 @@ async fn prompt_overwrite_if_needed(
/// post-restore plumbing.
struct RestoreOutcome {
archived: ArchivedGitWorktree,
restored_path: PathBuf,
restored: thread_worktree_archive::RestoredWorktree,
}
/// Runs the destructive `restore_worktree_via_git` for every archived
@ -795,22 +795,42 @@ async fn run_destructive_restore_pass(
) -> anyhow::Result<Vec<RestoreOutcome>> {
let mut outcomes = Vec::with_capacity(archived_worktrees.len());
for row in archived_worktrees {
let restored_path = thread_worktree_archive::restore_worktree_via_git(
let restored = match thread_worktree_archive::restore_worktree_via_git_deferred_cleanup(
row,
remote_connection,
confirmed_overwrite_paths,
cx,
)
.await
.with_context(|| format!("restoring worktree at {}", row.worktree_path.display()))?;
.with_context(|| format!("restoring worktree at {}", row.worktree_path.display()))
{
Ok(restored) => restored,
Err(error) => {
rollback_restore_outcomes(outcomes, remote_connection, cx).await;
return Err(error);
}
};
outcomes.push(RestoreOutcome {
archived: row.clone(),
restored_path,
restored,
});
}
Ok(outcomes)
}
async fn rollback_restore_outcomes(
outcomes: Vec<RestoreOutcome>,
remote_connection: Option<&RemoteConnectionOptions>,
cx: &mut AsyncApp,
) {
for outcome in outcomes.into_iter().rev() {
outcome
.restored
.rollback(&outcome.archived, remote_connection, cx)
.await;
}
}
/// Commits the DB-visible state changes for a successful restore:
/// rewrites worktree paths on the thread metadata, unarchives the
/// thread, and returns the updated metadata.
@ -845,7 +865,13 @@ async fn commit_db_state_after_restore(
let Some(persist_task) = persist_task else {
return Ok(None);
};
persist_task.await.map(Some)
let metadata = persist_task.await?;
cx.update(|cx| {
store.update(cx, |store, cx| {
store.apply_completed_worktree_restore(metadata.clone(), cx);
});
});
Ok(Some(metadata))
}
#[derive(Clone)]
@ -4107,7 +4133,7 @@ impl Sidebar {
.map(|outcome| {
(
outcome.archived.worktree_path.clone(),
outcome.restored_path.clone(),
outcome.restored.path.clone(),
)
})
.collect();
@ -4120,14 +4146,26 @@ impl Sidebar {
// commit above.
let archived_worktree_ids =
outcomes.iter().map(|outcome| outcome.archived.id).collect();
let updated_metadata = commit_db_state_after_restore(
let updated_metadata = match commit_db_state_after_restore(
&store,
thread_id,
&path_replacements,
archived_worktree_ids,
cx,
)
.await?;
.await
{
Ok(updated_metadata) => updated_metadata,
Err(error) => {
rollback_restore_outcomes(
outcomes,
metadata.remote_connection.as_ref(),
cx,
)
.await;
return Err(error);
}
};
match updated_metadata {
Some(updated_metadata) => {
@ -4170,7 +4208,7 @@ impl Sidebar {
}
let remote_connection = metadata.remote_connection.as_ref();
let cleanups = outcomes.iter().map(|outcome| {
let cleanups = outcomes.into_iter().map(|outcome| {
let mut cx = cx.clone();
async move {
thread_worktree_archive::cleanup_archived_worktree_ref(
@ -4179,6 +4217,7 @@ impl Sidebar {
&mut cx,
)
.await;
outcome.restored.cleanup_backup(&mut cx);
}
});
futures::future::join_all(cleanups).await;
@ -4652,6 +4691,10 @@ impl Sidebar {
except_terminal_id: Option<TerminalId>,
cx: &App,
) -> Vec<thread_worktree_archive::RootPlan> {
if remote_connection.is_some() {
return Vec::new();
}
let workspaces = self.archive_workspaces(cx);
folder_paths
.ordered_paths()