From 2d868323b6bdb3085f9009b61d9ef97cd5851e33 Mon Sep 17 00:00:00 2001 From: Renan Ribeiro <55855728+cosineblast@users.noreply.github.com> Date: Sat, 26 Apr 2025 12:24:35 -0300 Subject: [PATCH] Inter-Job direct messaging (#15253) # Description This PR implements an experimental inter-job communication model, through direct message passing, aka "mail"ing or "dm"ing: - `job send `: Sends a message the job with the given id, the root job has id 0. Messages are stored in the recipient's "mailbox" - `job recv`: Returns a stored message, blocks if the mailbox is empty - `job flush`: Clear all messages from mailbox Additionally, messages can be sent with a numeric tag, which can then be filtered with `mail recv --tag`. This is useful for spawning jobs and receiving messages specifically from those jobs. This PR is mostly a proof of concept for how inter-job communication could look like, so people can provide feedback and suggestions Closes #15199 May close #15220 since now jobs can access their own id. # User-Facing Changes Adds, `job id`, `job send`, `job recv` and `job flush` commands. # Tests + Formatting [X] TODO: Implement tests [X] Consider rewriting some of the job-related tests to use this, to make them a bit less fragile. # After Submitting --- crates/nu-command/src/default_context.rs | 8 + .../nu-command/src/experimental/job_flush.rs | 58 +++++ crates/nu-command/src/experimental/job_id.rs | 50 ++++ .../nu-command/src/experimental/job_recv.rs | 181 +++++++++++++++ .../nu-command/src/experimental/job_send.rs | 112 +++++++++ .../nu-command/src/experimental/job_spawn.rs | 23 +- .../src/experimental/job_unfreeze.rs | 4 +- crates/nu-command/src/experimental/mod.rs | 16 ++ crates/nu-command/src/system/run_external.rs | 2 +- crates/nu-command/tests/commands/job.rs | 218 ++++++++++++++++-- crates/nu-protocol/src/engine/engine_state.rs | 30 ++- crates/nu-protocol/src/engine/jobs.rs | 173 +++++++++++++- .../nu-protocol/src/errors/shell_error/mod.rs | 23 +- crates/nu-protocol/src/process/child.rs | 2 +- 14 files changed, 853 insertions(+), 47 deletions(-) create mode 100644 crates/nu-command/src/experimental/job_flush.rs create mode 100644 crates/nu-command/src/experimental/job_id.rs create mode 100644 crates/nu-command/src/experimental/job_recv.rs create mode 100644 crates/nu-command/src/experimental/job_send.rs diff --git a/crates/nu-command/src/default_context.rs b/crates/nu-command/src/default_context.rs index d803225c36..f18e3eb369 100644 --- a/crates/nu-command/src/default_context.rs +++ b/crates/nu-command/src/default_context.rs @@ -452,10 +452,18 @@ pub fn add_shell_command_context(mut engine_state: EngineState) -> EngineState { JobSpawn, JobList, JobKill, + JobId, JobTag, Job, }; + #[cfg(not(target_family = "wasm"))] + bind_command! { + JobSend, + JobRecv, + JobFlush, + } + #[cfg(all(unix, feature = "os"))] bind_command! { JobUnfreeze, diff --git a/crates/nu-command/src/experimental/job_flush.rs b/crates/nu-command/src/experimental/job_flush.rs new file mode 100644 index 0000000000..f717cb7bae --- /dev/null +++ b/crates/nu-command/src/experimental/job_flush.rs @@ -0,0 +1,58 @@ +use nu_engine::command_prelude::*; + +#[derive(Clone)] +pub struct JobFlush; + +impl Command for JobFlush { + fn name(&self) -> &str { + "job flush" + } + + fn description(&self) -> &str { + "Clear this job's mailbox." + } + + fn extra_description(&self) -> &str { + r#" +This command removes all messages in the mailbox of the current job. +If a message is received while this command is executing, it may also be discarded. +"# + } + + fn signature(&self) -> nu_protocol::Signature { + Signature::build("job flush") + .category(Category::Experimental) + .input_output_types(vec![(Type::Nothing, Type::Nothing)]) + .allow_variants_without_examples(true) + } + + fn search_terms(&self) -> Vec<&str> { + vec![] + } + + fn run( + &self, + engine_state: &EngineState, + _stack: &mut Stack, + call: &Call, + _input: PipelineData, + ) -> Result { + let mut mailbox = engine_state + .current_job + .mailbox + .lock() + .expect("failed to acquire lock"); + + mailbox.clear(); + + Ok(Value::nothing(call.head).into_pipeline_data()) + } + + fn examples(&self) -> Vec { + vec![Example { + example: "job flush", + description: "Clear the mailbox of the current job.", + result: None, + }] + } +} diff --git a/crates/nu-command/src/experimental/job_id.rs b/crates/nu-command/src/experimental/job_id.rs new file mode 100644 index 0000000000..6f3be8a9eb --- /dev/null +++ b/crates/nu-command/src/experimental/job_id.rs @@ -0,0 +1,50 @@ +use nu_engine::command_prelude::*; + +#[derive(Clone)] +pub struct JobId; + +impl Command for JobId { + fn name(&self) -> &str { + "job id" + } + + fn description(&self) -> &str { + "Get id of current job." + } + + fn extra_description(&self) -> &str { + "This command returns the job id for the current background job. +The special id 0 indicates that this command was not called from a background job thread, and +was instead spawned by main nushell execution thread." + } + + fn signature(&self) -> nu_protocol::Signature { + Signature::build("job id") + .category(Category::Experimental) + .input_output_types(vec![(Type::Nothing, Type::Int)]) + } + + fn search_terms(&self) -> Vec<&str> { + vec!["self", "this", "my-id", "this-id"] + } + + fn run( + &self, + engine_state: &EngineState, + _stack: &mut Stack, + call: &Call, + _input: PipelineData, + ) -> Result { + let head = call.head; + + Ok(Value::int(engine_state.current_job.id.get() as i64, head).into_pipeline_data()) + } + + fn examples(&self) -> Vec { + vec![Example { + example: "job id", + description: "Get id of current job", + result: None, + }] + } +} diff --git a/crates/nu-command/src/experimental/job_recv.rs b/crates/nu-command/src/experimental/job_recv.rs new file mode 100644 index 0000000000..a19a298620 --- /dev/null +++ b/crates/nu-command/src/experimental/job_recv.rs @@ -0,0 +1,181 @@ +use std::{ + sync::mpsc::{RecvTimeoutError, TryRecvError}, + time::{Duration, Instant}, +}; + +use nu_engine::command_prelude::*; + +use nu_protocol::{ + engine::{FilterTag, Mailbox}, + Signals, +}; + +#[derive(Clone)] +pub struct JobRecv; + +const CTRL_C_CHECK_INTERVAL: Duration = Duration::from_millis(100); + +impl Command for JobRecv { + fn name(&self) -> &str { + "job recv" + } + + fn description(&self) -> &str { + "Read a message from the mailbox." + } + + fn extra_description(&self) -> &str { + r#"When messages are sent to the current process, they get stored in what is called the "mailbox". +This commands reads and returns a message from the mailbox, in a first-in-first-out fashion. +j +Messages may have numeric flags attached to them. This commands supports filtering out messages that do not satisfy a given tag, by using the `tag` flag. +If no tag is specified, this command will accept any message. + +If no message with the specified tag (if any) is available in the mailbox, this command will block the current thread until one arrives. +By default this command block indefinitely until a matching message arrives, but a timeout duration can be specified. +If a timeout duration of zero is specified, it will succeed only if there already is a message in the mailbox. + +Note: When using par-each, only one thread at a time can utilize this command. +In the case of two or more threads running this command, they will wait until other threads are done using it, +in no particular order, regardless of the specified timeout parameter. +"# + } + + fn signature(&self) -> nu_protocol::Signature { + Signature::build("job recv") + .category(Category::Experimental) + .named("tag", SyntaxShape::Int, "A tag for the message", None) + .named( + "timeout", + SyntaxShape::Duration, + "The maximum time duration to wait for.", + None, + ) + .input_output_types(vec![(Type::Nothing, Type::Any)]) + .allow_variants_without_examples(true) + } + + fn search_terms(&self) -> Vec<&str> { + vec!["receive"] + } + + fn run( + &self, + engine_state: &EngineState, + stack: &mut Stack, + call: &Call, + _input: PipelineData, + ) -> Result { + let head = call.head; + + let tag_arg: Option> = call.get_flag(engine_state, stack, "tag")?; + + if let Some(tag) = tag_arg { + if tag.item < 0 { + return Err(ShellError::NeedsPositiveValue { span: tag.span }); + } + } + + let tag = tag_arg.map(|it| it.item as FilterTag); + + let duration: Option = call.get_flag(engine_state, stack, "timeout")?; + + let timeout = duration.map(|it| Duration::from_nanos(it as u64)); + + let mut mailbox = engine_state + .current_job + .mailbox + .lock() + .expect("failed to acquire lock"); + + if let Some(timeout) = timeout { + if timeout == Duration::ZERO { + recv_instantly(&mut mailbox, tag, head) + } else { + recv_with_time_limit(&mut mailbox, tag, engine_state.signals(), head, timeout) + } + } else { + recv_without_time_limit(&mut mailbox, tag, engine_state.signals(), head) + } + } + + fn examples(&self) -> Vec { + vec![ + Example { + example: "job recv", + description: "Block the current thread while no message arrives", + result: None, + }, + Example { + example: "job recv --timeout 10sec", + description: "Receive a message, wait for at most 10 seconds.", + result: None, + }, + Example { + example: "job recv --timeout 0sec", + description: "Get a message or fail if no message is available immediately", + result: None, + }, + ] + } +} + +fn recv_without_time_limit( + mailbox: &mut Mailbox, + tag: Option, + signals: &Signals, + span: Span, +) -> Result { + loop { + if signals.interrupted() { + return Err(ShellError::Interrupted { span }); + } + match mailbox.recv_timeout(tag, CTRL_C_CHECK_INTERVAL) { + Ok(value) => return Ok(value), + Err(RecvTimeoutError::Timeout) => {} // try again + Err(RecvTimeoutError::Disconnected) => return Err(ShellError::Interrupted { span }), + } + } +} + +fn recv_instantly( + mailbox: &mut Mailbox, + tag: Option, + span: Span, +) -> Result { + match mailbox.try_recv(tag) { + Ok(value) => Ok(value), + Err(TryRecvError::Empty) => Err(ShellError::RecvTimeout { span }), + Err(TryRecvError::Disconnected) => Err(ShellError::Interrupted { span }), + } +} + +fn recv_with_time_limit( + mailbox: &mut Mailbox, + tag: Option, + signals: &Signals, + span: Span, + timeout: Duration, +) -> Result { + let deadline = Instant::now() + timeout; + + loop { + if signals.interrupted() { + return Err(ShellError::Interrupted { span }); + } + + let time_until_deadline = deadline.saturating_duration_since(Instant::now()); + + let time_to_sleep = time_until_deadline.min(CTRL_C_CHECK_INTERVAL); + + match mailbox.recv_timeout(tag, time_to_sleep) { + Ok(value) => return Ok(value), + Err(RecvTimeoutError::Timeout) => {} // try again + Err(RecvTimeoutError::Disconnected) => return Err(ShellError::Interrupted { span }), + } + + if time_until_deadline.is_zero() { + return Err(ShellError::RecvTimeout { span }); + } + } +} diff --git a/crates/nu-command/src/experimental/job_send.rs b/crates/nu-command/src/experimental/job_send.rs new file mode 100644 index 0000000000..08495d5261 --- /dev/null +++ b/crates/nu-command/src/experimental/job_send.rs @@ -0,0 +1,112 @@ +use nu_engine::command_prelude::*; +use nu_protocol::{engine::FilterTag, JobId}; + +#[derive(Clone)] +pub struct JobSend; + +impl Command for JobSend { + fn name(&self) -> &str { + "job send" + } + + fn description(&self) -> &str { + "Send a message to the mailbox of a job." + } + + fn extra_description(&self) -> &str { + r#" +This command sends a message to a background job, which can then read sent messages +in a first-in-first-out fashion with `job recv`. When it does so, it may additionally specify a numeric filter tag, +in which case it will only read messages sent with the exact same filter tag. +In particular, the id 0 refers to the main/initial nushell thread. + +A message can be any nushell value, and streams are always collected before being sent. + +This command never blocks. +"# + } + + fn signature(&self) -> nu_protocol::Signature { + Signature::build("job send") + .category(Category::Experimental) + .required( + "id", + SyntaxShape::Int, + "The id of the job to send the message to.", + ) + .named("tag", SyntaxShape::Int, "A tag for the message", None) + .input_output_types(vec![(Type::Any, Type::Nothing)]) + .allow_variants_without_examples(true) + } + + fn search_terms(&self) -> Vec<&str> { + vec![] + } + + fn run( + &self, + engine_state: &EngineState, + stack: &mut Stack, + call: &Call, + input: PipelineData, + ) -> Result { + let head = call.head; + + let id_arg: Spanned = call.req(engine_state, stack, 0)?; + let tag_arg: Option> = call.get_flag(engine_state, stack, "tag")?; + + let id = id_arg.item; + + if id < 0 { + return Err(ShellError::NeedsPositiveValue { span: id_arg.span }); + } + + if let Some(tag) = tag_arg { + if tag.item < 0 { + return Err(ShellError::NeedsPositiveValue { span: tag.span }); + } + } + + let tag = tag_arg.map(|it| it.item as FilterTag); + + if id == 0 { + engine_state + .root_job_sender + .send((tag, input)) + .expect("this should NEVER happen."); + } else { + let jobs = engine_state.jobs.lock().expect("failed to acquire lock"); + + if let Some(job) = jobs.lookup(JobId::new(id as usize)) { + match job { + nu_protocol::engine::Job::Thread(thread_job) => { + // it is ok to send this value while holding the lock, because + // mail channels are always unbounded, so this send never blocks + let _ = thread_job.sender.send((tag, input)); + } + nu_protocol::engine::Job::Frozen(_) => { + return Err(ShellError::JobIsFrozen { + id: id as usize, + span: id_arg.span, + }); + } + } + } else { + return Err(ShellError::JobNotFound { + id: id as usize, + span: id_arg.span, + }); + } + } + + Ok(Value::nothing(head).into_pipeline_data()) + } + + fn examples(&self) -> Vec { + vec![Example { + example: "let id = job spawn { job recv | save sent.txt }; 'hi' | job send $id", + description: "Send a message to a newly spawned job", + result: None, + }] + } +} diff --git a/crates/nu-command/src/experimental/job_spawn.rs b/crates/nu-command/src/experimental/job_spawn.rs index 37203aafb9..09ff929a8a 100644 --- a/crates/nu-command/src/experimental/job_spawn.rs +++ b/crates/nu-command/src/experimental/job_spawn.rs @@ -1,14 +1,14 @@ use std::{ sync::{ atomic::{AtomicBool, AtomicU32}, - Arc, + mpsc, Arc, Mutex, }, thread, }; use nu_engine::{command_prelude::*, ClosureEvalOnce}; use nu_protocol::{ - engine::{Closure, Job, Redirection, ThreadJob}, + engine::{Closure, CurrentJob, Job, Mailbox, Redirection, ThreadJob}, report_shell_error, OutDest, Signals, }; @@ -57,12 +57,11 @@ impl Command for JobSpawn { let closure: Closure = call.req(engine_state, stack, 0)?; let tag: Option = call.get_flag(engine_state, stack, "tag")?; + let job_stack = stack.clone(); let mut job_state = engine_state.clone(); job_state.is_interactive = false; - let job_stack = stack.clone(); - // the new job should have its ctrl-c independent of foreground let job_signals = Signals::new(Arc::new(AtomicBool::new(false))); job_state.set_signals(job_signals.clone()); @@ -75,10 +74,20 @@ impl Command for JobSpawn { let jobs = job_state.jobs.clone(); let mut jobs = jobs.lock().expect("jobs lock is poisoned!"); + let (send, recv) = mpsc::channel(); + let id = { - let thread_job = ThreadJob::new(job_signals, tag); - job_state.current_thread_job = Some(thread_job.clone()); - jobs.add_job(Job::Thread(thread_job)) + let thread_job = ThreadJob::new(job_signals, tag, send); + + let id = jobs.add_job(Job::Thread(thread_job.clone())); + + job_state.current_job = CurrentJob { + id, + background_thread_job: Some(thread_job), + mailbox: Arc::new(Mutex::new(Mailbox::new(recv))), + }; + + id }; let result = thread::Builder::new() diff --git a/crates/nu-command/src/experimental/job_unfreeze.rs b/crates/nu-command/src/experimental/job_unfreeze.rs index 67fb3c96a1..3143b31184 100644 --- a/crates/nu-command/src/experimental/job_unfreeze.rs +++ b/crates/nu-command/src/experimental/job_unfreeze.rs @@ -118,7 +118,7 @@ fn unfreeze_job( }) => { let pid = handle.pid(); - if let Some(thread_job) = &state.current_thread_job { + if let Some(thread_job) = &state.current_thread_job() { if !thread_job.try_add_pid(pid) { kill_by_pid(pid.into()).map_err(|err| { ShellError::Io(IoError::new_internal( @@ -136,7 +136,7 @@ fn unfreeze_job( .then(|| state.pipeline_externals_state.clone()), ); - if let Some(thread_job) = &state.current_thread_job { + if let Some(thread_job) = &state.current_thread_job() { thread_job.remove_pid(pid); } diff --git a/crates/nu-command/src/experimental/mod.rs b/crates/nu-command/src/experimental/mod.rs index f98d123218..9c695d9116 100644 --- a/crates/nu-command/src/experimental/mod.rs +++ b/crates/nu-command/src/experimental/mod.rs @@ -1,5 +1,6 @@ mod is_admin; mod job; +mod job_id; mod job_kill; mod job_list; mod job_spawn; @@ -8,12 +9,27 @@ mod job_tag; #[cfg(all(unix, feature = "os"))] mod job_unfreeze; +#[cfg(not(target_family = "wasm"))] +mod job_flush; +#[cfg(not(target_family = "wasm"))] +mod job_recv; +#[cfg(not(target_family = "wasm"))] +mod job_send; + pub use is_admin::IsAdmin; pub use job::Job; +pub use job_id::JobId; pub use job_kill::JobKill; pub use job_list::JobList; pub use job_spawn::JobSpawn; pub use job_tag::JobTag; +#[cfg(not(target_family = "wasm"))] +pub use job_flush::JobFlush; +#[cfg(not(target_family = "wasm"))] +pub use job_recv::JobRecv; +#[cfg(not(target_family = "wasm"))] +pub use job_send::JobSend; + #[cfg(all(unix, feature = "os"))] pub use job_unfreeze::JobUnfreeze; diff --git a/crates/nu-command/src/system/run_external.rs b/crates/nu-command/src/system/run_external.rs index 8331c9f284..180238f3f2 100644 --- a/crates/nu-command/src/system/run_external.rs +++ b/crates/nu-command/src/system/run_external.rs @@ -285,7 +285,7 @@ impl Command for External { ) })?; - if let Some(thread_job) = &engine_state.current_thread_job { + if let Some(thread_job) = engine_state.current_thread_job() { if !thread_job.try_add_pid(child.pid()) { kill_by_pid(child.pid().into()).map_err(|err| { ShellError::Io(IoError::new_internal( diff --git a/crates/nu-command/tests/commands/job.rs b/crates/nu-command/tests/commands/job.rs index 10b8346edd..608ca318c8 100644 --- a/crates/nu-command/tests/commands/job.rs +++ b/crates/nu-command/tests/commands/job.rs @@ -1,22 +1,188 @@ -use nu_test_support::{nu, playground::Playground}; +use nu_test_support::nu; #[test] -fn jobs_do_run() { - Playground::setup("job_test_1", |dirs, sandbox| { - sandbox.with_files(&[]); +fn job_send_root_job_works() { + let actual = nu!(r#" + job spawn { 'beep' | job send 0 } + job recv --timeout 10sec"#); - let actual = nu!( - cwd: dirs.root(), - r#" - rm -f a.txt; - job spawn { sleep 200ms; 'a' | save a.txt }; - let before = 'a.txt' | path exists; - sleep 400ms; - let after = 'a.txt' | path exists; - [$before, $after] | to nuon"# - ); - assert_eq!(actual.out, "[false, true]"); - }) + assert_eq!(actual.out, "beep"); +} + +#[test] +fn job_send_background_job_works() { + let actual = nu!(r#" + let job = job spawn { job recv | job send 0 } + 'boop' | job send $job + job recv --timeout 10sec"#); + + assert_eq!(actual.out, "boop"); +} + +#[test] +fn job_send_to_self_works() { + let actual = nu!(r#" + "meep" | job send 0 + job recv"#); + + assert_eq!(actual.out, "meep"); +} + +#[test] +fn job_send_to_self_from_background_works() { + let actual = nu!(r#" + job spawn { + 'beep' | job send (job id) + job recv | job send 0 + } + + job recv --timeout 10sec"#); + + assert_eq!(actual.out, "beep"); +} + +#[test] +fn job_id_of_root_job_is_zero() { + let actual = nu!(r#"job id"#); + + assert_eq!(actual.out, "0"); +} + +#[test] +fn job_id_of_background_jobs_works() { + let actual = nu!(r#" + let job1 = job spawn { job id | job send 0 } + let id1 = job recv --timeout 5sec + + let job2 = job spawn { job id | job send 0 } + let id2 = job recv --timeout 5sec + + let job3 = job spawn { job id | job send 0 } + let id3 = job recv --timeout 5sec + + [($job1 == $id1) ($job2 == $id2) ($job3 == $id3)] | to nuon + + "#); + + assert_eq!(actual.out, "[true, true, true]"); +} + +#[test] +fn untagged_job_recv_accepts_tagged_messages() { + let actual = nu!(r#" + job spawn { "boop" | job send 0 --tag 123 } + job recv --timeout 10sec + "#); + + assert_eq!(actual.out, "boop"); +} + +#[test] +fn tagged_job_recv_filters_untagged_messages() { + let actual = nu!(r#" + job spawn { "boop" | job send 0 } + job recv --tag 123 --timeout 1sec + "#); + + assert_eq!(actual.out, ""); + assert!(actual.err.contains("timeout")); +} + +#[test] +fn tagged_job_recv_filters_badly_tagged_messages() { + let actual = nu!(r#" + job spawn { "boop" | job send 0 --tag 321 } + job recv --tag 123 --timeout 1sec + "#); + + assert_eq!(actual.out, ""); + assert!(actual.err.contains("timeout")); +} + +#[test] +fn tagged_job_recv_accepts_properly_tagged_messages() { + let actual = nu!(r#" + job spawn { "boop" | job send 0 --tag 123 } + job recv --tag 123 --timeout 5sec + "#); + + assert_eq!(actual.out, "boop"); +} + +#[test] +fn filtered_messages_are_not_erased() { + let actual = nu!(r#" + "msg1" | job send 0 --tag 123 + "msg2" | job send 0 --tag 456 + "msg3" | job send 0 --tag 789 + + let first = job recv --tag 789 --timeout 5sec + let second = job recv --timeout 1sec + let third = job recv --timeout 1sec + + + [($first) ($second) ($third)] | to nuon + "#); + + assert_eq!(actual.out, r#"["msg3", "msg1", "msg2"]"#); +} + +#[test] +fn job_recv_timeout_works() { + let actual = nu!(r#" + job spawn { + sleep 2sec + "boop" | job send 0 + } + + job recv --timeout 1sec + "#); + + assert_eq!(actual.out, ""); + assert!(actual.err.contains("timeout")); +} + +#[test] +fn job_recv_timeout_zero_works() { + let actual = nu!(r#" + "hi there" | job send 0 + job recv --timeout 0sec + "#); + + assert_eq!(actual.out, "hi there"); +} + +#[test] +fn job_flush_clears_messages() { + let actual = nu!(r#" + "SALE!!!" | job send 0 + "[HYPERLINK BLOCKED]" | job send 0 + + job flush + + job recv --timeout 1sec + "#); + + assert_eq!(actual.out, ""); + assert!(actual.err.contains("timeout")); +} + +#[test] +fn job_flush_clears_filtered_messages() { + let actual = nu!(r#" + "msg1" | job send 0 --tag 123 + "msg2" | job send 0 --tag 456 + "msg3" | job send 0 --tag 789 + + job recv --tag 789 --timeout 1sec + + job flush + + job recv --timeout 1sec + "#); + + assert_eq!(actual.out, ""); + assert!(actual.err.contains("timeout")); } #[test] @@ -31,11 +197,11 @@ fn job_list_adds_jobs_correctly() { let actual = nu!(format!( r#" let list0 = job list | get id; - let job1 = job spawn {{ sleep 20ms }}; + let job1 = job spawn {{ job recv }}; let list1 = job list | get id; - let job2 = job spawn {{ sleep 20ms }}; + let job2 = job spawn {{ job recv }}; let list2 = job list | get id; - let job3 = job spawn {{ sleep 20ms }}; + let job3 = job spawn {{ job recv }}; let list3 = job list | get id; [({}), ({}), ({}), ({})] | to nuon "#, @@ -52,11 +218,13 @@ fn job_list_adds_jobs_correctly() { fn jobs_get_removed_from_list_after_termination() { let actual = nu!(format!( r#" - let job = job spawn {{ sleep 0.5sec }}; + let job = job spawn {{ job recv }}; let list0 = job list | get id; - sleep 1sec + "die!" | job send $job + + sleep 0.2sec let list1 = job list | get id; @@ -68,6 +236,8 @@ fn jobs_get_removed_from_list_after_termination() { assert_eq!(actual.out, "[true, true]"); } +// TODO: find way to communicate between process in windows +// so these tests can fail less often #[test] fn job_list_shows_pids() { let actual = nu!(format!( @@ -89,9 +259,9 @@ fn job_list_shows_pids() { fn killing_job_removes_it_from_table() { let actual = nu!(format!( r#" - let job1 = job spawn {{ sleep 100ms }} - let job2 = job spawn {{ sleep 100ms }} - let job3 = job spawn {{ sleep 100ms }} + let job1 = job spawn {{ job recv }} + let job2 = job spawn {{ job recv }} + let job3 = job spawn {{ job recv }} let list_before = job list | get id diff --git a/crates/nu-protocol/src/engine/engine_state.rs b/crates/nu-protocol/src/engine/engine_state.rs index 89572c9a85..3099b33f20 100644 --- a/crates/nu-protocol/src/engine/engine_state.rs +++ b/crates/nu-protocol/src/engine/engine_state.rs @@ -8,9 +8,9 @@ use crate::{ }, eval_const::create_nu_constant, shell_error::io::IoError, - BlockId, Category, Config, DeclId, FileId, GetSpan, Handlers, HistoryConfig, Module, ModuleId, - OverlayId, ShellError, SignalAction, Signals, Signature, Span, SpanId, Type, Value, VarId, - VirtualPathId, + BlockId, Category, Config, DeclId, FileId, GetSpan, Handlers, HistoryConfig, JobId, Module, + ModuleId, OverlayId, ShellError, SignalAction, Signals, Signature, Span, SpanId, Type, Value, + VarId, VirtualPathId, }; use fancy_regex::Regex; use lru::LruCache; @@ -22,6 +22,8 @@ use std::{ path::PathBuf, sync::{ atomic::{AtomicBool, AtomicU32, Ordering}, + mpsc::channel, + mpsc::Sender, Arc, Mutex, MutexGuard, PoisonError, }, }; @@ -31,7 +33,7 @@ type PoisonDebuggerError<'a> = PoisonError>>; #[cfg(feature = "plugin")] use crate::{PluginRegistryFile, PluginRegistryItem, RegisteredPlugin}; -use super::{Jobs, ThreadJob}; +use super::{CurrentJob, Jobs, Mail, Mailbox, ThreadJob}; #[derive(Clone, Debug)] pub enum VirtualPath { @@ -117,7 +119,9 @@ pub struct EngineState { pub jobs: Arc>, // The job being executed with this engine state, or None if main thread - pub current_thread_job: Option, + pub current_job: CurrentJob, + + pub root_job_sender: Sender, // When there are background jobs running, the interactive behavior of `exit` changes depending on // the value of this flag: @@ -141,6 +145,8 @@ pub const UNKNOWN_SPAN_ID: SpanId = SpanId::new(0); impl EngineState { pub fn new() -> Self { + let (send, recv) = channel::(); + Self { files: vec![], virtual_paths: vec![], @@ -196,7 +202,12 @@ impl EngineState { is_debugging: IsDebugging::new(false), debugger: Arc::new(Mutex::new(Box::new(NoopDebugger))), jobs: Arc::new(Mutex::new(Jobs::default())), - current_thread_job: None, + current_job: CurrentJob { + id: JobId::new(0), + background_thread_job: None, + mailbox: Arc::new(Mutex::new(Mailbox::new(recv))), + }, + root_job_sender: send, exit_warning_given: Arc::new(AtomicBool::new(false)), } } @@ -1081,7 +1092,12 @@ impl EngineState { // Determines whether the current state is being held by a background job pub fn is_background_job(&self) -> bool { - self.current_thread_job.is_some() + self.current_job.background_thread_job.is_some() + } + + // Gets the thread job entry + pub fn current_thread_job(&self) -> Option<&ThreadJob> { + self.current_job.background_thread_job.as_ref() } } diff --git a/crates/nu-protocol/src/engine/jobs.rs b/crates/nu-protocol/src/engine/jobs.rs index 8e64e46f7f..71c18a4c83 100644 --- a/crates/nu-protocol/src/engine/jobs.rs +++ b/crates/nu-protocol/src/engine/jobs.rs @@ -1,11 +1,17 @@ use std::{ - collections::{HashMap, HashSet}, - sync::{Arc, Mutex}, + collections::{BTreeMap, BTreeSet, HashMap, HashSet}, + sync::{ + mpsc::{Receiver, RecvTimeoutError, Sender, TryRecvError}, + Arc, Mutex, + }, }; +#[cfg(not(target_family = "wasm"))] +use std::time::{Duration, Instant}; + use nu_system::{kill_by_pid, UnfreezeHandle}; -use crate::Signals; +use crate::{PipelineData, Signals}; use crate::JobId; @@ -139,13 +145,15 @@ pub struct ThreadJob { signals: Signals, pids: Arc>>, tag: Option, + pub sender: Sender, } impl ThreadJob { - pub fn new(signals: Signals, tag: Option) -> Self { + pub fn new(signals: Signals, tag: Option, sender: Sender) -> Self { ThreadJob { signals, pids: Arc::new(Mutex::new(HashSet::default())), + sender, tag, } } @@ -238,3 +246,160 @@ impl FrozenJob { } } } + +/// Stores the information about the background job currently being executed by this thread, if any +#[derive(Clone)] +pub struct CurrentJob { + pub id: JobId, + + // The background thread job associated with this thread. + // If None, it indicates this thread is currently the main job + pub background_thread_job: Option, + + // note: although the mailbox is Mutex'd, it is only ever accessed + // by the current job's threads + pub mailbox: Arc>, +} + +// The storage for unread messages +// +// Messages are initially sent over a mpsc channel, +// and may then be stored in a IgnoredMail struct when +// filtered out by a tag. +pub struct Mailbox { + receiver: Receiver, + ignored_mail: IgnoredMail, +} + +impl Mailbox { + pub fn new(receiver: Receiver) -> Self { + Mailbox { + receiver, + ignored_mail: IgnoredMail::default(), + } + } + + #[cfg(not(target_family = "wasm"))] + pub fn recv_timeout( + &mut self, + filter_tag: Option, + timeout: Duration, + ) -> Result { + if let Some(value) = self.ignored_mail.pop(filter_tag) { + Ok(value) + } else { + let mut waited_so_far = Duration::ZERO; + let mut before = Instant::now(); + + while waited_so_far < timeout { + let (tag, value) = self.receiver.recv_timeout(timeout - waited_so_far)?; + + if filter_tag.is_none() || filter_tag == tag { + return Ok(value); + } else { + self.ignored_mail.add((tag, value)); + let now = Instant::now(); + waited_so_far += now - before; + before = now; + } + } + + Err(RecvTimeoutError::Timeout) + } + } + + #[cfg(not(target_family = "wasm"))] + pub fn try_recv( + &mut self, + filter_tag: Option, + ) -> Result { + if let Some(value) = self.ignored_mail.pop(filter_tag) { + Ok(value) + } else { + loop { + let (tag, value) = self.receiver.try_recv()?; + + if filter_tag.is_none() || filter_tag == tag { + return Ok(value); + } else { + self.ignored_mail.add((tag, value)); + } + } + } + } + + pub fn clear(&mut self) { + self.ignored_mail.clear(); + + while self.receiver.try_recv().is_ok() {} + } +} + +// A data structure used to store messages which were received, but currently ignored by a tag filter +// messages are added and popped in a first-in-first-out matter. +#[derive(Default)] +struct IgnoredMail { + next_id: usize, + messages: BTreeMap, + by_tag: HashMap>, +} + +pub type FilterTag = u64; +pub type Mail = (Option, PipelineData); + +impl IgnoredMail { + pub fn add(&mut self, (tag, value): Mail) { + let id = self.next_id; + self.next_id += 1; + + self.messages.insert(id, (tag, value)); + + if let Some(tag) = tag { + self.by_tag.entry(tag).or_default().insert(id); + } + } + + pub fn pop(&mut self, tag: Option) -> Option { + if let Some(tag) = tag { + self.pop_oldest_with_tag(tag) + } else { + self.pop_oldest() + } + } + + pub fn clear(&mut self) { + self.messages.clear(); + self.by_tag.clear(); + } + + fn pop_oldest(&mut self) -> Option { + let (id, (tag, value)) = self.messages.pop_first()?; + + if let Some(tag) = tag { + let needs_cleanup = if let Some(ids) = self.by_tag.get_mut(&tag) { + ids.remove(&id); + ids.is_empty() + } else { + false + }; + + if needs_cleanup { + self.by_tag.remove(&tag); + } + } + + Some(value) + } + + fn pop_oldest_with_tag(&mut self, tag: FilterTag) -> Option { + let ids = self.by_tag.get_mut(&tag)?; + + let id = ids.pop_first()?; + + if ids.is_empty() { + self.by_tag.remove(&tag); + } + + Some(self.messages.remove(&id)?.1) + } +} diff --git a/crates/nu-protocol/src/errors/shell_error/mod.rs b/crates/nu-protocol/src/errors/shell_error/mod.rs index 84c23cd208..9ff49f36d8 100644 --- a/crates/nu-protocol/src/errors/shell_error/mod.rs +++ b/crates/nu-protocol/src/errors/shell_error/mod.rs @@ -1370,7 +1370,7 @@ On Windows, this would be %USERPROFILE%\AppData\Roaming"# #[error("Job {id} is not frozen")] #[diagnostic( - code(nu::shell::os_disabled), + code(nu::shell::job_not_frozen), help("You tried to unfreeze a job which is not frozen") )] JobNotFrozen { @@ -1379,6 +1379,27 @@ On Windows, this would be %USERPROFILE%\AppData\Roaming"# span: Span, }, + #[error("The job {id} is frozen")] + #[diagnostic( + code(nu::shell::job_is_frozen), + help("This operation cannot be performed because the job is frozen") + )] + JobIsFrozen { + id: usize, + #[label = "This job is frozen"] + span: Span, + }, + + #[error("No message was received in the requested time interval")] + #[diagnostic( + code(nu::shell::recv_timeout), + help("No message arrived within the specified time limit") + )] + RecvTimeout { + #[label = "timeout"] + span: Span, + }, + #[error(transparent)] #[diagnostic(transparent)] ChainedError(ChainedError), diff --git a/crates/nu-protocol/src/process/child.rs b/crates/nu-protocol/src/process/child.rs index 069cb71204..b431a11df8 100644 --- a/crates/nu-protocol/src/process/child.rs +++ b/crates/nu-protocol/src/process/child.rs @@ -194,7 +194,7 @@ impl PostWaitCallback { child_pid: Option, tag: Option, ) -> Self { - let this_job = engine_state.current_thread_job.clone(); + let this_job = engine_state.current_thread_job().cloned(); let jobs = engine_state.jobs.clone(); let is_interactive = engine_state.is_interactive;