diff --git a/crates/nu-command/src/default_context.rs b/crates/nu-command/src/default_context.rs index d803225c36..c190ca40db 100644 --- a/crates/nu-command/src/default_context.rs +++ b/crates/nu-command/src/default_context.rs @@ -453,6 +453,7 @@ pub fn add_shell_command_context(mut engine_state: EngineState) -> EngineState { JobList, JobKill, JobTag, + JobWait, Job, }; diff --git a/crates/nu-command/src/experimental/job_spawn.rs b/crates/nu-command/src/experimental/job_spawn.rs index 37203aafb9..b0061a40fa 100644 --- a/crates/nu-command/src/experimental/job_spawn.rs +++ b/crates/nu-command/src/experimental/job_spawn.rs @@ -8,7 +8,7 @@ use std::{ use nu_engine::{command_prelude::*, ClosureEvalOnce}; use nu_protocol::{ - engine::{Closure, Job, Redirection, ThreadJob}, + engine::{Closure, Job, Redirection, ThreadJob, WaitSignal}, report_shell_error, OutDest, Signals, }; @@ -54,7 +54,9 @@ impl Command for JobSpawn { ) -> Result { let head = call.head; - let closure: Closure = call.req(engine_state, stack, 0)?; + let spanned_closure: Spanned = call.req(engine_state, stack, 0)?; + + let closure: Closure = spanned_closure.item; let tag: Option = call.get_flag(engine_state, stack, "tag")?; @@ -75,8 +77,10 @@ impl Command for JobSpawn { let jobs = job_state.jobs.clone(); let mut jobs = jobs.lock().expect("jobs lock is poisoned!"); + let on_termination = Arc::new(WaitSignal::new()); + let id = { - let thread_job = ThreadJob::new(job_signals, tag); + let thread_job = ThreadJob::new(job_signals, tag, on_termination.clone()); job_state.current_thread_job = Some(thread_job.clone()); jobs.add_job(Job::Thread(thread_job)) }; @@ -89,15 +93,19 @@ impl Command for JobSpawn { Some(Redirection::Pipe(OutDest::Null)), Some(Redirection::Pipe(OutDest::Null)), ); - ClosureEvalOnce::new_preserve_out_dest(&job_state, &stack, closure) + let result_value = ClosureEvalOnce::new(&job_state, &stack, closure) .run_with_input(Value::nothing(head).into_pipeline_data()) - .and_then(|data| data.drain()) + .and_then(|data| data.into_value(spanned_closure.span)) .unwrap_or_else(|err| { if !job_state.signals().interrupted() { report_shell_error(&job_state, &err); } + + Value::error(err, head) }); + on_termination.signal(result_value); + { let mut jobs = job_state.jobs.lock().expect("jobs lock is poisoned!"); diff --git a/crates/nu-command/src/experimental/job_wait.rs b/crates/nu-command/src/experimental/job_wait.rs new file mode 100644 index 0000000000..50a2252ea9 --- /dev/null +++ b/crates/nu-command/src/experimental/job_wait.rs @@ -0,0 +1,82 @@ +use nu_engine::command_prelude::*; +use nu_protocol::{engine::Job, JobId}; + +#[derive(Clone)] +pub struct JobWait; + +impl Command for JobWait { + fn name(&self) -> &str { + "job wait" + } + + fn description(&self) -> &str { + "Wait for a job to complete and return its result value" + } + + fn signature(&self) -> nu_protocol::Signature { + Signature::build("job wait") + .category(Category::Experimental) + .required("id", SyntaxShape::Int, "The id of the running to wait for.") + .input_output_types(vec![(Type::Nothing, Type::Nothing)]) + } + + fn search_terms(&self) -> Vec<&str> { + vec!["join"] + } + + fn run( + &self, + engine_state: &EngineState, + stack: &mut Stack, + call: &Call, + _input: PipelineData, + ) -> Result { + let head = call.head; + + let id_arg: Spanned = call.req(engine_state, stack, 0)?; + + if id_arg.item < 0 { + return Err(ShellError::NeedsPositiveValue { span: id_arg.span }); + } + + let id: JobId = JobId::new(id_arg.item as usize); + + let mut jobs = engine_state.jobs.lock().expect("jobs lock is poisoned!"); + + match jobs.lookup_mut(id) { + None => { + return Err(ShellError::JobNotFound { + id: id.get(), + span: head, + }); + } + + Some(Job::Frozen { .. }) => { + return Err(ShellError::UnsupportedJobType { + id: id.get() as usize, + span: head, + kind: "frozen".to_string(), + }); + } + + Some(Job::Thread(job)) => { + let on_termination = job.on_termination().clone(); + + // .join() blocks so we drop our mutex guard + drop(jobs); + + let result = on_termination.join().clone().with_span(head); + + Ok(result.into_pipeline_data()) + } + } + } + + fn examples(&self) -> Vec { + vec![Example { + example: "let id = job spawn { sleep 5sec; 'hi there' }; job wait $id", + description: "Wait for a job to complete", + result: Some(Value::test_string("hi there")), + }] + } +} diff --git a/crates/nu-command/src/experimental/mod.rs b/crates/nu-command/src/experimental/mod.rs index f98d123218..cd21e259a7 100644 --- a/crates/nu-command/src/experimental/mod.rs +++ b/crates/nu-command/src/experimental/mod.rs @@ -4,6 +4,7 @@ mod job_kill; mod job_list; mod job_spawn; mod job_tag; +mod job_wait; #[cfg(all(unix, feature = "os"))] mod job_unfreeze; @@ -14,6 +15,7 @@ pub use job_kill::JobKill; pub use job_list::JobList; pub use job_spawn::JobSpawn; pub use job_tag::JobTag; +pub use job_wait::JobWait; #[cfg(all(unix, feature = "os"))] pub use job_unfreeze::JobUnfreeze; diff --git a/crates/nu-protocol/src/engine/jobs.rs b/crates/nu-protocol/src/engine/jobs.rs index 8e64e46f7f..c9bbbe48f1 100644 --- a/crates/nu-protocol/src/engine/jobs.rs +++ b/crates/nu-protocol/src/engine/jobs.rs @@ -5,7 +5,7 @@ use std::{ use nu_system::{kill_by_pid, UnfreezeHandle}; -use crate::Signals; +use crate::{Signals, Value}; use crate::JobId; @@ -139,14 +139,20 @@ pub struct ThreadJob { signals: Signals, pids: Arc>>, tag: Option, + on_termination: Arc>, } impl ThreadJob { - pub fn new(signals: Signals, tag: Option) -> Self { + pub fn new( + signals: Signals, + tag: Option, + on_termination: Arc>, + ) -> Self { ThreadJob { signals, pids: Arc::new(Mutex::new(HashSet::default())), tag, + on_termination, } } @@ -194,6 +200,10 @@ impl ThreadJob { pids.remove(&pid); } + + pub fn on_termination(&self) -> &Arc> { + return &self.on_termination; + } } impl Job { @@ -238,3 +248,202 @@ impl FrozenJob { } } } + +use std::sync::OnceLock; + +/// A synchronization primitive that allows multiple threads to wait for a single event to occur. +/// +/// Threads that call the [`join`] method will block until the [`signal`] method is called. +/// Once [`signal`] is called, all currently waiting threads will be woken up and will return from their `join` calls. +/// Subsequent calls to [`join`] will not block and will return immediately. +/// +/// The [`was_signaled`] method can be used to check if the signal has been emitted without blocking. +pub struct WaitSignal { + mutex: std::sync::Mutex, + value: std::sync::OnceLock, + var: std::sync::Condvar, +} + +impl WaitSignal { + /// Creates a new `WaitSignal` in an unsignaled state. + /// + /// No threads will be woken up initially. + pub fn new() -> Self { + WaitSignal { + mutex: std::sync::Mutex::new(false), + value: OnceLock::new(), + var: std::sync::Condvar::new(), + } + } + + /// Blocks the current thread until this `WaitSignal` is signaled. + /// + /// If the signal has already been emitted, this method returns immediately. + /// + /// # Panics + /// + /// This method will panic if the underlying mutex is poisoned. This can happen if another + /// thread holding the mutex panics. + pub fn join(&self) -> &T { + let mut guard = self.mutex.lock().expect("mutex is poisoned!"); + + while !*guard { + match self.var.wait(guard) { + Ok(it) => guard = it, + Err(_) => panic!("mutex is poisoned!"), + } + } + + return self.value.get().unwrap(); + } + + /// Signals all threads currently waiting on this `WaitSignal`. + /// + /// This method sets the internal state to "signaled" and wakes up all threads that are blocked + /// in the [`join`] method. Subsequent calls to [`join`] from any thread will return immediately. + /// This operation has no effect if the signal has already been emitted. + pub fn signal(&self, value: T) { + let mut guard = self.mutex.lock().expect("mutex is poisoned!"); + + *guard = true; + let _ = self.value.set(value); + + self.var.notify_all(); + } + + /// Checks if this `WaitSignal` has been signaled. + /// + /// This method returns `true` if the [`signal`] method has been called at least once, + /// and `false` otherwise. This method does not block the current thread. + /// + /// # Panics + /// + /// This method will panic if the underlying mutex is poisoned. This can happen if another + /// thread holding the mutex panics. + pub fn was_signaled(&self) -> bool { + let guard = self.mutex.lock().expect("mutex is poisoned!"); + + *guard + } +} + +// TODO: move to testing directory +#[cfg(test)] +mod test { + + use std::{ + sync::{mpsc, Arc}, + thread::{self, sleep}, + time::Duration, + }; + + use pretty_assertions::assert_eq; + + use crate::engine::jobs::WaitSignal; + + fn run_with_timeout(duration: Duration, lambda: F) + where + F: FnOnce() + Send + 'static, + { + let (send, recv) = std::sync::mpsc::channel(); + + let send_ = send.clone(); + + thread::spawn(move || { + lambda(); + + let send = send_; + + send.send(true).expect("send failed"); + }); + + thread::spawn(move || { + thread::sleep(duration); + + send.send(false).expect("send failed"); + }); + + let result = recv.recv().expect("recv failed!"); + + assert!(result == true, "timeout!"); + } + + #[test] + fn join_returns_when_signaled_from_another_thread() { + run_with_timeout(Duration::from_secs(1), || { + let signal = Arc::new(WaitSignal::new()); + + let thread_signal = signal.clone(); + + thread::spawn(move || { + sleep(Duration::from_millis(200)); + assert!(!thread_signal.was_signaled()); + thread_signal.signal(123); + }); + + let result = signal.join(); + + assert!(signal.was_signaled()); + + assert_eq!(*result, 123); + }); + } + + #[test] + fn join_works_from_multiple_threads() { + run_with_timeout(Duration::from_secs(1), || { + let signal = Arc::new(WaitSignal::new()); + + let (send, recv) = mpsc::channel(); + + let thread_count = 4; + + for _ in 0..thread_count { + let signal_ = signal.clone(); + let send_ = send.clone(); + + thread::spawn(move || { + let value = signal_.join(); + send_.send(*value).expect("send failed"); + }); + } + + signal.signal(321); + + for _ in 0..thread_count { + let result = recv.recv().expect("recv failed"); + + assert_eq!(result, 321); + } + }) + } + + #[test] + fn was_signaled_returns_false_when_struct_is_initalized() { + let signal = Arc::new(WaitSignal::<()>::new()); + + assert!(!signal.was_signaled()) + } + + #[test] + fn was_signaled_returns_true_when_signal_is_called() { + let signal = Arc::new(WaitSignal::new()); + + signal.signal(()); + + assert!(signal.was_signaled()) + } + + #[test] + fn join_returns_when_own_thread_signals() { + run_with_timeout(Duration::from_secs(1), || { + let signal = Arc::new(WaitSignal::new()); + + signal.signal(()); + + signal.join(); + + assert!(signal.was_signaled()) + }) + } +} diff --git a/crates/nu-protocol/src/errors/shell_error/mod.rs b/crates/nu-protocol/src/errors/shell_error/mod.rs index 84c23cd208..84160f02de 100644 --- a/crates/nu-protocol/src/errors/shell_error/mod.rs +++ b/crates/nu-protocol/src/errors/shell_error/mod.rs @@ -1379,6 +1379,13 @@ On Windows, this would be %USERPROFILE%\AppData\Roaming"# span: Span, }, + #[error("Job {id} is a job of type {kind}")] + #[diagnostic( + code(nu::shell::os_disabled), + help("This operation does not support the given job type") + )] + UnsupportedJobType { id: usize, span: Span, kind: String }, + #[error(transparent)] #[diagnostic(transparent)] ChainedError(ChainedError),