diff --git a/crates/nu-plugin-core/src/interface/mod.rs b/crates/nu-plugin-core/src/interface/mod.rs index d282eddd8b..6ce33cae4c 100644 --- a/crates/nu-plugin-core/src/interface/mod.rs +++ b/crates/nu-plugin-core/src/interface/mod.rs @@ -1,7 +1,10 @@ //! Implements the stream multiplexing interface for both the plugin side and the engine side. use nu_plugin_protocol::{ByteStreamInfo, ListStreamInfo, PipelineDataHeader, StreamMessage}; -use nu_protocol::{ByteStream, IntoSpanned, ListStream, PipelineData, Reader, ShellError, Signals}; +use nu_protocol::{ + engine::Sequence, ByteStream, IntoSpanned, ListStream, PipelineData, Reader, ShellError, + Signals, +}; use std::{ io::{Read, Write}, sync::Mutex, @@ -10,7 +13,7 @@ use std::{ pub mod stream; -use crate::{util::Sequence, Encoder}; +use crate::Encoder; use self::stream::{StreamManager, StreamManagerHandle, StreamWriter, WriteStreamMessage}; diff --git a/crates/nu-plugin-core/src/interface/tests.rs b/crates/nu-plugin-core/src/interface/tests.rs index 456eb547b5..cb28692706 100644 --- a/crates/nu-plugin-core/src/interface/tests.rs +++ b/crates/nu-plugin-core/src/interface/tests.rs @@ -1,5 +1,3 @@ -use crate::util::Sequence; - use super::{ stream::{StreamManager, StreamManagerHandle}, test_util::TestCase, @@ -10,8 +8,8 @@ use nu_plugin_protocol::{ StreamMessage, }; use nu_protocol::{ - ByteStream, ByteStreamSource, ByteStreamType, DataSource, ListStream, PipelineData, - PipelineMetadata, ShellError, Signals, Span, Value, + engine::Sequence, ByteStream, ByteStreamSource, ByteStreamType, DataSource, ListStream, + PipelineData, PipelineMetadata, ShellError, Signals, Span, Value, }; use std::{path::Path, sync::Arc}; diff --git a/crates/nu-plugin-core/src/util/mod.rs b/crates/nu-plugin-core/src/util/mod.rs index e88b945831..1be1cfbef5 100644 --- a/crates/nu-plugin-core/src/util/mod.rs +++ b/crates/nu-plugin-core/src/util/mod.rs @@ -1,7 +1,5 @@ -mod sequence; mod waitable; mod with_custom_values_in; -pub use sequence::Sequence; pub use waitable::*; pub use with_custom_values_in::with_custom_values_in; diff --git a/crates/nu-plugin-engine/src/interface/mod.rs b/crates/nu-plugin-engine/src/interface/mod.rs index 680b3b123f..78ed966dd6 100644 --- a/crates/nu-plugin-engine/src/interface/mod.rs +++ b/crates/nu-plugin-engine/src/interface/mod.rs @@ -1,7 +1,7 @@ //! Interface used by the engine to communicate with the plugin. use nu_plugin_core::{ - util::{with_custom_values_in, Sequence, Waitable, WaitableMut}, + util::{with_custom_values_in, Waitable, WaitableMut}, Interface, InterfaceManager, PipelineDataWriter, PluginRead, PluginWrite, StreamManager, StreamManagerHandle, }; @@ -11,8 +11,8 @@ use nu_plugin_protocol::{ PluginOutput, ProtocolInfo, StreamId, StreamMessage, }; use nu_protocol::{ - ast::Operator, CustomValue, IntoSpanned, PipelineData, PluginMetadata, PluginSignature, - ShellError, Signals, Span, Spanned, Value, + ast::Operator, engine::Sequence, CustomValue, IntoSpanned, PipelineData, PluginMetadata, + PluginSignature, ShellError, Signals, Span, Spanned, Value, }; use nu_utils::SharedCow; use std::{ @@ -664,6 +664,12 @@ impl PluginInterface { self.flush() } + /// Send the plugin a ctrl-c signal. + pub fn ctrlc(&self) -> Result<(), ShellError> { + self.write(PluginInput::Ctrlc)?; + self.flush() + } + /// Write an [`EngineCallResponse`]. Writes the full stream contained in any [`PipelineData`] /// before returning. pub fn write_engine_call_response( diff --git a/crates/nu-plugin-engine/src/persistent.rs b/crates/nu-plugin-engine/src/persistent.rs index 6a87aa1e6b..69f5c7f7a8 100644 --- a/crates/nu-plugin-engine/src/persistent.rs +++ b/crates/nu-plugin-engine/src/persistent.rs @@ -6,7 +6,7 @@ use crate::{ use super::{PluginInterface, PluginSource}; use nu_plugin_core::CommunicationMode; use nu_protocol::{ - engine::{EngineState, Stack}, + engine::{ctrlc, EngineState, Stack}, PluginGcConfig, PluginIdentity, PluginMetadata, RegisteredPlugin, ShellError, }; use std::{ @@ -37,6 +37,8 @@ struct MutableState { preferred_mode: Option, /// Garbage collector config gc_config: PluginGcConfig, + /// RAII guard for this plugin's ctrl-c handler + ctrlc_guard: Option, } #[derive(Debug, Clone, Copy)] @@ -64,6 +66,7 @@ impl PersistentPlugin { metadata: None, preferred_mode: None, gc_config, + ctrlc_guard: None, }), } } @@ -299,6 +302,34 @@ impl RegisteredPlugin for PersistentPlugin { fn as_any(self: Arc) -> Arc { self } + + fn configure_ctrlc_handler( + self: Arc, + handlers: &ctrlc::Handlers, + ) -> Result<(), ShellError> { + let guard = { + // We take a weakref to the plugin so that we don't create a cycle to the + // RAII guard that will be stored on the plugin. + let plugin = Arc::downgrade(&self); + handlers.register(Box::new(move || { + // write a Ctrl-C packet through the PluginInterface if the plugin is alive and + // running + if let Some(plugin) = plugin.upgrade() { + if let Ok(mutable) = plugin.mutable.lock() { + if let Some(ref running) = mutable.running { + let _ = running.interface.ctrlc(); + } + } + } + }))? + }; + + if let Ok(mut mutable) = self.mutable.lock() { + mutable.ctrlc_guard = Some(guard); + } + + Ok(()) + } } /// Anything that can produce a plugin interface. diff --git a/crates/nu-plugin-protocol/src/lib.rs b/crates/nu-plugin-protocol/src/lib.rs index a9196a2d8d..1a2bbd99ff 100644 --- a/crates/nu-plugin-protocol/src/lib.rs +++ b/crates/nu-plugin-protocol/src/lib.rs @@ -208,6 +208,8 @@ pub enum PluginInput { Drop(StreamId), /// See [`StreamMessage::Ack`]. Ack(StreamId), + /// Signal a ctrlc event + Ctrlc, } impl TryFrom for StreamMessage { diff --git a/crates/nu-plugin/src/plugin/interface/mod.rs b/crates/nu-plugin/src/plugin/interface/mod.rs index 10f787b222..daf2d5b783 100644 --- a/crates/nu-plugin/src/plugin/interface/mod.rs +++ b/crates/nu-plugin/src/plugin/interface/mod.rs @@ -1,7 +1,7 @@ //! Interface used by the plugin to communicate with the engine. use nu_plugin_core::{ - util::{Sequence, Waitable, WaitableMut}, + util::{Waitable, WaitableMut}, Interface, InterfaceManager, PipelineDataWriter, PluginRead, PluginWrite, StreamManager, StreamManagerHandle, }; @@ -11,13 +11,14 @@ use nu_plugin_protocol::{ PluginOutput, ProtocolInfo, }; use nu_protocol::{ - engine::Closure, Config, DeclId, LabeledError, PipelineData, PluginMetadata, PluginSignature, - ShellError, Signals, Span, Spanned, Value, + engine::{ctrlc, Closure, Sequence}, + Config, DeclId, LabeledError, PipelineData, PluginMetadata, PluginSignature, ShellError, + Signals, Span, Spanned, Value, }; use nu_utils::SharedCow; use std::{ collections::{btree_map, BTreeMap, HashMap}, - sync::{mpsc, Arc}, + sync::{atomic::AtomicBool, mpsc, Arc}, }; /// Plugin calls that are received by the [`EngineInterfaceManager`] for handling. @@ -63,6 +64,10 @@ struct EngineInterfaceState { mpsc::Sender<(EngineCallId, mpsc::Sender>)>, /// The synchronized output writer writer: Box>, + // Mirror signals from `EngineState` + signals: Signals, + /// Registered Ctrl-C handlers + ctrlc_handlers: ctrlc::Handlers, } impl std::fmt::Debug for EngineInterfaceState { @@ -116,6 +121,8 @@ impl EngineInterfaceManager { stream_id_sequence: Sequence::default(), engine_call_subscription_sender: subscription_tx, writer: Box::new(writer), + signals: Signals::new(Arc::new(AtomicBool::new(false))), + ctrlc_handlers: ctrlc::Handlers::new(), }), protocol_info_mut, plugin_call_sender: Some(plug_tx), @@ -235,7 +242,6 @@ impl InterfaceManager for EngineInterfaceManager { fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> { log::trace!("from engine: {:?}", input); - match input { PluginInput::Hello(info) => { let info = Arc::new(info); @@ -331,6 +337,11 @@ impl InterfaceManager for EngineInterfaceManager { }); self.send_engine_call_response(id, response) } + PluginInput::Ctrlc => { + self.state.signals.trigger(); + self.state.ctrlc_handlers.run(); + Ok(()) + } } } @@ -510,6 +521,15 @@ impl EngineInterface { self.state.writer.is_stdout() } + /// Register a closure which will be called when the engine receives a Ctrl-C signal. Returns a + /// RAII guard that will keep the closure alive until it is dropped. + pub fn register_ctrlc_handler( + &self, + handler: ctrlc::Handler, + ) -> Result { + self.state.ctrlc_handlers.register(handler) + } + /// Get the full shell configuration from the engine. As this is quite a large object, it is /// provided on request only. /// @@ -959,6 +979,10 @@ impl EngineInterface { self.write(PluginOutput::CallResponse(self.context()?, response))?; self.flush() } + + pub fn signals(&self) -> &Signals { + &self.state.signals + } } impl Interface for EngineInterface { diff --git a/crates/nu-protocol/src/engine/ctrlc.rs b/crates/nu-protocol/src/engine/ctrlc.rs new file mode 100644 index 0000000000..11121f7c9c --- /dev/null +++ b/crates/nu-protocol/src/engine/ctrlc.rs @@ -0,0 +1,139 @@ +use std::fmt::Debug; +use std::sync::{Arc, Mutex}; + +use crate::{engine::Sequence, ShellError}; + +/// Handler is a closure that can be sent across threads and shared. +pub type Handler = Box; + +/// Manages a collection of handlers. +#[derive(Clone)] +pub struct Handlers { + /// List of handler tuples containing an ID and the handler itself. + handlers: Arc>>, + /// Sequence generator for unique IDs. + next_id: Arc, +} + +impl Debug for Handlers { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Handlers") + .field("next_id", &self.next_id) + .finish() + } +} + +/// Guard that unregisters a handler when dropped. +#[derive(Clone)] +pub struct Guard { + /// Unique ID of the handler. + id: usize, + /// Reference to the handlers list. + handlers: Arc>>, +} + +impl Drop for Guard { + /// Drops the `Guard`, removing the associated handler from the list. + fn drop(&mut self) { + if let Ok(mut handlers) = self.handlers.lock() { + handlers.retain(|(id, _)| *id != self.id); + } + } +} + +impl Debug for Guard { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Guard").field("id", &self.id).finish() + } +} + +impl Handlers { + pub fn new() -> Handlers { + let handlers = Arc::new(Mutex::new(vec![])); + let next_id = Arc::new(Sequence::default()); + Handlers { handlers, next_id } + } + + /// Registers a new handler and returns an RAII guard which will unregister the handler when + /// dropped. + pub fn register(&self, handler: Handler) -> Result { + let id = self.next_id.next()?; + if let Ok(mut handlers) = self.handlers.lock() { + handlers.push((id, handler)); + } + + Ok(Guard { + id, + handlers: Arc::clone(&self.handlers), + }) + } + + /// Runs all registered handlers. + pub fn run(&self) { + if let Ok(handlers) = self.handlers.lock() { + for (_, handler) in handlers.iter() { + handler(); + } + } + } +} + +impl Default for Handlers { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicBool, Ordering}; + + #[test] + /// Tests registering and running multiple handlers. + fn test_multiple_handlers() { + let handlers = Handlers::new(); + let called1 = Arc::new(AtomicBool::new(false)); + let called2 = Arc::new(AtomicBool::new(false)); + + let called1_clone = Arc::clone(&called1); + let called2_clone = Arc::clone(&called2); + + let _guard1 = handlers.register(Box::new(move || { + called1_clone.store(true, Ordering::SeqCst); + })); + let _guard2 = handlers.register(Box::new(move || { + called2_clone.store(true, Ordering::SeqCst); + })); + + handlers.run(); + + assert!(called1.load(Ordering::SeqCst)); + assert!(called2.load(Ordering::SeqCst)); + } + + #[test] + /// Tests the dropping of a guard and ensuring the handler is unregistered. + fn test_guard_drop() { + let handlers = Handlers::new(); + let called = Arc::new(AtomicBool::new(false)); + let called_clone = Arc::clone(&called); + + let guard = handlers.register(Box::new(move || { + called_clone.store(true, Ordering::Relaxed); + })); + + // Ensure the handler is registered + assert_eq!(handlers.handlers.lock().unwrap().len(), 1); + + drop(guard); + + // Ensure the handler is removed after dropping the guard + assert_eq!(handlers.handlers.lock().unwrap().len(), 0); + + handlers.run(); + + // Ensure the handler is not called after being dropped + assert!(!called.load(Ordering::Relaxed)); + } +} diff --git a/crates/nu-protocol/src/engine/engine_state.rs b/crates/nu-protocol/src/engine/engine_state.rs index 8579295c32..e2409debe3 100644 --- a/crates/nu-protocol/src/engine/engine_state.rs +++ b/crates/nu-protocol/src/engine/engine_state.rs @@ -2,6 +2,7 @@ use crate::{ ast::Block, debugger::{Debugger, NoopDebugger}, engine::{ + ctrlc, usage::{build_usage, Usage}, CachedFile, Command, CommandType, EnvVars, OverlayFrame, ScopeFrame, Stack, StateDelta, Variable, Visibility, DEFAULT_OVERLAY_NAME, @@ -85,6 +86,7 @@ pub struct EngineState { pub spans: Vec, usage: Usage, pub scope: ScopeFrame, + pub ctrlc_handlers: Option, signals: Signals, pub env_vars: Arc, pub previous_env_vars: Arc>, @@ -145,6 +147,7 @@ impl EngineState { 0, false, ), + ctrlc_handlers: None, signals: Signals::empty(), env_vars: Arc::new( [(DEFAULT_OVERLAY_NAME.to_string(), HashMap::new())] @@ -268,8 +271,13 @@ impl EngineState { #[cfg(feature = "plugin")] if !delta.plugins.is_empty() { - // Replace plugins that overlap in identity. for plugin in std::mem::take(&mut delta.plugins) { + // Connect plugins to the ctrlc handlers + if let Some(handlers) = &self.ctrlc_handlers { + plugin.clone().configure_ctrlc_handler(handlers)?; + } + + // Replace plugins that overlap in identity. if let Some(existing) = self .plugins .iter_mut() diff --git a/crates/nu-protocol/src/engine/mod.rs b/crates/nu-protocol/src/engine/mod.rs index 39bb5a7ae3..24e4b15094 100644 --- a/crates/nu-protocol/src/engine/mod.rs +++ b/crates/nu-protocol/src/engine/mod.rs @@ -9,6 +9,7 @@ mod engine_state; mod error_handler; mod overlay; mod pattern_match; +mod sequence; mod stack; mod stack_out_dest; mod state_delta; @@ -27,8 +28,11 @@ pub use engine_state::*; pub use error_handler::*; pub use overlay::*; pub use pattern_match::*; +pub use sequence::*; pub use stack::*; pub use stack_out_dest::*; pub use state_delta::*; pub use state_working_set::*; pub use variable::*; + +pub mod ctrlc; diff --git a/crates/nu-plugin-core/src/util/sequence.rs b/crates/nu-protocol/src/engine/sequence.rs similarity index 98% rename from crates/nu-plugin-core/src/util/sequence.rs rename to crates/nu-protocol/src/engine/sequence.rs index 4f5c288c8f..34af8fc574 100644 --- a/crates/nu-plugin-core/src/util/sequence.rs +++ b/crates/nu-protocol/src/engine/sequence.rs @@ -1,4 +1,4 @@ -use nu_protocol::ShellError; +use crate::ShellError; use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; /// Implements an atomically incrementing sequential series of numbers diff --git a/crates/nu-protocol/src/pipeline/signals.rs b/crates/nu-protocol/src/pipeline/signals.rs index 06ce583c82..2057e27fe0 100644 --- a/crates/nu-protocol/src/pipeline/signals.rs +++ b/crates/nu-protocol/src/pipeline/signals.rs @@ -56,6 +56,13 @@ impl Signals { } } + /// Triggers an interrupt. + pub fn trigger(&self) { + if let Some(signals) = &self.signals { + signals.store(true, Ordering::Relaxed); + } + } + /// Returns whether an interrupt has been triggered. #[inline] pub fn interrupted(&self) -> bool { diff --git a/crates/nu-protocol/src/plugin/registered.rs b/crates/nu-protocol/src/plugin/registered.rs index 56d7f4079a..8749c24af9 100644 --- a/crates/nu-protocol/src/plugin/registered.rs +++ b/crates/nu-protocol/src/plugin/registered.rs @@ -1,6 +1,6 @@ use std::{any::Any, sync::Arc}; -use crate::{PluginGcConfig, PluginIdentity, PluginMetadata, ShellError}; +use crate::{engine::ctrlc, PluginGcConfig, PluginIdentity, PluginMetadata, ShellError}; /// Trait for plugins registered in the [`EngineState`](crate::engine::EngineState). pub trait RegisteredPlugin: Send + Sync { @@ -34,4 +34,12 @@ pub trait RegisteredPlugin: Send + Sync { /// This is necessary in order to allow `nu_plugin` to handle the implementation details of /// plugins. fn as_any(self: Arc) -> Arc; + + /// Give this plugin a chance to register for Ctrl-C signals. + fn configure_ctrlc_handler( + self: Arc, + _handler: &ctrlc::Handlers, + ) -> Result<(), ShellError> { + Ok(()) + } } diff --git a/crates/nu_plugin_example/src/commands/ctrlc.rs b/crates/nu_plugin_example/src/commands/ctrlc.rs new file mode 100644 index 0000000000..f9851a451a --- /dev/null +++ b/crates/nu_plugin_example/src/commands/ctrlc.rs @@ -0,0 +1,50 @@ +use std::sync::mpsc; + +use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand}; +use nu_protocol::{Category, LabeledError, PipelineData, Signature}; + +use crate::ExamplePlugin; + +/// `example ctrlc` +pub struct Ctrlc; + +impl PluginCommand for Ctrlc { + type Plugin = ExamplePlugin; + + fn name(&self) -> &str { + "example ctrlc" + } + + fn usage(&self) -> &str { + "Example command that demonstrates registering a ctrl-c handler" + } + + fn signature(&self) -> Signature { + Signature::build(self.name()).category(Category::Experimental) + } + + fn search_terms(&self) -> Vec<&str> { + vec!["example"] + } + + fn run( + &self, + _plugin: &ExamplePlugin, + engine: &EngineInterface, + _call: &EvaluatedCall, + _input: PipelineData, + ) -> Result { + let (sender, receiver) = mpsc::channel::<()>(); + let _guard = engine.register_ctrlc_handler(Box::new(move || { + let _ = sender.send(()); + })); + + eprintln!("interrupt status: {:?}", engine.signals().interrupted()); + eprintln!("waiting for ctrl-c signal..."); + receiver.recv().expect("handler went away"); + eprintln!("interrupt status: {:?}", engine.signals().interrupted()); + eprintln!("peace."); + + Ok(PipelineData::Empty) + } +} diff --git a/crates/nu_plugin_example/src/commands/mod.rs b/crates/nu_plugin_example/src/commands/mod.rs index b1703a3a47..48000677b8 100644 --- a/crates/nu_plugin_example/src/commands/mod.rs +++ b/crates/nu_plugin_example/src/commands/mod.rs @@ -15,12 +15,14 @@ pub use two::Two; // Engine interface demos mod call_decl; mod config; +mod ctrlc; mod disable_gc; mod env; mod view_span; pub use call_decl::CallDecl; pub use config::Config; +pub use ctrlc::Ctrlc; pub use disable_gc::DisableGc; pub use env::Env; pub use view_span::ViewSpan; diff --git a/crates/nu_plugin_example/src/lib.rs b/crates/nu_plugin_example/src/lib.rs index 3db97659bf..f0f4c71541 100644 --- a/crates/nu_plugin_example/src/lib.rs +++ b/crates/nu_plugin_example/src/lib.rs @@ -27,6 +27,7 @@ impl Plugin for ExamplePlugin { Box::new(Env), Box::new(ViewSpan), Box::new(DisableGc), + Box::new(Ctrlc), Box::new(CallDecl), // Stream demos Box::new(CollectBytes), diff --git a/src/signals.rs b/src/signals.rs index bb3539b95e..857f639843 100644 --- a/src/signals.rs +++ b/src/signals.rs @@ -1,4 +1,7 @@ -use nu_protocol::{engine::EngineState, Signals}; +use nu_protocol::{ + engine::{ctrlc::Handlers, EngineState}, + Signals, +}; use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -7,8 +10,13 @@ use std::sync::{ pub(crate) fn ctrlc_protection(engine_state: &mut EngineState) { let interrupt = Arc::new(AtomicBool::new(false)); engine_state.set_signals(Signals::new(interrupt.clone())); + + let ctrlc_handlers = Handlers::new(); + engine_state.ctrlc_handlers = Some(ctrlc_handlers.clone()); + ctrlc::set_handler(move || { interrupt.store(true, Ordering::Relaxed); + ctrlc_handlers.run(); }) .expect("Error setting Ctrl-C handler"); }