diff --git a/crates/nu-command/src/experimental/job_wait.rs b/crates/nu-command/src/experimental/job_wait.rs index 20827c87b2..9c83541ea3 100644 --- a/crates/nu-command/src/experimental/job_wait.rs +++ b/crates/nu-command/src/experimental/job_wait.rs @@ -1,5 +1,6 @@ use nu_engine::command_prelude::*; use nu_protocol::{engine::Job, JobId}; +use std::time::Duration; #[derive(Clone)] pub struct JobWait; @@ -10,14 +11,16 @@ impl Command for JobWait { } fn description(&self) -> &str { - r#"Wait for a job to complete and return its result value. + r#"Wait for a job to complete."# + } - Given the id of a running job currently in the job table, this command - waits for it to complete and returns the value returned - by the closure passed down to `job spawn`. + fn extra_description(&self) -> &str { + r#"Given the id of a running job currently in the job table, this command +waits for it to complete and returns the value returned +by the closure passed down to `job spawn` to create the given job. - Note that this command fails if a job is currently not in the job table - (as seen by `job list`), so it is not possible to wait for jobs that have already finished. +Note that this command fails if the provided job id is currently not in the job table +(as seen by `job list`), so it is not possible to wait for jobs that have already finished. "# } @@ -73,9 +76,13 @@ impl Command for JobWait { // .wait() blocks so we drop our mutex guard drop(jobs); - let result = waiter.wait().clone().with_span(head); + let value = wait_with_interrupt( + |time| waiter.wait_timeout(time), + || engine_state.signals().check(head), + Duration::from_millis(100), + )?; - Ok(result.into_pipeline_data()) + Ok(value.clone().with_span(head).into_pipeline_data()) } } } @@ -88,3 +95,18 @@ impl Command for JobWait { }] } } + +pub fn wait_with_interrupt( + mut wait: impl FnMut(Duration) -> Option, + mut interrupted: impl FnMut() -> Result<(), E>, + check_interval: Duration, +) -> Result { + loop { + interrupted()?; + + match wait(check_interval) { + Some(result) => return Ok(result), + None => {} // do nothing, try again + } + } +} diff --git a/crates/nu-protocol/src/engine/jobs.rs b/crates/nu-protocol/src/engine/jobs.rs index 7f974b17c7..32843a1c13 100644 --- a/crates/nu-protocol/src/engine/jobs.rs +++ b/crates/nu-protocol/src/engine/jobs.rs @@ -1,6 +1,7 @@ use std::{ collections::{HashMap, HashSet}, - sync::{Arc, Mutex}, + sync::{Arc, Mutex, WaitTimeoutResult}, + time::Instant, }; use nu_system::{kill_by_pid, UnfreezeHandle}; @@ -198,7 +199,7 @@ impl ThreadJob { } pub fn on_termination(&self) -> &Waiter { - return &self.on_termination; + &self.on_termination } } @@ -264,12 +265,12 @@ use std::sync::OnceLock; pub fn completion_signal() -> (Completer, Waiter) { let inner = Arc::new(InnerWaitCompleteSignal::new()); - return ( + ( Completer { inner: inner.clone(), }, Waiter { inner }, - ); + ) } /// Waiter and Completer are effectively just `Arc` wrappers around this type. @@ -324,6 +325,26 @@ impl Waiter { } } + pub fn wait_timeout(&self, duration: std::time::Duration) -> Option<&T> { + let inner: &InnerWaitCompleteSignal = self.inner.as_ref(); + + let guard = inner.mutex.lock().expect("mutex is poisoned!"); + + match inner + .var + .wait_timeout_while(guard, duration, |_| inner.value.get().is_none()) + { + Ok((_guard, result)) => { + if result.timed_out() { + return None; + } else { + return Some(inner.value.get().unwrap()); + } + } + Err(_) => panic!("mutex is poisoned!"), + } + } + // TODO: add wait_timeout /// Checks if this completion signal has been signaled. diff --git a/crates/nu-system/src/util.rs b/crates/nu-system/src/util.rs index 73b3575177..01663a77c1 100644 --- a/crates/nu-system/src/util.rs +++ b/crates/nu-system/src/util.rs @@ -1,5 +1,6 @@ use std::io; use std::process::Command as CommandSys; +use std::time::{Duration, Instant}; /// Tries to forcefully kill a process by its PID pub fn kill_by_pid(pid: i64) -> io::Result<()> {