job wait is now interruptible

This commit is contained in:
Renan Ribeiro 2025-04-24 09:41:47 -03:00
parent 86d1acdafe
commit 2590b22ae2
3 changed files with 56 additions and 12 deletions

View File

@ -1,5 +1,6 @@
use nu_engine::command_prelude::*; use nu_engine::command_prelude::*;
use nu_protocol::{engine::Job, JobId}; use nu_protocol::{engine::Job, JobId};
use std::time::Duration;
#[derive(Clone)] #[derive(Clone)]
pub struct JobWait; pub struct JobWait;
@ -10,14 +11,16 @@ impl Command for JobWait {
} }
fn description(&self) -> &str { fn description(&self) -> &str {
r#"Wait for a job to complete and return its result value. r#"Wait for a job to complete."#
}
Given the id of a running job currently in the job table, this command fn extra_description(&self) -> &str {
waits for it to complete and returns the value returned r#"Given the id of a running job currently in the job table, this command
by the closure passed down to `job spawn`. waits for it to complete and returns the value returned
by the closure passed down to `job spawn` to create the given job.
Note that this command fails if a job is currently not in the job table Note that this command fails if the provided job id is currently not in the job table
(as seen by `job list`), so it is not possible to wait for jobs that have already finished. (as seen by `job list`), so it is not possible to wait for jobs that have already finished.
"# "#
} }
@ -73,9 +76,13 @@ impl Command for JobWait {
// .wait() blocks so we drop our mutex guard // .wait() blocks so we drop our mutex guard
drop(jobs); drop(jobs);
let result = waiter.wait().clone().with_span(head); let value = wait_with_interrupt(
|time| waiter.wait_timeout(time),
|| engine_state.signals().check(head),
Duration::from_millis(100),
)?;
Ok(result.into_pipeline_data()) Ok(value.clone().with_span(head).into_pipeline_data())
} }
} }
} }
@ -88,3 +95,18 @@ impl Command for JobWait {
}] }]
} }
} }
pub fn wait_with_interrupt<R, E>(
mut wait: impl FnMut(Duration) -> Option<R>,
mut interrupted: impl FnMut() -> Result<(), E>,
check_interval: Duration,
) -> Result<R, E> {
loop {
interrupted()?;
match wait(check_interval) {
Some(result) => return Ok(result),
None => {} // do nothing, try again
}
}
}

View File

@ -1,6 +1,7 @@
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
sync::{Arc, Mutex}, sync::{Arc, Mutex, WaitTimeoutResult},
time::Instant,
}; };
use nu_system::{kill_by_pid, UnfreezeHandle}; use nu_system::{kill_by_pid, UnfreezeHandle};
@ -198,7 +199,7 @@ impl ThreadJob {
} }
pub fn on_termination(&self) -> &Waiter<Value> { pub fn on_termination(&self) -> &Waiter<Value> {
return &self.on_termination; &self.on_termination
} }
} }
@ -264,12 +265,12 @@ use std::sync::OnceLock;
pub fn completion_signal<T>() -> (Completer<T>, Waiter<T>) { pub fn completion_signal<T>() -> (Completer<T>, Waiter<T>) {
let inner = Arc::new(InnerWaitCompleteSignal::new()); let inner = Arc::new(InnerWaitCompleteSignal::new());
return ( (
Completer { Completer {
inner: inner.clone(), inner: inner.clone(),
}, },
Waiter { inner }, Waiter { inner },
); )
} }
/// Waiter and Completer are effectively just `Arc` wrappers around this type. /// Waiter and Completer are effectively just `Arc` wrappers around this type.
@ -324,6 +325,26 @@ impl<T> Waiter<T> {
} }
} }
pub fn wait_timeout(&self, duration: std::time::Duration) -> Option<&T> {
let inner: &InnerWaitCompleteSignal<T> = self.inner.as_ref();
let guard = inner.mutex.lock().expect("mutex is poisoned!");
match inner
.var
.wait_timeout_while(guard, duration, |_| inner.value.get().is_none())
{
Ok((_guard, result)) => {
if result.timed_out() {
return None;
} else {
return Some(inner.value.get().unwrap());
}
}
Err(_) => panic!("mutex is poisoned!"),
}
}
// TODO: add wait_timeout // TODO: add wait_timeout
/// Checks if this completion signal has been signaled. /// Checks if this completion signal has been signaled.

View File

@ -1,5 +1,6 @@
use std::io; use std::io;
use std::process::Command as CommandSys; use std::process::Command as CommandSys;
use std::time::{Duration, Instant};
/// Tries to forcefully kill a process by its PID /// Tries to forcefully kill a process by its PID
pub fn kill_by_pid(pid: i64) -> io::Result<()> { pub fn kill_by_pid(pid: i64) -> io::Result<()> {