From 584fb59748c6ff3b50af35f5e21fa215f2d783c8 Mon Sep 17 00:00:00 2001 From: cosineblast <55855728+cosineblast@users.noreply.github.com> Date: Fri, 28 Mar 2025 10:30:34 -0300 Subject: [PATCH] Add --try flag to job recv --- .../nu-command/src/experimental/job_recv.rs | 26 ++++++++++++++++--- crates/nu-command/tests/commands/job.rs | 10 +++++++ crates/nu-protocol/src/engine/jobs.rs | 19 +++++++++++++- 3 files changed, 51 insertions(+), 4 deletions(-) diff --git a/crates/nu-command/src/experimental/job_recv.rs b/crates/nu-command/src/experimental/job_recv.rs index 69e97956bd..01f6a9c099 100644 --- a/crates/nu-command/src/experimental/job_recv.rs +++ b/crates/nu-command/src/experimental/job_recv.rs @@ -1,4 +1,7 @@ -use std::{sync::mpsc::RecvTimeoutError, time::Duration, time::Instant}; +use std::{ + sync::mpsc::{RecvTimeoutError, TryRecvError}, + time::{Duration, Instant}, +}; use nu_engine::command_prelude::*; @@ -26,10 +29,11 @@ impl Command for JobRecv { This commands reads and returns a message from the mailbox, in a first-in-first-out fashion. j Messages may have numeric flags attached to them. This commands supports filtering out messages that do not satisfy a given tag, by using the `tag` flag. -If no tag is specified, this command will accept any message. +If no tag is specified, this command will accept any message. If no message with the specified tag (if any) is available in the mailbox, this command will block the current thread until one arrives. By default this command block indefinitely until a matching message arrives, but a timeout duration can be specified. +If a timeout duration of zero is specified, it will succeed only if there already is a message in the mailbox. Note: When using par-each, only one thread at a time can utilize this command. In the case of two or more threads running this command, they will wait until other threads are done using it, @@ -85,7 +89,11 @@ in no particular order, regardless of the specified timeout parameter. .expect("failed to acquire lock"); if let Some(timeout) = timeout { - recv_with_time_limit(&mut mailbox, tag, engine_state.signals(), head, timeout) + if timeout == Duration::ZERO { + recv_instantly(&mut mailbox, tag, head) + } else { + recv_with_time_limit(&mut mailbox, tag, engine_state.signals(), head, timeout) + } } else { recv_without_time_limit(&mut mailbox, tag, engine_state.signals(), head) } @@ -119,6 +127,18 @@ fn recv_without_time_limit( } } +fn recv_instantly( + mailbox: &mut Mailbox, + tag: Option, + span: Span, +) -> Result { + match mailbox.try_recv(tag) { + Ok(value) => Ok(value), + Err(TryRecvError::Empty) => Err(ShellError::RecvTimeout { span }), + Err(TryRecvError::Disconnected) => Err(ShellError::Interrupted { span }), + } +} + fn recv_with_time_limit( mailbox: &mut Mailbox, tag: Option, diff --git a/crates/nu-command/tests/commands/job.rs b/crates/nu-command/tests/commands/job.rs index 2b7edf9f12..bfd480cd56 100644 --- a/crates/nu-command/tests/commands/job.rs +++ b/crates/nu-command/tests/commands/job.rs @@ -142,6 +142,16 @@ fn job_recv_timeout_works() { assert!(actual.err.contains("timeout")); } +#[test] +fn job_recv_timeout_zero_works() { + let actual = nu!(r#" + "hi there" | job send 0 + job recv --timeout 0sec + "#); + + assert_eq!(actual.out, "hi there"); +} + #[test] fn job_flush_clears_messages() { let actual = nu!(r#" diff --git a/crates/nu-protocol/src/engine/jobs.rs b/crates/nu-protocol/src/engine/jobs.rs index 66432cce1d..019245cd30 100644 --- a/crates/nu-protocol/src/engine/jobs.rs +++ b/crates/nu-protocol/src/engine/jobs.rs @@ -1,7 +1,7 @@ use std::{ collections::{BTreeMap, BTreeSet, HashMap, HashSet}, sync::{ - mpsc::{Receiver, RecvTimeoutError, Sender}, + mpsc::{Receiver, RecvTimeoutError, Sender, TryRecvError}, Arc, Mutex, }, }; @@ -287,6 +287,23 @@ impl Mailbox { } } + #[cfg(not(target_family = "wasm"))] + pub fn try_recv(&mut self, filter_tag: Option) -> Result { + if let Some(value) = self.ignored_mail.pop(filter_tag) { + Ok(value) + } else { + loop { + let (tag, value) = self.receiver.try_recv()?; + + if filter_tag.is_none() || filter_tag == tag { + return Ok(value); + } else { + self.ignored_mail.add((tag, value)); + } + } + } + } + pub fn clear(&mut self) { self.ignored_mail.clear();