Inter-Job direct messaging (#15253)

# Description

This PR implements an experimental inter-job communication model,
through direct message passing, aka "mail"ing or "dm"ing:



- `job send <id>`: Sends a message the job with the given id, the root
job has id 0. Messages are stored in the recipient's "mailbox"
- `job recv`: Returns a stored message, blocks if the mailbox is empty
- `job flush`: Clear all messages from mailbox

Additionally, messages can be sent with a numeric tag, which can then be
filtered with `mail recv --tag`.
This is useful for spawning jobs and receiving messages specifically
from those jobs.

This PR is mostly a proof of concept for how inter-job communication
could look like, so people can provide feedback and suggestions

Closes  #15199

May close #15220 since now jobs can access their own id.

# User-Facing Changes

Adds, `job id`, `job send`, `job recv` and `job flush`  commands.

# Tests + Formatting

[X] TODO:  Implement tests
[X] Consider rewriting some of the job-related tests to use this, to
make them a bit less fragile.

# After Submitting
This commit is contained in:
Renan Ribeiro
2025-04-26 12:24:35 -03:00
committed by GitHub
parent 0389815137
commit 2d868323b6
14 changed files with 853 additions and 47 deletions

View File

@ -452,10 +452,18 @@ pub fn add_shell_command_context(mut engine_state: EngineState) -> EngineState {
JobSpawn,
JobList,
JobKill,
JobId,
JobTag,
Job,
};
#[cfg(not(target_family = "wasm"))]
bind_command! {
JobSend,
JobRecv,
JobFlush,
}
#[cfg(all(unix, feature = "os"))]
bind_command! {
JobUnfreeze,

View File

@ -0,0 +1,58 @@
use nu_engine::command_prelude::*;
#[derive(Clone)]
pub struct JobFlush;
impl Command for JobFlush {
fn name(&self) -> &str {
"job flush"
}
fn description(&self) -> &str {
"Clear this job's mailbox."
}
fn extra_description(&self) -> &str {
r#"
This command removes all messages in the mailbox of the current job.
If a message is received while this command is executing, it may also be discarded.
"#
}
fn signature(&self) -> nu_protocol::Signature {
Signature::build("job flush")
.category(Category::Experimental)
.input_output_types(vec![(Type::Nothing, Type::Nothing)])
.allow_variants_without_examples(true)
}
fn search_terms(&self) -> Vec<&str> {
vec![]
}
fn run(
&self,
engine_state: &EngineState,
_stack: &mut Stack,
call: &Call,
_input: PipelineData,
) -> Result<PipelineData, ShellError> {
let mut mailbox = engine_state
.current_job
.mailbox
.lock()
.expect("failed to acquire lock");
mailbox.clear();
Ok(Value::nothing(call.head).into_pipeline_data())
}
fn examples(&self) -> Vec<Example> {
vec![Example {
example: "job flush",
description: "Clear the mailbox of the current job.",
result: None,
}]
}
}

View File

@ -0,0 +1,50 @@
use nu_engine::command_prelude::*;
#[derive(Clone)]
pub struct JobId;
impl Command for JobId {
fn name(&self) -> &str {
"job id"
}
fn description(&self) -> &str {
"Get id of current job."
}
fn extra_description(&self) -> &str {
"This command returns the job id for the current background job.
The special id 0 indicates that this command was not called from a background job thread, and
was instead spawned by main nushell execution thread."
}
fn signature(&self) -> nu_protocol::Signature {
Signature::build("job id")
.category(Category::Experimental)
.input_output_types(vec![(Type::Nothing, Type::Int)])
}
fn search_terms(&self) -> Vec<&str> {
vec!["self", "this", "my-id", "this-id"]
}
fn run(
&self,
engine_state: &EngineState,
_stack: &mut Stack,
call: &Call,
_input: PipelineData,
) -> Result<PipelineData, ShellError> {
let head = call.head;
Ok(Value::int(engine_state.current_job.id.get() as i64, head).into_pipeline_data())
}
fn examples(&self) -> Vec<Example> {
vec![Example {
example: "job id",
description: "Get id of current job",
result: None,
}]
}
}

View File

@ -0,0 +1,181 @@
use std::{
sync::mpsc::{RecvTimeoutError, TryRecvError},
time::{Duration, Instant},
};
use nu_engine::command_prelude::*;
use nu_protocol::{
engine::{FilterTag, Mailbox},
Signals,
};
#[derive(Clone)]
pub struct JobRecv;
const CTRL_C_CHECK_INTERVAL: Duration = Duration::from_millis(100);
impl Command for JobRecv {
fn name(&self) -> &str {
"job recv"
}
fn description(&self) -> &str {
"Read a message from the mailbox."
}
fn extra_description(&self) -> &str {
r#"When messages are sent to the current process, they get stored in what is called the "mailbox".
This commands reads and returns a message from the mailbox, in a first-in-first-out fashion.
j
Messages may have numeric flags attached to them. This commands supports filtering out messages that do not satisfy a given tag, by using the `tag` flag.
If no tag is specified, this command will accept any message.
If no message with the specified tag (if any) is available in the mailbox, this command will block the current thread until one arrives.
By default this command block indefinitely until a matching message arrives, but a timeout duration can be specified.
If a timeout duration of zero is specified, it will succeed only if there already is a message in the mailbox.
Note: When using par-each, only one thread at a time can utilize this command.
In the case of two or more threads running this command, they will wait until other threads are done using it,
in no particular order, regardless of the specified timeout parameter.
"#
}
fn signature(&self) -> nu_protocol::Signature {
Signature::build("job recv")
.category(Category::Experimental)
.named("tag", SyntaxShape::Int, "A tag for the message", None)
.named(
"timeout",
SyntaxShape::Duration,
"The maximum time duration to wait for.",
None,
)
.input_output_types(vec![(Type::Nothing, Type::Any)])
.allow_variants_without_examples(true)
}
fn search_terms(&self) -> Vec<&str> {
vec!["receive"]
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
_input: PipelineData,
) -> Result<PipelineData, ShellError> {
let head = call.head;
let tag_arg: Option<Spanned<i64>> = call.get_flag(engine_state, stack, "tag")?;
if let Some(tag) = tag_arg {
if tag.item < 0 {
return Err(ShellError::NeedsPositiveValue { span: tag.span });
}
}
let tag = tag_arg.map(|it| it.item as FilterTag);
let duration: Option<i64> = call.get_flag(engine_state, stack, "timeout")?;
let timeout = duration.map(|it| Duration::from_nanos(it as u64));
let mut mailbox = engine_state
.current_job
.mailbox
.lock()
.expect("failed to acquire lock");
if let Some(timeout) = timeout {
if timeout == Duration::ZERO {
recv_instantly(&mut mailbox, tag, head)
} else {
recv_with_time_limit(&mut mailbox, tag, engine_state.signals(), head, timeout)
}
} else {
recv_without_time_limit(&mut mailbox, tag, engine_state.signals(), head)
}
}
fn examples(&self) -> Vec<Example> {
vec![
Example {
example: "job recv",
description: "Block the current thread while no message arrives",
result: None,
},
Example {
example: "job recv --timeout 10sec",
description: "Receive a message, wait for at most 10 seconds.",
result: None,
},
Example {
example: "job recv --timeout 0sec",
description: "Get a message or fail if no message is available immediately",
result: None,
},
]
}
}
fn recv_without_time_limit(
mailbox: &mut Mailbox,
tag: Option<FilterTag>,
signals: &Signals,
span: Span,
) -> Result<PipelineData, ShellError> {
loop {
if signals.interrupted() {
return Err(ShellError::Interrupted { span });
}
match mailbox.recv_timeout(tag, CTRL_C_CHECK_INTERVAL) {
Ok(value) => return Ok(value),
Err(RecvTimeoutError::Timeout) => {} // try again
Err(RecvTimeoutError::Disconnected) => return Err(ShellError::Interrupted { span }),
}
}
}
fn recv_instantly(
mailbox: &mut Mailbox,
tag: Option<FilterTag>,
span: Span,
) -> Result<PipelineData, ShellError> {
match mailbox.try_recv(tag) {
Ok(value) => Ok(value),
Err(TryRecvError::Empty) => Err(ShellError::RecvTimeout { span }),
Err(TryRecvError::Disconnected) => Err(ShellError::Interrupted { span }),
}
}
fn recv_with_time_limit(
mailbox: &mut Mailbox,
tag: Option<FilterTag>,
signals: &Signals,
span: Span,
timeout: Duration,
) -> Result<PipelineData, ShellError> {
let deadline = Instant::now() + timeout;
loop {
if signals.interrupted() {
return Err(ShellError::Interrupted { span });
}
let time_until_deadline = deadline.saturating_duration_since(Instant::now());
let time_to_sleep = time_until_deadline.min(CTRL_C_CHECK_INTERVAL);
match mailbox.recv_timeout(tag, time_to_sleep) {
Ok(value) => return Ok(value),
Err(RecvTimeoutError::Timeout) => {} // try again
Err(RecvTimeoutError::Disconnected) => return Err(ShellError::Interrupted { span }),
}
if time_until_deadline.is_zero() {
return Err(ShellError::RecvTimeout { span });
}
}
}

View File

@ -0,0 +1,112 @@
use nu_engine::command_prelude::*;
use nu_protocol::{engine::FilterTag, JobId};
#[derive(Clone)]
pub struct JobSend;
impl Command for JobSend {
fn name(&self) -> &str {
"job send"
}
fn description(&self) -> &str {
"Send a message to the mailbox of a job."
}
fn extra_description(&self) -> &str {
r#"
This command sends a message to a background job, which can then read sent messages
in a first-in-first-out fashion with `job recv`. When it does so, it may additionally specify a numeric filter tag,
in which case it will only read messages sent with the exact same filter tag.
In particular, the id 0 refers to the main/initial nushell thread.
A message can be any nushell value, and streams are always collected before being sent.
This command never blocks.
"#
}
fn signature(&self) -> nu_protocol::Signature {
Signature::build("job send")
.category(Category::Experimental)
.required(
"id",
SyntaxShape::Int,
"The id of the job to send the message to.",
)
.named("tag", SyntaxShape::Int, "A tag for the message", None)
.input_output_types(vec![(Type::Any, Type::Nothing)])
.allow_variants_without_examples(true)
}
fn search_terms(&self) -> Vec<&str> {
vec![]
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let head = call.head;
let id_arg: Spanned<i64> = call.req(engine_state, stack, 0)?;
let tag_arg: Option<Spanned<i64>> = call.get_flag(engine_state, stack, "tag")?;
let id = id_arg.item;
if id < 0 {
return Err(ShellError::NeedsPositiveValue { span: id_arg.span });
}
if let Some(tag) = tag_arg {
if tag.item < 0 {
return Err(ShellError::NeedsPositiveValue { span: tag.span });
}
}
let tag = tag_arg.map(|it| it.item as FilterTag);
if id == 0 {
engine_state
.root_job_sender
.send((tag, input))
.expect("this should NEVER happen.");
} else {
let jobs = engine_state.jobs.lock().expect("failed to acquire lock");
if let Some(job) = jobs.lookup(JobId::new(id as usize)) {
match job {
nu_protocol::engine::Job::Thread(thread_job) => {
// it is ok to send this value while holding the lock, because
// mail channels are always unbounded, so this send never blocks
let _ = thread_job.sender.send((tag, input));
}
nu_protocol::engine::Job::Frozen(_) => {
return Err(ShellError::JobIsFrozen {
id: id as usize,
span: id_arg.span,
});
}
}
} else {
return Err(ShellError::JobNotFound {
id: id as usize,
span: id_arg.span,
});
}
}
Ok(Value::nothing(head).into_pipeline_data())
}
fn examples(&self) -> Vec<Example> {
vec![Example {
example: "let id = job spawn { job recv | save sent.txt }; 'hi' | job send $id",
description: "Send a message to a newly spawned job",
result: None,
}]
}
}

View File

@ -1,14 +1,14 @@
use std::{
sync::{
atomic::{AtomicBool, AtomicU32},
Arc,
mpsc, Arc, Mutex,
},
thread,
};
use nu_engine::{command_prelude::*, ClosureEvalOnce};
use nu_protocol::{
engine::{Closure, Job, Redirection, ThreadJob},
engine::{Closure, CurrentJob, Job, Mailbox, Redirection, ThreadJob},
report_shell_error, OutDest, Signals,
};
@ -57,12 +57,11 @@ impl Command for JobSpawn {
let closure: Closure = call.req(engine_state, stack, 0)?;
let tag: Option<String> = call.get_flag(engine_state, stack, "tag")?;
let job_stack = stack.clone();
let mut job_state = engine_state.clone();
job_state.is_interactive = false;
let job_stack = stack.clone();
// the new job should have its ctrl-c independent of foreground
let job_signals = Signals::new(Arc::new(AtomicBool::new(false)));
job_state.set_signals(job_signals.clone());
@ -75,10 +74,20 @@ impl Command for JobSpawn {
let jobs = job_state.jobs.clone();
let mut jobs = jobs.lock().expect("jobs lock is poisoned!");
let (send, recv) = mpsc::channel();
let id = {
let thread_job = ThreadJob::new(job_signals, tag);
job_state.current_thread_job = Some(thread_job.clone());
jobs.add_job(Job::Thread(thread_job))
let thread_job = ThreadJob::new(job_signals, tag, send);
let id = jobs.add_job(Job::Thread(thread_job.clone()));
job_state.current_job = CurrentJob {
id,
background_thread_job: Some(thread_job),
mailbox: Arc::new(Mutex::new(Mailbox::new(recv))),
};
id
};
let result = thread::Builder::new()

View File

@ -118,7 +118,7 @@ fn unfreeze_job(
}) => {
let pid = handle.pid();
if let Some(thread_job) = &state.current_thread_job {
if let Some(thread_job) = &state.current_thread_job() {
if !thread_job.try_add_pid(pid) {
kill_by_pid(pid.into()).map_err(|err| {
ShellError::Io(IoError::new_internal(
@ -136,7 +136,7 @@ fn unfreeze_job(
.then(|| state.pipeline_externals_state.clone()),
);
if let Some(thread_job) = &state.current_thread_job {
if let Some(thread_job) = &state.current_thread_job() {
thread_job.remove_pid(pid);
}

View File

@ -1,5 +1,6 @@
mod is_admin;
mod job;
mod job_id;
mod job_kill;
mod job_list;
mod job_spawn;
@ -8,12 +9,27 @@ mod job_tag;
#[cfg(all(unix, feature = "os"))]
mod job_unfreeze;
#[cfg(not(target_family = "wasm"))]
mod job_flush;
#[cfg(not(target_family = "wasm"))]
mod job_recv;
#[cfg(not(target_family = "wasm"))]
mod job_send;
pub use is_admin::IsAdmin;
pub use job::Job;
pub use job_id::JobId;
pub use job_kill::JobKill;
pub use job_list::JobList;
pub use job_spawn::JobSpawn;
pub use job_tag::JobTag;
#[cfg(not(target_family = "wasm"))]
pub use job_flush::JobFlush;
#[cfg(not(target_family = "wasm"))]
pub use job_recv::JobRecv;
#[cfg(not(target_family = "wasm"))]
pub use job_send::JobSend;
#[cfg(all(unix, feature = "os"))]
pub use job_unfreeze::JobUnfreeze;

View File

@ -285,7 +285,7 @@ impl Command for External {
)
})?;
if let Some(thread_job) = &engine_state.current_thread_job {
if let Some(thread_job) = engine_state.current_thread_job() {
if !thread_job.try_add_pid(child.pid()) {
kill_by_pid(child.pid().into()).map_err(|err| {
ShellError::Io(IoError::new_internal(