mirror of
https://github.com/zed-industries/zed.git
synced 2026-06-01 03:14:56 +07:00
util: Make sure we reap child processes
When we switched to this util from async-process, we lost their reaper implementation to make sure we cleaned up zombie processes, which matters as I understand it when we drop but don't kill the spawned process and may never wait for its status.
This commit is contained in:
parent
a6a49afc28
commit
ed79e9e979
3 changed files with 868 additions and 36 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -19417,6 +19417,7 @@ dependencies = [
|
|||
"log",
|
||||
"mach2 0.5.0",
|
||||
"nix 0.29.0",
|
||||
"parking_lot",
|
||||
"percent-encoding",
|
||||
"pretty_assertions",
|
||||
"rand 0.9.4",
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@ url.workspace = true
|
|||
percent-encoding.workspace = true
|
||||
util_macros = { workspace = true, optional = true }
|
||||
gpui_util.workspace = true
|
||||
parking_lot.workspace = true
|
||||
|
||||
[target.'cfg(not(target_family = "wasm"))'.dependencies]
|
||||
smol.workspace = true
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ use mach2::exception_types::{
|
|||
};
|
||||
use mach2::port::{MACH_PORT_NULL, mach_port_t};
|
||||
use mach2::thread_status::{THREAD_STATE_NONE, thread_state_flavor_t};
|
||||
use parking_lot::{Condvar, Mutex, MutexGuard};
|
||||
use smol::Unblock;
|
||||
use std::collections::BTreeMap;
|
||||
use std::ffi::{CString, OsStr, OsString};
|
||||
|
|
@ -13,6 +14,8 @@ use std::os::unix::process::ExitStatusExt;
|
|||
use std::path::{Path, PathBuf};
|
||||
use std::process::{ExitStatus, Output};
|
||||
use std::ptr;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use std::thread;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
|
||||
pub enum Stdio {
|
||||
|
|
@ -234,13 +237,75 @@ pub struct Child {
|
|||
pub stdout: Option<Unblock<std::fs::File>>,
|
||||
pub stderr: Option<Unblock<std::fs::File>>,
|
||||
kill_on_drop: bool,
|
||||
state: Arc<SharedChildState>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SharedChildState {
|
||||
state: Mutex<ChildState>,
|
||||
status_changed: Condvar,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct ChildState {
|
||||
status: Option<ExitStatus>,
|
||||
wait_error: Option<WaitError>,
|
||||
wait_started: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct WaitError {
|
||||
raw_os_error: Option<i32>,
|
||||
kind: io::ErrorKind,
|
||||
message: String,
|
||||
}
|
||||
|
||||
impl WaitError {
|
||||
fn from_io(error: io::Error) -> Self {
|
||||
Self {
|
||||
raw_os_error: error.raw_os_error(),
|
||||
kind: error.kind(),
|
||||
message: error.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn to_io_error(&self) -> io::Error {
|
||||
if let Some(raw_os_error) = self.raw_os_error {
|
||||
io::Error::from_raw_os_error(raw_os_error)
|
||||
} else {
|
||||
io::Error::new(self.kind, self.message.clone())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SharedChildState {
|
||||
fn new() -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
state: Mutex::new(ChildState::default()),
|
||||
status_changed: Condvar::new(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Child {
|
||||
fn drop(&mut self) {
|
||||
if self.kill_on_drop && self.status.is_none() {
|
||||
let _ = self.kill();
|
||||
if cached_status(&self.state).is_some() {
|
||||
return;
|
||||
}
|
||||
|
||||
if self.kill_on_drop {
|
||||
if let Err(error) = self.kill() {
|
||||
if !is_no_such_process_error(&error) {
|
||||
log::debug!("failed to kill child process {} on drop: {error}", self.pid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(error) = register_wait(self.pid, self.state.clone(), WaitPurpose::Reap) {
|
||||
log::debug!(
|
||||
"failed to register child process {} for reaping: {error}",
|
||||
self.pid
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -251,30 +316,33 @@ impl Child {
|
|||
}
|
||||
|
||||
pub fn kill(&mut self) -> io::Result<()> {
|
||||
let result = unsafe { libc::kill(self.pid, libc::SIGKILL) };
|
||||
if result == -1 {
|
||||
Err(io::Error::last_os_error())
|
||||
} else {
|
||||
Ok(())
|
||||
if cached_status(&self.state).is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
kill_pid(self.pid)
|
||||
}
|
||||
|
||||
pub fn try_status(&mut self) -> io::Result<Option<ExitStatus>> {
|
||||
if let Some(status) = self.status {
|
||||
return Ok(Some(status));
|
||||
{
|
||||
let state = lock_child_state(&self.state);
|
||||
if let Some(status) = state.status {
|
||||
return Ok(Some(status));
|
||||
}
|
||||
if let Some(error) = &state.wait_error {
|
||||
return Err(error.to_io_error());
|
||||
}
|
||||
if state.wait_started {
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
let mut status: libc::c_int = 0;
|
||||
let result = unsafe { libc::waitpid(self.pid, &mut status, libc::WNOHANG) };
|
||||
|
||||
if result == -1 {
|
||||
Err(io::Error::last_os_error())
|
||||
} else if result == 0 {
|
||||
Ok(None)
|
||||
} else {
|
||||
let exit_status = ExitStatus::from_raw(status);
|
||||
self.status = Some(exit_status);
|
||||
Ok(Some(exit_status))
|
||||
match try_wait_for_pid(self.pid)? {
|
||||
Some(status) => {
|
||||
store_status(self.pid, &self.state, status);
|
||||
Ok(Some(status))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -283,31 +351,26 @@ impl Child {
|
|||
) -> impl std::future::Future<Output = io::Result<ExitStatus>> + Send + 'static {
|
||||
self.stdin.take();
|
||||
|
||||
let state = self.state.clone();
|
||||
let pid = self.pid;
|
||||
let cached_status = self.status;
|
||||
|
||||
async move {
|
||||
if let Some(status) = cached_status {
|
||||
return Ok(status);
|
||||
if let Err(error) = register_wait(pid, state.clone(), WaitPurpose::Status) {
|
||||
log::debug!("falling back to waitpid for child process {pid}: {error}");
|
||||
return smol::unblock(move || wait_for_pid_and_store_status(pid, state)).await;
|
||||
}
|
||||
|
||||
smol::unblock(move || {
|
||||
let mut status: libc::c_int = 0;
|
||||
let result = unsafe { libc::waitpid(pid, &mut status, 0) };
|
||||
if result == -1 {
|
||||
Err(io::Error::last_os_error())
|
||||
} else {
|
||||
Ok(ExitStatus::from_raw(status))
|
||||
}
|
||||
})
|
||||
.await
|
||||
if let Some(status) = cached_status(&state) {
|
||||
return Ok(status);
|
||||
}
|
||||
smol::unblock(move || wait_for_cached_status(state)).await
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn output(mut self) -> io::Result<Output> {
|
||||
use futures_lite::AsyncReadExt;
|
||||
|
||||
let status = self.status();
|
||||
self.stdin.take();
|
||||
|
||||
let stdout = self.stdout.take();
|
||||
let stdout_future = async move {
|
||||
|
|
@ -329,7 +392,7 @@ impl Child {
|
|||
|
||||
let (stdout_data, stderr_data) =
|
||||
futures_lite::future::try_zip(stdout_future, stderr_future).await?;
|
||||
let status = status.await?;
|
||||
let status = self.status().await?;
|
||||
|
||||
Ok(Output {
|
||||
status,
|
||||
|
|
@ -339,6 +402,617 @@ impl Child {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum WaitPurpose {
|
||||
Status,
|
||||
Reap,
|
||||
}
|
||||
|
||||
fn kill_pid(pid: libc::pid_t) -> io::Result<()> {
|
||||
let result = unsafe { libc::kill(pid, libc::SIGKILL) };
|
||||
if result == -1 {
|
||||
Err(io::Error::last_os_error())
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn lock_child_state(state: &SharedChildState) -> MutexGuard<'_, ChildState> {
|
||||
state.state.lock()
|
||||
}
|
||||
|
||||
fn cached_status(state: &SharedChildState) -> Option<ExitStatus> {
|
||||
lock_child_state(state).status
|
||||
}
|
||||
|
||||
fn store_status(pid: libc::pid_t, shared_state: &SharedChildState, status: ExitStatus) {
|
||||
{
|
||||
let mut state = lock_child_state(shared_state);
|
||||
state.status = Some(status);
|
||||
state.wait_error = None;
|
||||
state.wait_started = false;
|
||||
}
|
||||
shared_state.status_changed.notify_all();
|
||||
record_reaped_pid(pid, Some(status));
|
||||
}
|
||||
|
||||
fn store_wait_error(shared_state: &SharedChildState, error: WaitError) {
|
||||
{
|
||||
let mut state = lock_child_state(shared_state);
|
||||
state.wait_error = Some(error);
|
||||
state.wait_started = false;
|
||||
}
|
||||
shared_state.status_changed.notify_all();
|
||||
}
|
||||
|
||||
fn wait_for_cached_status(state: Arc<SharedChildState>) -> io::Result<ExitStatus> {
|
||||
let mut guard = lock_child_state(&state);
|
||||
loop {
|
||||
if let Some(status) = guard.status {
|
||||
return Ok(status);
|
||||
}
|
||||
if let Some(error) = &guard.wait_error {
|
||||
return Err(error.to_io_error());
|
||||
}
|
||||
state.status_changed.wait(&mut guard);
|
||||
}
|
||||
}
|
||||
|
||||
fn register_wait(
|
||||
pid: libc::pid_t,
|
||||
state: Arc<SharedChildState>,
|
||||
purpose: WaitPurpose,
|
||||
) -> io::Result<()> {
|
||||
{
|
||||
let state = lock_child_state(&state);
|
||||
if state.status.is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
if let Some(error) = &state.wait_error {
|
||||
if matches!(purpose, WaitPurpose::Status) {
|
||||
return Err(error.to_io_error());
|
||||
}
|
||||
}
|
||||
if state.wait_started {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
match try_wait_for_pid(pid) {
|
||||
Ok(Some(status)) => return finish_wait(pid, &state, Ok(status), purpose),
|
||||
Ok(None) => {}
|
||||
Err(error) => return finish_wait(pid, &state, Err(error), purpose),
|
||||
}
|
||||
|
||||
{
|
||||
let mut state = lock_child_state(&state);
|
||||
if state.status.is_some() || state.wait_started {
|
||||
return Ok(());
|
||||
}
|
||||
if let Some(error) = &state.wait_error {
|
||||
if matches!(purpose, WaitPurpose::Status) {
|
||||
return Err(error.to_io_error());
|
||||
}
|
||||
}
|
||||
if matches!(purpose, WaitPurpose::Reap) {
|
||||
state.wait_error = None;
|
||||
}
|
||||
state.wait_started = true;
|
||||
}
|
||||
|
||||
let reaper = match global_process_reaper() {
|
||||
Ok(reaper) => reaper,
|
||||
Err(error) if matches!(purpose, WaitPurpose::Status) => return Err(error),
|
||||
Err(error) => return finish_wait(pid, &state, Err(error), purpose),
|
||||
};
|
||||
|
||||
if matches!(purpose, WaitPurpose::Status) && reaper_forced_to_fail_for_test(pid) {
|
||||
return Err(io::Error::other("process reaper disabled for test"));
|
||||
}
|
||||
|
||||
match reaper.register_and_try_wait(pid, state.clone(), purpose) {
|
||||
Ok(Some(status)) => finish_wait(pid, &state, Ok(status), purpose),
|
||||
Ok(None) => Ok(()),
|
||||
Err(error) if matches!(purpose, WaitPurpose::Status) => Err(error),
|
||||
Err(error) => finish_wait(pid, &state, Err(error), purpose),
|
||||
}
|
||||
}
|
||||
|
||||
fn finish_wait(
|
||||
pid: libc::pid_t,
|
||||
state: &SharedChildState,
|
||||
result: io::Result<ExitStatus>,
|
||||
purpose: WaitPurpose,
|
||||
) -> io::Result<()> {
|
||||
match result {
|
||||
Ok(status) => {
|
||||
store_status(pid, state, status);
|
||||
Ok(())
|
||||
}
|
||||
Err(_) if cached_status(state).is_some() => Ok(()),
|
||||
Err(error) if matches!(purpose, WaitPurpose::Reap) && is_no_child_error(&error) => {
|
||||
{
|
||||
let mut state = lock_child_state(state);
|
||||
state.wait_started = false;
|
||||
}
|
||||
state.status_changed.notify_all();
|
||||
record_reaped_pid(pid, None);
|
||||
Ok(())
|
||||
}
|
||||
Err(error) => {
|
||||
let wait_error = WaitError::from_io(error);
|
||||
if matches!(purpose, WaitPurpose::Reap) {
|
||||
log::debug!("failed to reap child process {pid}: {}", wait_error.message);
|
||||
}
|
||||
store_wait_error(state, wait_error.clone());
|
||||
record_reaped_pid(pid, None);
|
||||
if matches!(purpose, WaitPurpose::Status) {
|
||||
Err(wait_error.to_io_error())
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ReaperEntry {
|
||||
state: Arc<SharedChildState>,
|
||||
purpose: WaitPurpose,
|
||||
needs_poll: bool,
|
||||
}
|
||||
|
||||
struct ProcessReaper {
|
||||
kqueue: libc::c_int,
|
||||
children: Mutex<BTreeMap<libc::pid_t, ReaperEntry>>,
|
||||
}
|
||||
|
||||
enum ProcessReaperEvent {
|
||||
ProcessExited(libc::pid_t),
|
||||
ScanChildren,
|
||||
}
|
||||
|
||||
const REAPER_WAKE_IDENT: libc::uintptr_t = usize::MAX as libc::uintptr_t;
|
||||
const REAPER_POLL_INTERVAL: libc::timespec = libc::timespec {
|
||||
tv_sec: 0,
|
||||
tv_nsec: 50_000_000,
|
||||
};
|
||||
|
||||
impl ProcessReaper {
|
||||
fn new() -> io::Result<Arc<Self>> {
|
||||
let kqueue = unsafe { libc::kqueue() };
|
||||
if kqueue == -1 {
|
||||
return Err(io::Error::last_os_error());
|
||||
}
|
||||
|
||||
if let Err(error) = add_reaper_wake_event(kqueue) {
|
||||
let close_result = unsafe { libc::close(kqueue) };
|
||||
if close_result == -1 {
|
||||
log::debug!(
|
||||
"failed to close process reaper kqueue after wake registration failed: {}",
|
||||
io::Error::last_os_error()
|
||||
);
|
||||
}
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
let reaper = Arc::new(Self {
|
||||
kqueue,
|
||||
children: Mutex::new(BTreeMap::new()),
|
||||
});
|
||||
|
||||
let reaper_thread = reaper.clone();
|
||||
if let Err(error) = thread::Builder::new()
|
||||
.name("zed-process-reaper".to_string())
|
||||
.spawn(move || reaper_thread.run())
|
||||
{
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
Ok(reaper)
|
||||
}
|
||||
|
||||
fn register_and_try_wait(
|
||||
&self,
|
||||
pid: libc::pid_t,
|
||||
state: Arc<SharedChildState>,
|
||||
purpose: WaitPurpose,
|
||||
) -> io::Result<Option<ExitStatus>> {
|
||||
self.register_entry_and_try_wait(
|
||||
pid,
|
||||
ReaperEntry {
|
||||
state,
|
||||
purpose,
|
||||
needs_poll: false,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
fn register_entry_and_try_wait(
|
||||
&self,
|
||||
pid: libc::pid_t,
|
||||
entry: ReaperEntry,
|
||||
) -> io::Result<Option<ExitStatus>> {
|
||||
let mut children = lock_reaper_children(&self.children);
|
||||
if let Some(existing) = children.get_mut(&pid) {
|
||||
if matches!(entry.purpose, WaitPurpose::Status) {
|
||||
existing.purpose = WaitPurpose::Status;
|
||||
}
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
children.insert(pid, entry);
|
||||
if let Err(error) = add_process_exit_event(self.kqueue, pid) {
|
||||
let wait_result = try_wait_for_pid(pid);
|
||||
return match wait_result {
|
||||
Ok(Some(status)) => {
|
||||
children.remove(&pid);
|
||||
Ok(Some(status))
|
||||
}
|
||||
Ok(None) if is_no_such_process_error(&error) => {
|
||||
if let Some(entry) = children.get_mut(&pid) {
|
||||
entry.needs_poll = true;
|
||||
}
|
||||
if let Err(error) = trigger_reaper_wake_event(self.kqueue) {
|
||||
children.remove(&pid);
|
||||
return Err(error);
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
Ok(None) => {
|
||||
children.remove(&pid);
|
||||
Err(error)
|
||||
}
|
||||
Err(wait_error) => {
|
||||
children.remove(&pid);
|
||||
Err(wait_error)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
if let Some(entry) = children.get_mut(&pid) {
|
||||
entry.needs_poll = false;
|
||||
}
|
||||
|
||||
let result = try_wait_for_pid(pid);
|
||||
if !matches!(result, Ok(None)) {
|
||||
children.remove(&pid);
|
||||
self.delete_process_exit_event(pid);
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
fn run(&self) {
|
||||
loop {
|
||||
let poll_children = self.has_polling_children();
|
||||
match wait_for_process_exit_event(self.kqueue, poll_children) {
|
||||
Ok(ProcessReaperEvent::ProcessExited(pid)) => self.reap_pid(pid),
|
||||
Ok(ProcessReaperEvent::ScanChildren) => self.reap_polling_children(),
|
||||
Err(error) => {
|
||||
log::debug!("failed to wait for child process exit event: {error}");
|
||||
thread::sleep(std::time::Duration::from_millis(100));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn has_polling_children(&self) -> bool {
|
||||
lock_reaper_children(&self.children)
|
||||
.values()
|
||||
.any(|entry| entry.needs_poll)
|
||||
}
|
||||
|
||||
fn reap_polling_children(&self) {
|
||||
let pids = lock_reaper_children(&self.children)
|
||||
.iter()
|
||||
.filter_map(|(pid, entry)| entry.needs_poll.then_some(*pid))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for pid in pids {
|
||||
self.reap_pid(pid);
|
||||
}
|
||||
}
|
||||
|
||||
fn reap_pid(&self, pid: libc::pid_t) {
|
||||
let entry = match lock_reaper_children(&self.children).remove(&pid) {
|
||||
Some(entry) => entry,
|
||||
None => return,
|
||||
};
|
||||
|
||||
match try_wait_for_pid(pid) {
|
||||
Ok(Some(status)) => {
|
||||
store_status(pid, &entry.state, status);
|
||||
}
|
||||
Ok(None) => {
|
||||
let state = entry.state.clone();
|
||||
let purpose = entry.purpose;
|
||||
match self.register_entry_and_try_wait(pid, entry) {
|
||||
Ok(Some(status)) => {
|
||||
store_status(pid, &state, status);
|
||||
}
|
||||
Ok(None) => {}
|
||||
Err(error) => {
|
||||
if let Err(error) = finish_wait(pid, &state, Err(error), purpose) {
|
||||
if !matches!(purpose, WaitPurpose::Reap) {
|
||||
log::debug!(
|
||||
"failed to finish child process wait for {pid}: {error}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
if let Err(error) = finish_wait(pid, &entry.state, Err(error), entry.purpose) {
|
||||
if !matches!(entry.purpose, WaitPurpose::Reap) {
|
||||
log::debug!("failed to finish child process wait for {pid}: {error}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn delete_process_exit_event(&self, pid: libc::pid_t) {
|
||||
if let Err(error) = delete_process_exit_event(self.kqueue, pid) {
|
||||
if !is_missing_kqueue_event_error(&error) {
|
||||
log::debug!("failed to unregister child process {pid} from reaper: {error}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ProcessReaper {
|
||||
fn drop(&mut self) {
|
||||
let result = unsafe { libc::close(self.kqueue) };
|
||||
if result == -1 {
|
||||
log::debug!(
|
||||
"failed to close process reaper kqueue: {}",
|
||||
io::Error::last_os_error()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn global_process_reaper() -> io::Result<Arc<ProcessReaper>> {
|
||||
static REAPER: OnceLock<Result<Arc<ProcessReaper>, WaitError>> = OnceLock::new();
|
||||
|
||||
match REAPER.get_or_init(|| ProcessReaper::new().map_err(WaitError::from_io)) {
|
||||
Ok(reaper) => Ok(reaper.clone()),
|
||||
Err(error) => Err(error.to_io_error()),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
static FORCE_REAPER_FAILURE_PIDS: Mutex<Vec<libc::pid_t>> = Mutex::new(Vec::new());
|
||||
|
||||
#[cfg(test)]
|
||||
fn reaper_forced_to_fail_for_test(pid: libc::pid_t) -> bool {
|
||||
FORCE_REAPER_FAILURE_PIDS.lock().contains(&pid)
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
fn reaper_forced_to_fail_for_test(_pid: libc::pid_t) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn lock_reaper_children(
|
||||
children: &Mutex<BTreeMap<libc::pid_t, ReaperEntry>>,
|
||||
) -> MutexGuard<'_, BTreeMap<libc::pid_t, ReaperEntry>> {
|
||||
children.lock()
|
||||
}
|
||||
|
||||
fn add_reaper_wake_event(kqueue: libc::c_int) -> io::Result<()> {
|
||||
change_reaper_wake_event(kqueue, (libc::EV_ADD | libc::EV_CLEAR) as libc::c_ushort, 0)
|
||||
}
|
||||
|
||||
fn trigger_reaper_wake_event(kqueue: libc::c_int) -> io::Result<()> {
|
||||
change_reaper_wake_event(kqueue, 0, libc::NOTE_TRIGGER)
|
||||
}
|
||||
|
||||
fn change_reaper_wake_event(
|
||||
kqueue: libc::c_int,
|
||||
flags: libc::c_ushort,
|
||||
fflags: libc::c_uint,
|
||||
) -> io::Result<()> {
|
||||
let event = libc::kevent {
|
||||
ident: REAPER_WAKE_IDENT,
|
||||
filter: libc::EVFILT_USER,
|
||||
flags,
|
||||
fflags,
|
||||
data: 0,
|
||||
udata: ptr::null_mut(),
|
||||
};
|
||||
|
||||
loop {
|
||||
let result = unsafe { libc::kevent(kqueue, &event, 1, ptr::null_mut(), 0, ptr::null()) };
|
||||
if result == -1 {
|
||||
let error = io::Error::last_os_error();
|
||||
if error.kind() == io::ErrorKind::Interrupted {
|
||||
continue;
|
||||
}
|
||||
return Err(error);
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
fn add_process_exit_event(kqueue: libc::c_int, pid: libc::pid_t) -> io::Result<()> {
|
||||
change_process_exit_event(
|
||||
kqueue,
|
||||
pid,
|
||||
(libc::EV_ADD | libc::EV_ENABLE | libc::EV_ONESHOT) as libc::c_ushort,
|
||||
libc::NOTE_EXIT,
|
||||
)
|
||||
}
|
||||
|
||||
fn delete_process_exit_event(kqueue: libc::c_int, pid: libc::pid_t) -> io::Result<()> {
|
||||
change_process_exit_event(kqueue, pid, libc::EV_DELETE as libc::c_ushort, 0)
|
||||
}
|
||||
|
||||
fn change_process_exit_event(
|
||||
kqueue: libc::c_int,
|
||||
pid: libc::pid_t,
|
||||
flags: libc::c_ushort,
|
||||
fflags: libc::c_uint,
|
||||
) -> io::Result<()> {
|
||||
let event = libc::kevent {
|
||||
ident: pid as libc::uintptr_t,
|
||||
filter: libc::EVFILT_PROC,
|
||||
flags,
|
||||
fflags,
|
||||
data: 0,
|
||||
udata: ptr::null_mut(),
|
||||
};
|
||||
|
||||
loop {
|
||||
let result = unsafe { libc::kevent(kqueue, &event, 1, ptr::null_mut(), 0, ptr::null()) };
|
||||
if result == -1 {
|
||||
let error = io::Error::last_os_error();
|
||||
if error.kind() == io::ErrorKind::Interrupted {
|
||||
continue;
|
||||
}
|
||||
return Err(error);
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
fn wait_for_process_exit_event(
|
||||
kqueue: libc::c_int,
|
||||
poll_children: bool,
|
||||
) -> io::Result<ProcessReaperEvent> {
|
||||
loop {
|
||||
let mut event = libc::kevent {
|
||||
ident: 0,
|
||||
filter: 0,
|
||||
flags: 0,
|
||||
fflags: 0,
|
||||
data: 0,
|
||||
udata: ptr::null_mut(),
|
||||
};
|
||||
let timeout = if poll_children {
|
||||
&REAPER_POLL_INTERVAL
|
||||
} else {
|
||||
ptr::null()
|
||||
};
|
||||
let result = unsafe { libc::kevent(kqueue, ptr::null(), 0, &mut event, 1, timeout) };
|
||||
|
||||
if result == -1 {
|
||||
let error = io::Error::last_os_error();
|
||||
if error.kind() == io::ErrorKind::Interrupted {
|
||||
continue;
|
||||
}
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
if result == 0 {
|
||||
return Ok(ProcessReaperEvent::ScanChildren);
|
||||
}
|
||||
|
||||
if event.flags & libc::EV_ERROR as libc::c_ushort != 0 {
|
||||
if event.data == 0 {
|
||||
continue;
|
||||
}
|
||||
return Err(io::Error::from_raw_os_error(event.data as i32));
|
||||
}
|
||||
|
||||
if event.filter == libc::EVFILT_USER && event.ident == REAPER_WAKE_IDENT {
|
||||
return Ok(ProcessReaperEvent::ScanChildren);
|
||||
}
|
||||
|
||||
if event.filter == libc::EVFILT_PROC && event.fflags & libc::NOTE_EXIT != 0 {
|
||||
return Ok(ProcessReaperEvent::ProcessExited(
|
||||
event.ident as libc::pid_t,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn wait_for_pid_and_store_status(
|
||||
pid: libc::pid_t,
|
||||
state: Arc<SharedChildState>,
|
||||
) -> io::Result<ExitStatus> {
|
||||
if let Some(status) = cached_status(&state) {
|
||||
return Ok(status);
|
||||
}
|
||||
|
||||
match wait_for_pid(pid) {
|
||||
Ok(status) => {
|
||||
store_status(pid, &state, status);
|
||||
Ok(status)
|
||||
}
|
||||
Err(error) => {
|
||||
if let Some(status) = cached_status(&state) {
|
||||
return Ok(status);
|
||||
}
|
||||
|
||||
let wait_error = WaitError::from_io(error);
|
||||
store_wait_error(&state, wait_error.clone());
|
||||
record_reaped_pid(pid, None);
|
||||
Err(wait_error.to_io_error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn wait_for_pid(pid: libc::pid_t) -> io::Result<ExitStatus> {
|
||||
let mut status: libc::c_int = 0;
|
||||
loop {
|
||||
let result = unsafe { libc::waitpid(pid, &mut status, 0) };
|
||||
if result == -1 {
|
||||
let error = io::Error::last_os_error();
|
||||
if error.kind() == io::ErrorKind::Interrupted {
|
||||
continue;
|
||||
}
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
return Ok(ExitStatus::from_raw(status));
|
||||
}
|
||||
}
|
||||
|
||||
fn try_wait_for_pid(pid: libc::pid_t) -> io::Result<Option<ExitStatus>> {
|
||||
let mut status: libc::c_int = 0;
|
||||
loop {
|
||||
let result = unsafe { libc::waitpid(pid, &mut status, libc::WNOHANG) };
|
||||
|
||||
if result == -1 {
|
||||
let error = io::Error::last_os_error();
|
||||
if error.kind() == io::ErrorKind::Interrupted {
|
||||
continue;
|
||||
}
|
||||
return Err(error);
|
||||
} else if result == 0 {
|
||||
return Ok(None);
|
||||
} else {
|
||||
return Ok(Some(ExitStatus::from_raw(status)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn is_no_child_error(error: &io::Error) -> bool {
|
||||
error.raw_os_error() == Some(libc::ECHILD)
|
||||
}
|
||||
|
||||
fn is_no_such_process_error(error: &io::Error) -> bool {
|
||||
error.raw_os_error() == Some(libc::ESRCH)
|
||||
}
|
||||
|
||||
fn is_missing_kqueue_event_error(error: &io::Error) -> bool {
|
||||
matches!(error.raw_os_error(), Some(libc::ENOENT | libc::ESRCH))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
static REAPED_PIDS: Mutex<Vec<(libc::pid_t, Option<ExitStatus>)>> = Mutex::new(Vec::new());
|
||||
|
||||
#[cfg(test)]
|
||||
fn record_reaped_pid(pid: libc::pid_t, status: Option<ExitStatus>) {
|
||||
REAPED_PIDS.lock().push((pid, status));
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
fn record_reaped_pid(_pid: libc::pid_t, _status: Option<ExitStatus>) {}
|
||||
|
||||
fn spawn_posix_spawn(
|
||||
program: &OsStr,
|
||||
args: &[OsString],
|
||||
|
|
@ -517,7 +1191,7 @@ fn spawn_posix_spawn(
|
|||
stdout: stdout_read.map(|fd| Unblock::new(std::fs::File::from_raw_fd(fd))),
|
||||
stderr: stderr_read.map(|fd| Unblock::new(std::fs::File::from_raw_fd(fd))),
|
||||
kill_on_drop,
|
||||
status: None,
|
||||
state: SharedChildState::new(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -560,6 +1234,63 @@ fn invalid_input_error() -> io::Error {
|
|||
mod tests {
|
||||
use super::*;
|
||||
use futures_lite::AsyncWriteExt;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
fn reap_checkpoint() -> usize {
|
||||
REAPED_PIDS.lock().len()
|
||||
}
|
||||
|
||||
fn recorded_reap(pid: libc::pid_t, checkpoint: usize) -> Option<Option<ExitStatus>> {
|
||||
REAPED_PIDS
|
||||
.lock()
|
||||
.iter()
|
||||
.skip(checkpoint)
|
||||
.rev()
|
||||
.find_map(|(reaped_pid, status)| (*reaped_pid == pid).then_some(*status))
|
||||
}
|
||||
|
||||
fn wait_for_recorded_reap(pid: libc::pid_t, checkpoint: usize) -> Option<ExitStatus> {
|
||||
let deadline = Instant::now() + Duration::from_secs(5);
|
||||
loop {
|
||||
if let Some(status) = recorded_reap(pid, checkpoint) {
|
||||
return status;
|
||||
}
|
||||
assert!(
|
||||
Instant::now() < deadline,
|
||||
"timed out waiting for pid {pid} to be reaped"
|
||||
);
|
||||
std::thread::sleep(Duration::from_millis(10));
|
||||
}
|
||||
}
|
||||
|
||||
fn assert_no_child_to_wait_for(pid: libc::pid_t) {
|
||||
let mut status: libc::c_int = 0;
|
||||
let result = unsafe { libc::waitpid(pid, &mut status, libc::WNOHANG) };
|
||||
assert_eq!(result, -1);
|
||||
assert_eq!(
|
||||
io::Error::last_os_error().raw_os_error(),
|
||||
Some(libc::ECHILD)
|
||||
);
|
||||
}
|
||||
|
||||
struct ForceReaperFailureForTest {
|
||||
pid: libc::pid_t,
|
||||
}
|
||||
|
||||
impl ForceReaperFailureForTest {
|
||||
fn new(pid: libc::pid_t) -> Self {
|
||||
FORCE_REAPER_FAILURE_PIDS.lock().push(pid);
|
||||
Self { pid }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ForceReaperFailureForTest {
|
||||
fn drop(&mut self) {
|
||||
FORCE_REAPER_FAILURE_PIDS
|
||||
.lock()
|
||||
.retain(|pid| *pid != self.pid);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_spawn_echo() {
|
||||
|
|
@ -676,6 +1407,105 @@ mod tests {
|
|||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_drop_reaps_child() {
|
||||
let checkpoint = reap_checkpoint();
|
||||
let pid = {
|
||||
let child = Command::new("/bin/sh")
|
||||
.args(["-c", "exit 7"])
|
||||
.spawn()
|
||||
.expect("failed to spawn");
|
||||
child.id() as libc::pid_t
|
||||
};
|
||||
|
||||
let status = wait_for_recorded_reap(pid, checkpoint).expect("missing exit status");
|
||||
assert_eq!(status.code(), Some(7));
|
||||
assert_no_child_to_wait_for(pid);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_kill_on_drop_kills_and_reaps_child() {
|
||||
let checkpoint = reap_checkpoint();
|
||||
let pid = {
|
||||
let mut command = Command::new("/bin/sh");
|
||||
let child = command
|
||||
.args(["-c", "sleep 10"])
|
||||
.kill_on_drop(true)
|
||||
.spawn()
|
||||
.expect("failed to spawn");
|
||||
child.id() as libc::pid_t
|
||||
};
|
||||
|
||||
let status = wait_for_recorded_reap(pid, checkpoint).expect("missing exit status");
|
||||
assert_eq!(status.signal(), Some(libc::SIGKILL));
|
||||
assert_no_child_to_wait_for(pid);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_drop_reaper_does_not_block_on_running_child() {
|
||||
let checkpoint = reap_checkpoint();
|
||||
let long_running_pid = {
|
||||
let child = Command::new("/bin/sh")
|
||||
.args(["-c", "sleep 1"])
|
||||
.spawn()
|
||||
.expect("failed to spawn");
|
||||
child.id() as libc::pid_t
|
||||
};
|
||||
let short_lived_pid = {
|
||||
let child = Command::new("/bin/sh")
|
||||
.args(["-c", "exit 9"])
|
||||
.spawn()
|
||||
.expect("failed to spawn");
|
||||
child.id() as libc::pid_t
|
||||
};
|
||||
|
||||
let status =
|
||||
wait_for_recorded_reap(short_lived_pid, checkpoint).expect("missing exit status");
|
||||
assert_eq!(status.code(), Some(9));
|
||||
assert_no_child_to_wait_for(short_lived_pid);
|
||||
|
||||
let status =
|
||||
wait_for_recorded_reap(long_running_pid, checkpoint).expect("missing exit status");
|
||||
assert!(status.success());
|
||||
assert_no_child_to_wait_for(long_running_pid);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_status_is_cached_before_kill() {
|
||||
smol::block_on(async {
|
||||
let mut child = Command::new("/bin/sh")
|
||||
.args(["-c", "exit 3"])
|
||||
.spawn()
|
||||
.expect("failed to spawn");
|
||||
|
||||
let status = child.status().await.expect("failed to wait for status");
|
||||
assert_eq!(status.code(), Some(3));
|
||||
child.kill().expect("kill should be a no-op after status");
|
||||
|
||||
let status = child
|
||||
.try_status()
|
||||
.expect("failed to read cached status")
|
||||
.expect("missing cached status");
|
||||
assert_eq!(status.code(), Some(3));
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_status_falls_back_to_waitpid_when_reaper_fails() {
|
||||
smol::block_on(async {
|
||||
let mut child = Command::new("/bin/sh")
|
||||
.args(["-c", "exit 11"])
|
||||
.spawn()
|
||||
.expect("failed to spawn");
|
||||
let pid = child.id() as libc::pid_t;
|
||||
let _force_reaper_failure = ForceReaperFailureForTest::new(pid);
|
||||
|
||||
let status = child.status().await.expect("failed to wait for status");
|
||||
assert_eq!(status.code(), Some(11));
|
||||
assert_no_child_to_wait_for(pid);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_env_remove_removes_set_env() {
|
||||
smol::block_on(async {
|
||||
|
|
|
|||
Loading…
Reference in a new issue