mirror of
https://github.com/nushell/nushell.git
synced 2025-05-31 07:08:22 +02:00
Implemented job wait
This commit adds a `job wait` command so users can wait for the completion of another job. For this, a data structure for joining and signaling job completion was implemented (since the native rust `JoinHandle` cannot be cloned).
This commit is contained in:
parent
bae04352ca
commit
75db02781d
@ -453,6 +453,7 @@ pub fn add_shell_command_context(mut engine_state: EngineState) -> EngineState {
|
|||||||
JobList,
|
JobList,
|
||||||
JobKill,
|
JobKill,
|
||||||
JobTag,
|
JobTag,
|
||||||
|
JobWait,
|
||||||
Job,
|
Job,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -8,7 +8,7 @@ use std::{
|
|||||||
|
|
||||||
use nu_engine::{command_prelude::*, ClosureEvalOnce};
|
use nu_engine::{command_prelude::*, ClosureEvalOnce};
|
||||||
use nu_protocol::{
|
use nu_protocol::{
|
||||||
engine::{Closure, Job, Redirection, ThreadJob},
|
engine::{Closure, Job, Redirection, ThreadJob, WaitSignal},
|
||||||
report_shell_error, OutDest, Signals,
|
report_shell_error, OutDest, Signals,
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -54,7 +54,9 @@ impl Command for JobSpawn {
|
|||||||
) -> Result<PipelineData, ShellError> {
|
) -> Result<PipelineData, ShellError> {
|
||||||
let head = call.head;
|
let head = call.head;
|
||||||
|
|
||||||
let closure: Closure = call.req(engine_state, stack, 0)?;
|
let spanned_closure: Spanned<Closure> = call.req(engine_state, stack, 0)?;
|
||||||
|
|
||||||
|
let closure: Closure = spanned_closure.item;
|
||||||
|
|
||||||
let tag: Option<String> = call.get_flag(engine_state, stack, "tag")?;
|
let tag: Option<String> = call.get_flag(engine_state, stack, "tag")?;
|
||||||
|
|
||||||
@ -75,8 +77,10 @@ impl Command for JobSpawn {
|
|||||||
let jobs = job_state.jobs.clone();
|
let jobs = job_state.jobs.clone();
|
||||||
let mut jobs = jobs.lock().expect("jobs lock is poisoned!");
|
let mut jobs = jobs.lock().expect("jobs lock is poisoned!");
|
||||||
|
|
||||||
|
let on_termination = Arc::new(WaitSignal::new());
|
||||||
|
|
||||||
let id = {
|
let id = {
|
||||||
let thread_job = ThreadJob::new(job_signals, tag);
|
let thread_job = ThreadJob::new(job_signals, tag, on_termination.clone());
|
||||||
job_state.current_thread_job = Some(thread_job.clone());
|
job_state.current_thread_job = Some(thread_job.clone());
|
||||||
jobs.add_job(Job::Thread(thread_job))
|
jobs.add_job(Job::Thread(thread_job))
|
||||||
};
|
};
|
||||||
@ -89,15 +93,19 @@ impl Command for JobSpawn {
|
|||||||
Some(Redirection::Pipe(OutDest::Null)),
|
Some(Redirection::Pipe(OutDest::Null)),
|
||||||
Some(Redirection::Pipe(OutDest::Null)),
|
Some(Redirection::Pipe(OutDest::Null)),
|
||||||
);
|
);
|
||||||
ClosureEvalOnce::new_preserve_out_dest(&job_state, &stack, closure)
|
let result_value = ClosureEvalOnce::new(&job_state, &stack, closure)
|
||||||
.run_with_input(Value::nothing(head).into_pipeline_data())
|
.run_with_input(Value::nothing(head).into_pipeline_data())
|
||||||
.and_then(|data| data.drain())
|
.and_then(|data| data.into_value(spanned_closure.span))
|
||||||
.unwrap_or_else(|err| {
|
.unwrap_or_else(|err| {
|
||||||
if !job_state.signals().interrupted() {
|
if !job_state.signals().interrupted() {
|
||||||
report_shell_error(&job_state, &err);
|
report_shell_error(&job_state, &err);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Value::error(err, head)
|
||||||
});
|
});
|
||||||
|
|
||||||
|
on_termination.signal(result_value);
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut jobs = job_state.jobs.lock().expect("jobs lock is poisoned!");
|
let mut jobs = job_state.jobs.lock().expect("jobs lock is poisoned!");
|
||||||
|
|
||||||
|
82
crates/nu-command/src/experimental/job_wait.rs
Normal file
82
crates/nu-command/src/experimental/job_wait.rs
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
use nu_engine::command_prelude::*;
|
||||||
|
use nu_protocol::{engine::Job, JobId};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct JobWait;
|
||||||
|
|
||||||
|
impl Command for JobWait {
|
||||||
|
fn name(&self) -> &str {
|
||||||
|
"job wait"
|
||||||
|
}
|
||||||
|
|
||||||
|
fn description(&self) -> &str {
|
||||||
|
"Wait for a job to complete and return its result value"
|
||||||
|
}
|
||||||
|
|
||||||
|
fn signature(&self) -> nu_protocol::Signature {
|
||||||
|
Signature::build("job wait")
|
||||||
|
.category(Category::Experimental)
|
||||||
|
.required("id", SyntaxShape::Int, "The id of the running to wait for.")
|
||||||
|
.input_output_types(vec![(Type::Nothing, Type::Nothing)])
|
||||||
|
}
|
||||||
|
|
||||||
|
fn search_terms(&self) -> Vec<&str> {
|
||||||
|
vec!["join"]
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run(
|
||||||
|
&self,
|
||||||
|
engine_state: &EngineState,
|
||||||
|
stack: &mut Stack,
|
||||||
|
call: &Call,
|
||||||
|
_input: PipelineData,
|
||||||
|
) -> Result<PipelineData, ShellError> {
|
||||||
|
let head = call.head;
|
||||||
|
|
||||||
|
let id_arg: Spanned<i64> = call.req(engine_state, stack, 0)?;
|
||||||
|
|
||||||
|
if id_arg.item < 0 {
|
||||||
|
return Err(ShellError::NeedsPositiveValue { span: id_arg.span });
|
||||||
|
}
|
||||||
|
|
||||||
|
let id: JobId = JobId::new(id_arg.item as usize);
|
||||||
|
|
||||||
|
let mut jobs = engine_state.jobs.lock().expect("jobs lock is poisoned!");
|
||||||
|
|
||||||
|
match jobs.lookup_mut(id) {
|
||||||
|
None => {
|
||||||
|
return Err(ShellError::JobNotFound {
|
||||||
|
id: id.get(),
|
||||||
|
span: head,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(Job::Frozen { .. }) => {
|
||||||
|
return Err(ShellError::UnsupportedJobType {
|
||||||
|
id: id.get() as usize,
|
||||||
|
span: head,
|
||||||
|
kind: "frozen".to_string(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(Job::Thread(job)) => {
|
||||||
|
let on_termination = job.on_termination().clone();
|
||||||
|
|
||||||
|
// .join() blocks so we drop our mutex guard
|
||||||
|
drop(jobs);
|
||||||
|
|
||||||
|
let result = on_termination.join().clone().with_span(head);
|
||||||
|
|
||||||
|
Ok(result.into_pipeline_data())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn examples(&self) -> Vec<Example> {
|
||||||
|
vec![Example {
|
||||||
|
example: "let id = job spawn { sleep 5sec; 'hi there' }; job wait $id",
|
||||||
|
description: "Wait for a job to complete",
|
||||||
|
result: Some(Value::test_string("hi there")),
|
||||||
|
}]
|
||||||
|
}
|
||||||
|
}
|
@ -4,6 +4,7 @@ mod job_kill;
|
|||||||
mod job_list;
|
mod job_list;
|
||||||
mod job_spawn;
|
mod job_spawn;
|
||||||
mod job_tag;
|
mod job_tag;
|
||||||
|
mod job_wait;
|
||||||
|
|
||||||
#[cfg(all(unix, feature = "os"))]
|
#[cfg(all(unix, feature = "os"))]
|
||||||
mod job_unfreeze;
|
mod job_unfreeze;
|
||||||
@ -14,6 +15,7 @@ pub use job_kill::JobKill;
|
|||||||
pub use job_list::JobList;
|
pub use job_list::JobList;
|
||||||
pub use job_spawn::JobSpawn;
|
pub use job_spawn::JobSpawn;
|
||||||
pub use job_tag::JobTag;
|
pub use job_tag::JobTag;
|
||||||
|
pub use job_wait::JobWait;
|
||||||
|
|
||||||
#[cfg(all(unix, feature = "os"))]
|
#[cfg(all(unix, feature = "os"))]
|
||||||
pub use job_unfreeze::JobUnfreeze;
|
pub use job_unfreeze::JobUnfreeze;
|
||||||
|
@ -5,7 +5,7 @@ use std::{
|
|||||||
|
|
||||||
use nu_system::{kill_by_pid, UnfreezeHandle};
|
use nu_system::{kill_by_pid, UnfreezeHandle};
|
||||||
|
|
||||||
use crate::Signals;
|
use crate::{Signals, Value};
|
||||||
|
|
||||||
use crate::JobId;
|
use crate::JobId;
|
||||||
|
|
||||||
@ -139,14 +139,20 @@ pub struct ThreadJob {
|
|||||||
signals: Signals,
|
signals: Signals,
|
||||||
pids: Arc<Mutex<HashSet<u32>>>,
|
pids: Arc<Mutex<HashSet<u32>>>,
|
||||||
tag: Option<String>,
|
tag: Option<String>,
|
||||||
|
on_termination: Arc<WaitSignal<Value>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ThreadJob {
|
impl ThreadJob {
|
||||||
pub fn new(signals: Signals, tag: Option<String>) -> Self {
|
pub fn new(
|
||||||
|
signals: Signals,
|
||||||
|
tag: Option<String>,
|
||||||
|
on_termination: Arc<WaitSignal<Value>>,
|
||||||
|
) -> Self {
|
||||||
ThreadJob {
|
ThreadJob {
|
||||||
signals,
|
signals,
|
||||||
pids: Arc::new(Mutex::new(HashSet::default())),
|
pids: Arc::new(Mutex::new(HashSet::default())),
|
||||||
tag,
|
tag,
|
||||||
|
on_termination,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -194,6 +200,10 @@ impl ThreadJob {
|
|||||||
|
|
||||||
pids.remove(&pid);
|
pids.remove(&pid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn on_termination(&self) -> &Arc<WaitSignal<Value>> {
|
||||||
|
return &self.on_termination;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Job {
|
impl Job {
|
||||||
@ -238,3 +248,202 @@ impl FrozenJob {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
use std::sync::OnceLock;
|
||||||
|
|
||||||
|
/// A synchronization primitive that allows multiple threads to wait for a single event to occur.
|
||||||
|
///
|
||||||
|
/// Threads that call the [`join`] method will block until the [`signal`] method is called.
|
||||||
|
/// Once [`signal`] is called, all currently waiting threads will be woken up and will return from their `join` calls.
|
||||||
|
/// Subsequent calls to [`join`] will not block and will return immediately.
|
||||||
|
///
|
||||||
|
/// The [`was_signaled`] method can be used to check if the signal has been emitted without blocking.
|
||||||
|
pub struct WaitSignal<T> {
|
||||||
|
mutex: std::sync::Mutex<bool>,
|
||||||
|
value: std::sync::OnceLock<T>,
|
||||||
|
var: std::sync::Condvar,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> WaitSignal<T> {
|
||||||
|
/// Creates a new `WaitSignal` in an unsignaled state.
|
||||||
|
///
|
||||||
|
/// No threads will be woken up initially.
|
||||||
|
pub fn new() -> Self {
|
||||||
|
WaitSignal {
|
||||||
|
mutex: std::sync::Mutex::new(false),
|
||||||
|
value: OnceLock::new(),
|
||||||
|
var: std::sync::Condvar::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Blocks the current thread until this `WaitSignal` is signaled.
|
||||||
|
///
|
||||||
|
/// If the signal has already been emitted, this method returns immediately.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// This method will panic if the underlying mutex is poisoned. This can happen if another
|
||||||
|
/// thread holding the mutex panics.
|
||||||
|
pub fn join(&self) -> &T {
|
||||||
|
let mut guard = self.mutex.lock().expect("mutex is poisoned!");
|
||||||
|
|
||||||
|
while !*guard {
|
||||||
|
match self.var.wait(guard) {
|
||||||
|
Ok(it) => guard = it,
|
||||||
|
Err(_) => panic!("mutex is poisoned!"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return self.value.get().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Signals all threads currently waiting on this `WaitSignal`.
|
||||||
|
///
|
||||||
|
/// This method sets the internal state to "signaled" and wakes up all threads that are blocked
|
||||||
|
/// in the [`join`] method. Subsequent calls to [`join`] from any thread will return immediately.
|
||||||
|
/// This operation has no effect if the signal has already been emitted.
|
||||||
|
pub fn signal(&self, value: T) {
|
||||||
|
let mut guard = self.mutex.lock().expect("mutex is poisoned!");
|
||||||
|
|
||||||
|
*guard = true;
|
||||||
|
let _ = self.value.set(value);
|
||||||
|
|
||||||
|
self.var.notify_all();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Checks if this `WaitSignal` has been signaled.
|
||||||
|
///
|
||||||
|
/// This method returns `true` if the [`signal`] method has been called at least once,
|
||||||
|
/// and `false` otherwise. This method does not block the current thread.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// This method will panic if the underlying mutex is poisoned. This can happen if another
|
||||||
|
/// thread holding the mutex panics.
|
||||||
|
pub fn was_signaled(&self) -> bool {
|
||||||
|
let guard = self.mutex.lock().expect("mutex is poisoned!");
|
||||||
|
|
||||||
|
*guard
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: move to testing directory
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
|
||||||
|
use std::{
|
||||||
|
sync::{mpsc, Arc},
|
||||||
|
thread::{self, sleep},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
|
use pretty_assertions::assert_eq;
|
||||||
|
|
||||||
|
use crate::engine::jobs::WaitSignal;
|
||||||
|
|
||||||
|
fn run_with_timeout<F>(duration: Duration, lambda: F)
|
||||||
|
where
|
||||||
|
F: FnOnce() + Send + 'static,
|
||||||
|
{
|
||||||
|
let (send, recv) = std::sync::mpsc::channel();
|
||||||
|
|
||||||
|
let send_ = send.clone();
|
||||||
|
|
||||||
|
thread::spawn(move || {
|
||||||
|
lambda();
|
||||||
|
|
||||||
|
let send = send_;
|
||||||
|
|
||||||
|
send.send(true).expect("send failed");
|
||||||
|
});
|
||||||
|
|
||||||
|
thread::spawn(move || {
|
||||||
|
thread::sleep(duration);
|
||||||
|
|
||||||
|
send.send(false).expect("send failed");
|
||||||
|
});
|
||||||
|
|
||||||
|
let result = recv.recv().expect("recv failed!");
|
||||||
|
|
||||||
|
assert!(result == true, "timeout!");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn join_returns_when_signaled_from_another_thread() {
|
||||||
|
run_with_timeout(Duration::from_secs(1), || {
|
||||||
|
let signal = Arc::new(WaitSignal::new());
|
||||||
|
|
||||||
|
let thread_signal = signal.clone();
|
||||||
|
|
||||||
|
thread::spawn(move || {
|
||||||
|
sleep(Duration::from_millis(200));
|
||||||
|
assert!(!thread_signal.was_signaled());
|
||||||
|
thread_signal.signal(123);
|
||||||
|
});
|
||||||
|
|
||||||
|
let result = signal.join();
|
||||||
|
|
||||||
|
assert!(signal.was_signaled());
|
||||||
|
|
||||||
|
assert_eq!(*result, 123);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn join_works_from_multiple_threads() {
|
||||||
|
run_with_timeout(Duration::from_secs(1), || {
|
||||||
|
let signal = Arc::new(WaitSignal::new());
|
||||||
|
|
||||||
|
let (send, recv) = mpsc::channel();
|
||||||
|
|
||||||
|
let thread_count = 4;
|
||||||
|
|
||||||
|
for _ in 0..thread_count {
|
||||||
|
let signal_ = signal.clone();
|
||||||
|
let send_ = send.clone();
|
||||||
|
|
||||||
|
thread::spawn(move || {
|
||||||
|
let value = signal_.join();
|
||||||
|
send_.send(*value).expect("send failed");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
signal.signal(321);
|
||||||
|
|
||||||
|
for _ in 0..thread_count {
|
||||||
|
let result = recv.recv().expect("recv failed");
|
||||||
|
|
||||||
|
assert_eq!(result, 321);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn was_signaled_returns_false_when_struct_is_initalized() {
|
||||||
|
let signal = Arc::new(WaitSignal::<()>::new());
|
||||||
|
|
||||||
|
assert!(!signal.was_signaled())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn was_signaled_returns_true_when_signal_is_called() {
|
||||||
|
let signal = Arc::new(WaitSignal::new());
|
||||||
|
|
||||||
|
signal.signal(());
|
||||||
|
|
||||||
|
assert!(signal.was_signaled())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn join_returns_when_own_thread_signals() {
|
||||||
|
run_with_timeout(Duration::from_secs(1), || {
|
||||||
|
let signal = Arc::new(WaitSignal::new());
|
||||||
|
|
||||||
|
signal.signal(());
|
||||||
|
|
||||||
|
signal.join();
|
||||||
|
|
||||||
|
assert!(signal.was_signaled())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1379,6 +1379,13 @@ On Windows, this would be %USERPROFILE%\AppData\Roaming"#
|
|||||||
span: Span,
|
span: Span,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
#[error("Job {id} is a job of type {kind}")]
|
||||||
|
#[diagnostic(
|
||||||
|
code(nu::shell::os_disabled),
|
||||||
|
help("This operation does not support the given job type")
|
||||||
|
)]
|
||||||
|
UnsupportedJobType { id: usize, span: Span, kind: String },
|
||||||
|
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
#[diagnostic(transparent)]
|
#[diagnostic(transparent)]
|
||||||
ChainedError(ChainedError),
|
ChainedError(ChainedError),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user