This commit is contained in:
Renan Ribeiro 2025-05-07 01:29:42 +00:00 committed by GitHub
commit c97a4414a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 431 additions and 8 deletions

View File

@ -454,6 +454,7 @@ pub fn add_shell_command_context(mut engine_state: EngineState) -> EngineState {
JobKill,
JobId,
JobTag,
JobWait,
Job,
};

View File

@ -8,7 +8,7 @@ use std::{
use nu_engine::{command_prelude::*, ClosureEvalOnce};
use nu_protocol::{
engine::{Closure, CurrentJob, Job, Mailbox, Redirection, ThreadJob},
engine::{completion_signal, Closure, CurrentJob, Job, Mailbox, Redirection, ThreadJob},
report_shell_error, OutDest, Signals,
};
@ -54,7 +54,9 @@ impl Command for JobSpawn {
) -> Result<PipelineData, ShellError> {
let head = call.head;
let closure: Closure = call.req(engine_state, stack, 0)?;
let spanned_closure: Spanned<Closure> = call.req(engine_state, stack, 0)?;
let closure: Closure = spanned_closure.item;
let tag: Option<String> = call.get_flag(engine_state, stack, "tag")?;
let job_stack = stack.clone();
@ -74,10 +76,11 @@ impl Command for JobSpawn {
let jobs = job_state.jobs.clone();
let mut jobs = jobs.lock().expect("jobs lock is poisoned!");
let (complete, wait) = completion_signal();
let (send, recv) = mpsc::channel();
let id = {
let thread_job = ThreadJob::new(job_signals, tag, send);
let thread_job = ThreadJob::new(job_signals, tag, send, wait);
let id = jobs.add_job(Job::Thread(thread_job.clone()));
@ -98,15 +101,19 @@ impl Command for JobSpawn {
Some(Redirection::Pipe(OutDest::Null)),
Some(Redirection::Pipe(OutDest::Null)),
);
ClosureEvalOnce::new_preserve_out_dest(&job_state, &stack, closure)
let result_value = ClosureEvalOnce::new(&job_state, &stack, closure)
.run_with_input(Value::nothing(head).into_pipeline_data())
.and_then(|data| data.drain())
.and_then(|data| data.into_value(spanned_closure.span))
.unwrap_or_else(|err| {
if !job_state.signals().interrupted() {
report_shell_error(&job_state, &err);
}
Value::error(err, head)
});
complete.complete(result_value);
{
let mut jobs = job_state.jobs.lock().expect("jobs lock is poisoned!");

View File

@ -0,0 +1,106 @@
use nu_engine::command_prelude::*;
use nu_protocol::{engine::Job, JobId};
use std::time::Duration;
#[derive(Clone)]
pub struct JobWait;
impl Command for JobWait {
fn name(&self) -> &str {
"job wait"
}
fn description(&self) -> &str {
r#"Wait for a job to complete."#
}
fn extra_description(&self) -> &str {
r#"Given the id of a running job currently in the job table, this command
waits for it to complete and returns the value returned
by the closure passed down to `job spawn` to create the given job.
Note that this command fails if the provided job id is currently not in the job table
(as seen by `job list`), so it is not possible to wait for jobs that have already finished.
"#
}
fn signature(&self) -> nu_protocol::Signature {
Signature::build("job wait")
.category(Category::Experimental)
.required("id", SyntaxShape::Int, "The id of the running to wait for.")
.input_output_types(vec![(Type::Nothing, Type::Nothing)])
}
fn search_terms(&self) -> Vec<&str> {
vec!["join"]
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
_input: PipelineData,
) -> Result<PipelineData, ShellError> {
let head = call.head;
let id_arg: Spanned<i64> = call.req(engine_state, stack, 0)?;
if id_arg.item < 0 {
return Err(ShellError::NeedsPositiveValue { span: id_arg.span });
}
let id: JobId = JobId::new(id_arg.item as usize);
let mut jobs = engine_state.jobs.lock().expect("jobs lock is poisoned!");
match jobs.lookup_mut(id) {
None => Err(ShellError::JobNotFound {
id: id.get(),
span: head,
}),
Some(Job::Frozen { .. }) => Err(ShellError::JobIsFrozen {
id: id.get() as usize,
span: head,
}),
Some(Job::Thread(job)) => {
let waiter = job.on_termination().clone();
// .wait() blocks so we drop our mutex guard
drop(jobs);
let value = wait_with_interrupt(
|time| waiter.wait_timeout(time),
|| engine_state.signals().check(head),
Duration::from_millis(100),
)?;
Ok(value.clone().with_span(head).into_pipeline_data())
}
}
}
fn examples(&self) -> Vec<Example> {
vec![Example {
example: "let id = job spawn { sleep 5sec; 'hi there' }; job wait $id",
description: "Wait for a job to complete",
result: Some(Value::test_string("hi there")),
}]
}
}
pub fn wait_with_interrupt<R, E>(
mut wait: impl FnMut(Duration) -> Option<R>,
mut interrupted: impl FnMut() -> Result<(), E>,
check_interval: Duration,
) -> Result<R, E> {
loop {
interrupted()?;
if let Some(result) = wait(check_interval) {
return Ok(result);
}
}
}

View File

@ -5,6 +5,7 @@ mod job_kill;
mod job_list;
mod job_spawn;
mod job_tag;
mod job_wait;
#[cfg(all(unix, feature = "os"))]
mod job_unfreeze;
@ -23,6 +24,7 @@ pub use job_kill::JobKill;
pub use job_list::JobList;
pub use job_spawn::JobSpawn;
pub use job_tag::JobTag;
pub use job_wait::JobWait;
#[cfg(not(target_family = "wasm"))]
pub use job_flush::JobFlush;

View File

@ -1,4 +1,4 @@
use nu_test_support::nu;
use nu_test_support::{nu, playground::Playground};
#[test]
fn job_send_root_job_works() {
@ -435,3 +435,40 @@ fn job_tag_modifies_tagged_job_tag() {
assert_eq!(actual.out, "beep");
assert_eq!(actual.err, "");
}
#[test]
fn job_wait_waits_for_completion() {
let actual = nu!(r#"
let id = job spawn { sleep 1sec; 'beep' }
job wait $id
"#);
assert_eq!(actual.err, "");
assert_eq!(actual.out, "beep");
}
#[test]
fn multiple_jobs_can_wait_for_a_single_one() {
Playground::setup("job_wait_test_1", |dirs, sandbox| {
sandbox.with_files(&[]);
let actual = nu!(
cwd: dirs.root(),
r#"
let id = job spawn { sleep 1sec; 'beep' }
let a = job spawn { job wait $id | save a.txt }
let b = job spawn { job wait $id | save b.txt }
let c = job spawn { job wait $id | save c.txt }
sleep 1.1sec
[(open a.txt), (open b.txt), (open c.txt)] | to nuon
"#);
assert_eq!(actual.err, "");
assert_eq!(actual.out, "[beep, beep, beep]");
})
}

View File

@ -11,7 +11,7 @@ use std::time::{Duration, Instant};
use nu_system::{kill_by_pid, UnfreezeHandle};
use crate::{PipelineData, Signals};
use crate::{PipelineData, Signals, Value};
use crate::JobId;
@ -145,16 +145,23 @@ pub struct ThreadJob {
signals: Signals,
pids: Arc<Mutex<HashSet<u32>>>,
tag: Option<String>,
on_termination: Waiter<Value>,
pub sender: Sender<Mail>,
}
impl ThreadJob {
pub fn new(signals: Signals, tag: Option<String>, sender: Sender<Mail>) -> Self {
pub fn new(
signals: Signals,
tag: Option<String>,
sender: Sender<Mail>,
on_termination: Waiter<Value>,
) -> Self {
ThreadJob {
signals,
pids: Arc::new(Mutex::new(HashSet::default())),
sender,
tag,
on_termination,
}
}
@ -202,6 +209,10 @@ impl ThreadJob {
pids.remove(&pid);
}
pub fn on_termination(&self) -> &Waiter<Value> {
&self.on_termination
}
}
impl Job {
@ -247,6 +258,148 @@ impl FrozenJob {
}
}
use std::sync::OnceLock;
/// A synchronization primitive that allows multiple threads to wait for a single event to be completed.
///
/// A Waiter/Completer pair is similar to a Receiver/Sender pair from std::sync::mpsc, with a few important differences:
/// - Only one value can only be sent/completed, subsequent completions are ignored
/// - Multiple threads can wait for the completion of an event (`Waiter` is `Clone` unlike `Receiver`)
///
/// This type differs from `OnceLock` only in a few regards:
/// - It is split into `Waiter` and `Completer` halves
/// - It allows users to `wait` on the completion event with a timeout
///
/// Threads that call the [`wait`] method of the `Waiter` block until the [`complete`] method of a matching `Completer` is called.
/// Once [`complete`] is called, all currently waiting threads will be woken up and will return from their `wait` calls.
/// Subsequent calls to [`wait`] will not block and will return immediately.
///
pub fn completion_signal<T>() -> (Completer<T>, Waiter<T>) {
let inner = Arc::new(InnerWaitCompleteSignal::new());
(
Completer {
inner: inner.clone(),
},
Waiter { inner },
)
}
/// Waiter and Completer are effectively just `Arc` wrappers around this type.
struct InnerWaitCompleteSignal<T> {
// One may ask: "Why the mutex and the convar"?
// It turns out OnceLock doesn't have a `wait_timeout` method, so
// we use the one from the condvar.
//
// We once again, assume acquire-release semamntics for Rust mutexes
mutex: std::sync::Mutex<()>,
var: std::sync::Condvar,
value: std::sync::OnceLock<T>,
}
impl<T> InnerWaitCompleteSignal<T> {
pub fn new() -> Self {
InnerWaitCompleteSignal {
mutex: std::sync::Mutex::new(()),
value: OnceLock::new(),
var: std::sync::Condvar::new(),
}
}
}
#[derive(Clone)]
pub struct Waiter<T> {
inner: Arc<InnerWaitCompleteSignal<T>>,
}
pub struct Completer<T> {
inner: Arc<InnerWaitCompleteSignal<T>>,
}
impl<T> Waiter<T> {
/// Blocks the current thread until a completion signal is sent.
///
/// If the signal has already been emitted, this method returns immediately.
///
pub fn wait(&self) -> &T {
let inner: &InnerWaitCompleteSignal<T> = self.inner.as_ref();
let mut guard = inner.mutex.lock().expect("mutex is poisoned!");
loop {
match inner.value.get() {
None => match inner.var.wait(guard) {
Ok(it) => guard = it,
Err(_) => panic!("mutex is poisoned!"),
},
Some(value) => return value,
}
}
}
pub fn wait_timeout(&self, duration: std::time::Duration) -> Option<&T> {
let inner: &InnerWaitCompleteSignal<T> = self.inner.as_ref();
let guard = inner.mutex.lock().expect("mutex is poisoned!");
match inner
.var
.wait_timeout_while(guard, duration, |_| inner.value.get().is_none())
{
Ok((_guard, result)) => {
if result.timed_out() {
None
} else {
// SAFETY:
// This should never fail, since we just ran a `wait_timeout_while`
// that should run while the `inner.value` OnceLock is not defined.
// Therefore, it by this point in the code, either a timeout happened,
// or a call to the `.get()` method of the OnceLock returned `Some`thing.
// A OnceLock cannot be undefined once it is defined, so any subsequent call
// to `inner.value.get()` should return `Some`thing.
Some(inner.value.get().expect("OnceLock was not defined!"))
}
}
Err(_) => panic!("mutex is poisoned!"),
}
}
// TODO: add wait_timeout
/// Checks if this completion signal has been signaled.
///
/// This method returns `true` if the [`signal`] method has been called at least once,
/// and `false` otherwise. This method does not block the current thread.
///
pub fn is_completed(&self) -> bool {
self.try_get().is_some()
}
/// Returns the completed value, or None if none was sent.
pub fn try_get(&self) -> Option<&T> {
let _guard = self.inner.mutex.lock().expect("mutex is poisoned!");
self.inner.value.get()
}
}
impl<T> Completer<T> {
/// Signals all threads currently waiting on this completion signal.
///
/// This method sets wakes up all threads that are blocked in the [`wait`] method
/// of an attached `Waiter`. Subsequent calls to [`wait`] from any thread will return immediately.
/// This operation has no effect if this completion signal has already been completed.
pub fn complete(&self, value: T) {
let inner: &InnerWaitCompleteSignal<T> = self.inner.as_ref();
let mut _guard = inner.mutex.lock().expect("mutex is poisoned!");
let _ = inner.value.set(value);
inner.var.notify_all();
}
}
/// Stores the information about the background job currently being executed by this thread, if any
#[derive(Clone)]
pub struct CurrentJob {
@ -403,3 +556,120 @@ impl IgnoredMail {
Some(self.messages.remove(&id)?.1)
}
}
#[cfg(test)]
mod completion_signal_tests {
use std::{
sync::mpsc,
thread::{self, sleep},
time::Duration,
};
use crate::engine::completion_signal;
fn run_with_timeout<F>(duration: Duration, lambda: F)
where
F: FnOnce() + Send + 'static,
{
let (send, recv) = std::sync::mpsc::channel();
let send_ = send.clone();
thread::spawn(move || {
lambda();
let send = send_;
send.send(true).expect("send failed");
});
thread::spawn(move || {
thread::sleep(duration);
send.send(false).expect("send failed");
});
let ok = recv.recv().expect("recv failed!");
assert!(ok, "got timeout!");
}
#[test]
fn wait_returns_when_signaled_from_another_thread() {
run_with_timeout(Duration::from_secs(1), || {
let (complete, wait) = completion_signal();
let wait_ = wait.clone();
thread::spawn(move || {
sleep(Duration::from_millis(200));
assert!(!wait_.is_completed());
complete.complete(123);
});
let result = wait.wait();
assert!(wait.is_completed());
assert_eq!(*result, 123);
});
}
#[test]
fn wait_works_from_multiple_threads() {
run_with_timeout(Duration::from_secs(1), || {
let (complete, wait) = completion_signal();
let (send, recv) = mpsc::channel();
let thread_count = 4;
for _ in 0..thread_count {
let wait_ = wait.clone();
let send_ = send.clone();
thread::spawn(move || {
let value = wait_.wait();
send_.send(*value).expect("send failed");
});
}
complete.complete(321);
for _ in 0..thread_count {
let result = recv.recv().expect("recv failed");
assert_eq!(result, 321);
}
})
}
#[test]
fn was_signaled_returns_false_when_struct_is_initialized() {
let (_, wait) = completion_signal::<()>();
assert!(!wait.is_completed())
}
#[test]
fn was_signaled_returns_true_when_signal_is_called() {
let (complete, wait) = completion_signal();
complete.complete(());
assert!(wait.is_completed())
}
#[test]
fn wait_returns_when_own_thread_signals() {
run_with_timeout(Duration::from_secs(1), || {
let (complete, wait) = completion_signal();
complete.complete(());
wait.wait();
assert!(wait.is_completed())
})
}
}