Add --try flag to job recv

This commit is contained in:
cosineblast 2025-03-28 10:30:34 -03:00
parent 30f7d4091a
commit 584fb59748
3 changed files with 51 additions and 4 deletions

View File

@ -1,4 +1,7 @@
use std::{sync::mpsc::RecvTimeoutError, time::Duration, time::Instant};
use std::{
sync::mpsc::{RecvTimeoutError, TryRecvError},
time::{Duration, Instant},
};
use nu_engine::command_prelude::*;
@ -26,10 +29,11 @@ impl Command for JobRecv {
This commands reads and returns a message from the mailbox, in a first-in-first-out fashion.
j
Messages may have numeric flags attached to them. This commands supports filtering out messages that do not satisfy a given tag, by using the `tag` flag.
If no tag is specified, this command will accept any message.
If no tag is specified, this command will accept any message.
If no message with the specified tag (if any) is available in the mailbox, this command will block the current thread until one arrives.
By default this command block indefinitely until a matching message arrives, but a timeout duration can be specified.
If a timeout duration of zero is specified, it will succeed only if there already is a message in the mailbox.
Note: When using par-each, only one thread at a time can utilize this command.
In the case of two or more threads running this command, they will wait until other threads are done using it,
@ -85,7 +89,11 @@ in no particular order, regardless of the specified timeout parameter.
.expect("failed to acquire lock");
if let Some(timeout) = timeout {
recv_with_time_limit(&mut mailbox, tag, engine_state.signals(), head, timeout)
if timeout == Duration::ZERO {
recv_instantly(&mut mailbox, tag, head)
} else {
recv_with_time_limit(&mut mailbox, tag, engine_state.signals(), head, timeout)
}
} else {
recv_without_time_limit(&mut mailbox, tag, engine_state.signals(), head)
}
@ -119,6 +127,18 @@ fn recv_without_time_limit(
}
}
fn recv_instantly(
mailbox: &mut Mailbox,
tag: Option<FilterTag>,
span: Span,
) -> Result<Value, ShellError> {
match mailbox.try_recv(tag) {
Ok(value) => Ok(value),
Err(TryRecvError::Empty) => Err(ShellError::RecvTimeout { span }),
Err(TryRecvError::Disconnected) => Err(ShellError::Interrupted { span }),
}
}
fn recv_with_time_limit(
mailbox: &mut Mailbox,
tag: Option<FilterTag>,

View File

@ -142,6 +142,16 @@ fn job_recv_timeout_works() {
assert!(actual.err.contains("timeout"));
}
#[test]
fn job_recv_timeout_zero_works() {
let actual = nu!(r#"
"hi there" | job send 0
job recv --timeout 0sec
"#);
assert_eq!(actual.out, "hi there");
}
#[test]
fn job_flush_clears_messages() {
let actual = nu!(r#"

View File

@ -1,7 +1,7 @@
use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
sync::{
mpsc::{Receiver, RecvTimeoutError, Sender},
mpsc::{Receiver, RecvTimeoutError, Sender, TryRecvError},
Arc, Mutex,
},
};
@ -287,6 +287,23 @@ impl Mailbox {
}
}
#[cfg(not(target_family = "wasm"))]
pub fn try_recv(&mut self, filter_tag: Option<FilterTag>) -> Result<Value, TryRecvError> {
if let Some(value) = self.ignored_mail.pop(filter_tag) {
Ok(value)
} else {
loop {
let (tag, value) = self.receiver.try_recv()?;
if filter_tag.is_none() || filter_tag == tag {
return Ok(value);
} else {
self.ignored_mail.add((tag, value));
}
}
}
}
pub fn clear(&mut self) {
self.ignored_mail.clear();