From 3ed97e68dc88c334aeb96ca1fd69bc66262f008c Mon Sep 17 00:00:00 2001 From: cosineblast <55855728+cosineblast@users.noreply.github.com> Date: Tue, 4 Mar 2025 11:31:37 -0300 Subject: [PATCH] Begin mailbox implementation This commit adds the data structures for the mail messaging system --- crates/nu-command/src/experimental/job_id.rs | 6 +- .../nu-command/src/experimental/job_spawn.rs | 21 ++- crates/nu-protocol/src/engine/engine_state.rs | 21 ++- crates/nu-protocol/src/engine/jobs.rs | 150 +++++++++++++++++- 4 files changed, 177 insertions(+), 21 deletions(-) diff --git a/crates/nu-command/src/experimental/job_id.rs b/crates/nu-command/src/experimental/job_id.rs index 93bdbdd1ae..6f3be8a9eb 100644 --- a/crates/nu-command/src/experimental/job_id.rs +++ b/crates/nu-command/src/experimental/job_id.rs @@ -37,11 +37,7 @@ was instead spawned by main nushell execution thread." ) -> Result { let head = call.head; - if let Some((id, _)) = &engine_state.thread_job_entry { - Ok(Value::int(id.get() as i64, head).into_pipeline_data()) - } else { - Ok(Value::int(0, head).into_pipeline_data()) - } + Ok(Value::int(engine_state.current_job.id.get() as i64, head).into_pipeline_data()) } fn examples(&self) -> Vec { diff --git a/crates/nu-command/src/experimental/job_spawn.rs b/crates/nu-command/src/experimental/job_spawn.rs index 44d03b7a6e..6eff290a29 100644 --- a/crates/nu-command/src/experimental/job_spawn.rs +++ b/crates/nu-command/src/experimental/job_spawn.rs @@ -1,15 +1,15 @@ use std::{ sync::{ atomic::{AtomicBool, AtomicU32}, - Arc, + mpsc, Arc, Mutex, }, thread, }; use nu_engine::{command_prelude::*, ClosureEvalOnce}; use nu_protocol::{ - engine::{Closure, Job, Redirection, ThreadJob}, - report_shell_error, OutDest, Signals, + engine::{Closure, CurrentJob, Redirection, Job, Mailbox, ThreadJob}, + report_shell_error, OutDest, Signals, }; #[derive(Clone)] @@ -50,11 +50,11 @@ impl Command for JobSpawn { let closure: Closure = call.req(engine_state, stack, 0)?; + let job_stack = stack.clone(); + let mut job_state = engine_state.clone(); job_state.is_interactive = false; - let job_stack = stack.clone(); - // the new job should have its ctrl-c independent of foreground let job_signals = Signals::new(Arc::new(AtomicBool::new(false))); job_state.set_signals(job_signals.clone()); @@ -67,11 +67,18 @@ impl Command for JobSpawn { let jobs = job_state.jobs.clone(); let mut jobs = jobs.lock().expect("jobs lock is poisoned!"); + let (send, recv) = mpsc::channel(); + let id = { - let thread_job = ThreadJob::new(job_signals); + let thread_job = ThreadJob::new(job_signals, send); + let id = jobs.add_job(Job::Thread(thread_job.clone())); - job_state.thread_job_entry = Some((id, thread_job)); + job_state.current_job = CurrentJob { + id, + background_thread_job: Some(thread_job), + mailbox: Arc::new(Mutex::new(Mailbox::new(recv))), + }; id }; diff --git a/crates/nu-protocol/src/engine/engine_state.rs b/crates/nu-protocol/src/engine/engine_state.rs index dd3faeff70..46ae9e46ef 100644 --- a/crates/nu-protocol/src/engine/engine_state.rs +++ b/crates/nu-protocol/src/engine/engine_state.rs @@ -22,6 +22,8 @@ use std::{ path::PathBuf, sync::{ atomic::{AtomicBool, AtomicU32, Ordering}, + mpsc::channel, + mpsc::Sender, Arc, Mutex, MutexGuard, PoisonError, }, }; @@ -31,7 +33,7 @@ type PoisonDebuggerError<'a> = PoisonError>>; #[cfg(feature = "plugin")] use crate::{PluginRegistryFile, PluginRegistryItem, RegisteredPlugin}; -use super::{Jobs, ThreadJob}; +use super::{CurrentJob, Jobs, Mail, Mailbox, ThreadJob}; #[derive(Clone, Debug)] pub enum VirtualPath { @@ -117,7 +119,9 @@ pub struct EngineState { pub jobs: Arc>, // The job being executed with this engine state, or None if main thread - pub thread_job_entry: Option<(JobId, ThreadJob)>, + pub current_job: CurrentJob, + + pub root_job_sender: Sender, // When there are background jobs running, the interactive behavior of `exit` changes depending on // the value of this flag: @@ -141,6 +145,8 @@ pub const UNKNOWN_SPAN_ID: SpanId = SpanId::new(0); impl EngineState { pub fn new() -> Self { + let (send, recv) = channel::(); + Self { files: vec![], virtual_paths: vec![], @@ -196,7 +202,12 @@ impl EngineState { is_debugging: IsDebugging::new(false), debugger: Arc::new(Mutex::new(Box::new(NoopDebugger))), jobs: Arc::new(Mutex::new(Jobs::default())), - thread_job_entry: None, + current_job: CurrentJob { + id: JobId::new(0), + background_thread_job: None, + mailbox: Arc::new(Mutex::new(Mailbox::new(recv))), + }, + root_job_sender: send, exit_warning_given: Arc::new(AtomicBool::new(false)), } } @@ -1080,12 +1091,12 @@ impl EngineState { // Determines whether the current state is being held by a background job pub fn is_background_job(&self) -> bool { - self.thread_job_entry.is_some() + self.current_job.background_thread_job.is_some() } // Gets the thread job entry pub fn current_thread_job(&self) -> Option<&ThreadJob> { - self.thread_job_entry.as_ref().map(|(_, job)| job) + self.current_job.background_thread_job.as_ref() } } diff --git a/crates/nu-protocol/src/engine/jobs.rs b/crates/nu-protocol/src/engine/jobs.rs index 7d0b4c4b56..f4655833c9 100644 --- a/crates/nu-protocol/src/engine/jobs.rs +++ b/crates/nu-protocol/src/engine/jobs.rs @@ -1,11 +1,15 @@ use std::{ - collections::{HashMap, HashSet}, - sync::{Arc, Mutex}, + collections::{BTreeMap, BTreeSet, HashMap, HashSet}, + sync::{ + mpsc::{Receiver, RecvTimeoutError, Sender}, + Arc, Mutex, + }, + time::{Duration, Instant}, }; use nu_system::{kill_by_pid, UnfreezeHandle}; -use crate::Signals; +use crate::{Signals, Value}; use crate::JobId; @@ -134,13 +138,15 @@ pub enum Job { pub struct ThreadJob { signals: Signals, pids: Arc>>, + pub sender: Sender, } impl ThreadJob { - pub fn new(signals: Signals) -> Self { + pub fn new(signals: Signals, sender: Sender) -> Self { ThreadJob { signals, pids: Arc::new(Mutex::new(HashSet::default())), + sender, } } @@ -217,3 +223,139 @@ impl FrozenJob { } } } + +/// Stores the information about the background job currently being executed by this thread, if any +#[derive(Clone)] +pub struct CurrentJob { + pub id: JobId, + + // The backgorund thread job associated with this thread. + // If None, it indicates this thread is currently the main job + pub background_thread_job: Option, + + // note: altough the mailbox is Mutex'd, it is only ever accessed + // by the current job's threads + pub mailbox: Arc>, +} + +// The storage for unread messages +// +// Messages are initially sent over a mpsc channel, +// and may then be stored in a IgnoredMail struct when +// filtered out by a tag. +pub struct Mailbox { + receiver: Receiver, + ignored_mail: IgnoredMail, +} + +impl Mailbox { + pub fn new(receiver: Receiver) -> Self { + Mailbox { + receiver, + ignored_mail: IgnoredMail::default(), + } + } + + pub fn recv_timeout( + &mut self, + filter_tag: Option, + timeout: Duration, + ) -> Result { + if let Some(value) = self.ignored_mail.pop(filter_tag) { + Ok(value) + } else { + let mut waited_so_far = Duration::ZERO; + let mut before = Instant::now(); + + while waited_so_far < timeout { + let (tag, value) = self.receiver.recv_timeout(timeout - waited_so_far)?; + + if filter_tag.is_some() && filter_tag == tag { + return Ok(value); + } else { + self.ignored_mail.add((tag, value)); + let now = Instant::now(); + waited_so_far += now - before; + before = now; + } + } + + Err(RecvTimeoutError::Timeout) + } + } + + pub fn clear(&mut self) { + self.ignored_mail.clear(); + + while self.receiver.try_recv().is_ok() {} + } +} + +// A data structure used to store messages which were received, but currently ignored by a tag filter +// messages are added and popped in a first-in-first-out matter. +#[derive(Default)] +struct IgnoredMail { + next_id: usize, + messages: BTreeMap, + by_tag: HashMap>, +} + +pub type Tag = u64; +pub type Mail = (Option, Value); + +impl IgnoredMail { + pub fn add(&mut self, (tag, value): Mail) { + let id = self.next_id; + self.next_id += 1; + + self.messages.insert(id, (tag, value)); + + if let Some(tag) = tag { + self.by_tag.entry(tag).or_default().insert(id); + } + } + + pub fn pop(&mut self, tag: Option) -> Option { + if let Some(tag) = tag { + self.pop_oldest_with_tag(tag) + } else { + self.pop_oldest() + } + } + + pub fn clear(&mut self) { + self.messages.clear(); + self.by_tag.clear(); + } + + fn pop_oldest(&mut self) -> Option { + let (id, (tag, value)) = self.messages.pop_first()?; + + if let Some(tag) = tag { + let needs_cleanup = if let Some(ids) = self.by_tag.get_mut(&tag) { + ids.remove(&id); + ids.is_empty() + } else { + false + }; + + if needs_cleanup { + self.by_tag.remove(&tag); + } + } + + Some(value) + } + + fn pop_oldest_with_tag(&mut self, tag: Tag) -> Option { + let ids = self.by_tag.get_mut(&tag)?; + + let id = ids.pop_first()?; + + if ids.is_empty() { + self.by_tag.remove(&tag); + } + + Some(self.messages.remove(&id)?.1) + } +}