Add Goodbye message to ensure plugins exit when they are no longer needed (#12014)

# Description

This fixes a race condition where all interfaces to a plugin might have
been dropped, but both sides are still expecting input, and the
`PluginInterfaceManager` doesn't get a chance to see that the interfaces
have been dropped and stop trying to consume input.

As the manager needs to hold on to a writer, we can't automatically
close the stream, but we also can't interrupt it if it's in a waiting to
read. So the best solution is to send a message to the plugin that we
are no longer going to be sending it any plugin calls, so that it knows
that it can exit when it's done.

This race condition is a little bit tricky to trigger as-is, but can be
more noticeable when running plugins in a tight loop. If too many plugin
processes are spawned at one time, Nushell can start to encounter "too
many open files" errors, and not be very useful.


# User-Facing Changes


# Tests + Formatting
- 🟢 `toolkit fmt`
- 🟢 `toolkit clippy`
- 🟢 `toolkit test`
- 🟢 `toolkit test stdlib`

# After Submitting

I will need to add `Goodbye` to the protocol docs
This commit is contained in:
Devyn Cairns 2024-02-28 18:41:22 -08:00 committed by GitHub
parent 345edbbe10
commit ab08328a30
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 80 additions and 3 deletions

View File

@ -66,8 +66,8 @@ impl std::fmt::Debug for EngineInterfaceState {
pub(crate) struct EngineInterfaceManager { pub(crate) struct EngineInterfaceManager {
/// Shared state /// Shared state
state: Arc<EngineInterfaceState>, state: Arc<EngineInterfaceState>,
/// Channel to send received PluginCalls to /// Channel to send received PluginCalls to. This is removed after `Goodbye` is received.
plugin_call_sender: mpsc::Sender<ReceivedPluginCall>, plugin_call_sender: Option<mpsc::Sender<ReceivedPluginCall>>,
/// Receiver for PluginCalls. This is usually taken after initialization /// Receiver for PluginCalls. This is usually taken after initialization
plugin_call_receiver: Option<mpsc::Receiver<ReceivedPluginCall>>, plugin_call_receiver: Option<mpsc::Receiver<ReceivedPluginCall>>,
/// Manages stream messages and state /// Manages stream messages and state
@ -85,7 +85,7 @@ impl EngineInterfaceManager {
stream_id_sequence: Sequence::default(), stream_id_sequence: Sequence::default(),
writer: Box::new(writer), writer: Box::new(writer),
}), }),
plugin_call_sender: plug_tx, plugin_call_sender: Some(plug_tx),
plugin_call_receiver: Some(plug_rx), plugin_call_receiver: Some(plug_rx),
stream_manager: StreamManager::new(), stream_manager: StreamManager::new(),
protocol_info: None, protocol_info: None,
@ -112,6 +112,10 @@ impl EngineInterfaceManager {
/// Send a [`ReceivedPluginCall`] to the channel /// Send a [`ReceivedPluginCall`] to the channel
fn send_plugin_call(&self, plugin_call: ReceivedPluginCall) -> Result<(), ShellError> { fn send_plugin_call(&self, plugin_call: ReceivedPluginCall) -> Result<(), ShellError> {
self.plugin_call_sender self.plugin_call_sender
.as_ref()
.ok_or_else(|| ShellError::PluginFailedToDecode {
msg: "Received a plugin call after Goodbye".into(),
})?
.send(plugin_call) .send(plugin_call)
.map_err(|_| ShellError::NushellFailed { .map_err(|_| ShellError::NushellFailed {
msg: "Received a plugin call, but there's nowhere to send it".into(), msg: "Received a plugin call, but there's nowhere to send it".into(),
@ -230,6 +234,11 @@ impl InterfaceManager for EngineInterfaceManager {
}) })
} }
}, },
PluginInput::Goodbye => {
// Remove the plugin call sender so it hangs up
drop(self.plugin_call_sender.take());
Ok(())
}
} }
} }

View File

@ -1,3 +1,5 @@
use std::sync::mpsc::TryRecvError;
use nu_protocol::{ use nu_protocol::{
CustomValue, IntoInterruptiblePipelineData, PipelineData, PluginSignature, ShellError, Span, CustomValue, IntoInterruptiblePipelineData, PipelineData, PluginSignature, ShellError, Span,
Spanned, Value, Spanned, Value,
@ -215,6 +217,25 @@ fn manager_consume_errors_on_sending_other_messages_before_hello() -> Result<(),
Ok(()) Ok(())
} }
#[test]
fn manager_consume_goodbye_closes_plugin_call_channel() -> Result<(), ShellError> {
let mut manager = TestCase::new().engine();
manager.protocol_info = Some(ProtocolInfo::default());
let rx = manager
.take_plugin_call_receiver()
.expect("plugin call receiver missing");
manager.consume(PluginInput::Goodbye)?;
match rx.try_recv() {
Err(TryRecvError::Disconnected) => (),
_ => panic!("receiver was not disconnected"),
}
Ok(())
}
#[test] #[test]
fn manager_consume_call_signature_forwards_to_receiver_with_context() -> Result<(), ShellError> { fn manager_consume_call_signature_forwards_to_receiver_with_context() -> Result<(), ShellError> {
let mut manager = TestCase::new().engine(); let mut manager = TestCase::new().engine();

View File

@ -318,6 +318,16 @@ impl PluginInterface {
self.flush() self.flush()
} }
/// Tell the plugin it should not expect any more plugin calls and should terminate after it has
/// finished processing the ones it has already received.
///
/// Note that this is automatically called when the last existing `PluginInterface` is dropped.
/// You probably do not need to call this manually.
pub(crate) fn goodbye(&self) -> Result<(), ShellError> {
self.write(PluginInput::Goodbye)?;
self.flush()
}
/// Write a plugin call message. Returns the writer for the stream, and the receiver for /// Write a plugin call message. Returns the writer for the stream, and the receiver for
/// messages (e.g. response) related to the plugin call /// messages (e.g. response) related to the plugin call
fn write_plugin_call( fn write_plugin_call(
@ -502,3 +512,18 @@ impl Interface for PluginInterface {
} }
} }
} }
impl Drop for PluginInterface {
fn drop(&mut self) {
// Automatically send `Goodbye` if there are no more interfaces. In that case there would be
// only two copies of the state, one of which we hold, and one of which the manager holds.
//
// Our copy is about to be dropped, so there would only be one left, the manager. The
// manager will never send any plugin calls, so we should let the plugin know that.
if Arc::strong_count(&self.state) < 3 {
if let Err(err) = self.goodbye() {
log::warn!("Error during plugin Goodbye: {err}");
}
}
}
}

View File

@ -415,6 +415,23 @@ fn interface_hello_sends_protocol_info() -> Result<(), ShellError> {
Ok(()) Ok(())
} }
#[test]
fn interface_goodbye() -> Result<(), ShellError> {
let test = TestCase::new();
let interface = test.plugin("test").get_interface();
interface.goodbye()?;
let written = test.next_written().expect("nothing written");
assert!(
matches!(written, PluginInput::Goodbye),
"not goodbye: {written:?}"
);
assert!(!test.has_unconsumed_write());
Ok(())
}
#[test] #[test]
fn interface_write_plugin_call_registers_subscription() -> Result<(), ShellError> { fn interface_write_plugin_call_registers_subscription() -> Result<(), ShellError> {
let mut manager = TestCase::new().plugin("test"); let mut manager = TestCase::new().plugin("test");

View File

@ -114,6 +114,9 @@ pub enum PluginInput {
/// Execute a [`PluginCall`], such as `Run` or `Signature`. The ID should not have been used /// Execute a [`PluginCall`], such as `Run` or `Signature`. The ID should not have been used
/// before. /// before.
Call(PluginCallId, PluginCall<PipelineDataHeader>), Call(PluginCallId, PluginCall<PipelineDataHeader>),
/// Don't expect any more plugin calls. Exit after all currently executing plugin calls are
/// finished.
Goodbye,
/// Stream control or data message. Untagged to keep them as small as possible. /// Stream control or data message. Untagged to keep them as small as possible.
/// ///
/// For example, `Stream(Ack(0))` is encoded as `{"Ack": 0}` /// For example, `Stream(Ack(0))` is encoded as `{"Ack": 0}`

View File

@ -217,6 +217,8 @@ def write_error(id, msg, span=None):
def handle_input(input): def handle_input(input):
if "Hello" in input: if "Hello" in input:
return return
elif input == "Goodbye":
return
elif "Call" in input: elif "Call" in input:
[id, plugin_call] = input["Call"] [id, plugin_call] = input["Call"]
if "Signature" in plugin_call: if "Signature" in plugin_call: