Implement mail commands

This commit is contained in:
cosineblast 2025-03-04 11:31:37 -03:00
parent 3ed97e68dc
commit f15fe84df6
7 changed files with 359 additions and 0 deletions

View File

@ -453,6 +453,10 @@ pub fn add_shell_command_context(mut engine_state: EngineState) -> EngineState {
JobKill,
JobId,
Job,
MailSend,
MailRecv,
MailClear,
Mail,
};
#[cfg(all(unix, feature = "os"))]

View File

@ -0,0 +1,34 @@
use nu_engine::{command_prelude::*, get_full_help};
#[derive(Clone)]
pub struct Mail;
impl Command for Mail {
fn name(&self) -> &str {
"mail"
}
fn signature(&self) -> Signature {
Signature::build("mail")
.category(Category::Strings)
.input_output_types(vec![(Type::Nothing, Type::String)])
}
fn description(&self) -> &str {
"Various commands for job communication."
}
fn extra_description(&self) -> &str {
"You must use one of the following subcommands. Using this command as-is will only produce this help message."
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
_input: PipelineData,
) -> Result<PipelineData, ShellError> {
Ok(Value::string(get_full_help(self, engine_state, stack), call.head).into_pipeline_data())
}
}

View File

@ -0,0 +1,58 @@
use nu_engine::command_prelude::*;
#[derive(Clone)]
pub struct MailClear;
impl Command for MailClear {
fn name(&self) -> &str {
"mail clear"
}
fn description(&self) -> &str {
"Clear mailbox."
}
fn extra_description(&self) -> &str {
r#"
This command removes all messages in the mailbox of the current job.
If a message is received while this command is executing, it may also be discarded.
"#
}
fn signature(&self) -> nu_protocol::Signature {
Signature::build("mail flush")
.category(Category::Experimental)
.input_output_types(vec![(Type::Nothing, Type::Nothing)])
.allow_variants_without_examples(true)
}
fn search_terms(&self) -> Vec<&str> {
vec![]
}
fn run(
&self,
engine_state: &EngineState,
_stack: &mut Stack,
call: &Call,
_input: PipelineData,
) -> Result<PipelineData, ShellError> {
let mut mailbox = engine_state
.current_job
.mailbox
.lock()
.expect("failed to acquire lock");
mailbox.clear();
Ok(Value::nothing(call.head).into_pipeline_data())
}
fn examples(&self) -> Vec<Example> {
vec![Example {
example: "let id = job spawn { mail recv | save sent.txt }; 'hi' | mail send $id",
description: "Send a message to a newly spawned job",
result: None,
}]
}
}

View File

@ -0,0 +1,119 @@
use std::{sync::mpsc::RecvTimeoutError, time::Duration};
use nu_engine::command_prelude::*;
use nu_protocol::engine::Tag;
#[derive(Clone)]
pub struct MailRecv;
const CTRL_C_CHECK_INTERVAL: Duration = Duration::from_millis(100);
impl Command for MailRecv {
fn name(&self) -> &str {
"mail recv"
}
fn description(&self) -> &str {
"Read a message from the mailbox"
}
fn extra_description(&self) -> &str {
r#"When messages are sent to the current process, they get stored in what is called the "mailbox".
This commands reads and returns a message from the mailbox, in a first-in-first-out fashion.
j
Messages may have numeric flags attatched to them. This commands supports filtering out messages that do not satisfy a given tag, by using the `tag` flag.
If no tag is specified, this command will accept any message.
If no message with the specified tag (if any) is available in the mailbox, this command will block the current thread until one arrives.
By default this command block indefinitely until a matching message arrives, but a timeout duration can be specified.
Note: When using par-each, only one thread at a time can utilize this command.
In the case of two or more threads running this command, they will wait until other threads are done using it,
in no particular order, regardless of the specified timeout parameter.
"#
}
fn signature(&self) -> nu_protocol::Signature {
Signature::build("mail recv")
.category(Category::Experimental)
.named("tag", SyntaxShape::Int, "A tag for the message", None)
.named(
"timeout",
SyntaxShape::Duration,
"The maximum duration to wait for",
None,
)
.input_output_types(vec![(Type::Nothing, Type::Any)])
.allow_variants_without_examples(true)
}
fn search_terms(&self) -> Vec<&str> {
vec!["receive"]
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
_input: PipelineData,
) -> Result<PipelineData, ShellError> {
let head = call.head;
let tag_arg: Option<Spanned<i64>> = call.get_flag(engine_state, stack, "tag")?;
if let Some(tag) = tag_arg {
if tag.item < 0 {
return Err(ShellError::NeedsPositiveValue { span: tag.span });
}
}
let tag = tag_arg.map(|it| it.item as Tag);
let duration: Option<i64> = call.get_flag(engine_state, stack, "timeout")?;
let timeout = duration.map(|it| Duration::from_nanos(it as u64));
let mut mailbox = engine_state
.current_job
.mailbox
.lock()
.expect("failed to acquire lock");
if let Some(timeout) = timeout {
let value = mailbox
.recv_timeout(tag, timeout)
.map_err(|error| match error {
RecvTimeoutError::Timeout => ShellError::RecvTimeout { span: head },
// if the channel was disconnected, it means this job was removed from the job
// table, so it was killed/interrupted
RecvTimeoutError::Disconnected => ShellError::Interrupted { span: head },
})?;
Ok(value.into_pipeline_data())
} else {
loop {
if engine_state.signals().interrupted() {
return Err(ShellError::Interrupted { span: head });
}
match mailbox.recv_timeout(tag, CTRL_C_CHECK_INTERVAL) {
Ok(value) => return Ok(value.into_pipeline_data()),
Err(RecvTimeoutError::Timeout) => {} // try again
Err(RecvTimeoutError::Disconnected) => {
return Err(ShellError::Interrupted { span: head })
}
}
}
}
}
fn examples(&self) -> Vec<Example> {
vec![Example {
example: "mail recv",
description: "Block the current thread while no message arrives",
result: None,
}]
}
}

View File

@ -0,0 +1,113 @@
use nu_engine::command_prelude::*;
use nu_protocol::{engine::Tag, JobId};
#[derive(Clone)]
pub struct MailSend;
impl Command for MailSend {
fn name(&self) -> &str {
"mail send"
}
fn description(&self) -> &str {
"Send a message to a job"
}
fn extra_description(&self) -> &str {
r#"
This command sends a message to a background job, which can then read sent messages
in a first-in-first-out fashion with `mail recv`. When it does so, it may additionally specify a numeric filter tag,
in which case it will only read messages sent with the exact same filter tag.
A message can be any nushell value, and streams are always collected before being sent.
This command never blocks.
"#
}
fn signature(&self) -> nu_protocol::Signature {
Signature::build("mail send")
.category(Category::Experimental)
.required(
"id",
SyntaxShape::Int,
"The id of the job to send the message to",
)
.named("tag", SyntaxShape::Int, "A tag for the message", None)
.input_output_types(vec![(Type::Any, Type::Nothing)])
.allow_variants_without_examples(true)
}
fn search_terms(&self) -> Vec<&str> {
vec![]
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let head = call.head;
let id_arg: Spanned<i64> = call.req(engine_state, stack, 0)?;
let tag_arg: Option<Spanned<i64>> = call.get_flag(engine_state, stack, "tag")?;
let id = id_arg.item;
if id < 0 {
return Err(ShellError::NeedsPositiveValue { span: id_arg.span });
}
if let Some(tag) = tag_arg {
if tag.item < 0 {
return Err(ShellError::NeedsPositiveValue { span: tag.span });
}
}
let tag = tag_arg.map(|it| it.item as Tag);
let value = input.into_value(head)?;
if id == 0 {
engine_state
.root_job_sender
.send((tag, value))
.expect("this should NEVER happen.");
} else {
let jobs = engine_state.jobs.lock().expect("failed to acquire lock");
if let Some(job) = jobs.lookup(JobId::new(id as usize)) {
match job {
nu_protocol::engine::Job::Thread(thread_job) => {
// it is ok to send this value while holding the lock, because
// mail channels are always unbounded, so this send never blocks
let _ = thread_job.sender.send((tag, value));
}
nu_protocol::engine::Job::Frozen(_) => {
return Err(ShellError::NeedThreadJob {
id: id as usize,
span: id_arg.span,
});
}
}
} else {
return Err(ShellError::JobNotFound {
id: id as usize,
span: id_arg.span,
});
}
}
Ok(Value::nothing(head).into_pipeline_data())
}
fn examples(&self) -> Vec<Example> {
vec![Example {
example: "let id = job spawn { mail recv | save sent.txt }; 'hi' | mail send $id",
description: "Send a message to a newly spawned job",
result: None,
}]
}
}

View File

@ -5,6 +5,11 @@ mod job_kill;
mod job_list;
mod job_spawn;
mod mail;
mod mail_clear;
mod mail_recv;
mod mail_send;
#[cfg(all(unix, feature = "os"))]
mod job_unfreeze;
@ -15,5 +20,10 @@ pub use job_kill::JobKill;
pub use job_list::JobList;
pub use job_spawn::JobSpawn;
pub use mail::Mail;
pub use mail_clear::MailClear;
pub use mail_recv::MailRecv;
pub use mail_send::MailSend;
#[cfg(all(unix, feature = "os"))]
pub use job_unfreeze::JobUnfreeze;

View File

@ -1360,6 +1360,27 @@ On Windows, this would be %USERPROFILE%\AppData\Roaming"#
span: Span,
},
#[error("The job {id} is not a thread job")]
#[diagnostic(
code(nu::shell::os_disabled),
help("The operation you tried to perform cannot be done on this kind of job")
)]
NeedThreadJob {
id: usize,
#[label = "not a thread job"]
span: Span,
},
#[error("No message was received in the requested time interval")]
#[diagnostic(
code(nu::shell::os_disabled),
help("No message arrived within the specified time limit")
)]
RecvTimeout {
#[label = "timeout"]
span: Span,
},
#[error(transparent)]
#[diagnostic(transparent)]
ChainedError(ChainedError),