use crate::{App, PlatformDispatcher, PlatformScheduler}; use futures::channel::mpsc; use futures::prelude::*; use gpui_util::{TryFutureExt, TryFutureExtBacktrace}; use scheduler::Instant; use scheduler::Scheduler; use std::{future::Future, marker::PhantomData, mem, pin::Pin, rc::Rc, sync::Arc, time::Duration}; pub use scheduler::{FallibleTask, LocalExecutor as SchedulerLocalExecutor, Priority, Task}; /// A pointer to the executor that is currently running, /// for spawning background tasks. #[derive(Clone)] pub struct BackgroundExecutor { inner: scheduler::BackgroundExecutor, dispatcher: Arc, } /// A pointer to the executor that is currently running, /// for spawning tasks on the main thread. #[derive(Clone)] pub struct ForegroundExecutor { inner: scheduler::LocalExecutor, dispatcher: Arc, not_send: PhantomData>, } /// Extension trait for `Task>` that adds `detach_and_log_err` with an `&App` context. /// /// This trait is automatically implemented for all `Task>` types. pub trait TaskExt { /// Run the task to completion in the background and log any errors that occur. fn detach_and_log_err(self, cx: &App); /// Like [`Self::detach_and_log_err`], but uses `{:?}` formatting on failure so `anyhow::Error` /// values emit their full backtrace. Prefer `detach_and_log_err` unless a backtrace is wanted. fn detach_and_log_err_with_backtrace(self, cx: &App); } impl TaskExt for Task> where T: 'static, E: 'static + std::fmt::Display + std::fmt::Debug, { #[track_caller] fn detach_and_log_err(self, cx: &App) { let location = core::panic::Location::caller(); cx.foreground_executor() .spawn(self.log_tracked_err(*location)) .detach(); } #[track_caller] fn detach_and_log_err_with_backtrace(self, cx: &App) { let location = *core::panic::Location::caller(); cx.foreground_executor() .spawn(self.log_tracked_err_with_backtrace(location)) .detach(); } } impl BackgroundExecutor { /// Creates a new BackgroundExecutor from the given PlatformDispatcher. pub fn new(dispatcher: Arc) -> Self { #[cfg(any(test, feature = "test-support"))] let scheduler: Arc = if let Some(test_dispatcher) = dispatcher.as_test() { test_dispatcher.scheduler().clone() } else { Arc::new(PlatformScheduler::new(dispatcher.clone())) }; #[cfg(not(any(test, feature = "test-support")))] let scheduler: Arc = Arc::new(PlatformScheduler::new(dispatcher.clone())); Self { inner: scheduler::BackgroundExecutor::new(scheduler), dispatcher, } } /// Returns the underlying scheduler::BackgroundExecutor. /// /// This is used by Ex to pass the executor to thread/worktree code. pub fn scheduler_executor(&self) -> scheduler::BackgroundExecutor { self.inner.clone() } /// Enqueues the given future to be run to completion on a background thread. #[track_caller] pub fn spawn(&self, future: impl Future + Send + 'static) -> Task where R: Send + 'static, { self.spawn_with_priority(Priority::default(), future.boxed()) } /// Enqueues the given future to be run to completion on a background thread with the given priority. /// /// When `Priority::RealtimeAudio` is used, the task runs on a dedicated thread with /// realtime scheduling priority, suitable for audio processing. #[track_caller] pub fn spawn_with_priority( &self, priority: Priority, future: impl Future + Send + 'static, ) -> Task where R: Send + 'static, { if priority == Priority::RealtimeAudio { self.inner.spawn_realtime(future) } else { self.inner.spawn_with_priority(priority, future) } } /// Scoped lets you start a number of tasks and waits /// for all of them to complete before returning. pub async fn scoped<'scope, F>(&self, scheduler: F) where F: FnOnce(&mut Scope<'scope>), { let mut scope = Scope::new(self.clone(), Priority::default()); (scheduler)(&mut scope); let spawned = mem::take(&mut scope.futures) .into_iter() .map(|f| self.spawn_with_priority(scope.priority, f)) .collect::>(); for task in spawned { task.await; } } /// Scoped lets you start a number of tasks and waits /// for all of them to complete before returning. pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F) where F: FnOnce(&mut Scope<'scope>), { let mut scope = Scope::new(self.clone(), priority); (scheduler)(&mut scope); let spawned = mem::take(&mut scope.futures) .into_iter() .map(|f| self.spawn_with_priority(scope.priority, f)) .collect::>(); for task in spawned { task.await; } } /// Get the current time. /// /// Calling this instead of `std::time::Instant::now` allows the use /// of fake timers in tests. pub fn now(&self) -> Instant { self.inner.scheduler().clock().now() } /// Returns a task that will complete after the given duration. /// Depending on other concurrent tasks the elapsed duration may be longer /// than requested. #[track_caller] pub fn timer(&self, duration: Duration) -> Task<()> { if duration.is_zero() { return Task::ready(()); } self.spawn(self.inner.scheduler().timer(duration)) } /// In tests, run an arbitrary number of tasks (determined by the SEED environment variable) #[cfg(any(test, feature = "test-support"))] pub fn simulate_random_delay(&self) -> impl Future + use<> { self.dispatcher.as_test().unwrap().simulate_random_delay() } /// In tests, move time forward. This does not run any tasks, but does make `timer`s ready. #[cfg(any(test, feature = "test-support"))] pub fn advance_clock(&self, duration: Duration) { self.dispatcher.as_test().unwrap().advance_clock(duration) } /// In tests, run one task. #[cfg(any(test, feature = "test-support"))] pub fn tick(&self) -> bool { self.dispatcher.as_test().unwrap().scheduler().tick() } /// In tests, run tasks until the scheduler would park. /// /// Under the scheduler-backed test dispatcher, `tick()` will not advance the clock, so a pending /// timer can keep `has_pending_tasks()` true even after all currently-runnable tasks have been /// drained. To preserve the historical semantics that tests relied on (drain all work that can /// make progress), we advance the clock to the next timer when no runnable tasks remain. #[cfg(any(test, feature = "test-support"))] pub fn run_until_parked(&self) { let scheduler = self.dispatcher.as_test().unwrap().scheduler(); scheduler.run(); } /// In tests, prevents `run_until_parked` from panicking if there are outstanding tasks. #[cfg(any(test, feature = "test-support"))] pub fn allow_parking(&self) { self.dispatcher .as_test() .unwrap() .scheduler() .allow_parking(); if std::env::var("GPUI_RUN_UNTIL_PARKED_LOG").ok().as_deref() == Some("1") { log::warn!("[gpui::executor] allow_parking: enabled"); } } /// Sets the range of ticks to run before timing out in block_on. #[cfg(any(test, feature = "test-support"))] pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive) { self.dispatcher .as_test() .unwrap() .scheduler() .set_timeout_ticks(range); } /// Undoes the effect of [`Self::allow_parking`]. #[cfg(any(test, feature = "test-support"))] pub fn forbid_parking(&self) { self.dispatcher .as_test() .unwrap() .scheduler() .forbid_parking(); } /// In tests, returns the rng used by the dispatcher. #[cfg(any(test, feature = "test-support"))] pub fn rng(&self) -> scheduler::SharedRng { self.dispatcher.as_test().unwrap().scheduler().rng() } /// How many CPUs are available to the dispatcher. pub fn num_cpus(&self) -> usize { #[cfg(any(test, feature = "test-support"))] if let Some(test) = self.dispatcher.as_test() { return test.num_cpus_override().unwrap_or(4); } num_cpus::get() } /// Override the number of CPUs reported by this executor in tests. /// Panics if not called on a test executor. #[cfg(any(test, feature = "test-support"))] pub fn set_num_cpus(&self, count: usize) { self.dispatcher .as_test() .expect("set_num_cpus can only be called on a test executor") .set_num_cpus(count); } /// Whether we're on the main thread. pub fn is_main_thread(&self) -> bool { self.dispatcher.is_main_thread() } #[doc(hidden)] pub fn dispatcher(&self) -> &Arc { &self.dispatcher } } impl ForegroundExecutor { /// Creates a new ForegroundExecutor from the given PlatformDispatcher. pub fn new(dispatcher: Arc) -> Self { #[cfg(any(test, feature = "test-support"))] let (scheduler, session_id): (Arc, _) = if let Some(test_dispatcher) = dispatcher.as_test() { ( test_dispatcher.scheduler().clone(), test_dispatcher.session_id(), ) } else { let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone())); let inner = platform_scheduler.foreground_executor(); return Self { inner, dispatcher, not_send: PhantomData, }; }; #[cfg(not(any(test, feature = "test-support")))] let inner = { let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone())); platform_scheduler.foreground_executor() }; #[cfg(any(test, feature = "test-support"))] let inner = { let scheduler_for_dispatch = Arc::downgrade(&scheduler); scheduler::LocalExecutor::new(session_id, scheduler, move |runnable| { if let Some(scheduler) = scheduler_for_dispatch.upgrade() { scheduler.schedule_local(session_id, runnable); } }) }; Self { inner, dispatcher, not_send: PhantomData, } } /// Enqueues the given Task to run on the main thread. #[track_caller] pub fn spawn(&self, future: impl Future + 'static) -> Task where R: 'static, { self.inner.spawn(future.boxed_local()) } /// Enqueues the given Task to run on the main thread with the given priority. #[track_caller] pub fn spawn_with_priority( &self, _priority: Priority, future: impl Future + 'static, ) -> Task where R: 'static, { // Priority is ignored for foreground tasks - they run in order on the main thread self.inner.spawn(future) } /// Used by the test harness to run an async test in a synchronous fashion. #[cfg(any(test, feature = "test-support"))] #[track_caller] pub fn block_test(&self, future: impl Future) -> R { use std::cell::Cell; let scheduler = self.inner.scheduler(); let output = Cell::new(None); let future = async { output.set(Some(future.await)); }; let mut future = std::pin::pin!(future); // In async GPUI tests, we must allow foreground tasks scheduled by the test itself // (which are associated with the test session) to make progress while we block. // Otherwise, awaiting futures that depend on same-session foreground work can deadlock. scheduler.block(None, future.as_mut(), None); output.take().expect("block_test future did not complete") } /// Block the current thread until the given future resolves. /// Consider using `block_with_timeout` instead. pub fn block_on(&self, future: impl Future) -> R { self.inner.block_on(future) } /// Block the current thread until the given future resolves or the timeout elapses. pub fn block_with_timeout>( &self, duration: Duration, future: Fut, ) -> Result + use> { self.inner.block_with_timeout(duration, future) } #[doc(hidden)] pub fn dispatcher(&self) -> &Arc { &self.dispatcher } #[doc(hidden)] pub fn scheduler_executor(&self) -> SchedulerLocalExecutor { self.inner.clone() } } /// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`]. pub struct Scope<'a> { executor: BackgroundExecutor, priority: Priority, futures: Vec + Send + 'static>>>, tx: Option>, rx: mpsc::Receiver<()>, lifetime: PhantomData<&'a ()>, } impl<'a> Scope<'a> { fn new(executor: BackgroundExecutor, priority: Priority) -> Self { let (tx, rx) = mpsc::channel(1); Self { executor, priority, tx: Some(tx), rx, futures: Default::default(), lifetime: PhantomData, } } /// How many CPUs are available to the dispatcher. pub fn num_cpus(&self) -> usize { self.executor.num_cpus() } /// Spawn a future into this scope. #[track_caller] pub fn spawn(&mut self, f: F) where F: Future + Send + 'a, { let tx = self.tx.clone().unwrap(); // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because // dropping this `Scope` blocks until all of the futures have resolved. let f = unsafe { mem::transmute::< Pin + Send + 'a>>, Pin + Send + 'static>>, >(Box::pin(async move { f.await; drop(tx); })) }; self.futures.push(f); } } impl Drop for Scope<'_> { fn drop(&mut self) { self.tx.take().unwrap(); // Wait until the channel is closed, which means that all of the spawned // futures have resolved. let future = async { self.rx.next().await; }; let mut future = std::pin::pin!(future); self.executor .inner .scheduler() .block(None, future.as_mut(), None); } } #[cfg(test)] mod test { use super::*; use crate::{App, TestDispatcher, TestPlatform}; use std::cell::RefCell; /// Helper to create test infrastructure. /// Returns (dispatcher, background_executor, app). fn create_test_app() -> (TestDispatcher, BackgroundExecutor, Rc) { let dispatcher = TestDispatcher::new(0); let arc_dispatcher = Arc::new(dispatcher.clone()); let background_executor = BackgroundExecutor::new(arc_dispatcher.clone()); let foreground_executor = ForegroundExecutor::new(arc_dispatcher); let platform = TestPlatform::new(background_executor.clone(), foreground_executor); let asset_source = Arc::new(()); let http_client = http_client::FakeHttpClient::with_404_response(); let app = App::new_app(platform, asset_source, http_client); (dispatcher, background_executor, app) } #[test] fn sanity_test_tasks_run() { let (dispatcher, _background_executor, app) = create_test_app(); let foreground_executor = app.borrow().foreground_executor.clone(); let task_ran = Rc::new(RefCell::new(false)); foreground_executor .spawn({ let task_ran = Rc::clone(&task_ran); async move { *task_ran.borrow_mut() = true; } }) .detach(); // Run dispatcher while app is still alive dispatcher.run_until_parked(); // Task should have run assert!( *task_ran.borrow(), "Task should run normally when app is alive" ); } }