scheduler: Add spawn_dedicated for single-threaded actors with !Send state (#57609)

Adds `scheduler::spawn_dedicated_thread` (and inherent `spawn_dedicated`
methods on `PlatformScheduler` and `TestScheduler`) so single-threaded
actors that own `!Send` state can run on their own OS thread and freely
do blocking I/O without disturbing any other executor.

### Why

A single-threaded actor that needs to do blocking syscalls is currently
stuck: it can't run on the shared foreground executor (blocking would
stall every other foreground session), and it can't move to the
background pool because its state isn't `Send`. `spawn_dedicated` gives
each such actor its own thread and its own `LocalExecutor`, while still
participating in the same testable scheduler infrastructure as
everything else.

### Shape

- `pub fn spawn_dedicated_thread(session_id, scheduler, f) -> Task<_>`
in `scheduler`. Owns the OS thread, the per-session runnable channel,
and the `LocalExecutor` setup.
- Inherent `spawn_dedicated` on `PlatformScheduler` (allocates its own
`SessionId`, delegates to the free function).
- Inherent `spawn_dedicated` on `TestScheduler` (no real thread — runs
as a fresh local session driven by the test scheduler's run loop, so
determinism under `many` is preserved).
- Renames `Scheduler::schedule_foreground` → `schedule_local` and
`scheduler::ForegroundExecutor` → `scheduler::LocalExecutor` to reflect
that these are session-pinned queues rather than "the main thread" (a
dedicated session runs on its own thread). GPUI's wrapper
`gpui::ForegroundExecutor` and the `foreground_executor` field/method
names are unchanged to keep blast radius small.
- `LocalExecutor::new` now takes an explicit dispatch closure, so the
routing decision (default session, dedicated thread, or something else)
lives at the construction site.

### Tests

- `TestScheduler` side: round-trip, `!Send` future, `Send` closure
capturing shared state, inner `executor.spawn`, determinism under `many`
seeds, drop-cancels-future, detached child runs after root completes.
- `PlatformScheduler` side: real separate thread (blocking syscalls
don't stall the test), `!Send` future output, drop-cancels-future,
thread tears down after work completes, detached child outlives root.

cc @as-cii

Release Notes:

- N/A

---------

Co-authored-by: Antonio Scandurra <me@as-cii.com>
Co-authored-by: Conrad Irwin <conrad.irwin@gmail.com>
This commit is contained in:
Nathan Sobo 2026-05-29 09:58:02 -06:00 committed by GitHub
parent 18051ab399
commit c30d18b10d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 830 additions and 41 deletions

View file

@ -143,7 +143,7 @@ pub enum ChannelEvent {
impl EventEmitter<ChannelEvent> for ChannelStore {}
enum OpenEntityHandle<E> {
enum OpenEntityHandle<E: 'static> {
Open(WeakEntity<E>),
Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
}

View file

@ -6,9 +6,7 @@ use scheduler::Instant;
use scheduler::Scheduler;
use std::{future::Future, marker::PhantomData, mem, pin::Pin, rc::Rc, sync::Arc, time::Duration};
pub use scheduler::{
FallibleTask, ForegroundExecutor as SchedulerForegroundExecutor, Priority, Task,
};
pub use scheduler::{FallibleTask, LocalExecutor as SchedulerLocalExecutor, Priority, Task};
/// A pointer to the executor that is currently running,
/// for spawning background tasks.
@ -22,7 +20,7 @@ pub struct BackgroundExecutor {
/// for spawning tasks on the main thread.
#[derive(Clone)]
pub struct ForegroundExecutor {
inner: scheduler::ForegroundExecutor,
inner: scheduler::LocalExecutor,
dispatcher: Arc<dyn PlatformDispatcher>,
not_send: PhantomData<Rc<()>>,
}
@ -280,18 +278,29 @@ impl ForegroundExecutor {
)
} else {
let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
let session_id = platform_scheduler.allocate_session_id();
(platform_scheduler, session_id)
let inner = platform_scheduler.foreground_executor();
return Self {
inner,
dispatcher,
not_send: PhantomData,
};
};
#[cfg(not(any(test, feature = "test-support")))]
let (scheduler, session_id): (Arc<dyn Scheduler>, _) = {
let inner = {
let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
let session_id = platform_scheduler.allocate_session_id();
(platform_scheduler, session_id)
platform_scheduler.foreground_executor()
};
let inner = scheduler::ForegroundExecutor::new(session_id, scheduler);
#[cfg(any(test, feature = "test-support"))]
let inner = {
let scheduler_for_dispatch = Arc::downgrade(&scheduler);
scheduler::LocalExecutor::new(session_id, scheduler, move |runnable| {
if let Some(scheduler) = scheduler_for_dispatch.upgrade() {
scheduler.schedule_local(session_id, runnable);
}
})
};
Self {
inner,
@ -366,7 +375,7 @@ impl ForegroundExecutor {
}
#[doc(hidden)]
pub fn scheduler_executor(&self) -> SchedulerForegroundExecutor {
pub fn scheduler_executor(&self) -> SchedulerLocalExecutor {
self.inner.clone()
}
}

View file

@ -139,8 +139,7 @@ impl PlatformDispatcher for TestDispatcher {
}
fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: Priority) {
self.scheduler
.schedule_foreground(self.session_id, runnable);
self.scheduler.schedule_local(self.session_id, runnable);
}
fn dispatch_after(&self, _duration: Duration, _runnable: RunnableVariant) {

View file

@ -3,10 +3,14 @@ use async_task::Runnable;
use chrono::{DateTime, Utc};
use futures::channel::oneshot;
use scheduler::Instant;
use scheduler::{Clock, Priority, Scheduler, SessionId, TestScheduler, Timer};
use scheduler::{
Clock, LocalExecutor, Priority, Scheduler, SessionId, Task, TestScheduler, Timer,
spawn_dedicated_thread,
};
#[cfg(not(target_family = "wasm"))]
use std::task::{Context, Poll};
use std::{
any::Any,
future::Future,
pin::Pin,
sync::{
@ -35,7 +39,17 @@ impl PlatformScheduler {
}
}
pub fn allocate_session_id(&self) -> SessionId {
pub fn foreground_executor(self: &Arc<Self>) -> LocalExecutor {
let session_id = self.next_session_id();
let scheduler = Arc::downgrade(self);
LocalExecutor::new(session_id, self.clone(), move |runnable| {
if let Some(scheduler) = scheduler.upgrade() {
scheduler.schedule_local(session_id, runnable);
}
})
}
fn next_session_id(&self) -> SessionId {
SessionId::new(self.next_session_id.fetch_add(1, Ordering::SeqCst))
}
}
@ -90,7 +104,7 @@ impl Scheduler for PlatformScheduler {
}
}
fn schedule_foreground(&self, _session_id: SessionId, runnable: Runnable<RunnableMeta>) {
fn schedule_local(&self, _session_id: SessionId, runnable: Runnable<RunnableMeta>) {
self.dispatcher
.dispatch_on_main_thread(runnable, Priority::default());
}
@ -133,6 +147,21 @@ impl Scheduler for PlatformScheduler {
self.clock.clone()
}
fn spawn_dedicated(
self: Arc<Self>,
f: Box<
dyn FnOnce(
LocalExecutor,
)
-> Pin<Box<dyn Future<Output = Box<dyn Any + Send + Sync>> + 'static>>
+ Send
+ 'static,
>,
) -> Task<Box<dyn Any + Send + Sync>> {
let session_id = self.next_session_id();
spawn_dedicated_thread(session_id, self, move |executor| f(executor))
}
fn as_test(&self) -> Option<&TestScheduler> {
None
}
@ -152,3 +181,261 @@ impl Clock for PlatformClock {
self.dispatcher.now()
}
}
#[cfg(all(test, not(target_family = "wasm")))]
mod tests {
use super::*;
use crate::{RunnableVariant, ThreadTaskTimings};
use scheduler::BackgroundExecutor;
use std::time::Instant as StdInstant;
// `spawn_dedicated` shouldn't touch the platform dispatcher at all;
// panicking on every method ensures the test catches it if it does.
struct SmokeDispatcher;
impl PlatformDispatcher for SmokeDispatcher {
fn get_all_timings(&self) -> Vec<ThreadTaskTimings> {
Vec::new()
}
fn get_current_thread_timings(&self) -> ThreadTaskTimings {
ThreadTaskTimings {
thread_name: None,
thread_id: std::thread::current().id(),
timings: Vec::new(),
total_pushed: 0,
}
}
fn is_main_thread(&self) -> bool {
false
}
fn dispatch(&self, _runnable: RunnableVariant, _priority: Priority) {
panic!("SmokeDispatcher should not be asked to dispatch in this test");
}
fn dispatch_on_main_thread(&self, _runnable: RunnableVariant, _priority: Priority) {
panic!("SmokeDispatcher does not implement a main thread");
}
fn dispatch_after(&self, _duration: Duration, _runnable: RunnableVariant) {
panic!("SmokeDispatcher does not implement timers");
}
fn spawn_realtime(&self, _f: Box<dyn FnOnce() + Send>) {
panic!("SmokeDispatcher does not implement realtime");
}
}
#[test]
fn spawn_dedicated_runs_on_a_real_separate_thread() {
let background =
BackgroundExecutor::new(Arc::new(PlatformScheduler::new(Arc::new(SmokeDispatcher))));
let started = StdInstant::now();
let task = background.spawn_dedicated(|_executor| async move {
// A genuine blocking syscall on the dedicated thread. If
// `spawn_dedicated` were running the future on any shared
// executor, this would stall that executor.
let thread_id_before = std::thread::current().id();
std::thread::sleep(Duration::from_millis(50));
let thread_id_after = std::thread::current().id();
assert_eq!(thread_id_before, thread_id_after);
(thread_id_before, "slept")
});
let (dedicated_thread_id, message) = futures::executor::block_on(task);
let elapsed = started.elapsed();
assert_eq!(message, "slept");
assert_ne!(
dedicated_thread_id,
std::thread::current().id(),
"dedicated future ran on the test thread"
);
assert!(
elapsed >= Duration::from_millis(40),
"expected the dedicated thread to genuinely sleep, elapsed = {:?}",
elapsed
);
}
#[test]
fn spawn_dedicated_returns_not_send_future_output() {
// The whole point of `spawn_dedicated` is that the future can be
// `!Send`. Constructing one with `Rc<RefCell<_>>` ensures the
// signature actually permits it.
use std::cell::RefCell;
use std::rc::Rc;
let background =
BackgroundExecutor::new(Arc::new(PlatformScheduler::new(Arc::new(SmokeDispatcher))));
let task = background.spawn_dedicated(|_executor| async move {
let state = Rc::new(RefCell::new(0_i32));
for _ in 0..3 {
*state.borrow_mut() += 1;
}
*state.borrow()
});
let output = futures::executor::block_on(task);
assert_eq!(output, 3);
}
#[test]
fn spawn_dedicated_dropping_task_cancels_future() {
use parking_lot::Mutex;
use std::sync::mpsc;
let background =
BackgroundExecutor::new(Arc::new(PlatformScheduler::new(Arc::new(SmokeDispatcher))));
let (started_tx, started_rx) = mpsc::channel::<()>();
let (after_park_tx, after_park_rx) = mpsc::channel::<()>();
let observed_post_await_write = Arc::new(Mutex::new(false));
let task = {
let observed_post_await_write = observed_post_await_write.clone();
background.spawn_dedicated(move |_executor| async move {
// Announce that the future is live on the dedicated thread.
started_tx
.send(())
.expect("started signal must be received");
// Park forever. Dropping the `Task` must cancel us here so
// the code below this `await` never runs.
futures::future::pending::<()>().await;
*observed_post_await_write.lock() = true;
after_park_tx
.send(())
.expect("after-park signal must be received");
})
};
// Wait until the dedicated future is actually parked at the await.
started_rx
.recv_timeout(Duration::from_secs(2))
.expect("dedicated future failed to start");
// Drop the root Task: this must cancel the future.
drop(task);
// If cancellation works, the future never advances past `pending`,
// so this recv must time out.
assert!(
after_park_rx
.recv_timeout(Duration::from_millis(100))
.is_err(),
"dedicated future advanced past the await after its Task was dropped"
);
assert!(
!*observed_post_await_write.lock(),
"dedicated future ran code past the cancellation point"
);
}
#[test]
fn spawn_dedicated_thread_tears_down_after_work_completes() {
use std::sync::mpsc;
// Fires from `Drop` so we observe teardown of the dedicated future's
// captured state on whichever thread runs its destructor.
struct DropSignal {
tx: Option<mpsc::Sender<std::thread::ThreadId>>,
}
impl Drop for DropSignal {
fn drop(&mut self) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(std::thread::current().id());
}
}
}
let background =
BackgroundExecutor::new(Arc::new(PlatformScheduler::new(Arc::new(SmokeDispatcher))));
let (started_tx, started_rx) = mpsc::channel::<std::thread::ThreadId>();
let (drop_tx, drop_rx) = mpsc::channel::<std::thread::ThreadId>();
let task = background.spawn_dedicated(move |_executor| async move {
// Captured by the future's state. When the future completes and
// its state is dropped on the dedicated thread, this guard's
// `Drop` fires and reports the thread id it ran on.
let _guard = DropSignal { tx: Some(drop_tx) };
started_tx
.send(std::thread::current().id())
.expect("started signal must be received");
// Future returns immediately. The dedicated thread should then
// drop the future (firing _guard), exit the recv loop, and exit.
});
let dedicated_thread_id = started_rx
.recv_timeout(Duration::from_secs(2))
.expect("dedicated future failed to start");
assert_ne!(
dedicated_thread_id,
std::thread::current().id(),
"dedicated future ran on the test thread"
);
// Drive the root task to completion so its body finishes.
futures::executor::block_on(task);
// The guard's drop runs from the dedicated thread as it tears down
// the future's captured state. If the executor/recv-loop were
// keeping the future alive past task completion, this would hang.
let drop_thread_id = drop_rx
.recv_timeout(Duration::from_secs(2))
.expect("dedicated future's captured state was not dropped after task completion");
assert_eq!(
drop_thread_id, dedicated_thread_id,
"dedicated future's captured state must be dropped on the dedicated thread, not elsewhere"
);
}
#[test]
fn spawn_dedicated_detached_child_outlives_root() {
use std::sync::mpsc;
let background =
BackgroundExecutor::new(Arc::new(PlatformScheduler::new(Arc::new(SmokeDispatcher))));
// `gate_rx` lets the detached child park until the test explicitly
// releases it — after we've already observed the root completing.
let (gate_tx, gate_rx) = mpsc::channel::<()>();
let (child_done_tx, child_done_rx) = mpsc::channel::<std::thread::ThreadId>();
let task = background.spawn_dedicated(move |executor| async move {
executor
.spawn(async move {
// Blocking on `recv` is normally wrong inside an
// executor, but the dedicated thread is exclusive to
// this session, so blocking the only future on it is
// fine — this is the property `spawn_dedicated` is
// designed to provide.
gate_rx
.recv()
.expect("gate sender dropped before child resumed");
child_done_tx
.send(std::thread::current().id())
.expect("child_done receiver dropped");
})
.detach();
// Root finishes here. The detached child must keep the
// dedicated thread alive until it completes.
});
futures::executor::block_on(task);
// Negative assertion: the child has not finished, because the gate
// hasn't been released yet.
assert!(
child_done_rx
.recv_timeout(Duration::from_millis(50))
.is_err(),
"detached child finished before being released"
);
// Release the gate. The detached child should now complete on the
// dedicated thread.
gate_tx.send(()).expect("gate receiver dropped");
let child_thread_id = child_done_rx
.recv_timeout(Duration::from_secs(2))
.expect("detached child failed to complete after gate was released");
assert_ne!(
child_thread_id,
std::thread::current().id(),
"detached child ran on the test thread instead of the dedicated thread"
);
}
}

View file

@ -1555,7 +1555,7 @@ type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, ones
type StreamResponseChannels =
Arc<Mutex<HashMap<MessageId, UnboundedSender<(Result<Envelope>, oneshot::Sender<()>)>>>>;
struct Signal<T> {
struct Signal<T: 'static> {
tx: Mutex<Option<oneshot::Sender<T>>>,
rx: Shared<Task<Option<T>>>,
}

View file

@ -1,5 +1,7 @@
use crate::{Instant, Priority, RunnableMeta, Scheduler, SessionId, Timer};
use async_task::Runnable;
use std::{
any::Any,
future::Future,
marker::PhantomData,
mem::ManuallyDrop,
@ -12,18 +14,39 @@ use std::{
time::Duration,
};
/// A `!Send` executor pinned to a single session. Tasks spawned on it run in
/// order on whichever thread drains the dispatch destination supplied at
/// construction time — typically the main thread for the default session, or
/// a dedicated OS thread for sessions created by `spawn_dedicated_thread`.
#[derive(Clone)]
pub struct ForegroundExecutor {
pub struct LocalExecutor {
session_id: SessionId,
scheduler: Arc<dyn Scheduler>,
// Spawned tasks' schedule callbacks each hold an `Arc` clone of this
// closure, so the destination it captures stays alive as long as work
// could still land on it.
dispatch: Arc<dyn Fn(Runnable<RunnableMeta>) + Send + Sync>,
not_send: PhantomData<Rc<()>>,
}
impl ForegroundExecutor {
pub fn new(session_id: SessionId, scheduler: Arc<dyn Scheduler>) -> Self {
impl LocalExecutor {
/// Constructs a local executor that runs spawned tasks by sending their
/// runnables through `dispatch`. The `scheduler` is retained for access to
/// clocks, timers, and other scheduler-level services.
///
/// For the common case of routing runnables through
/// `Scheduler::schedule_local`, callers pass a closure that does exactly
/// that. `spawn_dedicated_thread` instead passes a closure that sends to
/// the dedicated thread's channel.
pub fn new(
session_id: SessionId,
scheduler: Arc<dyn Scheduler>,
dispatch: impl Fn(Runnable<RunnableMeta>) + Send + Sync + 'static,
) -> Self {
Self {
session_id,
scheduler,
dispatch: Arc::new(dispatch),
not_send: PhantomData,
}
}
@ -42,16 +65,11 @@ impl ForegroundExecutor {
F: Future + 'static,
F::Output: 'static,
{
let session_id = self.session_id;
let scheduler = Arc::downgrade(&self.scheduler);
let dispatch = self.dispatch.clone();
let location = Location::caller();
let (runnable, task) = spawn_local_with_source_location(
future,
move |runnable| {
if let Some(scheduler) = scheduler.upgrade() {
scheduler.schedule_foreground(session_id, runnable);
}
},
move |runnable| dispatch(runnable),
RunnableMeta { location },
);
runnable.schedule();
@ -110,6 +128,48 @@ impl ForegroundExecutor {
pub fn now(&self) -> Instant {
self.scheduler.clock().now()
}
/// Spawn a closure on a fresh session pinned to its own [`LocalExecutor`].
/// The closure runs on a new OS thread under `PlatformScheduler`, or on
/// the test scheduler's loop under `TestScheduler`.
///
/// The returned `Task` represents the dedicated work: dropping it cancels
/// the dedicated closure, `.await`ing it yields the closure's return
/// value, `.detach()`ing it lets the dedicated work run independently of
/// the caller.
#[track_caller]
pub fn spawn_dedicated<F, Fut>(&self, f: F) -> Task<Fut::Output>
where
F: FnOnce(LocalExecutor) -> Fut + Send + 'static,
Fut: Future + 'static,
Fut::Output: Send + Sync + 'static,
{
self.scheduler
.clone()
.spawn_dedicated(box_dedicated(f))
.downcast::<Fut::Output>()
}
}
/// Boxes the user-supplied dedicated closure into the type-erased shape
/// expected by [`Scheduler::spawn_dedicated`]. The user's `Fut::Output` is
/// boxed as `Box<dyn Any + Send + Sync>` on the dedicated side and downcast
/// back to `Fut::Output` by [`Task::downcast`] in the wrapper.
fn box_dedicated<F, Fut>(
f: F,
) -> Box<
dyn FnOnce(LocalExecutor) -> Pin<Box<dyn Future<Output = Box<dyn Any + Send + Sync>> + 'static>>
+ Send
+ 'static,
>
where
F: FnOnce(LocalExecutor) -> Fut + Send + 'static,
Fut: Future + 'static,
Fut::Output: Send + Sync + 'static,
{
Box::new(move |executor| {
Box::pin(async move { Box::new(f(executor).await) as Box<dyn Any + Send + Sync> })
})
}
#[derive(Clone)]
@ -193,6 +253,27 @@ impl BackgroundExecutor {
pub fn scheduler(&self) -> &Arc<dyn Scheduler> {
&self.scheduler
}
/// Spawn a closure on a fresh session pinned to its own [`LocalExecutor`].
/// The closure runs on a new OS thread under `PlatformScheduler`, or on
/// the test scheduler's loop under `TestScheduler`.
///
/// The returned `Task` represents the dedicated work: dropping it cancels
/// the dedicated closure, `.await`ing it yields the closure's return
/// value, `.detach()`ing it lets the dedicated work run independently of
/// the caller.
#[track_caller]
pub fn spawn_dedicated<F, Fut>(&self, f: F) -> Task<Fut::Output>
where
F: FnOnce(LocalExecutor) -> Fut + Send + 'static,
Fut: Future + 'static,
Fut::Output: Send + Sync + 'static,
{
self.scheduler
.clone()
.spawn_dedicated(box_dedicated(f))
.downcast::<Fut::Output>()
}
}
/// Task is a primitive that allows work to happen in the background.
@ -202,16 +283,22 @@ impl BackgroundExecutor {
/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
/// the task to continue running, but with no way to return a value.
#[must_use]
#[derive(Debug)]
pub struct Task<T>(TaskState<T>);
#[derive(Debug)]
enum TaskState<T> {
/// A task that is ready to return a value
Ready(Option<T>),
/// A task that is currently running.
Spawned(async_task::Task<T, RunnableMeta>),
/// A typed view of a [`Task<Box<dyn Any + Send + Sync>>`] obtained via
/// [`Task::downcast`]. The inner task drives the actual work; the
/// downcast layer just unwraps the `Box<dyn Any + Send + Sync>` on poll.
Downcast {
inner: Box<Task<Box<dyn Any + Send + Sync>>>,
marker: PhantomData<fn() -> T>,
},
}
impl<T> Task<T> {
@ -229,6 +316,7 @@ impl<T> Task<T> {
match &self.0 {
TaskState::Ready(_) => true,
TaskState::Spawned(task) => task.is_finished(),
TaskState::Downcast { inner, .. } => inner.is_ready(),
}
}
@ -237,6 +325,7 @@ impl<T> Task<T> {
match self {
Task(TaskState::Ready(_)) => {}
Task(TaskState::Spawned(task)) => task.detach(),
Task(TaskState::Downcast { inner, .. }) => inner.detach(),
}
}
@ -245,10 +334,43 @@ impl<T> Task<T> {
FallibleTask(match self.0 {
TaskState::Ready(val) => FallibleTaskState::Ready(val),
TaskState::Spawned(task) => FallibleTaskState::Spawned(task.fallible()),
TaskState::Downcast { inner, .. } => FallibleTaskState::Downcast {
inner: Box::new(inner.fallible()),
marker: PhantomData,
},
})
}
}
impl Task<Box<dyn Any + Send + Sync>> {
/// Reinterprets the boxed output as a concrete `T` via downcast on
/// completion. Used by [`LocalExecutor::spawn_dedicated`] and
/// [`BackgroundExecutor::spawn_dedicated`] to recover the user closure's
/// `Fut::Output` from the dyn-safe [`Scheduler::spawn_dedicated`].
///
/// Panics on poll if the inner output is not in fact a `T` -- a logic
/// error in whatever produced the inner task, since the downcast type is
/// chosen by the caller of `downcast`.
pub fn downcast<T: Send + Sync + 'static>(self) -> Task<T> {
Task(TaskState::Downcast {
inner: Box::new(self),
marker: PhantomData,
})
}
}
impl<T> std::fmt::Debug for Task<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.0 {
TaskState::Ready(_) => f.debug_tuple("Task::Ready").finish(),
TaskState::Spawned(task) => f.debug_tuple("Task::Spawned").field(task).finish(),
TaskState::Downcast { inner, .. } => {
f.debug_tuple("Task::Downcast").field(inner).finish()
}
}
}
}
/// A task that returns `Option<T>` instead of panicking when cancelled.
#[must_use]
pub struct FallibleTask<T>(FallibleTaskState<T>);
@ -259,6 +381,12 @@ enum FallibleTaskState<T> {
/// A task that is currently running (wraps async_task::FallibleTask).
Spawned(async_task::FallibleTask<T, RunnableMeta>),
/// Mirror of [`TaskState::Downcast`] for fallible tasks.
Downcast {
inner: Box<FallibleTask<Box<dyn Any + Send + Sync>>>,
marker: PhantomData<fn() -> T>,
},
}
impl<T> FallibleTask<T> {
@ -272,17 +400,29 @@ impl<T> FallibleTask<T> {
match self.0 {
FallibleTaskState::Ready(_) => {}
FallibleTaskState::Spawned(task) => task.detach(),
FallibleTaskState::Downcast { inner, .. } => inner.detach(),
}
}
}
impl<T> Future for FallibleTask<T> {
impl<T: 'static> Future for FallibleTask<T> {
type Output = Option<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match unsafe { self.get_unchecked_mut() } {
FallibleTask(FallibleTaskState::Ready(val)) => Poll::Ready(val.take()),
FallibleTask(FallibleTaskState::Spawned(task)) => Pin::new(task).poll(cx),
FallibleTask(FallibleTaskState::Downcast { inner, .. }) => {
match Pin::new(inner.as_mut()).poll(cx) {
Poll::Ready(Some(boxed_any)) => Poll::Ready(Some(
*boxed_any
.downcast::<T>()
.expect("FallibleTask::poll: downcast type mismatch"),
)),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
}
}
@ -294,17 +434,29 @@ impl<T> std::fmt::Debug for FallibleTask<T> {
FallibleTaskState::Spawned(task) => {
f.debug_tuple("FallibleTask::Spawned").field(task).finish()
}
FallibleTaskState::Downcast { inner, .. } => f
.debug_tuple("FallibleTask::Downcast")
.field(inner)
.finish(),
}
}
}
impl<T> Future for Task<T> {
impl<T: 'static> Future for Task<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match unsafe { self.get_unchecked_mut() } {
Task(TaskState::Ready(val)) => Poll::Ready(val.take().unwrap()),
Task(TaskState::Spawned(task)) => Pin::new(task).poll(cx),
Task(TaskState::Downcast { inner, .. }) => match Pin::new(inner.as_mut()).poll(cx) {
Poll::Ready(boxed_any) => Poll::Ready(
*boxed_any
.downcast::<T>()
.expect("Task::poll: downcast type mismatch"),
),
Poll::Pending => Poll::Pending,
},
}
}
}

View file

@ -11,11 +11,13 @@ pub use test_scheduler::*;
use async_task::Runnable;
use futures::channel::oneshot;
use std::{
any::Any,
future::Future,
panic::Location,
pin::Pin,
sync::Arc,
task::{Context, Poll},
thread,
time::Duration,
};
@ -82,7 +84,11 @@ pub trait Scheduler: Send + Sync {
timeout: Option<Duration>,
) -> bool;
fn schedule_foreground(&self, session_id: SessionId, runnable: Runnable<RunnableMeta>);
/// Schedule a runnable on the local (session-pinned) queue for `session_id`.
/// Runnables scheduled here run in order on whichever thread drains the
/// session — the main thread for ordinary sessions, or a dedicated OS
/// thread for sessions created via `spawn_dedicated_thread`.
fn schedule_local(&self, session_id: SessionId, runnable: Runnable<RunnableMeta>);
/// Schedule a background task with the given priority.
fn schedule_background_with_priority(
@ -103,11 +109,87 @@ pub trait Scheduler: Send + Sync {
fn timer(&self, timeout: Duration) -> Timer;
fn clock(&self) -> Arc<dyn Clock>;
/// Spawn a closure on a fresh session pinned to its own [`LocalExecutor`].
///
/// `PlatformScheduler` runs the closure on a new OS thread (see
/// [`spawn_dedicated_thread`]). `TestScheduler` runs it on the test
/// scheduler's loop alongside everything else so determinism under
/// `TestScheduler::many` is preserved.
///
/// This is the dyn-safe entry point: the closure's output is type-erased
/// as `Box<dyn Any + Send + Sync>` so the trait stays object-safe.
/// Callers typically reach for the type-safe wrappers on
/// [`LocalExecutor::spawn_dedicated`] and
/// [`BackgroundExecutor::spawn_dedicated`], which compose this method
/// with [`Task::downcast`] to recover the closure's concrete return type.
fn spawn_dedicated(
self: Arc<Self>,
f: Box<
dyn FnOnce(
LocalExecutor,
)
-> Pin<Box<dyn Future<Output = Box<dyn Any + Send + Sync>> + 'static>>
+ Send
+ 'static,
>,
) -> Task<Box<dyn Any + Send + Sync>>;
fn as_test(&self) -> Option<&TestScheduler> {
None
}
}
/// Spawn work on a fresh OS thread that's exclusive to the returned task and
/// anything spawned on the executor it provides. Blocking syscalls inside that
/// work don't disturb any other executor in the process.
///
/// `f` is called on the dedicated thread with a [`LocalExecutor`] pinned
/// to it. The future `f` returns may freely be `!Send`. The returned `Task` is
/// that future's task: dropping it cancels the root, but detached children
/// keep running until they finish. The thread shuts down once the executor and
/// every task on it are gone.
///
/// The caller is responsible for supplying a `session_id` that's distinct from
/// every other live session on `scheduler`. Concrete schedulers typically wrap
/// this in an inherent method that allocates the id from their own counter.
pub fn spawn_dedicated_thread<F, Fut>(
session_id: SessionId,
scheduler: Arc<dyn Scheduler>,
f: F,
) -> Task<Fut::Output>
where
F: FnOnce(LocalExecutor) -> Fut + Send + 'static,
Fut: Future + 'static,
Fut::Output: Send + 'static,
{
let (runnable_sender, runnable_receiver) = flume::unbounded::<Runnable<RunnableMeta>>();
let (task_sender, task_receiver) = flume::bounded::<Task<Fut::Output>>(1);
thread::Builder::new()
.name(format!("spawn_dedicated session {:?}", session_id))
.spawn(move || {
let dispatch = move |runnable: Runnable<RunnableMeta>| {
let _ = runnable_sender.send(runnable);
};
let executor = LocalExecutor::new(session_id, scheduler, dispatch);
let root_task = executor.spawn(f(executor.clone()));
let _ = task_sender.send(root_task);
// After this drop, every strong reference to the runnable sender
// lives inside a spawned task or a user-held executor clone. The
// recv loop exits once all of those are gone.
drop(executor);
while let Ok(runnable) = runnable_receiver.recv() {
runnable.run();
}
})
.expect("failed to spawn dedicated thread");
task_receiver
.recv()
.expect("dedicated thread failed to produce root task")
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct SessionId(u16);

View file

@ -1,6 +1,6 @@
use crate::{
BackgroundExecutor, Clock, ForegroundExecutor, Instant, Priority, RunnableMeta, Scheduler,
SessionId, TestClock, Timer,
BackgroundExecutor, Clock, Instant, LocalExecutor, Priority, RunnableMeta, Scheduler,
SessionId, Task, TestClock, Timer,
};
use async_task::Runnable;
use backtrace::{Backtrace, BacktraceFrame};
@ -10,6 +10,7 @@ use rand::{
distr::{StandardUniform, uniform::SampleRange, uniform::SampleUniform},
prelude::*,
};
use std::any::Any;
use std::{
any::type_name_of_val,
collections::{BTreeMap, HashSet, VecDeque},
@ -152,18 +153,21 @@ impl TestScheduler {
self.state.lock().is_main_thread
}
/// Allocate a new session ID for foreground task scheduling.
/// This is used by GPUI's TestDispatcher to map dispatcher instances to sessions.
pub fn allocate_session_id(&self) -> SessionId {
let mut state = self.state.lock();
state.next_session_id.0 += 1;
state.next_session_id
}
/// Create a foreground executor for this scheduler
pub fn foreground(self: &Arc<Self>) -> ForegroundExecutor {
/// Create a local executor for this scheduler.
pub fn foreground(self: &Arc<Self>) -> LocalExecutor {
let session_id = self.allocate_session_id();
ForegroundExecutor::new(session_id, self.clone())
let scheduler = Arc::downgrade(self);
LocalExecutor::new(session_id, self.clone(), move |runnable| {
if let Some(scheduler) = scheduler.upgrade() {
scheduler.schedule_local(session_id, runnable);
}
})
}
/// Create a background executor for this scheduler
@ -585,7 +589,7 @@ impl Scheduler for TestScheduler {
completed
}
fn schedule_foreground(&self, session_id: SessionId, runnable: Runnable<RunnableMeta>) {
fn schedule_local(&self, session_id: SessionId, runnable: Runnable<RunnableMeta>) {
assert_correct_thread(&self.thread, &self.state);
let mut state = self.state.lock();
let ix = if state.randomize_order {
@ -660,6 +664,31 @@ impl Scheduler for TestScheduler {
self.clock.clone()
}
/// In the test world, dedicated work is just a fresh local session driven
/// by the test scheduler's run loop alongside everything else. No real
/// thread is spawned, so determinism under `TestScheduler::many` is
/// preserved.
fn spawn_dedicated(
self: Arc<Self>,
f: Box<
dyn FnOnce(
LocalExecutor,
)
-> Pin<Box<dyn Future<Output = Box<dyn Any + Send + Sync>> + 'static>>
+ Send
+ 'static,
>,
) -> Task<Box<dyn Any + Send + Sync>> {
let session_id = self.allocate_session_id();
let scheduler = Arc::downgrade(&self);
let executor = LocalExecutor::new(session_id, self, move |runnable| {
if let Some(scheduler) = scheduler.upgrade() {
scheduler.schedule_local(session_id, runnable);
}
});
executor.spawn(f(executor.clone()))
}
fn as_test(&self) -> Option<&TestScheduler> {
Some(self)
}

View file

@ -728,3 +728,234 @@ fn test_background_priority_scheduling() {
iterations
);
}
#[test]
fn test_spawn_dedicated_basic_round_trip() {
let result = TestScheduler::once(async |scheduler| {
scheduler
.background()
.spawn_dedicated(|_executor| async { 42 })
.await
});
assert_eq!(result, 42);
}
#[test]
fn test_spawn_dedicated_not_send_future() {
let result = TestScheduler::once(async |scheduler| {
scheduler
.background()
.spawn_dedicated(|_executor| async move {
// `Rc<RefCell<_>>` is `!Send`. If `spawn_dedicated` required
// the returned future to be `Send`, this wouldn't compile.
let state = Rc::new(RefCell::new(0_i32));
for _ in 0..5 {
*state.borrow_mut() += 1;
}
*state.borrow()
})
.await
});
assert_eq!(result, 5);
}
#[test]
fn test_spawn_dedicated_send_closure_captures() {
use parking_lot::Mutex;
let observed = TestScheduler::once(async |scheduler| {
let shared = Arc::new(Mutex::new(0_i32));
let shared_for_closure = shared.clone();
let returned = scheduler
.background()
.spawn_dedicated(move |_executor| {
// `shared_for_closure` crossed the `Send` boundary of the
// closure; we then mutate it from inside the !Send future.
let local = shared_for_closure;
async move {
*local.lock() = 7;
}
})
.await;
let _: () = returned;
*shared.lock()
});
assert_eq!(observed, 7);
}
#[test]
fn test_spawn_dedicated_inner_spawn_local() {
let result = TestScheduler::once(async |scheduler| {
scheduler
.background()
.spawn_dedicated(|executor| async move {
// The provided executor can spawn additional `!Send` work
// onto the same dedicated session.
let inner = Rc::new(RefCell::new(0_i32));
let inner_for_child = inner.clone();
let child = executor.spawn(async move {
*inner_for_child.borrow_mut() = 99;
*inner_for_child.borrow()
});
child.await
})
.await
});
assert_eq!(result, 99);
}
#[test]
fn test_spawn_dedicated_determinism_under_many() {
use parking_lot::Mutex;
let outcomes = TestScheduler::many(if cfg!(miri) { 4 } else { 20 }, async |scheduler| {
let trace = Arc::new(Mutex::new(Vec::<u32>::new()));
let background = scheduler.background();
let mut tasks = Vec::new();
for id in 0..4_u32 {
let trace = trace.clone();
let task = background.spawn_dedicated(move |executor| async move {
for step in 0..3 {
trace.lock().push(id * 100 + step);
executor.spawn(async {}).await;
}
id
});
tasks.push(task);
}
let mut outputs = Vec::new();
for task in tasks {
outputs.push(task.await);
}
(trace.lock().clone(), outputs)
});
// Re-running with the same seed should produce the same trace. Run a
// second pass with identical seeds and compare to the first.
let outcomes_replay = TestScheduler::many(if cfg!(miri) { 4 } else { 20 }, async |scheduler| {
let trace = Arc::new(Mutex::new(Vec::<u32>::new()));
let background = scheduler.background();
let mut tasks = Vec::new();
for id in 0..4_u32 {
let trace = trace.clone();
let task = background.spawn_dedicated(move |executor| async move {
for step in 0..3 {
trace.lock().push(id * 100 + step);
executor.spawn(async {}).await;
}
id
});
tasks.push(task);
}
let mut outputs = Vec::new();
for task in tasks {
outputs.push(task.await);
}
(trace.lock().clone(), outputs)
});
assert_eq!(
outcomes, outcomes_replay,
"per-seed outcomes should be reproducible"
);
// Sanity: at least one seed produced a non-monotonic trace,
// demonstrating that dedicated tasks really do interleave under the
// scheduler's randomization.
let any_interleaved = outcomes.iter().any(|(trace, _)| {
trace
.windows(2)
.any(|window| window[0] / 100 != window[1] / 100)
});
assert!(
any_interleaved,
"expected at least one seed to interleave dedicated tasks"
);
}
#[test]
fn test_spawn_dedicated_dropping_task_cancels_future() {
use parking_lot::Mutex;
let counter_after = TestScheduler::once(async |scheduler| {
let counter = Arc::new(Mutex::new(0_u32));
let (resume_tx, resume_rx) = oneshot::channel::<()>();
let task = {
let counter = counter.clone();
scheduler
.background()
.spawn_dedicated(move |_executor| async move {
*counter.lock() = 1;
// Park here until the test resumes us. If the task is
// dropped before this resolves, the second assignment
// below must never happen.
let _ = resume_rx.await;
*counter.lock() = 2;
})
};
// Let the dedicated future make its first observable step.
scheduler.run();
assert_eq!(*counter.lock(), 1);
// Cancel by dropping the root task, then unblock the parked oneshot.
// The future must not advance past the await: counter stays at 1.
drop(task);
let _ = resume_tx.send(());
scheduler.run();
*counter.lock()
});
assert_eq!(
counter_after, 1,
"dropping the dedicated task must cancel the root future before its second write"
);
}
#[test]
fn test_spawn_dedicated_detached_child_runs_after_root_completes() {
use parking_lot::Mutex;
let child_ran = TestScheduler::once(async |scheduler| {
let child_ran = Arc::new(Mutex::new(false));
let task = {
let child_ran = child_ran.clone();
scheduler
.background()
.spawn_dedicated(move |executor| async move {
executor
.spawn(async move {
*child_ran.lock() = true;
})
.detach();
// Root returns immediately, before the child has had a
// chance to run.
})
};
task.await;
// Drain the dedicated session. The detached child must run.
scheduler.run();
*child_ran.lock()
});
assert!(
child_ran,
"detached child must complete after the root, not be cancelled with it"
);
}
// The production smoke test for `spawn_dedicated` lives in the `gpui` crate
// alongside `PlatformScheduler`, which is the real production implementation
// of the `Scheduler` trait. See `crates/gpui/src/platform_scheduler.rs`.