Keep plugins persistently running in the background (#12064)

# Description
This PR uses the new plugin protocol to intelligently keep plugin
processes running in the background for further plugin calls.

Running plugins can be seen by running the new `plugin list` command,
and stopped by running the new `plugin stop` command.

This is an enhancement for the performance of plugins, as starting new
plugin processes has overhead, especially for plugins in languages that
take a significant amount of time on startup. It also enables plugins
that have persistent state between commands, making the migration of
features like dataframes and `stor` to plugins possible.

Plugins are automatically stopped by the new plugin garbage collector,
configurable with `$env.config.plugin_gc`:

```nushell
  $env.config.plugin_gc = {
      # Configuration for plugin garbage collection
      default: {
          enabled: true # true to enable stopping of inactive plugins
          stop_after: 10sec # how long to wait after a plugin is inactive to stop it
      }
      plugins: {
          # alternate configuration for specific plugins, by name, for example:
          #
          # gstat: {
          #     enabled: false
          # }
      }
  }
```

If garbage collection is enabled, plugins will be stopped after
`stop_after` passes after they were last active. Plugins are counted as
inactive if they have no running plugin calls. Reading the stream from
the response of a plugin call is still considered to be activity, but if
a plugin holds on to a stream but the call ends without an active
streaming response, it is not counted as active even if it is reading
it. Plugins can explicitly disable the GC as appropriate with
`engine.set_gc_disabled(true)`.

The `version` command now lists plugin names rather than plugin
commands. The list of plugin commands is accessible via `plugin list`.

Recommend doing this together with #12029, because it will likely force
plugin developers to do the right thing with mutability and lead to less
unexpected behavior when running plugins nested / in parallel.

# User-Facing Changes
- new command: `plugin list`
- new command: `plugin stop`
- changed command: `version` (now lists plugin names, rather than
commands)
- new config: `$env.config.plugin_gc`
- Plugins will keep running and be reused, at least for the configured
GC period
- Plugins that used mutable state in weird ways like `inc` did might
misbehave until fixed
- Plugins can disable GC if they need to
- Had to change plugin signature to accept `&EngineInterface` so that
the GC disable feature works. #12029 does this anyway, and I'm expecting
(resolvable) conflicts with that

# Tests + Formatting
- 🟢 `toolkit fmt`
- 🟢 `toolkit clippy`
- 🟢 `toolkit test`
- 🟢 `toolkit test stdlib`

Because there is some specific OS behavior required for plugins to not
respond to Ctrl-C directly, I've developed against and tested on both
Linux and Windows to ensure that works properly.

# After Submitting
I think this probably needs to be in the book somewhere
This commit is contained in:
Devyn Cairns
2024-03-09 15:10:22 -08:00
committed by GitHub
parent 430fb1fcb6
commit bc19be25b1
44 changed files with 2131 additions and 304 deletions

View File

@ -16,7 +16,7 @@
//! invoked by Nushell.
//!
//! ```rust,no_run
//! use nu_plugin::*;
//! use nu_plugin::{EvaluatedCall, LabeledError, MsgPackSerializer, Plugin, EngineInterface, serve_plugin};
//! use nu_protocol::{PluginSignature, Value};
//!
//! struct MyPlugin;
@ -55,7 +55,7 @@ pub use serializers::{json::JsonSerializer, msgpack::MsgPackSerializer};
// Used by other nu crates.
#[doc(hidden)]
pub use plugin::{get_signature, PluginDeclaration};
pub use plugin::{get_signature, PersistentPlugin, PluginDeclaration};
#[doc(hidden)]
pub use serializers::EncodingType;

View File

@ -4,11 +4,9 @@ use nu_engine::get_eval_block_with_early_return;
use nu_protocol::{
ast::Call,
engine::{Closure, EngineState, Stack},
Config, PipelineData, ShellError, Span, Spanned, Value,
Config, PipelineData, PluginIdentity, ShellError, Span, Spanned, Value,
};
use super::PluginIdentity;
/// Object safe trait for abstracting operations required of the plugin context.
pub(crate) trait PluginExecutionContext: Send + Sync {
/// The [Span] for the command execution (`call.head`)
@ -81,7 +79,7 @@ impl PluginExecutionContext for PluginExecutionCommandContext {
Ok(self
.get_config()?
.plugins
.get(&self.identity.plugin_name)
.get(self.identity.name())
.cloned()
.map(|value| {
let span = value.span();

View File

@ -1,28 +1,27 @@
use super::{PluginExecutionCommandContext, PluginIdentity};
use super::{PersistentPlugin, PluginExecutionCommandContext, PluginSource};
use crate::protocol::{CallInfo, EvaluatedCall};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use nu_engine::get_eval_expression;
use nu_protocol::engine::{Command, EngineState, Stack};
use nu_protocol::{ast::Call, PluginSignature, Signature};
use nu_protocol::{Example, PipelineData, ShellError};
use nu_protocol::{Example, PipelineData, PluginIdentity, RegisteredPlugin, ShellError};
#[doc(hidden)] // Note: not for plugin authors / only used in nu-parser
#[derive(Clone)]
pub struct PluginDeclaration {
name: String,
signature: PluginSignature,
identity: Arc<PluginIdentity>,
source: PluginSource,
}
impl PluginDeclaration {
pub fn new(filename: PathBuf, signature: PluginSignature, shell: Option<PathBuf>) -> Self {
pub fn new(plugin: &Arc<PersistentPlugin>, signature: PluginSignature) -> Self {
Self {
name: signature.sig.name.clone(),
signature,
identity: Arc::new(PluginIdentity::new(filename, shell)),
source: PluginSource::new(plugin),
}
}
}
@ -79,25 +78,37 @@ impl Command for PluginDeclaration {
let evaluated_call =
EvaluatedCall::try_from_call(call, engine_state, stack, eval_expression)?;
// We need the current environment variables for `python` based plugins
// Or we'll likely have a problem when a plugin is implemented in a virtual Python environment.
let current_envs = nu_engine::env::env_to_strings(engine_state, stack).unwrap_or_default();
// Get the engine config
let engine_config = nu_engine::get_config(engine_state, stack);
// Start the plugin
let plugin = self.identity.clone().spawn(current_envs).map_err(|err| {
let decl = engine_state.get_decl(call.decl_id);
ShellError::GenericError {
error: format!("Unable to spawn plugin for `{}`", decl.name()),
msg: err.to_string(),
span: Some(call.head),
help: None,
inner: vec![],
}
})?;
// Get, or start, the plugin.
let plugin = self
.source
.persistent(None)
.and_then(|p| {
// Set the garbage collector config from the local config before running
p.set_gc_config(engine_config.plugin_gc.get(p.identity().name()));
p.get(|| {
// We need the current environment variables for `python` based plugins. Or
// we'll likely have a problem when a plugin is implemented in a virtual Python
// environment.
nu_engine::env::env_to_strings(engine_state, stack)
})
})
.map_err(|err| {
let decl = engine_state.get_decl(call.decl_id);
ShellError::GenericError {
error: format!("Unable to spawn plugin for `{}`", decl.name()),
msg: err.to_string(),
span: Some(call.head),
help: None,
inner: vec![],
}
})?;
// Create the context to execute in - this supports engine calls and custom values
let context = Arc::new(PluginExecutionCommandContext::new(
self.identity.clone(),
self.source.identity.clone(),
engine_state,
stack,
call,
@ -113,7 +124,11 @@ impl Command for PluginDeclaration {
)
}
fn is_plugin(&self) -> Option<(&Path, Option<&Path>)> {
Some((&self.identity.filename, self.identity.shell.as_deref()))
fn is_plugin(&self) -> bool {
true
}
fn plugin_identity(&self) -> Option<&PluginIdentity> {
Some(&self.source.identity)
}
}

View File

@ -0,0 +1,290 @@
use std::{
sync::{mpsc, Arc, Weak},
thread,
time::{Duration, Instant},
};
use nu_protocol::{PluginGcConfig, RegisteredPlugin};
use crate::PersistentPlugin;
/// Plugin garbage collector
///
/// Many users don't want all of their plugins to stay running indefinitely after using them, so
/// this runs a thread that monitors the plugin's usage and stops it automatically if it meets
/// certain conditions of inactivity.
#[derive(Debug, Clone)]
pub struct PluginGc {
sender: mpsc::Sender<PluginGcMsg>,
}
impl PluginGc {
/// Start a new plugin garbage collector. Returns an error if the thread failed to spawn.
pub fn new(
config: PluginGcConfig,
plugin: &Arc<PersistentPlugin>,
) -> std::io::Result<PluginGc> {
let (sender, receiver) = mpsc::channel();
let mut state = PluginGcState {
config,
last_update: None,
locks: 0,
disabled: false,
plugin: Arc::downgrade(plugin),
name: plugin.identity().name().to_owned(),
};
thread::Builder::new()
.name(format!("plugin gc ({})", plugin.identity().name()))
.spawn(move || state.run(receiver))?;
Ok(PluginGc { sender })
}
/// Update the garbage collector config
pub fn set_config(&self, config: PluginGcConfig) {
let _ = self.sender.send(PluginGcMsg::SetConfig(config));
}
/// Increment the number of locks held by the plugin
pub fn increment_locks(&self, amount: i64) {
let _ = self.sender.send(PluginGcMsg::AddLocks(amount));
}
/// Decrement the number of locks held by the plugin
pub fn decrement_locks(&self, amount: i64) {
let _ = self.sender.send(PluginGcMsg::AddLocks(-amount));
}
/// Set whether the GC is disabled by explicit request from the plugin. This is separate from
/// the `enabled` option in the config, and overrides that option.
pub fn set_disabled(&self, disabled: bool) {
let _ = self.sender.send(PluginGcMsg::SetDisabled(disabled));
}
/// Tell the GC to stop tracking the plugin. The plugin will not be stopped. The GC cannot be
/// reactivated after this request - a new one must be created instead.
pub fn stop_tracking(&self) {
let _ = self.sender.send(PluginGcMsg::StopTracking);
}
/// Tell the GC that the plugin exited so that it can remove it from the persistent plugin.
///
/// The reason the plugin tells the GC rather than just stopping itself via `source` is that
/// it can't guarantee that the plugin currently pointed to by `source` is itself, but if the
/// GC is still running, it hasn't received [`.stop_tracking()`] yet, which means it should be
/// the right plugin.
pub fn exited(&self) {
let _ = self.sender.send(PluginGcMsg::Exited);
}
}
#[derive(Debug)]
enum PluginGcMsg {
SetConfig(PluginGcConfig),
AddLocks(i64),
SetDisabled(bool),
StopTracking,
Exited,
}
#[derive(Debug)]
struct PluginGcState {
config: PluginGcConfig,
last_update: Option<Instant>,
locks: i64,
disabled: bool,
plugin: Weak<PersistentPlugin>,
name: String,
}
impl PluginGcState {
fn next_timeout(&self, now: Instant) -> Option<Duration> {
if self.locks <= 0 && !self.disabled {
self.last_update
.zip(self.config.enabled.then_some(self.config.stop_after))
.map(|(last_update, stop_after)| {
// If configured to stop, and used at some point, calculate the difference
let stop_after_duration = Duration::from_nanos(stop_after.max(0) as u64);
let duration_since_last_update = now.duration_since(last_update);
stop_after_duration.saturating_sub(duration_since_last_update)
})
} else {
// Don't timeout if there are locks set, or disabled
None
}
}
// returns `Some()` if the GC should not continue to operate, with `true` if it should stop the
// plugin, or `false` if it should not
fn handle_message(&mut self, msg: PluginGcMsg) -> Option<bool> {
match msg {
PluginGcMsg::SetConfig(config) => {
self.config = config;
}
PluginGcMsg::AddLocks(amount) => {
self.locks += amount;
if self.locks < 0 {
log::warn!(
"Plugin GC ({name}) problem: locks count below zero after adding \
{amount}: locks={locks}",
name = self.name,
locks = self.locks,
);
}
// Any time locks are modified, that counts as activity
self.last_update = Some(Instant::now());
}
PluginGcMsg::SetDisabled(disabled) => {
self.disabled = disabled;
}
PluginGcMsg::StopTracking => {
// Immediately exit without stopping the plugin
return Some(false);
}
PluginGcMsg::Exited => {
// Exit and stop the plugin
return Some(true);
}
}
None
}
fn run(&mut self, receiver: mpsc::Receiver<PluginGcMsg>) {
let mut always_stop = false;
loop {
let Some(msg) = (match self.next_timeout(Instant::now()) {
Some(duration) => receiver.recv_timeout(duration).ok(),
None => receiver.recv().ok(),
}) else {
// If the timeout was reached, or the channel is disconnected, break the loop
break;
};
log::trace!("Plugin GC ({name}) message: {msg:?}", name = self.name);
if let Some(should_stop) = self.handle_message(msg) {
// Exit the GC
if should_stop {
// If should_stop = true, attempt to stop the plugin
always_stop = true;
break;
} else {
// Don't stop the plugin
return;
}
}
}
// Upon exiting the loop, if the timeout reached zero, or we are exiting due to an Exited
// message, stop the plugin
if always_stop
|| self
.next_timeout(Instant::now())
.is_some_and(|t| t.is_zero())
{
// We only hold a weak reference, and it's not an error if we fail to upgrade it -
// that just means the plugin is definitely stopped anyway.
if let Some(plugin) = self.plugin.upgrade() {
let name = &self.name;
if let Err(err) = plugin.stop() {
log::warn!("Plugin `{name}` failed to be stopped by GC: {err}");
} else {
log::debug!("Plugin `{name}` successfully stopped by GC");
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_state() -> PluginGcState {
PluginGcState {
config: PluginGcConfig::default(),
last_update: None,
locks: 0,
disabled: false,
plugin: Weak::new(),
name: "test".into(),
}
}
#[test]
fn timeout_configured_as_zero() {
let now = Instant::now();
let mut state = test_state();
state.config.enabled = true;
state.config.stop_after = 0;
state.last_update = Some(now);
assert_eq!(Some(Duration::ZERO), state.next_timeout(now));
}
#[test]
fn timeout_past_deadline() {
let now = Instant::now();
let mut state = test_state();
state.config.enabled = true;
state.config.stop_after = Duration::from_secs(1).as_nanos() as i64;
state.last_update = Some(now - Duration::from_secs(2));
assert_eq!(Some(Duration::ZERO), state.next_timeout(now));
}
#[test]
fn timeout_with_deadline_in_future() {
let now = Instant::now();
let mut state = test_state();
state.config.enabled = true;
state.config.stop_after = Duration::from_secs(1).as_nanos() as i64;
state.last_update = Some(now);
assert_eq!(Some(Duration::from_secs(1)), state.next_timeout(now));
}
#[test]
fn no_timeout_if_disabled_by_config() {
let now = Instant::now();
let mut state = test_state();
state.config.enabled = false;
state.last_update = Some(now);
assert_eq!(None, state.next_timeout(now));
}
#[test]
fn no_timeout_if_disabled_by_plugin() {
let now = Instant::now();
let mut state = test_state();
state.config.enabled = true;
state.disabled = true;
state.last_update = Some(now);
assert_eq!(None, state.next_timeout(now));
}
#[test]
fn no_timeout_if_locks_count_over_zero() {
let now = Instant::now();
let mut state = test_state();
state.config.enabled = true;
state.locks = 1;
state.last_update = Some(now);
assert_eq!(None, state.next_timeout(now));
}
#[test]
fn adding_locks_changes_last_update() {
let mut state = test_state();
let original_last_update = Some(Instant::now() - Duration::from_secs(1));
state.last_update = original_last_update;
state.handle_message(PluginGcMsg::AddLocks(1));
assert_ne!(original_last_update, state.last_update, "not updated");
}
}

View File

@ -1,110 +0,0 @@
use std::{
ffi::OsStr,
path::{Path, PathBuf},
sync::Arc,
};
use nu_protocol::ShellError;
use super::{create_command, make_plugin_interface, PluginInterface};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PluginIdentity {
/// The filename used to start the plugin
pub(crate) filename: PathBuf,
/// The shell used to start the plugin, if required
pub(crate) shell: Option<PathBuf>,
/// The friendly name of the plugin (e.g. `inc` for `C:\nu_plugin_inc.exe`)
pub(crate) plugin_name: String,
}
impl PluginIdentity {
pub(crate) fn new(filename: impl Into<PathBuf>, shell: Option<PathBuf>) -> PluginIdentity {
let filename = filename.into();
// `C:\nu_plugin_inc.exe` becomes `inc`
// `/home/nu/.cargo/bin/nu_plugin_inc` becomes `inc`
// any other path, including if it doesn't start with nu_plugin_, becomes
// `<invalid plugin name>`
let plugin_name = filename
.file_stem()
.map(|stem| stem.to_string_lossy().into_owned())
.and_then(|stem| stem.strip_prefix("nu_plugin_").map(|s| s.to_owned()))
.unwrap_or_else(|| {
log::warn!(
"filename `{}` is not a valid plugin name, must start with nu_plugin_",
filename.display()
);
"<invalid plugin name>".into()
});
PluginIdentity {
filename,
shell,
plugin_name,
}
}
#[cfg(all(test, windows))]
pub(crate) fn new_fake(name: &str) -> Arc<PluginIdentity> {
Arc::new(PluginIdentity::new(
format!(r"C:\fake\path\nu_plugin_{name}.exe"),
None,
))
}
#[cfg(all(test, not(windows)))]
pub(crate) fn new_fake(name: &str) -> Arc<PluginIdentity> {
Arc::new(PluginIdentity::new(
format!(r"/fake/path/nu_plugin_{name}"),
None,
))
}
/// Run the plugin command stored in this [`PluginIdentity`], then set up and return the
/// [`PluginInterface`] attached to it.
pub(crate) fn spawn(
self: Arc<Self>,
envs: impl IntoIterator<Item = (impl AsRef<OsStr>, impl AsRef<OsStr>)>,
) -> Result<PluginInterface, ShellError> {
let source_file = Path::new(&self.filename);
let mut plugin_cmd = create_command(source_file, self.shell.as_deref());
// We need the current environment variables for `python` based plugins
// Or we'll likely have a problem when a plugin is implemented in a virtual Python environment.
plugin_cmd.envs(envs);
let program_name = plugin_cmd.get_program().to_os_string().into_string();
// Run the plugin command
let child = plugin_cmd.spawn().map_err(|err| {
let error_msg = match err.kind() {
std::io::ErrorKind::NotFound => match program_name {
Ok(prog_name) => {
format!("Can't find {prog_name}, please make sure that {prog_name} is in PATH.")
}
_ => {
format!("Error spawning child process: {err}")
}
},
_ => {
format!("Error spawning child process: {err}")
}
};
ShellError::PluginFailedToLoad { msg: error_msg }
})?;
make_plugin_interface(child, self)
}
}
#[test]
fn parses_name_from_path() {
assert_eq!("test", PluginIdentity::new_fake("test").plugin_name);
assert_eq!(
"<invalid plugin name>",
PluginIdentity::new("other", None).plugin_name
);
assert_eq!(
"<invalid plugin name>",
PluginIdentity::new("", None).plugin_name
);
}

View File

@ -13,7 +13,8 @@ use nu_protocol::{
use crate::{
protocol::{
CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, PluginCall,
PluginCallId, PluginCallResponse, PluginCustomValue, PluginInput, ProtocolInfo,
PluginCallId, PluginCallResponse, PluginCustomValue, PluginInput, PluginOption,
ProtocolInfo,
},
LabeledError, PluginOutput,
};
@ -670,6 +671,18 @@ impl EngineInterface {
value => Ok(value),
}
}
/// Tell the engine whether to disable garbage collection for this plugin.
///
/// The garbage collector is enabled by default, but plugins can turn it off (ideally
/// temporarily) as necessary to implement functionality that requires the plugin to stay
/// running for longer than the engine can automatically determine.
///
/// The user can still stop the plugin if they want to with the `plugin stop` command.
pub fn set_gc_disabled(&self, disabled: bool) -> Result<(), ShellError> {
self.write(PluginOutput::Option(PluginOption::GcDisabled(disabled)))?;
self.flush()
}
}
impl Interface for EngineInterface {

View File

@ -2,7 +2,7 @@
use std::{
collections::{btree_map, BTreeMap},
sync::{mpsc, Arc},
sync::{mpsc, Arc, OnceLock},
};
use nu_protocol::{
@ -11,11 +11,11 @@ use nu_protocol::{
};
use crate::{
plugin::{context::PluginExecutionContext, PluginIdentity},
plugin::{context::PluginExecutionContext, gc::PluginGc, PluginSource},
protocol::{
CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, PluginCall,
PluginCallId, PluginCallResponse, PluginCustomValue, PluginInput, PluginOutput,
ProtocolInfo, StreamId, StreamMessage,
PluginCallId, PluginCallResponse, PluginCustomValue, PluginInput, PluginOption,
PluginOutput, ProtocolInfo, StreamId, StreamMessage,
},
sequence::Sequence,
};
@ -63,14 +63,16 @@ impl std::ops::Deref for Context {
/// Internal shared state between the manager and each interface.
struct PluginInterfaceState {
/// The identity of the plugin being interfaced with
identity: Arc<PluginIdentity>,
/// The source to be used for custom values coming from / going to the plugin
source: Arc<PluginSource>,
/// Sequence for generating plugin call ids
plugin_call_id_sequence: Sequence,
/// Sequence for generating stream ids
stream_id_sequence: Sequence,
/// Sender to subscribe to a plugin call response
plugin_call_subscription_sender: mpsc::Sender<(PluginCallId, PluginCallSubscription)>,
/// An error that should be propagated to further plugin calls
error: OnceLock<ShellError>,
/// The synchronized output writer
writer: Box<dyn PluginWrite<PluginInput>>,
}
@ -78,7 +80,7 @@ struct PluginInterfaceState {
impl std::fmt::Debug for PluginInterfaceState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PluginInterfaceState")
.field("identity", &self.identity)
.field("source", &self.source)
.field("plugin_call_id_sequence", &self.plugin_call_id_sequence)
.field("stream_id_sequence", &self.stream_id_sequence)
.field(
@ -118,21 +120,24 @@ pub(crate) struct PluginInterfaceManager {
///
/// This is necessary so we know when we can remove context for plugin calls
plugin_call_input_streams: BTreeMap<StreamId, PluginCallId>,
/// Garbage collector handle, to notify about the state of the plugin
gc: Option<PluginGc>,
}
impl PluginInterfaceManager {
pub(crate) fn new(
identity: Arc<PluginIdentity>,
source: Arc<PluginSource>,
writer: impl PluginWrite<PluginInput> + 'static,
) -> PluginInterfaceManager {
let (subscription_tx, subscription_rx) = mpsc::channel();
PluginInterfaceManager {
state: Arc::new(PluginInterfaceState {
identity,
source,
plugin_call_id_sequence: Sequence::default(),
stream_id_sequence: Sequence::default(),
plugin_call_subscription_sender: subscription_tx,
error: OnceLock::new(),
writer: Box::new(writer),
}),
stream_manager: StreamManager::new(),
@ -140,9 +145,17 @@ impl PluginInterfaceManager {
plugin_call_subscriptions: BTreeMap::new(),
plugin_call_subscription_receiver: subscription_rx,
plugin_call_input_streams: BTreeMap::new(),
gc: None,
}
}
/// Add a garbage collector to this plugin. The manager will notify the garbage collector about
/// the state of the plugin so that it can be automatically cleaned up if the plugin is
/// inactive.
pub(crate) fn set_garbage_collector(&mut self, gc: Option<PluginGc>) {
self.gc = gc;
}
/// Consume pending messages in the `plugin_call_subscription_receiver`
fn receive_plugin_call_subscriptions(&mut self) {
while let Ok((id, subscription)) = self.plugin_call_subscription_receiver.try_recv() {
@ -154,16 +167,21 @@ impl PluginInterfaceManager {
}
}
/// Track the start of stream(s)
/// Track the start of incoming stream(s)
fn recv_stream_started(&mut self, call_id: PluginCallId, stream_id: StreamId) {
self.plugin_call_input_streams.insert(stream_id, call_id);
// Increment the number of streams on the subscription so context stays alive
self.receive_plugin_call_subscriptions();
if let Some(sub) = self.plugin_call_subscriptions.get_mut(&call_id) {
self.plugin_call_input_streams.insert(stream_id, call_id);
sub.remaining_streams_to_read += 1;
}
// Add a lock to the garbage collector for each stream
if let Some(ref gc) = self.gc {
gc.increment_locks(1);
}
}
/// Track the end of a stream
/// Track the end of an incoming stream
fn recv_stream_ended(&mut self, stream_id: StreamId) {
if let Some(call_id) = self.plugin_call_input_streams.remove(&stream_id) {
if let btree_map::Entry::Occupied(mut e) = self.plugin_call_subscriptions.entry(call_id)
@ -174,6 +192,11 @@ impl PluginInterfaceManager {
e.remove();
}
}
// Streams read from the plugin are tracked with locks on the GC so plugins don't get
// stopped if they have active streams
if let Some(ref gc) = self.gc {
gc.decrement_locks(1);
}
}
}
@ -336,12 +359,17 @@ impl PluginInterfaceManager {
&mut self,
mut reader: impl PluginRead<PluginOutput>,
) -> Result<(), ShellError> {
let mut result = Ok(());
while let Some(msg) = reader.read().transpose() {
if self.is_finished() {
break;
}
// We assume an error here is unrecoverable (at least, without restarting the plugin)
if let Err(err) = msg.and_then(|msg| self.consume(msg)) {
// Put the error in the state so that new calls see it
let _ = self.state.error.set(err.clone());
// Error to streams
let _ = self.stream_manager.broadcast_read_error(err.clone());
// Error to call waiters
@ -354,10 +382,16 @@ impl PluginInterfaceManager {
.as_ref()
.map(|s| s.send(ReceivedPluginCallMessage::Error(err.clone())));
}
return Err(err);
result = Err(err);
break;
}
}
Ok(())
// Tell the GC we are exiting so that the plugin doesn't get stuck open
if let Some(ref gc) = self.gc {
gc.exited();
}
result
}
}
@ -369,6 +403,7 @@ impl InterfaceManager for PluginInterfaceManager {
PluginInterface {
state: self.state.clone(),
stream_manager_handle: self.stream_manager.get_handle(),
gc: self.gc.clone(),
}
}
@ -387,7 +422,9 @@ impl InterfaceManager for PluginInterfaceManager {
msg: format!(
"Plugin `{}` is compiled for nushell version {}, \
which is not compatible with version {}",
self.state.identity.plugin_name, info.version, local_info.version,
self.state.source.name(),
info.version,
local_info.version,
),
})
}
@ -398,11 +435,20 @@ impl InterfaceManager for PluginInterfaceManager {
msg: format!(
"Failed to receive initial Hello message from `{}`. \
This plugin might be too old",
self.state.identity.plugin_name
self.state.source.name()
),
})
}
PluginOutput::Stream(message) => self.consume_stream_message(message),
PluginOutput::Option(option) => match option {
PluginOption::GcDisabled(disabled) => {
// Turn garbage collection off/on.
if let Some(ref gc) = self.gc {
gc.set_disabled(disabled);
}
Ok(())
}
},
PluginOutput::CallResponse(id, response) => {
// Handle reading the pipeline data, if any
let response = match response {
@ -413,17 +459,26 @@ impl InterfaceManager for PluginInterfaceManager {
// error response, but send it anyway
let exec_context = self.get_context(id)?;
let ctrlc = exec_context.as_ref().and_then(|c| c.0.ctrlc());
// Register the streams in the response
for stream_id in data.stream_ids() {
self.recv_stream_started(id, stream_id);
}
match self.read_pipeline_data(data, ctrlc) {
Ok(data) => PluginCallResponse::PipelineData(data),
Err(err) => PluginCallResponse::Error(err.into()),
}
}
};
self.send_plugin_call_response(id, response)
let result = self.send_plugin_call_response(id, response);
if result.is_ok() {
// When a call ends, it releases a lock on the GC
if let Some(ref gc) = self.gc {
gc.decrement_locks(1);
}
}
result
}
PluginOutput::EngineCall { context, id, call } => {
// Handle reading the pipeline data, if any
@ -441,7 +496,7 @@ impl InterfaceManager for PluginInterfaceManager {
} => {
// Add source to any plugin custom values in the arguments
for arg in positional.iter_mut() {
PluginCustomValue::add_source(arg, &self.state.identity);
PluginCustomValue::add_source(arg, &self.state.source);
}
self.read_pipeline_data(input, ctrlc)
.map(|input| EngineCall::EvalClosure {
@ -472,14 +527,14 @@ impl InterfaceManager for PluginInterfaceManager {
// Add source to any values
match data {
PipelineData::Value(ref mut value, _) => {
PluginCustomValue::add_source(value, &self.state.identity);
PluginCustomValue::add_source(value, &self.state.source);
Ok(data)
}
PipelineData::ListStream(ListStream { stream, ctrlc, .. }, meta) => {
let identity = self.state.identity.clone();
let source = self.state.source.clone();
Ok(stream
.map(move |mut value| {
PluginCustomValue::add_source(&mut value, &identity);
PluginCustomValue::add_source(&mut value, &source);
value
})
.into_pipeline_data_with_metadata(meta, ctrlc))
@ -489,7 +544,7 @@ impl InterfaceManager for PluginInterfaceManager {
}
fn consume_stream_message(&mut self, message: StreamMessage) -> Result<(), ShellError> {
// Keep track of streams that end so we know if we don't need the context anymore
// Keep track of streams that end
if let StreamMessage::End(id) = message {
self.recv_stream_ended(id);
}
@ -504,6 +559,8 @@ pub(crate) struct PluginInterface {
state: Arc<PluginInterfaceState>,
/// Handle to stream manager
stream_manager_handle: StreamManagerHandle,
/// Handle to plugin garbage collector
gc: Option<PluginGc>,
}
impl PluginInterface {
@ -580,7 +637,7 @@ impl PluginInterface {
mut call,
input,
}) => {
verify_call_args(&mut call, &self.state.identity)?;
verify_call_args(&mut call, &self.state.source)?;
let (header, writer) = self.init_write_pipeline_data(input)?;
(
PluginCall::Run(CallInfo {
@ -604,9 +661,28 @@ impl PluginInterface {
remaining_streams_to_read: 0,
},
))
.map_err(|_| ShellError::NushellFailed {
msg: "PluginInterfaceManager hung up and is no longer accepting plugin calls"
.into(),
.map_err(|_| ShellError::GenericError {
error: format!("Plugin `{}` closed unexpectedly", self.state.source.name()),
msg: "can't complete this operation because the plugin is closed".into(),
span: match &call {
PluginCall::CustomValueOp(value, _) => Some(value.span),
PluginCall::Run(info) => Some(info.call.head),
_ => None,
},
help: Some(format!(
"the plugin may have experienced an error. Try registering the plugin again \
with `{}`",
if let Some(shell) = self.state.source.shell() {
format!(
"register --shell '{}' '{}'",
shell.display(),
self.state.source.filename().display(),
)
} else {
format!("register '{}'", self.state.source.filename().display())
}
)),
inner: vec![],
})?;
// Write request
@ -681,6 +757,18 @@ impl PluginInterface {
call: PluginCall<PipelineData>,
context: &Option<Context>,
) -> Result<PluginCallResponse<PipelineData>, ShellError> {
// Check for an error in the state first, and return it if set.
if let Some(error) = self.state.error.get() {
return Err(error.clone());
}
// Starting a plugin call adds a lock on the GC. Locks are not added for streams being read
// by the plugin, so the plugin would have to explicitly tell us if it expects to stay alive
// while reading streams in the background after the response ends.
if let Some(ref gc) = self.gc {
gc.increment_locks(1);
}
let (writer, rx) = self.write_plugin_call(call, context.clone())?;
// Finish writing stream in the background
@ -737,7 +825,7 @@ impl PluginInterface {
/// Check that custom values in call arguments come from the right source
fn verify_call_args(
call: &mut crate::EvaluatedCall,
source: &Arc<PluginIdentity>,
source: &Arc<PluginSource>,
) -> Result<(), ShellError> {
for arg in call.positional.iter_mut() {
PluginCustomValue::verify_source(arg, source)?;
@ -772,14 +860,14 @@ impl Interface for PluginInterface {
// Validate the destination of values in the pipeline data
match data {
PipelineData::Value(mut value, meta) => {
PluginCustomValue::verify_source(&mut value, &self.state.identity)?;
PluginCustomValue::verify_source(&mut value, &self.state.source)?;
Ok(PipelineData::Value(value, meta))
}
PipelineData::ListStream(ListStream { stream, ctrlc, .. }, meta) => {
let identity = self.state.identity.clone();
let source = self.state.source.clone();
Ok(stream
.map(move |mut value| {
match PluginCustomValue::verify_source(&mut value, &identity) {
match PluginCustomValue::verify_source(&mut value, &source) {
Ok(()) => value,
// Put the error in the stream instead
Err(err) => Value::error(err, value.span()),

View File

@ -12,7 +12,7 @@ use crate::{
plugin::{
context::PluginExecutionBogusContext,
interface::{test_util::TestCase, Interface, InterfaceManager},
PluginIdentity,
PluginSource,
},
protocol::{
test_util::{expected_test_custom_value, test_plugin_custom_value},
@ -214,17 +214,22 @@ fn manager_consume_all_propagates_io_error_to_plugin_calls() -> Result<(), Shell
.consume_all(&mut test)
.expect_err("consume_all did not error");
// We have to hold interface until now otherwise consume_all won't try to process the message
drop(interface);
let message = rx.try_recv().expect("failed to get plugin call message");
match message {
ReceivedPluginCallMessage::Error(error) => {
check_test_io_error(&error);
Ok(())
}
_ => panic!("received something other than an error: {message:?}"),
}
// Check that further calls also cause the error
match interface.get_signature() {
Ok(_) => panic!("plugin call after exit did not cause error somehow"),
Err(err) => {
check_test_io_error(&err);
Ok(())
}
}
}
#[test]
@ -242,17 +247,22 @@ fn manager_consume_all_propagates_message_error_to_plugin_calls() -> Result<(),
.consume_all(&mut test)
.expect_err("consume_all did not error");
// We have to hold interface until now otherwise consume_all won't try to process the message
drop(interface);
let message = rx.try_recv().expect("failed to get plugin call message");
match message {
ReceivedPluginCallMessage::Error(error) => {
check_invalid_output_error(&error);
Ok(())
}
_ => panic!("received something other than an error: {message:?}"),
}
// Check that further calls also cause the error
match interface.get_signature() {
Ok(_) => panic!("plugin call after exit did not cause error somehow"),
Err(err) => {
check_invalid_output_error(&err);
Ok(())
}
}
}
#[test]
@ -640,7 +650,7 @@ fn manager_prepare_pipeline_data_adds_source_to_values() -> Result<(), ShellErro
.expect("custom value is not a PluginCustomValue");
if let Some(source) = &custom_value.source {
assert_eq!("test", source.plugin_name);
assert_eq!("test", source.name());
} else {
panic!("source was not set");
}
@ -670,7 +680,7 @@ fn manager_prepare_pipeline_data_adds_source_to_list_streams() -> Result<(), She
.expect("custom value is not a PluginCustomValue");
if let Some(source) = &custom_value.source {
assert_eq!("test", source.plugin_name);
assert_eq!("test", source.name());
} else {
panic!("source was not set");
}
@ -1086,7 +1096,7 @@ fn normal_values(interface: &PluginInterface) -> Vec<Value> {
name: "SomeTest".into(),
data: vec![1, 2, 3],
// Has the same source, so it should be accepted
source: Some(interface.state.identity.clone()),
source: Some(interface.state.source.clone()),
})),
]
}
@ -1144,7 +1154,7 @@ fn bad_custom_values() -> Vec<Value> {
Value::test_custom_value(Box::new(PluginCustomValue {
name: "SomeTest".into(),
data: vec![1, 2, 3],
source: Some(PluginIdentity::new_fake("pluto")),
source: Some(PluginSource::new_fake("pluto").into()),
})),
]
}

View File

@ -5,7 +5,7 @@ use std::{
use nu_protocol::ShellError;
use crate::{plugin::PluginIdentity, protocol::PluginInput, PluginOutput};
use crate::{plugin::PluginSource, protocol::PluginInput, PluginOutput};
use super::{EngineInterfaceManager, PluginInterfaceManager, PluginRead, PluginWrite};
@ -131,7 +131,7 @@ impl<I, O> TestCase<I, O> {
impl TestCase<PluginOutput, PluginInput> {
/// Create a new [`PluginInterfaceManager`] that writes to this test case.
pub(crate) fn plugin(&self, name: &str) -> PluginInterfaceManager {
PluginInterfaceManager::new(PluginIdentity::new_fake(name), self.clone())
PluginInterfaceManager::new(PluginSource::new_fake(name).into(), self.clone())
}
}

View File

@ -1,7 +1,5 @@
mod declaration;
pub use declaration::PluginDeclaration;
use nu_engine::documentation::get_flags_section;
use std::collections::HashMap;
use std::ffi::OsStr;
use std::sync::mpsc::TrySendError;
use std::sync::{mpsc, Arc, Mutex};
@ -17,22 +15,35 @@ use std::path::Path;
use std::process::{Child, ChildStdout, Command as CommandSys, Stdio};
use std::{env, thread};
#[cfg(unix)]
use std::os::unix::process::CommandExt;
#[cfg(windows)]
use std::os::windows::process::CommandExt;
use nu_protocol::{PipelineData, PluginSignature, ShellError, Spanned, Value};
mod interface;
pub use interface::EngineInterface;
pub(crate) use interface::PluginInterface;
mod context;
pub(crate) use context::PluginExecutionCommandContext;
mod identity;
pub(crate) use identity::PluginIdentity;
use self::interface::{InterfaceManager, PluginInterfaceManager};
use self::gc::PluginGc;
use super::EvaluatedCall;
mod context;
mod declaration;
mod gc;
mod interface;
mod persistent;
mod source;
pub use declaration::PluginDeclaration;
pub use interface::EngineInterface;
pub use persistent::PersistentPlugin;
pub(crate) use context::PluginExecutionCommandContext;
pub(crate) use interface::PluginInterface;
pub(crate) use source::PluginSource;
use interface::{InterfaceManager, PluginInterfaceManager};
pub(crate) const OUTPUT_BUFFER_SIZE: usize = 8192;
/// Encoder for a specific message type. Usually implemented on [`PluginInput`]
@ -119,12 +130,19 @@ fn create_command(path: &Path, shell: Option<&Path>) -> CommandSys {
// Both stdout and stdin are piped so we can receive information from the plugin
process.stdout(Stdio::piped()).stdin(Stdio::piped());
// The plugin should be run in a new process group to prevent Ctrl-C from stopping it
#[cfg(unix)]
process.process_group(0);
#[cfg(windows)]
process.creation_flags(windows::Win32::System::Threading::CREATE_NEW_PROCESS_GROUP.0);
process
}
fn make_plugin_interface(
mut child: Child,
identity: Arc<PluginIdentity>,
source: Arc<PluginSource>,
gc: Option<PluginGc>,
) -> Result<PluginInterface, ShellError> {
let stdin = child
.stdin
@ -144,7 +162,9 @@ fn make_plugin_interface(
let reader = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, stdout);
let mut manager = PluginInterfaceManager::new(identity, (Mutex::new(stdin), encoder));
let mut manager = PluginInterfaceManager::new(source.clone(), (Mutex::new(stdin), encoder));
manager.set_garbage_collector(gc);
let interface = manager.get_interface();
interface.hello()?;
@ -152,7 +172,10 @@ fn make_plugin_interface(
// we write, because we are expected to be able to handle multiple messages coming in from the
// plugin at any time, including stream messages like `Drop`.
std::thread::Builder::new()
.name("plugin interface reader".into())
.name(format!(
"plugin interface reader ({})",
source.identity.name()
))
.spawn(move || {
if let Err(err) = manager.consume_all((reader, encoder)) {
log::warn!("Error in PluginInterfaceManager: {err}");
@ -170,14 +193,16 @@ fn make_plugin_interface(
}
#[doc(hidden)] // Note: not for plugin authors / only used in nu-parser
pub fn get_signature(
path: &Path,
shell: Option<&Path>,
current_envs: &HashMap<String, String>,
) -> Result<Vec<PluginSignature>, ShellError> {
Arc::new(PluginIdentity::new(path, shell.map(|s| s.to_owned())))
.spawn(current_envs)?
.get_signature()
pub fn get_signature<E, K, V>(
plugin: Arc<PersistentPlugin>,
envs: impl FnOnce() -> Result<E, ShellError>,
) -> Result<Vec<PluginSignature>, ShellError>
where
E: IntoIterator<Item = (K, V)>,
K: AsRef<OsStr>,
V: AsRef<OsStr>,
{
plugin.get(envs)?.get_signature()
}
/// The basic API for a Nushell plugin

View File

@ -0,0 +1,186 @@
use std::{
ffi::OsStr,
sync::{Arc, Mutex},
};
use nu_protocol::{PluginGcConfig, PluginIdentity, RegisteredPlugin, ShellError};
use super::{create_command, gc::PluginGc, make_plugin_interface, PluginInterface, PluginSource};
/// A box that can keep a plugin that was spawned persistent for further uses. The plugin may or
/// may not be currently running. [`.get()`] gets the currently running plugin, or spawns it if it's
/// not running.
///
/// Note: used in the parser, not for plugin authors
#[doc(hidden)]
#[derive(Debug)]
pub struct PersistentPlugin {
/// Identity (filename, shell, name) of the plugin
identity: PluginIdentity,
/// Reference to the plugin if running
running: Mutex<Option<RunningPlugin>>,
/// Garbage collector config
gc_config: Mutex<PluginGcConfig>,
}
#[derive(Debug)]
struct RunningPlugin {
/// Process ID of the running plugin
pid: u32,
/// Interface (which can be cloned) to the running plugin
interface: PluginInterface,
/// Garbage collector for the plugin
gc: PluginGc,
}
impl PersistentPlugin {
/// Create a new persistent plugin. The plugin will not be spawned immediately.
pub fn new(identity: PluginIdentity, gc_config: PluginGcConfig) -> PersistentPlugin {
PersistentPlugin {
identity,
running: Mutex::new(None),
gc_config: Mutex::new(gc_config),
}
}
/// Get the plugin interface of the running plugin, or spawn it if it's not currently running.
///
/// Will call `envs` to get environment variables to spawn the plugin if the plugin needs to be
/// spawned.
pub(crate) fn get<E, K, V>(
self: Arc<Self>,
envs: impl FnOnce() -> Result<E, ShellError>,
) -> Result<PluginInterface, ShellError>
where
E: IntoIterator<Item = (K, V)>,
K: AsRef<OsStr>,
V: AsRef<OsStr>,
{
let mut running = self.running.lock().map_err(|_| ShellError::NushellFailed {
msg: format!(
"plugin `{}` running mutex poisoned, probably panic during spawn",
self.identity.name()
),
})?;
if let Some(ref running) = *running {
// It exists, so just clone the interface
Ok(running.interface.clone())
} else {
// Try to spawn, and then store the spawned plugin if we were successful.
//
// We hold the lock the whole time to prevent others from trying to spawn and ending
// up with duplicate plugins
let new_running = self.clone().spawn(envs()?)?;
let interface = new_running.interface.clone();
*running = Some(new_running);
Ok(interface)
}
}
/// Run the plugin command, then set up and return [`RunningPlugin`].
fn spawn(
self: Arc<Self>,
envs: impl IntoIterator<Item = (impl AsRef<OsStr>, impl AsRef<OsStr>)>,
) -> Result<RunningPlugin, ShellError> {
let source_file = self.identity.filename();
let mut plugin_cmd = create_command(source_file, self.identity.shell());
// We need the current environment variables for `python` based plugins
// Or we'll likely have a problem when a plugin is implemented in a virtual Python environment.
plugin_cmd.envs(envs);
let program_name = plugin_cmd.get_program().to_os_string().into_string();
// Run the plugin command
let child = plugin_cmd.spawn().map_err(|err| {
let error_msg = match err.kind() {
std::io::ErrorKind::NotFound => match program_name {
Ok(prog_name) => {
format!("Can't find {prog_name}, please make sure that {prog_name} is in PATH.")
}
_ => {
format!("Error spawning child process: {err}")
}
},
_ => {
format!("Error spawning child process: {err}")
}
};
ShellError::PluginFailedToLoad { msg: error_msg }
})?;
// Start the plugin garbage collector
let gc_config =
self.gc_config
.lock()
.map(|c| c.clone())
.map_err(|_| ShellError::NushellFailed {
msg: "plugin gc mutex poisoned".into(),
})?;
let gc = PluginGc::new(gc_config, &self)?;
let pid = child.id();
let interface =
make_plugin_interface(child, Arc::new(PluginSource::new(&self)), Some(gc.clone()))?;
Ok(RunningPlugin { pid, interface, gc })
}
}
impl RegisteredPlugin for PersistentPlugin {
fn identity(&self) -> &PluginIdentity {
&self.identity
}
fn is_running(&self) -> bool {
// If the lock is poisoned, we return false here. That may not be correct, but this is a
// failure state anyway that would be noticed at some point
self.running.lock().map(|r| r.is_some()).unwrap_or(false)
}
fn pid(&self) -> Option<u32> {
// Again, we return None for a poisoned lock.
self.running
.lock()
.ok()
.and_then(|r| r.as_ref().map(|r| r.pid))
}
fn stop(&self) -> Result<(), ShellError> {
let mut running = self.running.lock().map_err(|_| ShellError::NushellFailed {
msg: format!(
"plugin `{}` running mutex poisoned, probably panic during spawn",
self.identity.name()
),
})?;
// If the plugin is running, stop its GC, so that the GC doesn't accidentally try to stop
// a future plugin
if let Some(running) = running.as_ref() {
running.gc.stop_tracking();
}
// We don't try to kill the process or anything, we just drop the RunningPlugin. It should
// exit soon after
*running = None;
Ok(())
}
fn set_gc_config(&self, gc_config: &PluginGcConfig) {
if let Ok(mut conf) = self.gc_config.lock() {
// Save the new config for future calls
*conf = gc_config.clone();
}
if let Ok(running) = self.running.lock() {
if let Some(running) = running.as_ref() {
// If the plugin is already running, propagate the config change to the running GC
running.gc.set_config(gc_config.clone());
}
}
}
fn as_any(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync> {
self
}
}

View File

@ -0,0 +1,67 @@
use std::sync::{Arc, Weak};
use nu_protocol::{PluginIdentity, RegisteredPlugin, ShellError, Span};
use super::PersistentPlugin;
#[derive(Debug, Clone)]
pub(crate) struct PluginSource {
/// The identity of the plugin
pub(crate) identity: Arc<PluginIdentity>,
/// A weak reference to the persistent plugin that might hold an interface to the plugin.
///
/// This is weak to avoid cyclic references, but it does mean we might fail to upgrade if
/// the engine state lost the [`PersistentPlugin`] at some point.
pub(crate) persistent: Weak<PersistentPlugin>,
}
impl PluginSource {
/// Create from an `Arc<PersistentPlugin>`
pub(crate) fn new(plugin: &Arc<PersistentPlugin>) -> PluginSource {
PluginSource {
identity: plugin.identity().clone().into(),
persistent: Arc::downgrade(plugin),
}
}
/// Create a new fake source with a fake identity, for testing
///
/// Warning: [`.persistent()`] will always return an error.
#[cfg(test)]
pub(crate) fn new_fake(name: &str) -> PluginSource {
PluginSource {
identity: PluginIdentity::new_fake(name).into(),
persistent: Weak::new(),
}
}
/// Try to upgrade the persistent reference, and return an error referencing `span` as the
/// object that referenced it otherwise
pub(crate) fn persistent(
&self,
span: Option<Span>,
) -> Result<Arc<PersistentPlugin>, ShellError> {
self.persistent
.upgrade()
.ok_or_else(|| ShellError::GenericError {
error: format!("The `{}` plugin is no longer present", self.identity.name()),
msg: "removed since this object was created".into(),
span,
help: Some("try recreating the object that came from the plugin".into()),
inner: vec![],
})
}
/// Sources are compatible if their identities are equal
pub(crate) fn is_compatible(&self, other: &PluginSource) -> bool {
self.identity == other.identity
}
}
impl std::ops::Deref for PluginSource {
type Target = PluginIdentity;
fn deref(&self) -> &PluginIdentity {
&self.identity
}
}

View File

@ -320,6 +320,16 @@ impl PluginCallResponse<PipelineDataHeader> {
}
}
/// Options that can be changed to affect how the engine treats the plugin
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum PluginOption {
/// Send `GcDisabled(true)` to stop the plugin from being automatically garbage collected, or
/// `GcDisabled(false)` to enable it again.
///
/// See [`EngineInterface::set_gc_disabled`] for more information.
GcDisabled(bool),
}
/// Information received from the plugin
///
/// Note: exported for internal use, not public.
@ -328,6 +338,8 @@ impl PluginCallResponse<PipelineDataHeader> {
pub enum PluginOutput {
/// This must be the first message. Indicates supported protocol
Hello(ProtocolInfo),
/// Set option. No response expected
Option(PluginOption),
/// A response to a [`PluginCall`]. The ID should be the same sent with the plugin call this
/// is a response to
CallResponse(PluginCallId, PluginCallResponse<PipelineDataHeader>),

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use nu_protocol::{CustomValue, ShellError, Span, Spanned, Value};
use serde::{Deserialize, Serialize};
use crate::plugin::PluginIdentity;
use crate::plugin::PluginSource;
#[cfg(test)]
mod tests;
@ -17,7 +17,7 @@ mod tests;
/// that local plugin custom values are converted to and from [`PluginCustomData`] on the boundary.
///
/// [`PluginInterface`](crate::interface::PluginInterface) is responsible for adding the
/// appropriate [`PluginIdentity`](crate::plugin::PluginIdentity), ensuring that only
/// appropriate [`PluginSource`](crate::plugin::PluginSource), ensuring that only
/// [`PluginCustomData`] is contained within any values sent, and that the `source` of any
/// values sent matches the plugin it is being sent to.
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -30,7 +30,7 @@ pub struct PluginCustomValue {
/// Which plugin the custom value came from. This is not defined on the plugin side. The engine
/// side is responsible for maintaining it, and it is not sent over the serialization boundary.
#[serde(skip, default)]
pub source: Option<Arc<PluginIdentity>>,
pub(crate) source: Option<Arc<PluginSource>>,
}
#[typetag::serde]
@ -52,7 +52,7 @@ impl CustomValue for PluginCustomValue {
"Unable to spawn plugin `{}` to get base value",
self.source
.as_ref()
.map(|s| s.plugin_name.as_str())
.map(|s| s.name())
.unwrap_or("<unknown>")
),
msg: err.to_string(),
@ -61,14 +61,18 @@ impl CustomValue for PluginCustomValue {
inner: vec![err],
};
let identity = self.source.clone().ok_or_else(|| {
let source = self.source.clone().ok_or_else(|| {
wrap_err(ShellError::NushellFailed {
msg: "The plugin source for the custom value was not set".into(),
})
})?;
let empty_env: Option<(String, String)> = None;
let plugin = identity.spawn(empty_env).map_err(wrap_err)?;
// Envs probably should be passed here, but it's likely that the plugin is already running
let empty_envs = std::iter::empty::<(&str, &str)>();
let plugin = source
.persistent(Some(span))
.and_then(|p| p.get(|| Ok(empty_envs)))
.map_err(wrap_err)?;
plugin
.custom_value_to_base_value(Spanned {
@ -117,8 +121,8 @@ impl PluginCustomValue {
})
}
/// Add a [`PluginIdentity`] to all [`PluginCustomValue`]s within a value, recursively.
pub(crate) fn add_source(value: &mut Value, source: &Arc<PluginIdentity>) {
/// Add a [`PluginSource`] to all [`PluginCustomValue`]s within a value, recursively.
pub(crate) fn add_source(value: &mut Value, source: &Arc<PluginSource>) {
let span = value.span();
match value {
// Set source on custom value
@ -179,21 +183,26 @@ impl PluginCustomValue {
/// since `LazyRecord` could return something different the next time it is called.
pub(crate) fn verify_source(
value: &mut Value,
source: &PluginIdentity,
source: &PluginSource,
) -> Result<(), ShellError> {
let span = value.span();
match value {
// Set source on custom value
Value::CustomValue { val, .. } => {
if let Some(custom_value) = val.as_any().downcast_ref::<PluginCustomValue>() {
if custom_value.source.as_deref() == Some(source) {
if custom_value
.source
.as_ref()
.map(|s| s.is_compatible(source))
.unwrap_or(false)
{
Ok(())
} else {
Err(ShellError::CustomValueIncorrectForPlugin {
name: custom_value.name.clone(),
span,
dest_plugin: source.plugin_name.clone(),
src_plugin: custom_value.source.as_ref().map(|s| s.plugin_name.clone()),
dest_plugin: source.name().to_owned(),
src_plugin: custom_value.source.as_ref().map(|s| s.name().to_owned()),
})
}
} else {
@ -201,7 +210,7 @@ impl PluginCustomValue {
Err(ShellError::CustomValueIncorrectForPlugin {
name: val.value_string(),
span,
dest_plugin: source.plugin_name.clone(),
dest_plugin: source.name().to_owned(),
src_plugin: None,
})
}

View File

@ -1,9 +1,11 @@
use std::sync::Arc;
use nu_protocol::{
ast::RangeInclusion, engine::Closure, record, CustomValue, Range, ShellError, Span, Value,
};
use crate::{
plugin::PluginIdentity,
plugin::PluginSource,
protocol::test_util::{
expected_test_custom_value, test_plugin_custom_value, test_plugin_custom_value_with_source,
TestCustomValue,
@ -45,7 +47,7 @@ fn expected_serialize_output() -> Result<(), ShellError> {
#[test]
fn add_source_at_root() -> Result<(), ShellError> {
let mut val = Value::test_custom_value(Box::new(test_plugin_custom_value()));
let source = PluginIdentity::new_fake("foo");
let source = Arc::new(PluginSource::new_fake("foo"));
PluginCustomValue::add_source(&mut val, &source);
let custom_value = val.as_custom_value()?;
@ -53,7 +55,10 @@ fn add_source_at_root() -> Result<(), ShellError> {
.as_any()
.downcast_ref()
.expect("not PluginCustomValue");
assert_eq!(Some(source), plugin_custom_value.source);
assert_eq!(
Some(Arc::as_ptr(&source)),
plugin_custom_value.source.as_ref().map(Arc::as_ptr)
);
Ok(())
}
@ -84,7 +89,7 @@ fn add_source_nested_range() -> Result<(), ShellError> {
to: orig_custom_val.clone(),
inclusion: RangeInclusion::Inclusive,
});
let source = PluginIdentity::new_fake("foo");
let source = Arc::new(PluginSource::new_fake("foo"));
PluginCustomValue::add_source(&mut val, &source);
check_range_custom_values(&val, |name, custom_value| {
@ -93,8 +98,8 @@ fn add_source_nested_range() -> Result<(), ShellError> {
.downcast_ref()
.unwrap_or_else(|| panic!("{name} not PluginCustomValue"));
assert_eq!(
Some(&source),
plugin_custom_value.source.as_ref(),
Some(Arc::as_ptr(&source)),
plugin_custom_value.source.as_ref().map(Arc::as_ptr),
"{name} source not set correctly"
);
Ok(())
@ -126,7 +131,7 @@ fn add_source_nested_record() -> Result<(), ShellError> {
"foo" => orig_custom_val.clone(),
"bar" => orig_custom_val.clone(),
});
let source = PluginIdentity::new_fake("foo");
let source = Arc::new(PluginSource::new_fake("foo"));
PluginCustomValue::add_source(&mut val, &source);
check_record_custom_values(&val, &["foo", "bar"], |key, custom_value| {
@ -135,8 +140,8 @@ fn add_source_nested_record() -> Result<(), ShellError> {
.downcast_ref()
.unwrap_or_else(|| panic!("'{key}' not PluginCustomValue"));
assert_eq!(
Some(&source),
plugin_custom_value.source.as_ref(),
Some(Arc::as_ptr(&source)),
plugin_custom_value.source.as_ref().map(Arc::as_ptr),
"'{key}' source not set correctly"
);
Ok(())
@ -165,7 +170,7 @@ fn check_list_custom_values(
fn add_source_nested_list() -> Result<(), ShellError> {
let orig_custom_val = Value::test_custom_value(Box::new(test_plugin_custom_value()));
let mut val = Value::test_list(vec![orig_custom_val.clone(), orig_custom_val.clone()]);
let source = PluginIdentity::new_fake("foo");
let source = Arc::new(PluginSource::new_fake("foo"));
PluginCustomValue::add_source(&mut val, &source);
check_list_custom_values(&val, 0..=1, |index, custom_value| {
@ -174,8 +179,8 @@ fn add_source_nested_list() -> Result<(), ShellError> {
.downcast_ref()
.unwrap_or_else(|| panic!("[{index}] not PluginCustomValue"));
assert_eq!(
Some(&source),
plugin_custom_value.source.as_ref(),
Some(Arc::as_ptr(&source)),
plugin_custom_value.source.as_ref().map(Arc::as_ptr),
"[{index}] source not set correctly"
);
Ok(())
@ -209,7 +214,7 @@ fn add_source_nested_closure() -> Result<(), ShellError> {
block_id: 0,
captures: vec![(0, orig_custom_val.clone()), (1, orig_custom_val.clone())],
});
let source = PluginIdentity::new_fake("foo");
let source = Arc::new(PluginSource::new_fake("foo"));
PluginCustomValue::add_source(&mut val, &source);
check_closure_custom_values(&val, 0..=1, |index, custom_value| {
@ -218,8 +223,8 @@ fn add_source_nested_closure() -> Result<(), ShellError> {
.downcast_ref()
.unwrap_or_else(|| panic!("[{index}] not PluginCustomValue"));
assert_eq!(
Some(&source),
plugin_custom_value.source.as_ref(),
Some(Arc::as_ptr(&source)),
plugin_custom_value.source.as_ref().map(Arc::as_ptr),
"[{index}] source not set correctly"
);
Ok(())
@ -233,10 +238,10 @@ fn verify_source_error_message() -> Result<(), ShellError> {
let mut native_val = Value::custom_value(Box::new(TestCustomValue(32)), span);
let mut foreign_val = {
let mut val = test_plugin_custom_value();
val.source = Some(PluginIdentity::new_fake("other"));
val.source = Some(Arc::new(PluginSource::new_fake("other")));
Value::custom_value(Box::new(val), span)
};
let source = PluginIdentity::new_fake("test");
let source = PluginSource::new_fake("test");
PluginCustomValue::verify_source(&mut ok_val, &source).expect("ok_val should be verified ok");
@ -266,7 +271,7 @@ fn verify_source_error_message() -> Result<(), ShellError> {
#[test]
fn verify_source_nested_range() -> Result<(), ShellError> {
let native_val = Value::test_custom_value(Box::new(TestCustomValue(32)));
let source = PluginIdentity::new_fake("test");
let source = PluginSource::new_fake("test");
for (name, mut val) in [
(
"from",
@ -315,7 +320,7 @@ fn verify_source_nested_range() -> Result<(), ShellError> {
#[test]
fn verify_source_nested_record() -> Result<(), ShellError> {
let native_val = Value::test_custom_value(Box::new(TestCustomValue(32)));
let source = PluginIdentity::new_fake("test");
let source = PluginSource::new_fake("test");
for (name, mut val) in [
(
"first element foo",
@ -346,7 +351,7 @@ fn verify_source_nested_record() -> Result<(), ShellError> {
#[test]
fn verify_source_nested_list() -> Result<(), ShellError> {
let native_val = Value::test_custom_value(Box::new(TestCustomValue(32)));
let source = PluginIdentity::new_fake("test");
let source = PluginSource::new_fake("test");
for (name, mut val) in [
(
"first element",
@ -371,7 +376,7 @@ fn verify_source_nested_list() -> Result<(), ShellError> {
#[test]
fn verify_source_nested_closure() -> Result<(), ShellError> {
let native_val = Value::test_custom_value(Box::new(TestCustomValue(32)));
let source = PluginIdentity::new_fake("test");
let source = PluginSource::new_fake("test");
for (name, mut val) in [
(
"first capture",

View File

@ -1,7 +1,7 @@
use nu_protocol::{CustomValue, ShellError, Span, Value};
use serde::{Deserialize, Serialize};
use crate::plugin::PluginIdentity;
use crate::plugin::PluginSource;
use super::PluginCustomValue;
@ -44,7 +44,7 @@ pub(crate) fn expected_test_custom_value() -> TestCustomValue {
pub(crate) fn test_plugin_custom_value_with_source() -> PluginCustomValue {
PluginCustomValue {
source: Some(PluginIdentity::new_fake("test")),
source: Some(PluginSource::new_fake("test").into()),
..test_plugin_custom_value()
}
}

View File

@ -2,8 +2,8 @@ macro_rules! generate_tests {
($encoder:expr) => {
use crate::protocol::{
CallInfo, CustomValueOp, EvaluatedCall, LabeledError, PipelineDataHeader, PluginCall,
PluginCallResponse, PluginCustomValue, PluginInput, PluginOutput, StreamData,
StreamMessage,
PluginCallResponse, PluginCustomValue, PluginInput, PluginOption, PluginOutput,
StreamData, StreamMessage,
};
use nu_protocol::{PluginSignature, Span, Spanned, SyntaxShape, Value};
@ -531,6 +531,28 @@ macro_rules! generate_tests {
_ => panic!("decoded into wrong value: {returned:?}"),
}
}
#[test]
fn output_round_trip_option() {
let plugin_output = PluginOutput::Option(PluginOption::GcDisabled(true));
let encoder = $encoder;
let mut buffer: Vec<u8> = Vec::new();
encoder
.encode(&plugin_output, &mut buffer)
.expect("unable to serialize message");
let returned = encoder
.decode(&mut buffer.as_slice())
.expect("unable to deserialize message")
.expect("eof");
match returned {
PluginOutput::Option(PluginOption::GcDisabled(disabled)) => {
assert!(disabled);
}
_ => panic!("decoded into wrong value: {returned:?}"),
}
}
};
}