mirror of
https://github.com/zed-industries/zed.git
synced 2026-06-01 03:14:56 +07:00
Add cooldown when file watch limit is reached (#57720)
Stop trying to add new watches for 5 seconds after receiving the "OS file watch limit reached" error. This was flooding the logs and many pointless syscalls. Related to #57422, #57042, FR-18 Release Notes: - Improved file watcher behavior when the OS file watch limit is reached.
This commit is contained in:
parent
80a4042ff5
commit
32d0737318
1 changed files with 135 additions and 31 deletions
|
|
@ -6,7 +6,7 @@ use std::{
|
|||
ops::DerefMut,
|
||||
path::Path,
|
||||
sync::{Arc, LazyLock, OnceLock},
|
||||
time::Duration,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use util::{ResultExt, paths::SanitizedPath};
|
||||
|
||||
|
|
@ -50,11 +50,13 @@ impl FsWatcher {
|
|||
|
||||
fn add_existing_path(&self, path: Arc<Path>) -> anyhow::Result<()> {
|
||||
let registration_path = path.clone();
|
||||
let registration =
|
||||
register_existing_path(path, self.tx.clone(), self.pending_path_events.clone())?;
|
||||
self.registrations
|
||||
.lock()
|
||||
.insert(registration_path, registration);
|
||||
if let Some(registration) =
|
||||
register_existing_path(path, self.tx.clone(), self.pending_path_events.clone())?
|
||||
{
|
||||
self.registrations
|
||||
.lock()
|
||||
.insert(registration_path, registration);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -182,7 +184,7 @@ fn register_existing_path(
|
|||
path: Arc<Path>,
|
||||
tx: async_channel::Sender<()>,
|
||||
pending_path_events: Arc<Mutex<Vec<PathEvent>>>,
|
||||
) -> anyhow::Result<FsWatcherRegistration> {
|
||||
) -> anyhow::Result<Option<FsWatcherRegistration>> {
|
||||
let mode = if requires_poll_watcher(path.as_ref()) {
|
||||
log::info!(
|
||||
"Using poll watcher ({}ms interval) for {}",
|
||||
|
|
@ -196,20 +198,24 @@ fn register_existing_path(
|
|||
};
|
||||
let root_path = SanitizedPath::new_arc(path.as_ref());
|
||||
let path_for_callback = path.clone();
|
||||
let registration_id = global_watcher().add(path, mode, move |event: ¬ify::Event| {
|
||||
log::trace!("watcher received event: {event:?}");
|
||||
push_notify_event(
|
||||
&tx,
|
||||
&pending_path_events,
|
||||
&root_path,
|
||||
path_for_callback.as_ref(),
|
||||
event,
|
||||
);
|
||||
})?;
|
||||
Ok(FsWatcherRegistration {
|
||||
let Some(registration_id) =
|
||||
global_watcher().add(path, mode, move |event: ¬ify::Event| {
|
||||
log::trace!("watcher received event: {event:?}");
|
||||
push_notify_event(
|
||||
&tx,
|
||||
&pending_path_events,
|
||||
&root_path,
|
||||
path_for_callback.as_ref(),
|
||||
event,
|
||||
);
|
||||
})?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
Ok(Some(FsWatcherRegistration {
|
||||
id: registration_id,
|
||||
mode,
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
|
|
@ -345,7 +351,7 @@ async fn poll_path_until_created(
|
|||
}
|
||||
|
||||
match register_existing_path(path.clone(), tx.clone(), pending_path_events.clone()) {
|
||||
Ok(registration) => {
|
||||
Ok(Some(registration)) => {
|
||||
{
|
||||
let mut pending_registrations = pending_registrations.lock();
|
||||
if pending_registrations.remove(path.as_ref()).is_none() {
|
||||
|
|
@ -370,6 +376,7 @@ async fn poll_path_until_created(
|
|||
);
|
||||
return;
|
||||
}
|
||||
Ok(None) => {}
|
||||
Err(error) => {
|
||||
log::warn!("failed to watch newly-created path {path:?}: {error}; retrying");
|
||||
}
|
||||
|
|
@ -501,10 +508,16 @@ struct WatcherState {
|
|||
watchers: HashMap<WatcherRegistrationId, WatcherRegistrationState>,
|
||||
native_path_registrations: HashMap<Arc<std::path::Path>, PathRegistrationState>,
|
||||
poll_path_registrations: HashMap<Arc<std::path::Path>, PathRegistrationState>,
|
||||
cooldown_until: Option<Instant>,
|
||||
last_registration: WatcherRegistrationId,
|
||||
}
|
||||
|
||||
impl WatcherState {
|
||||
fn is_native_watch_limit_cooldown_active(&self) -> bool {
|
||||
self.cooldown_until
|
||||
.is_some_and(|cooldown_until| cooldown_until > Instant::now())
|
||||
}
|
||||
|
||||
fn path_registrations(
|
||||
&mut self,
|
||||
mode: WatcherMode,
|
||||
|
|
@ -565,15 +578,30 @@ impl GlobalWatcher {
|
|||
path: Arc<std::path::Path>,
|
||||
mode: WatcherMode,
|
||||
cb: impl Fn(¬ify::Event) + Send + Sync + 'static,
|
||||
) -> anyhow::Result<WatcherRegistrationId> {
|
||||
) -> anyhow::Result<Option<WatcherRegistrationId>> {
|
||||
let mut state = self.state.lock();
|
||||
let registrations_for_mode = state.path_registrations(mode);
|
||||
let path_already_covered =
|
||||
path_already_covered(path.as_ref(), registrations_for_mode, mode);
|
||||
let (path_already_covered, path_already_registered) = {
|
||||
let registrations_for_mode = state.path_registrations(mode);
|
||||
(
|
||||
path_already_covered(path.as_ref(), registrations_for_mode, mode),
|
||||
registrations_for_mode.contains_key(&path),
|
||||
)
|
||||
};
|
||||
|
||||
if !path_already_covered && !path_already_registered {
|
||||
if mode == WatcherMode::Native && state.is_native_watch_limit_cooldown_active() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if !path_already_covered && !registrations_for_mode.contains_key(&path) {
|
||||
drop(state);
|
||||
self.watch(&path, mode)?;
|
||||
match self.watch(&path, mode) {
|
||||
Ok(()) => {}
|
||||
Err(error) if mode == WatcherMode::Native && is_max_files_watch_error(&error) => {
|
||||
self.start_native_watch_limit_cooldown(&path);
|
||||
return Ok(None);
|
||||
}
|
||||
Err(error) => return Err(error),
|
||||
}
|
||||
state = self.state.lock();
|
||||
}
|
||||
|
||||
|
|
@ -595,7 +623,20 @@ impl GlobalWatcher {
|
|||
has_os_watcher: !path_already_covered,
|
||||
});
|
||||
|
||||
Ok(id)
|
||||
Ok(Some(id))
|
||||
}
|
||||
|
||||
fn start_native_watch_limit_cooldown(&self, path: &Path) {
|
||||
let mut state = self.state.lock();
|
||||
let now = Instant::now();
|
||||
let should_log = !state.is_native_watch_limit_cooldown_active();
|
||||
state.cooldown_until = Some(now + *NATIVE_WATCH_LIMIT_COOLDOWN);
|
||||
if should_log {
|
||||
log::warn!(
|
||||
"OS file watch limit reached while watching {path:?}; skipping new native file watcher registrations for {} seconds",
|
||||
NATIVE_WATCH_LIMIT_COOLDOWN.as_secs()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove(&self, id: WatcherRegistrationId) {
|
||||
|
|
@ -688,6 +729,12 @@ fn path_already_covered(
|
|||
.any(|ancestor| path_registrations.contains_key(ancestor))
|
||||
}
|
||||
|
||||
fn is_max_files_watch_error(error: &anyhow::Error) -> bool {
|
||||
error
|
||||
.downcast_ref::<notify::Error>()
|
||||
.is_some_and(|error| matches!(&error.kind, notify::ErrorKind::MaxFilesWatch))
|
||||
}
|
||||
|
||||
static POLL_INTERVAL: LazyLock<Duration> = LazyLock::new(|| {
|
||||
let poll_ms: u64 = std::env::var("ZED_FILE_WATCHER_POLL_MS")
|
||||
.ok()
|
||||
|
|
@ -697,6 +744,15 @@ static POLL_INTERVAL: LazyLock<Duration> = LazyLock::new(|| {
|
|||
Duration::from_millis(poll_ms)
|
||||
});
|
||||
|
||||
static NATIVE_WATCH_LIMIT_COOLDOWN: LazyLock<Duration> = LazyLock::new(|| {
|
||||
let cooldown_seconds: u64 = std::env::var("ZED_NATIVE_WATCH_LIMIT_COOLDOWN_SECONDS")
|
||||
.ok()
|
||||
.and_then(|value| value.parse().ok())
|
||||
.unwrap_or(5)
|
||||
.clamp(0, 300);
|
||||
Duration::from_secs(cooldown_seconds)
|
||||
});
|
||||
|
||||
pub fn poll_interval() -> Duration {
|
||||
*POLL_INTERVAL
|
||||
}
|
||||
|
|
@ -709,6 +765,7 @@ fn global_watcher() -> &'static GlobalWatcher {
|
|||
watchers: Default::default(),
|
||||
native_path_registrations: Default::default(),
|
||||
poll_path_registrations: Default::default(),
|
||||
cooldown_until: None,
|
||||
last_registration: Default::default(),
|
||||
}),
|
||||
native_watcher: Mutex::new(None),
|
||||
|
|
@ -789,6 +846,7 @@ mod tests {
|
|||
watched_paths: HashSet<PathBuf>,
|
||||
watch_calls: Vec<PathBuf>,
|
||||
unwatch_calls: Vec<PathBuf>,
|
||||
fail_with_watch_limit: bool,
|
||||
}
|
||||
|
||||
struct SharedFakeWatchBackend(Arc<Mutex<FakeWatchBackend>>);
|
||||
|
|
@ -798,6 +856,9 @@ mod tests {
|
|||
let path = path.to_path_buf();
|
||||
let mut backend = self.0.lock();
|
||||
backend.watch_calls.push(path.clone());
|
||||
if backend.fail_with_watch_limit {
|
||||
return Err(notify::Error::new(notify::ErrorKind::MaxFilesWatch));
|
||||
}
|
||||
backend.watched_paths.insert(path);
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -815,15 +876,31 @@ mod tests {
|
|||
}
|
||||
|
||||
fn test_watcher(poll_watcher: Arc<Mutex<FakeWatchBackend>>) -> GlobalWatcher {
|
||||
test_watcher_with_backends(None, Some(poll_watcher))
|
||||
}
|
||||
|
||||
fn test_watcher_with_backends(
|
||||
native_watcher: Option<Arc<Mutex<FakeWatchBackend>>>,
|
||||
poll_watcher: Option<Arc<Mutex<FakeWatchBackend>>>,
|
||||
) -> GlobalWatcher {
|
||||
GlobalWatcher {
|
||||
state: Mutex::new(WatcherState {
|
||||
watchers: Default::default(),
|
||||
native_path_registrations: Default::default(),
|
||||
poll_path_registrations: Default::default(),
|
||||
cooldown_until: None,
|
||||
last_registration: Default::default(),
|
||||
}),
|
||||
native_watcher: Mutex::new(None),
|
||||
poll_watcher: Mutex::new(Some(Box::new(SharedFakeWatchBackend(poll_watcher)))),
|
||||
native_watcher: Mutex::new(
|
||||
native_watcher.map(|watcher| {
|
||||
Box::new(SharedFakeWatchBackend(watcher)) as Box<dyn WatchBackend>
|
||||
}),
|
||||
),
|
||||
poll_watcher: Mutex::new(
|
||||
poll_watcher.map(|watcher| {
|
||||
Box::new(SharedFakeWatchBackend(watcher)) as Box<dyn WatchBackend>
|
||||
}),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -844,10 +921,12 @@ mod tests {
|
|||
|
||||
let parent_registration = watcher
|
||||
.add(parent.as_ref().into(), WatcherMode::Poll, |_| {})
|
||||
.expect("add parent watch");
|
||||
.expect("add parent watch")
|
||||
.expect("parent watch registered");
|
||||
let child_registration = watcher
|
||||
.add(child.as_ref().into(), WatcherMode::Poll, |_| {})
|
||||
.expect("add covered child watch");
|
||||
.expect("add covered child watch")
|
||||
.expect("child watch registered");
|
||||
|
||||
watcher.remove(parent_registration);
|
||||
watcher.remove(child_registration);
|
||||
|
|
@ -857,6 +936,31 @@ mod tests {
|
|||
assert_eq!(backend.unwatch_calls, &[parent.to_path_buf()]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn native_watch_limit_cools_down_subsequent_native_registrations() {
|
||||
let native_backend = Arc::new(Mutex::new(FakeWatchBackend {
|
||||
fail_with_watch_limit: true,
|
||||
..Default::default()
|
||||
}));
|
||||
let poll_backend = Arc::new(Mutex::new(FakeWatchBackend::default()));
|
||||
let watcher = test_watcher_with_backends(Some(native_backend.clone()), Some(poll_backend));
|
||||
let first_path = Arc::<Path>::from(Path::new("/repo/first"));
|
||||
let second_path = Arc::<Path>::from(Path::new("/repo/second"));
|
||||
|
||||
let first_registration = watcher
|
||||
.add(first_path.clone(), WatcherMode::Native, |_| {})
|
||||
.expect("native watch limit is handled");
|
||||
let second_registration = watcher
|
||||
.add(second_path, WatcherMode::Native, |_| {})
|
||||
.expect("native watch limit backoff is handled");
|
||||
|
||||
assert!(first_registration.is_none());
|
||||
assert!(second_registration.is_none());
|
||||
|
||||
let native_backend = native_backend.lock();
|
||||
assert_eq!(native_backend.watch_calls, &[first_path.to_path_buf()]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_coalesce_pending_rescans() {
|
||||
let test_cases = [
|
||||
|
|
|
|||
Loading…
Reference in a new issue