mirror of
https://github.com/zed-industries/zed.git
synced 2026-06-01 03:14:56 +07:00
fs: Poll until watched path is created instead of watching parent (#57152)
Self-Review Checklist: - [x] I've reviewed my own diff for quality, security, and reliability - [x] Unsafe blocks (if any) have justifying comments - [x] The content is consistent with the [UI/UX checklist](https://github.com/zed-industries/zed/blob/main/CONTRIBUTING.md#uiux-checklist) - [ ] Tests cover the new/changed behavior - [x] Performance impact has been considered and is acceptable Closes #ISSUE Part of FR-9. Release Notes: - N/A or Added/Fixed/Improved ...
This commit is contained in:
parent
57a64fc824
commit
a0aa3e842f
2 changed files with 275 additions and 186 deletions
|
|
@ -1,5 +1,7 @@
|
|||
pub mod fs_watcher;
|
||||
|
||||
pub use fs_watcher::requires_poll_watcher;
|
||||
|
||||
use parking_lot::Mutex;
|
||||
use std::ffi::OsString;
|
||||
use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};
|
||||
|
|
@ -71,133 +73,6 @@ pub trait Watcher: Send + Sync {
|
|||
fn remove(&self, path: &Path) -> Result<()>;
|
||||
}
|
||||
|
||||
/// Detect whether a path requires polling instead of native file watching.
|
||||
///
|
||||
/// Returns `true` for filesystem types where inotify/FSEvents/ReadDirectoryChanges
|
||||
/// silently fail to deliver events: 9P (WSL drvfs), NFS, CIFS/SMB, FUSE (sshfs), etc.
|
||||
///
|
||||
/// Can be overridden with the `ZED_FILE_WATCHER_MODE` environment variable:
|
||||
/// - `native` — always use native OS watcher
|
||||
/// - `poll` — always use polling
|
||||
/// - `auto` (default) — auto-detect based on filesystem type
|
||||
pub fn requires_poll_watcher(path: &Path) -> bool {
|
||||
match std::env::var("ZED_FILE_WATCHER_MODE")
|
||||
.as_deref()
|
||||
.unwrap_or("auto")
|
||||
{
|
||||
"native" => return false,
|
||||
"poll" => return true,
|
||||
_ => {}
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
let path = effective_watch_path(path);
|
||||
return detect_requires_poll_watcher_linux(&path);
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
{
|
||||
let _ = path;
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn effective_watch_path(path: &Path) -> PathBuf {
|
||||
if path.exists() {
|
||||
return path.to_path_buf();
|
||||
}
|
||||
|
||||
for ancestor in path.ancestors() {
|
||||
if ancestor.exists() {
|
||||
return ancestor.to_path_buf();
|
||||
}
|
||||
}
|
||||
|
||||
path.to_path_buf()
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
fn detect_requires_poll_watcher_linux(path: &Path) -> bool {
|
||||
use std::ffi::CString;
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
|
||||
// Check filesystem type via statfs
|
||||
let c_path = match CString::new(path.as_os_str().as_bytes()) {
|
||||
Ok(p) => p,
|
||||
Err(_) => return false,
|
||||
};
|
||||
|
||||
let mut stat: libc::statfs = unsafe { std::mem::zeroed() };
|
||||
if unsafe { libc::statfs(c_path.as_ptr(), &mut stat) } != 0 {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Filesystem magic numbers where inotify does not deliver events.
|
||||
// These are defined in linux/magic.h and statfs(2).
|
||||
const V9FS_MAGIC: u64 = 0x0102_1997; // Plan 9 / WSL2 interop (drvfs)
|
||||
const NFS_SUPER_MAGIC: u64 = 0x0000_6969;
|
||||
const CIFS_MAGIC: u64 = 0xFF53_4D42; // CIFS/SMB
|
||||
const SMB_SUPER_MAGIC: u64 = 0x0000_517B;
|
||||
const SMB2_MAGIC: u64 = 0xFE53_4D42;
|
||||
const FUSE_SUPER_MAGIC: u64 = 0x6573_5546; // FUSE (includes sshfs)
|
||||
|
||||
let fs_type = (stat.f_type as u64) & 0xFFFF_FFFF;
|
||||
if fs_type == V9FS_MAGIC
|
||||
|| fs_type == NFS_SUPER_MAGIC
|
||||
|| fs_type == CIFS_MAGIC
|
||||
|| fs_type == SMB_SUPER_MAGIC
|
||||
|| fs_type == SMB2_MAGIC
|
||||
|| fs_type == FUSE_SUPER_MAGIC
|
||||
{
|
||||
log::info!(
|
||||
"Detected network/virtual filesystem (type 0x{:x}) at {}, using poll watcher",
|
||||
fs_type,
|
||||
path.display()
|
||||
);
|
||||
return true;
|
||||
}
|
||||
|
||||
// Also check for WSL + /mnt/<drive>/ pattern as a fallback
|
||||
// in case statfs returns an unexpected type for drvfs
|
||||
if is_wsl_drvfs_path(path) {
|
||||
log::info!(
|
||||
"Detected WSL drvfs mount at {}, using poll watcher",
|
||||
path.display()
|
||||
);
|
||||
return true;
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
fn is_wsl_drvfs_path(path: &Path) -> bool {
|
||||
// Only relevant inside WSL
|
||||
if std::env::var_os("WSL_DISTRO_NAME").is_none() {
|
||||
if let Ok(version) = std::fs::read_to_string("/proc/version") {
|
||||
let v = version.to_lowercase();
|
||||
if !v.contains("microsoft") && !v.contains("wsl") {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Windows drives are mounted at /mnt/c, /mnt/d, etc.
|
||||
let path_str = match path.to_str() {
|
||||
Some(s) => s,
|
||||
None => return false,
|
||||
};
|
||||
if !path_str.starts_with("/mnt/") || path_str.len() < 6 {
|
||||
return false;
|
||||
}
|
||||
let after_mnt = &path_str[5..];
|
||||
after_mnt.starts_with(|c: char| c.is_ascii_alphabetic())
|
||||
&& (after_mnt.len() == 1 || after_mnt.as_bytes()[1] == b'/')
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
|
||||
pub enum PathEventKind {
|
||||
Removed,
|
||||
|
|
@ -1197,35 +1072,17 @@ impl Fs for RealFs {
|
|||
use util::{ResultExt as _, paths::SanitizedPath};
|
||||
let executor = self.executor.clone();
|
||||
|
||||
let use_poll = requires_poll_watcher(path);
|
||||
let watch_path = effective_watch_path(path);
|
||||
|
||||
let (tx, rx) = async_channel::unbounded();
|
||||
let pending_paths: Arc<Mutex<Vec<PathEvent>>> = Default::default();
|
||||
|
||||
let mode = if use_poll {
|
||||
log::info!(
|
||||
"Using poll watcher ({}ms interval) for {}",
|
||||
fs_watcher::poll_interval().as_millis(),
|
||||
path.display()
|
||||
);
|
||||
telemetry::event!("fs_watcher_poll", path = path.display().to_string());
|
||||
fs_watcher::WatcherMode::Poll
|
||||
} else {
|
||||
fs_watcher::WatcherMode::Native
|
||||
};
|
||||
let watcher: Arc<dyn Watcher> = Arc::new(fs_watcher::FsWatcher::new(
|
||||
executor.clone(),
|
||||
tx.clone(),
|
||||
pending_paths.clone(),
|
||||
mode,
|
||||
));
|
||||
|
||||
if let Err(e) = watcher.add(&watch_path) {
|
||||
log::warn!(
|
||||
"Failed to watch {} using {}:\n{e}",
|
||||
path.display(),
|
||||
watch_path.display()
|
||||
);
|
||||
if let Err(e) = watcher.add(path) {
|
||||
log::warn!("Failed to watch {}:\n{e}", path.display());
|
||||
}
|
||||
|
||||
// Check if path is a symlink and follow the target parent
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
use gpui::{BackgroundExecutor, Task};
|
||||
use notify::{Event, EventKind};
|
||||
use parking_lot::Mutex;
|
||||
use std::{
|
||||
|
|
@ -19,29 +20,66 @@ pub enum WatcherMode {
|
|||
}
|
||||
|
||||
pub struct FsWatcher {
|
||||
executor: BackgroundExecutor,
|
||||
tx: async_channel::Sender<()>,
|
||||
pending_path_events: Arc<Mutex<Vec<PathEvent>>>,
|
||||
registrations: Mutex<BTreeMap<Arc<std::path::Path>, WatcherRegistrationId>>,
|
||||
registrations: Arc<Mutex<BTreeMap<Arc<std::path::Path>, FsWatcherRegistration>>>,
|
||||
pending_registrations: Arc<Mutex<HashMap<Arc<std::path::Path>, Task<()>>>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct FsWatcherRegistration {
|
||||
id: WatcherRegistrationId,
|
||||
mode: WatcherMode,
|
||||
}
|
||||
|
||||
impl FsWatcher {
|
||||
pub fn new(
|
||||
executor: BackgroundExecutor,
|
||||
tx: async_channel::Sender<()>,
|
||||
pending_path_events: Arc<Mutex<Vec<PathEvent>>>,
|
||||
mode: WatcherMode,
|
||||
) -> Self {
|
||||
Self {
|
||||
executor,
|
||||
tx,
|
||||
pending_path_events,
|
||||
registrations: Default::default(),
|
||||
mode,
|
||||
pending_registrations: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn add_existing_path(&self, path: Arc<Path>) -> anyhow::Result<()> {
|
||||
let registration_path = path.clone();
|
||||
let registration =
|
||||
register_existing_path(path, self.tx.clone(), self.pending_path_events.clone())?;
|
||||
self.registrations
|
||||
.lock()
|
||||
.insert(registration_path, registration);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_pending_path(&self, path: Arc<Path>) {
|
||||
let mut pending_registrations = self.pending_registrations.lock();
|
||||
if pending_registrations.contains_key(path.as_ref()) {
|
||||
return;
|
||||
}
|
||||
|
||||
let task = self.executor.spawn(poll_path_until_created(
|
||||
self.executor.clone(),
|
||||
path.clone(),
|
||||
self.tx.clone(),
|
||||
self.pending_path_events.clone(),
|
||||
self.registrations.clone(),
|
||||
self.pending_registrations.clone(),
|
||||
));
|
||||
pending_registrations.insert(path, task);
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for FsWatcher {
|
||||
fn drop(&mut self) {
|
||||
self.pending_registrations.lock().clear();
|
||||
|
||||
let mut registrations = BTreeMap::new();
|
||||
{
|
||||
let old = &mut self.registrations.lock();
|
||||
|
|
@ -50,7 +88,7 @@ impl Drop for FsWatcher {
|
|||
|
||||
let global_watcher = global_watcher();
|
||||
for (_, registration) in registrations {
|
||||
global_watcher.remove(registration);
|
||||
global_watcher.remove(registration.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -58,59 +96,252 @@ impl Drop for FsWatcher {
|
|||
impl Watcher for FsWatcher {
|
||||
fn add(&self, path: &std::path::Path) -> anyhow::Result<()> {
|
||||
log::trace!("watcher add: {path:?}");
|
||||
let tx = self.tx.clone();
|
||||
let pending_path_events = self.pending_path_events.clone();
|
||||
|
||||
if (self.mode == WatcherMode::Poll || cfg!(any(target_os = "windows", target_os = "macos")))
|
||||
&& let Some((watched_path, _)) = self
|
||||
.registrations
|
||||
.lock()
|
||||
.range::<std::path::Path, _>((
|
||||
std::ops::Bound::Unbounded,
|
||||
std::ops::Bound::Included(path),
|
||||
))
|
||||
.next_back()
|
||||
&& path.starts_with(watched_path.as_ref())
|
||||
{
|
||||
log::trace!(
|
||||
"path to watch is covered by existing registration: {path:?}, {watched_path:?}"
|
||||
);
|
||||
let (path_is_covered_by_recursive_registration, path_is_already_watched) = {
|
||||
let registrations = self.registrations.lock();
|
||||
(
|
||||
path.ancestors().skip(1).any(|ancestor| {
|
||||
registrations.get(ancestor).is_some_and(|registration| {
|
||||
registration.mode == WatcherMode::Poll
|
||||
|| cfg!(any(target_os = "windows", target_os = "macos"))
|
||||
})
|
||||
}),
|
||||
registrations.contains_key(path),
|
||||
)
|
||||
};
|
||||
|
||||
if path_is_covered_by_recursive_registration {
|
||||
log::trace!("path to watch is covered by existing registration: {path:?}");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if self.registrations.lock().contains_key(path) {
|
||||
if path_is_already_watched {
|
||||
log::trace!("path to watch is already watched: {path:?}");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let root_path = SanitizedPath::new_arc(path);
|
||||
if self.pending_registrations.lock().contains_key(path) {
|
||||
log::trace!("path to watch is already pending: {path:?}");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let path: Arc<std::path::Path> = path.into();
|
||||
if std::fs::symlink_metadata(path.as_ref()).is_err() {
|
||||
self.add_pending_path(path);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let registration_path = path.clone();
|
||||
let registration_id =
|
||||
global_watcher().add(path.clone(), self.mode, move |event: ¬ify::Event| {
|
||||
log::trace!("watcher received event: {event:?}");
|
||||
push_notify_event(&tx, &pending_path_events, &root_path, path.as_ref(), event);
|
||||
})?;
|
||||
|
||||
self.registrations
|
||||
.lock()
|
||||
.insert(registration_path, registration_id);
|
||||
|
||||
Ok(())
|
||||
self.add_existing_path(path)
|
||||
}
|
||||
|
||||
fn remove(&self, path: &std::path::Path) -> anyhow::Result<()> {
|
||||
log::trace!("remove watched path: {path:?}");
|
||||
self.pending_registrations.lock().remove(path);
|
||||
|
||||
let Some(registration) = self.registrations.lock().remove(path) else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
global_watcher().remove(registration);
|
||||
global_watcher().remove(registration.id);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Detect whether a path requires polling instead of native file watching.
|
||||
///
|
||||
/// Returns `true` for filesystem types where inotify/FSEvents/ReadDirectoryChanges
|
||||
/// silently fail to deliver events: 9P (WSL drvfs), NFS, CIFS/SMB, FUSE (sshfs), etc.
|
||||
///
|
||||
/// Can be overridden with the `ZED_FILE_WATCHER_MODE` environment variable:
|
||||
/// - `native` — always use native OS watcher
|
||||
/// - `poll` — always use polling
|
||||
/// - `auto` (default) — auto-detect based on filesystem type
|
||||
pub fn requires_poll_watcher(path: &Path) -> bool {
|
||||
match std::env::var("ZED_FILE_WATCHER_MODE")
|
||||
.as_deref()
|
||||
.unwrap_or("auto")
|
||||
{
|
||||
"native" => return false,
|
||||
"poll" => return true,
|
||||
_ => {}
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
return detect_requires_poll_watcher_linux(path);
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
{
|
||||
let _ = path;
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn register_existing_path(
|
||||
path: Arc<Path>,
|
||||
tx: async_channel::Sender<()>,
|
||||
pending_path_events: Arc<Mutex<Vec<PathEvent>>>,
|
||||
) -> anyhow::Result<FsWatcherRegistration> {
|
||||
let mode = if requires_poll_watcher(path.as_ref()) {
|
||||
log::info!(
|
||||
"Using poll watcher ({}ms interval) for {}",
|
||||
poll_interval().as_millis(),
|
||||
path.display()
|
||||
);
|
||||
telemetry::event!("fs_watcher_poll", path = path.display().to_string());
|
||||
WatcherMode::Poll
|
||||
} else {
|
||||
WatcherMode::Native
|
||||
};
|
||||
let root_path = SanitizedPath::new_arc(path.as_ref());
|
||||
let path_for_callback = path.clone();
|
||||
let registration_id = global_watcher().add(path, mode, move |event: ¬ify::Event| {
|
||||
log::trace!("watcher received event: {event:?}");
|
||||
push_notify_event(
|
||||
&tx,
|
||||
&pending_path_events,
|
||||
&root_path,
|
||||
path_for_callback.as_ref(),
|
||||
event,
|
||||
);
|
||||
})?;
|
||||
Ok(FsWatcherRegistration {
|
||||
id: registration_id,
|
||||
mode,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
fn detect_requires_poll_watcher_linux(path: &Path) -> bool {
|
||||
use std::ffi::CString;
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
|
||||
let c_path = match CString::new(path.as_os_str().as_bytes()) {
|
||||
Ok(p) => p,
|
||||
Err(_) => return false,
|
||||
};
|
||||
|
||||
let mut stat: libc::statfs = unsafe { std::mem::zeroed() };
|
||||
if unsafe { libc::statfs(c_path.as_ptr(), &mut stat) } != 0 {
|
||||
return false;
|
||||
}
|
||||
|
||||
const V9FS_MAGIC: u64 = 0x0102_1997;
|
||||
const NFS_SUPER_MAGIC: u64 = 0x0000_6969;
|
||||
const CIFS_MAGIC: u64 = 0xFF53_4D42;
|
||||
const SMB_SUPER_MAGIC: u64 = 0x0000_517B;
|
||||
const SMB2_MAGIC: u64 = 0xFE53_4D42;
|
||||
const FUSE_SUPER_MAGIC: u64 = 0x6573_5546;
|
||||
|
||||
let fs_type = (stat.f_type as u64) & 0xFFFF_FFFF;
|
||||
if fs_type == V9FS_MAGIC
|
||||
|| fs_type == NFS_SUPER_MAGIC
|
||||
|| fs_type == CIFS_MAGIC
|
||||
|| fs_type == SMB_SUPER_MAGIC
|
||||
|| fs_type == SMB2_MAGIC
|
||||
|| fs_type == FUSE_SUPER_MAGIC
|
||||
{
|
||||
log::info!(
|
||||
"Detected network/virtual filesystem (type 0x{:x}) at {}, using poll watcher",
|
||||
fs_type,
|
||||
path.display()
|
||||
);
|
||||
return true;
|
||||
}
|
||||
|
||||
if is_wsl_drvfs_path(path) {
|
||||
log::info!(
|
||||
"Detected WSL drvfs mount at {}, using poll watcher",
|
||||
path.display()
|
||||
);
|
||||
return true;
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
fn is_wsl_drvfs_path(path: &Path) -> bool {
|
||||
if std::env::var_os("WSL_DISTRO_NAME").is_none() {
|
||||
if let Ok(version) = std::fs::read_to_string("/proc/version") {
|
||||
let version = version.to_lowercase();
|
||||
if !version.contains("microsoft") && !version.contains("wsl") {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
let Some(path) = path.to_str() else {
|
||||
return false;
|
||||
};
|
||||
if !path.starts_with("/mnt/") || path.len() < 6 {
|
||||
return false;
|
||||
}
|
||||
let after_mnt = &path[5..];
|
||||
after_mnt.starts_with(|c: char| c.is_ascii_alphabetic())
|
||||
&& (after_mnt.len() == 1 || after_mnt.as_bytes()[1] == b'/')
|
||||
}
|
||||
|
||||
async fn poll_path_until_created(
|
||||
executor: BackgroundExecutor,
|
||||
path: Arc<Path>,
|
||||
tx: async_channel::Sender<()>,
|
||||
pending_path_events: Arc<Mutex<Vec<PathEvent>>>,
|
||||
registrations: Arc<Mutex<BTreeMap<Arc<Path>, FsWatcherRegistration>>>,
|
||||
pending_registrations: Arc<Mutex<HashMap<Arc<Path>, Task<()>>>>,
|
||||
) {
|
||||
loop {
|
||||
executor.timer(poll_interval()).await;
|
||||
|
||||
if !pending_registrations.lock().contains_key(path.as_ref()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if smol::fs::symlink_metadata(path.as_ref()).await.is_err() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if registrations.lock().contains_key(path.as_ref()) {
|
||||
pending_registrations.lock().remove(path.as_ref());
|
||||
return;
|
||||
}
|
||||
|
||||
match register_existing_path(path.clone(), tx.clone(), pending_path_events.clone()) {
|
||||
Ok(registration) => {
|
||||
{
|
||||
let mut pending_registrations = pending_registrations.lock();
|
||||
if pending_registrations.remove(path.as_ref()).is_none() {
|
||||
global_watcher().remove(registration.id);
|
||||
return;
|
||||
}
|
||||
registrations.lock().insert(path.clone(), registration);
|
||||
}
|
||||
enqueue_path_events(
|
||||
&tx,
|
||||
&pending_path_events,
|
||||
vec![
|
||||
PathEvent {
|
||||
path: path.to_path_buf(),
|
||||
kind: Some(PathEventKind::Created),
|
||||
},
|
||||
PathEvent {
|
||||
path: path.to_path_buf(),
|
||||
kind: Some(PathEventKind::Rescan),
|
||||
},
|
||||
],
|
||||
);
|
||||
return;
|
||||
}
|
||||
Err(error) => {
|
||||
log::warn!("failed to watch newly-created path {path:?}: {error}; retrying");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn enqueue_path_events(
|
||||
tx: &smol::channel::Sender<()>,
|
||||
pending_path_events: &Arc<Mutex<Vec<PathEvent>>>,
|
||||
|
|
@ -417,9 +648,10 @@ fn path_already_covered(
|
|||
mode: WatcherMode,
|
||||
) -> bool {
|
||||
(mode == WatcherMode::Poll || cfg!(any(target_os = "windows", target_os = "macos")))
|
||||
&& path_registrations
|
||||
.keys()
|
||||
.any(|existing| path.starts_with(existing.as_ref()) && path != existing.as_ref())
|
||||
&& path
|
||||
.ancestors()
|
||||
.skip(1)
|
||||
.any(|ancestor| path_registrations.contains_key(ancestor))
|
||||
}
|
||||
|
||||
static POLL_INTERVAL: LazyLock<Duration> = LazyLock::new(|| {
|
||||
|
|
|
|||
Loading…
Reference in a new issue