Begin mailbox implementation

This commit adds the data structures for the mail messaging system
This commit is contained in:
cosineblast 2025-03-04 11:31:37 -03:00
parent bffa9d3278
commit 3ed97e68dc
4 changed files with 177 additions and 21 deletions

View File

@ -37,11 +37,7 @@ was instead spawned by main nushell execution thread."
) -> Result<PipelineData, ShellError> {
let head = call.head;
if let Some((id, _)) = &engine_state.thread_job_entry {
Ok(Value::int(id.get() as i64, head).into_pipeline_data())
} else {
Ok(Value::int(0, head).into_pipeline_data())
}
Ok(Value::int(engine_state.current_job.id.get() as i64, head).into_pipeline_data())
}
fn examples(&self) -> Vec<Example> {

View File

@ -1,15 +1,15 @@
use std::{
sync::{
atomic::{AtomicBool, AtomicU32},
Arc,
mpsc, Arc, Mutex,
},
thread,
};
use nu_engine::{command_prelude::*, ClosureEvalOnce};
use nu_protocol::{
engine::{Closure, Job, Redirection, ThreadJob},
report_shell_error, OutDest, Signals,
engine::{Closure, CurrentJob, Redirection, Job, Mailbox, ThreadJob},
report_shell_error, OutDest, Signals,
};
#[derive(Clone)]
@ -50,11 +50,11 @@ impl Command for JobSpawn {
let closure: Closure = call.req(engine_state, stack, 0)?;
let job_stack = stack.clone();
let mut job_state = engine_state.clone();
job_state.is_interactive = false;
let job_stack = stack.clone();
// the new job should have its ctrl-c independent of foreground
let job_signals = Signals::new(Arc::new(AtomicBool::new(false)));
job_state.set_signals(job_signals.clone());
@ -67,11 +67,18 @@ impl Command for JobSpawn {
let jobs = job_state.jobs.clone();
let mut jobs = jobs.lock().expect("jobs lock is poisoned!");
let (send, recv) = mpsc::channel();
let id = {
let thread_job = ThreadJob::new(job_signals);
let thread_job = ThreadJob::new(job_signals, send);
let id = jobs.add_job(Job::Thread(thread_job.clone()));
job_state.thread_job_entry = Some((id, thread_job));
job_state.current_job = CurrentJob {
id,
background_thread_job: Some(thread_job),
mailbox: Arc::new(Mutex::new(Mailbox::new(recv))),
};
id
};

View File

@ -22,6 +22,8 @@ use std::{
path::PathBuf,
sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
mpsc::channel,
mpsc::Sender,
Arc, Mutex, MutexGuard, PoisonError,
},
};
@ -31,7 +33,7 @@ type PoisonDebuggerError<'a> = PoisonError<MutexGuard<'a, Box<dyn Debugger>>>;
#[cfg(feature = "plugin")]
use crate::{PluginRegistryFile, PluginRegistryItem, RegisteredPlugin};
use super::{Jobs, ThreadJob};
use super::{CurrentJob, Jobs, Mail, Mailbox, ThreadJob};
#[derive(Clone, Debug)]
pub enum VirtualPath {
@ -117,7 +119,9 @@ pub struct EngineState {
pub jobs: Arc<Mutex<Jobs>>,
// The job being executed with this engine state, or None if main thread
pub thread_job_entry: Option<(JobId, ThreadJob)>,
pub current_job: CurrentJob,
pub root_job_sender: Sender<Mail>,
// When there are background jobs running, the interactive behavior of `exit` changes depending on
// the value of this flag:
@ -141,6 +145,8 @@ pub const UNKNOWN_SPAN_ID: SpanId = SpanId::new(0);
impl EngineState {
pub fn new() -> Self {
let (send, recv) = channel::<Mail>();
Self {
files: vec![],
virtual_paths: vec![],
@ -196,7 +202,12 @@ impl EngineState {
is_debugging: IsDebugging::new(false),
debugger: Arc::new(Mutex::new(Box::new(NoopDebugger))),
jobs: Arc::new(Mutex::new(Jobs::default())),
thread_job_entry: None,
current_job: CurrentJob {
id: JobId::new(0),
background_thread_job: None,
mailbox: Arc::new(Mutex::new(Mailbox::new(recv))),
},
root_job_sender: send,
exit_warning_given: Arc::new(AtomicBool::new(false)),
}
}
@ -1080,12 +1091,12 @@ impl EngineState {
// Determines whether the current state is being held by a background job
pub fn is_background_job(&self) -> bool {
self.thread_job_entry.is_some()
self.current_job.background_thread_job.is_some()
}
// Gets the thread job entry
pub fn current_thread_job(&self) -> Option<&ThreadJob> {
self.thread_job_entry.as_ref().map(|(_, job)| job)
self.current_job.background_thread_job.as_ref()
}
}

View File

@ -1,11 +1,15 @@
use std::{
collections::{HashMap, HashSet},
sync::{Arc, Mutex},
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
sync::{
mpsc::{Receiver, RecvTimeoutError, Sender},
Arc, Mutex,
},
time::{Duration, Instant},
};
use nu_system::{kill_by_pid, UnfreezeHandle};
use crate::Signals;
use crate::{Signals, Value};
use crate::JobId;
@ -134,13 +138,15 @@ pub enum Job {
pub struct ThreadJob {
signals: Signals,
pids: Arc<Mutex<HashSet<u32>>>,
pub sender: Sender<Mail>,
}
impl ThreadJob {
pub fn new(signals: Signals) -> Self {
pub fn new(signals: Signals, sender: Sender<Mail>) -> Self {
ThreadJob {
signals,
pids: Arc::new(Mutex::new(HashSet::default())),
sender,
}
}
@ -217,3 +223,139 @@ impl FrozenJob {
}
}
}
/// Stores the information about the background job currently being executed by this thread, if any
#[derive(Clone)]
pub struct CurrentJob {
pub id: JobId,
// The backgorund thread job associated with this thread.
// If None, it indicates this thread is currently the main job
pub background_thread_job: Option<ThreadJob>,
// note: altough the mailbox is Mutex'd, it is only ever accessed
// by the current job's threads
pub mailbox: Arc<Mutex<Mailbox>>,
}
// The storage for unread messages
//
// Messages are initially sent over a mpsc channel,
// and may then be stored in a IgnoredMail struct when
// filtered out by a tag.
pub struct Mailbox {
receiver: Receiver<Mail>,
ignored_mail: IgnoredMail,
}
impl Mailbox {
pub fn new(receiver: Receiver<Mail>) -> Self {
Mailbox {
receiver,
ignored_mail: IgnoredMail::default(),
}
}
pub fn recv_timeout(
&mut self,
filter_tag: Option<Tag>,
timeout: Duration,
) -> Result<Value, RecvTimeoutError> {
if let Some(value) = self.ignored_mail.pop(filter_tag) {
Ok(value)
} else {
let mut waited_so_far = Duration::ZERO;
let mut before = Instant::now();
while waited_so_far < timeout {
let (tag, value) = self.receiver.recv_timeout(timeout - waited_so_far)?;
if filter_tag.is_some() && filter_tag == tag {
return Ok(value);
} else {
self.ignored_mail.add((tag, value));
let now = Instant::now();
waited_so_far += now - before;
before = now;
}
}
Err(RecvTimeoutError::Timeout)
}
}
pub fn clear(&mut self) {
self.ignored_mail.clear();
while self.receiver.try_recv().is_ok() {}
}
}
// A data structure used to store messages which were received, but currently ignored by a tag filter
// messages are added and popped in a first-in-first-out matter.
#[derive(Default)]
struct IgnoredMail {
next_id: usize,
messages: BTreeMap<usize, Mail>,
by_tag: HashMap<Tag, BTreeSet<usize>>,
}
pub type Tag = u64;
pub type Mail = (Option<Tag>, Value);
impl IgnoredMail {
pub fn add(&mut self, (tag, value): Mail) {
let id = self.next_id;
self.next_id += 1;
self.messages.insert(id, (tag, value));
if let Some(tag) = tag {
self.by_tag.entry(tag).or_default().insert(id);
}
}
pub fn pop(&mut self, tag: Option<Tag>) -> Option<Value> {
if let Some(tag) = tag {
self.pop_oldest_with_tag(tag)
} else {
self.pop_oldest()
}
}
pub fn clear(&mut self) {
self.messages.clear();
self.by_tag.clear();
}
fn pop_oldest(&mut self) -> Option<Value> {
let (id, (tag, value)) = self.messages.pop_first()?;
if let Some(tag) = tag {
let needs_cleanup = if let Some(ids) = self.by_tag.get_mut(&tag) {
ids.remove(&id);
ids.is_empty()
} else {
false
};
if needs_cleanup {
self.by_tag.remove(&tag);
}
}
Some(value)
}
fn pop_oldest_with_tag(&mut self, tag: Tag) -> Option<Value> {
let ids = self.by_tag.get_mut(&tag)?;
let id = ids.pop_first()?;
if ids.is_empty() {
self.by_tag.remove(&tag);
}
Some(self.messages.remove(&id)?.1)
}
}