From f15fe84df6f88460239026179d8a09a96fcc1453 Mon Sep 17 00:00:00 2001 From: cosineblast <55855728+cosineblast@users.noreply.github.com> Date: Tue, 4 Mar 2025 11:31:37 -0300 Subject: [PATCH] Implement mail commands --- crates/nu-command/src/default_context.rs | 4 + crates/nu-command/src/experimental/mail.rs | 34 +++++ .../nu-command/src/experimental/mail_clear.rs | 58 +++++++++ .../nu-command/src/experimental/mail_recv.rs | 119 ++++++++++++++++++ .../nu-command/src/experimental/mail_send.rs | 113 +++++++++++++++++ crates/nu-command/src/experimental/mod.rs | 10 ++ .../nu-protocol/src/errors/shell_error/mod.rs | 21 ++++ 7 files changed, 359 insertions(+) create mode 100644 crates/nu-command/src/experimental/mail.rs create mode 100644 crates/nu-command/src/experimental/mail_clear.rs create mode 100644 crates/nu-command/src/experimental/mail_recv.rs create mode 100644 crates/nu-command/src/experimental/mail_send.rs diff --git a/crates/nu-command/src/default_context.rs b/crates/nu-command/src/default_context.rs index 5cd326a222..71b355e9a8 100644 --- a/crates/nu-command/src/default_context.rs +++ b/crates/nu-command/src/default_context.rs @@ -453,6 +453,10 @@ pub fn add_shell_command_context(mut engine_state: EngineState) -> EngineState { JobKill, JobId, Job, + MailSend, + MailRecv, + MailClear, + Mail, }; #[cfg(all(unix, feature = "os"))] diff --git a/crates/nu-command/src/experimental/mail.rs b/crates/nu-command/src/experimental/mail.rs new file mode 100644 index 0000000000..63b553ca3b --- /dev/null +++ b/crates/nu-command/src/experimental/mail.rs @@ -0,0 +1,34 @@ +use nu_engine::{command_prelude::*, get_full_help}; + +#[derive(Clone)] +pub struct Mail; + +impl Command for Mail { + fn name(&self) -> &str { + "mail" + } + + fn signature(&self) -> Signature { + Signature::build("mail") + .category(Category::Strings) + .input_output_types(vec![(Type::Nothing, Type::String)]) + } + + fn description(&self) -> &str { + "Various commands for job communication." + } + + fn extra_description(&self) -> &str { + "You must use one of the following subcommands. Using this command as-is will only produce this help message." + } + + fn run( + &self, + engine_state: &EngineState, + stack: &mut Stack, + call: &Call, + _input: PipelineData, + ) -> Result { + Ok(Value::string(get_full_help(self, engine_state, stack), call.head).into_pipeline_data()) + } +} diff --git a/crates/nu-command/src/experimental/mail_clear.rs b/crates/nu-command/src/experimental/mail_clear.rs new file mode 100644 index 0000000000..5ed67fd909 --- /dev/null +++ b/crates/nu-command/src/experimental/mail_clear.rs @@ -0,0 +1,58 @@ +use nu_engine::command_prelude::*; + +#[derive(Clone)] +pub struct MailClear; + +impl Command for MailClear { + fn name(&self) -> &str { + "mail clear" + } + + fn description(&self) -> &str { + "Clear mailbox." + } + + fn extra_description(&self) -> &str { + r#" +This command removes all messages in the mailbox of the current job. +If a message is received while this command is executing, it may also be discarded. +"# + } + + fn signature(&self) -> nu_protocol::Signature { + Signature::build("mail flush") + .category(Category::Experimental) + .input_output_types(vec![(Type::Nothing, Type::Nothing)]) + .allow_variants_without_examples(true) + } + + fn search_terms(&self) -> Vec<&str> { + vec![] + } + + fn run( + &self, + engine_state: &EngineState, + _stack: &mut Stack, + call: &Call, + _input: PipelineData, + ) -> Result { + let mut mailbox = engine_state + .current_job + .mailbox + .lock() + .expect("failed to acquire lock"); + + mailbox.clear(); + + Ok(Value::nothing(call.head).into_pipeline_data()) + } + + fn examples(&self) -> Vec { + vec![Example { + example: "let id = job spawn { mail recv | save sent.txt }; 'hi' | mail send $id", + description: "Send a message to a newly spawned job", + result: None, + }] + } +} diff --git a/crates/nu-command/src/experimental/mail_recv.rs b/crates/nu-command/src/experimental/mail_recv.rs new file mode 100644 index 0000000000..7fd7f4254e --- /dev/null +++ b/crates/nu-command/src/experimental/mail_recv.rs @@ -0,0 +1,119 @@ +use std::{sync::mpsc::RecvTimeoutError, time::Duration}; + +use nu_engine::command_prelude::*; +use nu_protocol::engine::Tag; + +#[derive(Clone)] +pub struct MailRecv; + +const CTRL_C_CHECK_INTERVAL: Duration = Duration::from_millis(100); + +impl Command for MailRecv { + fn name(&self) -> &str { + "mail recv" + } + + fn description(&self) -> &str { + "Read a message from the mailbox" + } + + fn extra_description(&self) -> &str { + r#"When messages are sent to the current process, they get stored in what is called the "mailbox". +This commands reads and returns a message from the mailbox, in a first-in-first-out fashion. +j +Messages may have numeric flags attatched to them. This commands supports filtering out messages that do not satisfy a given tag, by using the `tag` flag. +If no tag is specified, this command will accept any message. + +If no message with the specified tag (if any) is available in the mailbox, this command will block the current thread until one arrives. +By default this command block indefinitely until a matching message arrives, but a timeout duration can be specified. + +Note: When using par-each, only one thread at a time can utilize this command. +In the case of two or more threads running this command, they will wait until other threads are done using it, +in no particular order, regardless of the specified timeout parameter. +"# + } + + fn signature(&self) -> nu_protocol::Signature { + Signature::build("mail recv") + .category(Category::Experimental) + .named("tag", SyntaxShape::Int, "A tag for the message", None) + .named( + "timeout", + SyntaxShape::Duration, + "The maximum duration to wait for", + None, + ) + .input_output_types(vec![(Type::Nothing, Type::Any)]) + .allow_variants_without_examples(true) + } + + fn search_terms(&self) -> Vec<&str> { + vec!["receive"] + } + + fn run( + &self, + engine_state: &EngineState, + stack: &mut Stack, + call: &Call, + _input: PipelineData, + ) -> Result { + let head = call.head; + + let tag_arg: Option> = call.get_flag(engine_state, stack, "tag")?; + + if let Some(tag) = tag_arg { + if tag.item < 0 { + return Err(ShellError::NeedsPositiveValue { span: tag.span }); + } + } + + let tag = tag_arg.map(|it| it.item as Tag); + + let duration: Option = call.get_flag(engine_state, stack, "timeout")?; + + let timeout = duration.map(|it| Duration::from_nanos(it as u64)); + + let mut mailbox = engine_state + .current_job + .mailbox + .lock() + .expect("failed to acquire lock"); + + if let Some(timeout) = timeout { + let value = mailbox + .recv_timeout(tag, timeout) + .map_err(|error| match error { + RecvTimeoutError::Timeout => ShellError::RecvTimeout { span: head }, + + // if the channel was disconnected, it means this job was removed from the job + // table, so it was killed/interrupted + RecvTimeoutError::Disconnected => ShellError::Interrupted { span: head }, + })?; + + Ok(value.into_pipeline_data()) + } else { + loop { + if engine_state.signals().interrupted() { + return Err(ShellError::Interrupted { span: head }); + } + + match mailbox.recv_timeout(tag, CTRL_C_CHECK_INTERVAL) { + Ok(value) => return Ok(value.into_pipeline_data()), + Err(RecvTimeoutError::Timeout) => {} // try again + Err(RecvTimeoutError::Disconnected) => { + return Err(ShellError::Interrupted { span: head }) + } + } + } + } + } + + fn examples(&self) -> Vec { + vec![Example { + example: "mail recv", + description: "Block the current thread while no message arrives", + result: None, + }] + } +} diff --git a/crates/nu-command/src/experimental/mail_send.rs b/crates/nu-command/src/experimental/mail_send.rs new file mode 100644 index 0000000000..42e4e1ed15 --- /dev/null +++ b/crates/nu-command/src/experimental/mail_send.rs @@ -0,0 +1,113 @@ +use nu_engine::command_prelude::*; +use nu_protocol::{engine::Tag, JobId}; + +#[derive(Clone)] +pub struct MailSend; + +impl Command for MailSend { + fn name(&self) -> &str { + "mail send" + } + + fn description(&self) -> &str { + "Send a message to a job" + } + + fn extra_description(&self) -> &str { + r#" +This command sends a message to a background job, which can then read sent messages +in a first-in-first-out fashion with `mail recv`. When it does so, it may additionally specify a numeric filter tag, +in which case it will only read messages sent with the exact same filter tag. + +A message can be any nushell value, and streams are always collected before being sent. + +This command never blocks. +"# + } + + fn signature(&self) -> nu_protocol::Signature { + Signature::build("mail send") + .category(Category::Experimental) + .required( + "id", + SyntaxShape::Int, + "The id of the job to send the message to", + ) + .named("tag", SyntaxShape::Int, "A tag for the message", None) + .input_output_types(vec![(Type::Any, Type::Nothing)]) + .allow_variants_without_examples(true) + } + + fn search_terms(&self) -> Vec<&str> { + vec![] + } + + fn run( + &self, + engine_state: &EngineState, + stack: &mut Stack, + call: &Call, + input: PipelineData, + ) -> Result { + let head = call.head; + + let id_arg: Spanned = call.req(engine_state, stack, 0)?; + let tag_arg: Option> = call.get_flag(engine_state, stack, "tag")?; + + let id = id_arg.item; + + if id < 0 { + return Err(ShellError::NeedsPositiveValue { span: id_arg.span }); + } + + if let Some(tag) = tag_arg { + if tag.item < 0 { + return Err(ShellError::NeedsPositiveValue { span: tag.span }); + } + } + + let tag = tag_arg.map(|it| it.item as Tag); + + let value = input.into_value(head)?; + + if id == 0 { + engine_state + .root_job_sender + .send((tag, value)) + .expect("this should NEVER happen."); + } else { + let jobs = engine_state.jobs.lock().expect("failed to acquire lock"); + + if let Some(job) = jobs.lookup(JobId::new(id as usize)) { + match job { + nu_protocol::engine::Job::Thread(thread_job) => { + // it is ok to send this value while holding the lock, because + // mail channels are always unbounded, so this send never blocks + let _ = thread_job.sender.send((tag, value)); + } + nu_protocol::engine::Job::Frozen(_) => { + return Err(ShellError::NeedThreadJob { + id: id as usize, + span: id_arg.span, + }); + } + } + } else { + return Err(ShellError::JobNotFound { + id: id as usize, + span: id_arg.span, + }); + } + } + + Ok(Value::nothing(head).into_pipeline_data()) + } + + fn examples(&self) -> Vec { + vec![Example { + example: "let id = job spawn { mail recv | save sent.txt }; 'hi' | mail send $id", + description: "Send a message to a newly spawned job", + result: None, + }] + } +} diff --git a/crates/nu-command/src/experimental/mod.rs b/crates/nu-command/src/experimental/mod.rs index a892ac5550..ec7a55b485 100644 --- a/crates/nu-command/src/experimental/mod.rs +++ b/crates/nu-command/src/experimental/mod.rs @@ -5,6 +5,11 @@ mod job_kill; mod job_list; mod job_spawn; +mod mail; +mod mail_clear; +mod mail_recv; +mod mail_send; + #[cfg(all(unix, feature = "os"))] mod job_unfreeze; @@ -15,5 +20,10 @@ pub use job_kill::JobKill; pub use job_list::JobList; pub use job_spawn::JobSpawn; +pub use mail::Mail; +pub use mail_clear::MailClear; +pub use mail_recv::MailRecv; +pub use mail_send::MailSend; + #[cfg(all(unix, feature = "os"))] pub use job_unfreeze::JobUnfreeze; diff --git a/crates/nu-protocol/src/errors/shell_error/mod.rs b/crates/nu-protocol/src/errors/shell_error/mod.rs index 627c3679b3..5c25aae992 100644 --- a/crates/nu-protocol/src/errors/shell_error/mod.rs +++ b/crates/nu-protocol/src/errors/shell_error/mod.rs @@ -1360,6 +1360,27 @@ On Windows, this would be %USERPROFILE%\AppData\Roaming"# span: Span, }, + #[error("The job {id} is not a thread job")] + #[diagnostic( + code(nu::shell::os_disabled), + help("The operation you tried to perform cannot be done on this kind of job") + )] + NeedThreadJob { + id: usize, + #[label = "not a thread job"] + span: Span, + }, + + #[error("No message was received in the requested time interval")] + #[diagnostic( + code(nu::shell::os_disabled), + help("No message arrived within the specified time limit") + )] + RecvTimeout { + #[label = "timeout"] + span: Span, + }, + #[error(transparent)] #[diagnostic(transparent)] ChainedError(ChainedError),