diff --git a/crates/nu-command/src/experimental/job_recv.rs b/crates/nu-command/src/experimental/job_recv.rs index 1ed6cf44c2..a3230e5ac3 100644 --- a/crates/nu-command/src/experimental/job_recv.rs +++ b/crates/nu-command/src/experimental/job_recv.rs @@ -1,7 +1,11 @@ -use std::{sync::mpsc::RecvTimeoutError, time::Duration}; +use std::{sync::mpsc::RecvTimeoutError, time::Duration, time::Instant}; use nu_engine::command_prelude::*; -use nu_protocol::engine::FilterTag; + +use nu_protocol::{ + engine::{FilterTag, Mailbox}, + Signals, +}; #[derive(Clone)] pub struct JobRecv; @@ -80,33 +84,12 @@ in no particular order, regardless of the specified timeout parameter. .lock() .expect("failed to acquire lock"); - if let Some(timeout) = timeout { - let value = mailbox - .recv_timeout(tag, timeout) - .map_err(|error| match error { - RecvTimeoutError::Timeout => ShellError::RecvTimeout { span: head }, - - // if the channel was disconnected, it means this job was removed from the job - // table, so it was killed/interrupted - RecvTimeoutError::Disconnected => ShellError::Interrupted { span: head }, - })?; - - Ok(value.into_pipeline_data()) + return if let Some(timeout) = timeout { + recv_with_time_limit(&mut mailbox, tag, engine_state.signals(), head, timeout) } else { - loop { - if engine_state.signals().interrupted() { - return Err(ShellError::Interrupted { span: head }); - } - - match mailbox.recv_timeout(tag, CTRL_C_CHECK_INTERVAL) { - Ok(value) => return Ok(value.into_pipeline_data()), - Err(RecvTimeoutError::Timeout) => {} // try again - Err(RecvTimeoutError::Disconnected) => { - return Err(ShellError::Interrupted { span: head }) - } - } - } + recv_without_time_limit(&mut mailbox, tag, engine_state.signals(), head) } + .map(|value| value.into_pipeline_data()); } fn examples(&self) -> Vec { @@ -117,3 +100,51 @@ in no particular order, regardless of the specified timeout parameter. }] } } + +fn recv_without_time_limit( + mailbox: &mut Mailbox, + tag: Option, + signals: &Signals, + span: Span, +) -> Result { + loop { + if signals.interrupted() { + return Err(ShellError::Interrupted { span }); + } + match mailbox.recv_timeout(tag, CTRL_C_CHECK_INTERVAL) { + Ok(value) => return Ok(value), + Err(RecvTimeoutError::Timeout) => {} // try again + Err(RecvTimeoutError::Disconnected) => return Err(ShellError::Interrupted { span }), + } + } +} + +fn recv_with_time_limit( + mailbox: &mut Mailbox, + tag: Option, + signals: &Signals, + span: Span, + timeout: Duration, +) -> Result { + let deadline = Instant::now() + timeout; + + loop { + if signals.interrupted() { + return Err(ShellError::Interrupted { span }); + } + + let time_until_deadline = deadline.saturating_duration_since(Instant::now()); + + let time_to_sleep = time_until_deadline.min(CTRL_C_CHECK_INTERVAL); + + match mailbox.recv_timeout(tag, time_to_sleep) { + Ok(value) => return Ok(value), + Err(RecvTimeoutError::Timeout) => {} // try again + Err(RecvTimeoutError::Disconnected) => return Err(ShellError::Interrupted { span }), + } + + if time_until_deadline.is_zero() { + return Err(ShellError::RecvTimeout { span }); + } + } +}