From ab08328a30c13bd6fd80e5cd6cdf5af76a53752d Mon Sep 17 00:00:00 2001 From: Devyn Cairns Date: Wed, 28 Feb 2024 18:41:22 -0800 Subject: [PATCH] Add Goodbye message to ensure plugins exit when they are no longer needed (#12014) # Description This fixes a race condition where all interfaces to a plugin might have been dropped, but both sides are still expecting input, and the `PluginInterfaceManager` doesn't get a chance to see that the interfaces have been dropped and stop trying to consume input. As the manager needs to hold on to a writer, we can't automatically close the stream, but we also can't interrupt it if it's in a waiting to read. So the best solution is to send a message to the plugin that we are no longer going to be sending it any plugin calls, so that it knows that it can exit when it's done. This race condition is a little bit tricky to trigger as-is, but can be more noticeable when running plugins in a tight loop. If too many plugin processes are spawned at one time, Nushell can start to encounter "too many open files" errors, and not be very useful. # User-Facing Changes # Tests + Formatting - :green_circle: `toolkit fmt` - :green_circle: `toolkit clippy` - :green_circle: `toolkit test` - :green_circle: `toolkit test stdlib` # After Submitting I will need to add `Goodbye` to the protocol docs --- .../nu-plugin/src/plugin/interface/engine.rs | 15 ++++++++--- .../src/plugin/interface/engine/tests.rs | 21 ++++++++++++++++ .../nu-plugin/src/plugin/interface/plugin.rs | 25 +++++++++++++++++++ .../src/plugin/interface/plugin/tests.rs | 17 +++++++++++++ crates/nu-plugin/src/protocol/mod.rs | 3 +++ .../nu_plugin_python_example.py | 2 ++ 6 files changed, 80 insertions(+), 3 deletions(-) diff --git a/crates/nu-plugin/src/plugin/interface/engine.rs b/crates/nu-plugin/src/plugin/interface/engine.rs index 643ce6c6db..7d5012ed11 100644 --- a/crates/nu-plugin/src/plugin/interface/engine.rs +++ b/crates/nu-plugin/src/plugin/interface/engine.rs @@ -66,8 +66,8 @@ impl std::fmt::Debug for EngineInterfaceState { pub(crate) struct EngineInterfaceManager { /// Shared state state: Arc, - /// Channel to send received PluginCalls to - plugin_call_sender: mpsc::Sender, + /// Channel to send received PluginCalls to. This is removed after `Goodbye` is received. + plugin_call_sender: Option>, /// Receiver for PluginCalls. This is usually taken after initialization plugin_call_receiver: Option>, /// Manages stream messages and state @@ -85,7 +85,7 @@ impl EngineInterfaceManager { stream_id_sequence: Sequence::default(), writer: Box::new(writer), }), - plugin_call_sender: plug_tx, + plugin_call_sender: Some(plug_tx), plugin_call_receiver: Some(plug_rx), stream_manager: StreamManager::new(), protocol_info: None, @@ -112,6 +112,10 @@ impl EngineInterfaceManager { /// Send a [`ReceivedPluginCall`] to the channel fn send_plugin_call(&self, plugin_call: ReceivedPluginCall) -> Result<(), ShellError> { self.plugin_call_sender + .as_ref() + .ok_or_else(|| ShellError::PluginFailedToDecode { + msg: "Received a plugin call after Goodbye".into(), + })? .send(plugin_call) .map_err(|_| ShellError::NushellFailed { msg: "Received a plugin call, but there's nowhere to send it".into(), @@ -230,6 +234,11 @@ impl InterfaceManager for EngineInterfaceManager { }) } }, + PluginInput::Goodbye => { + // Remove the plugin call sender so it hangs up + drop(self.plugin_call_sender.take()); + Ok(()) + } } } diff --git a/crates/nu-plugin/src/plugin/interface/engine/tests.rs b/crates/nu-plugin/src/plugin/interface/engine/tests.rs index 4946ee239f..c6be0e5374 100644 --- a/crates/nu-plugin/src/plugin/interface/engine/tests.rs +++ b/crates/nu-plugin/src/plugin/interface/engine/tests.rs @@ -1,3 +1,5 @@ +use std::sync::mpsc::TryRecvError; + use nu_protocol::{ CustomValue, IntoInterruptiblePipelineData, PipelineData, PluginSignature, ShellError, Span, Spanned, Value, @@ -215,6 +217,25 @@ fn manager_consume_errors_on_sending_other_messages_before_hello() -> Result<(), Ok(()) } +#[test] +fn manager_consume_goodbye_closes_plugin_call_channel() -> Result<(), ShellError> { + let mut manager = TestCase::new().engine(); + manager.protocol_info = Some(ProtocolInfo::default()); + + let rx = manager + .take_plugin_call_receiver() + .expect("plugin call receiver missing"); + + manager.consume(PluginInput::Goodbye)?; + + match rx.try_recv() { + Err(TryRecvError::Disconnected) => (), + _ => panic!("receiver was not disconnected"), + } + + Ok(()) +} + #[test] fn manager_consume_call_signature_forwards_to_receiver_with_context() -> Result<(), ShellError> { let mut manager = TestCase::new().engine(); diff --git a/crates/nu-plugin/src/plugin/interface/plugin.rs b/crates/nu-plugin/src/plugin/interface/plugin.rs index 79951ed9c7..2a2e83c740 100644 --- a/crates/nu-plugin/src/plugin/interface/plugin.rs +++ b/crates/nu-plugin/src/plugin/interface/plugin.rs @@ -318,6 +318,16 @@ impl PluginInterface { self.flush() } + /// Tell the plugin it should not expect any more plugin calls and should terminate after it has + /// finished processing the ones it has already received. + /// + /// Note that this is automatically called when the last existing `PluginInterface` is dropped. + /// You probably do not need to call this manually. + pub(crate) fn goodbye(&self) -> Result<(), ShellError> { + self.write(PluginInput::Goodbye)?; + self.flush() + } + /// Write a plugin call message. Returns the writer for the stream, and the receiver for /// messages (e.g. response) related to the plugin call fn write_plugin_call( @@ -502,3 +512,18 @@ impl Interface for PluginInterface { } } } + +impl Drop for PluginInterface { + fn drop(&mut self) { + // Automatically send `Goodbye` if there are no more interfaces. In that case there would be + // only two copies of the state, one of which we hold, and one of which the manager holds. + // + // Our copy is about to be dropped, so there would only be one left, the manager. The + // manager will never send any plugin calls, so we should let the plugin know that. + if Arc::strong_count(&self.state) < 3 { + if let Err(err) = self.goodbye() { + log::warn!("Error during plugin Goodbye: {err}"); + } + } + } +} diff --git a/crates/nu-plugin/src/plugin/interface/plugin/tests.rs b/crates/nu-plugin/src/plugin/interface/plugin/tests.rs index 93e1df1c97..57b306f86a 100644 --- a/crates/nu-plugin/src/plugin/interface/plugin/tests.rs +++ b/crates/nu-plugin/src/plugin/interface/plugin/tests.rs @@ -415,6 +415,23 @@ fn interface_hello_sends_protocol_info() -> Result<(), ShellError> { Ok(()) } +#[test] +fn interface_goodbye() -> Result<(), ShellError> { + let test = TestCase::new(); + let interface = test.plugin("test").get_interface(); + interface.goodbye()?; + + let written = test.next_written().expect("nothing written"); + + assert!( + matches!(written, PluginInput::Goodbye), + "not goodbye: {written:?}" + ); + + assert!(!test.has_unconsumed_write()); + Ok(()) +} + #[test] fn interface_write_plugin_call_registers_subscription() -> Result<(), ShellError> { let mut manager = TestCase::new().plugin("test"); diff --git a/crates/nu-plugin/src/protocol/mod.rs b/crates/nu-plugin/src/protocol/mod.rs index da736ad5d3..a5e054ed15 100644 --- a/crates/nu-plugin/src/protocol/mod.rs +++ b/crates/nu-plugin/src/protocol/mod.rs @@ -114,6 +114,9 @@ pub enum PluginInput { /// Execute a [`PluginCall`], such as `Run` or `Signature`. The ID should not have been used /// before. Call(PluginCallId, PluginCall), + /// Don't expect any more plugin calls. Exit after all currently executing plugin calls are + /// finished. + Goodbye, /// Stream control or data message. Untagged to keep them as small as possible. /// /// For example, `Stream(Ack(0))` is encoded as `{"Ack": 0}` diff --git a/crates/nu_plugin_python/nu_plugin_python_example.py b/crates/nu_plugin_python/nu_plugin_python_example.py index 74733491cd..bd339895d9 100755 --- a/crates/nu_plugin_python/nu_plugin_python_example.py +++ b/crates/nu_plugin_python/nu_plugin_python_example.py @@ -217,6 +217,8 @@ def write_error(id, msg, span=None): def handle_input(input): if "Hello" in input: return + elif input == "Goodbye": + return elif "Call" in input: [id, plugin_call] = input["Call"] if "Signature" in plugin_call: