Files
nushell/crates/nu-command/src/experimental/job_send.rs
Renan Ribeiro 2d868323b6 Inter-Job direct messaging (#15253)
# Description

This PR implements an experimental inter-job communication model,
through direct message passing, aka "mail"ing or "dm"ing:



- `job send <id>`: Sends a message the job with the given id, the root
job has id 0. Messages are stored in the recipient's "mailbox"
- `job recv`: Returns a stored message, blocks if the mailbox is empty
- `job flush`: Clear all messages from mailbox

Additionally, messages can be sent with a numeric tag, which can then be
filtered with `mail recv --tag`.
This is useful for spawning jobs and receiving messages specifically
from those jobs.

This PR is mostly a proof of concept for how inter-job communication
could look like, so people can provide feedback and suggestions

Closes  #15199

May close #15220 since now jobs can access their own id.

# User-Facing Changes

Adds, `job id`, `job send`, `job recv` and `job flush`  commands.

# Tests + Formatting

[X] TODO:  Implement tests
[X] Consider rewriting some of the job-related tests to use this, to
make them a bit less fragile.

# After Submitting
2025-04-26 23:24:35 +08:00

113 lines
3.5 KiB
Rust

use nu_engine::command_prelude::*;
use nu_protocol::{engine::FilterTag, JobId};
#[derive(Clone)]
pub struct JobSend;
impl Command for JobSend {
fn name(&self) -> &str {
"job send"
}
fn description(&self) -> &str {
"Send a message to the mailbox of a job."
}
fn extra_description(&self) -> &str {
r#"
This command sends a message to a background job, which can then read sent messages
in a first-in-first-out fashion with `job recv`. When it does so, it may additionally specify a numeric filter tag,
in which case it will only read messages sent with the exact same filter tag.
In particular, the id 0 refers to the main/initial nushell thread.
A message can be any nushell value, and streams are always collected before being sent.
This command never blocks.
"#
}
fn signature(&self) -> nu_protocol::Signature {
Signature::build("job send")
.category(Category::Experimental)
.required(
"id",
SyntaxShape::Int,
"The id of the job to send the message to.",
)
.named("tag", SyntaxShape::Int, "A tag for the message", None)
.input_output_types(vec![(Type::Any, Type::Nothing)])
.allow_variants_without_examples(true)
}
fn search_terms(&self) -> Vec<&str> {
vec![]
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let head = call.head;
let id_arg: Spanned<i64> = call.req(engine_state, stack, 0)?;
let tag_arg: Option<Spanned<i64>> = call.get_flag(engine_state, stack, "tag")?;
let id = id_arg.item;
if id < 0 {
return Err(ShellError::NeedsPositiveValue { span: id_arg.span });
}
if let Some(tag) = tag_arg {
if tag.item < 0 {
return Err(ShellError::NeedsPositiveValue { span: tag.span });
}
}
let tag = tag_arg.map(|it| it.item as FilterTag);
if id == 0 {
engine_state
.root_job_sender
.send((tag, input))
.expect("this should NEVER happen.");
} else {
let jobs = engine_state.jobs.lock().expect("failed to acquire lock");
if let Some(job) = jobs.lookup(JobId::new(id as usize)) {
match job {
nu_protocol::engine::Job::Thread(thread_job) => {
// it is ok to send this value while holding the lock, because
// mail channels are always unbounded, so this send never blocks
let _ = thread_job.sender.send((tag, input));
}
nu_protocol::engine::Job::Frozen(_) => {
return Err(ShellError::JobIsFrozen {
id: id as usize,
span: id_arg.span,
});
}
}
} else {
return Err(ShellError::JobNotFound {
id: id as usize,
span: id_arg.span,
});
}
}
Ok(Value::nothing(head).into_pipeline_data())
}
fn examples(&self) -> Vec<Example> {
vec![Example {
example: "let id = job spawn { job recv | save sent.txt }; 'hi' | job send $id",
description: "Send a message to a newly spawned job",
result: None,
}]
}
}