nushell/crates/nu-plugin-engine/src/init.rs
Ian Manske 6fd854ed9f
Replace ExternalStream with new ByteStream type (#12774)
# Description
This PR introduces a `ByteStream` type which is a `Read`-able stream of
bytes. Internally, it has an enum over three different byte stream
sources:
```rust
pub enum ByteStreamSource {
    Read(Box<dyn Read + Send + 'static>),
    File(File),
    Child(ChildProcess),
}
```

This is in comparison to the current `RawStream` type, which is an
`Iterator<Item = Vec<u8>>` and has to allocate for each read chunk.

Currently, `PipelineData::ExternalStream` serves a weird dual role where
it is either external command output or a wrapper around `RawStream`.
`ByteStream` makes this distinction more clear (via `ByteStreamSource`)
and replaces `PipelineData::ExternalStream` in this PR:
```rust
pub enum PipelineData {
    Empty,
    Value(Value, Option<PipelineMetadata>),
    ListStream(ListStream, Option<PipelineMetadata>),
    ByteStream(ByteStream, Option<PipelineMetadata>),
}
```

The PR is relatively large, but a decent amount of it is just repetitive
changes.

This PR fixes #7017, fixes #10763, and fixes #12369.

This PR also improves performance when piping external commands. Nushell
should, in most cases, have competitive pipeline throughput compared to,
e.g., bash.
| Command | Before (MB/s) | After (MB/s) | Bash (MB/s) |
| -------------------------------------------------- | -------------:|
------------:| -----------:|
| `throughput \| rg 'x'` | 3059 | 3744 | 3739 |
| `throughput \| nu --testbin relay o> /dev/null` | 3508 | 8087 | 8136 |

# User-Facing Changes
- This is a breaking change for the plugin communication protocol,
because the `ExternalStreamInfo` was replaced with `ByteStreamInfo`.
Plugins now only have to deal with a single input stream, as opposed to
the previous three streams: stdout, stderr, and exit code.
- The output of `describe` has been changed for external/byte streams.
- Temporary breaking change: `bytes starts-with` no longer works with
byte streams. This is to keep the PR smaller, and `bytes ends-with`
already does not work on byte streams.
- If a process core dumped, then instead of having a `Value::Error` in
the `exit_code` column of the output returned from `complete`, it now is
a `Value::Int` with the negation of the signal number.

# After Submitting
- Update docs and book as necessary
- Release notes (e.g., plugin protocol changes)
- Adapt/convert commands to work with byte streams (high priority is
`str length`, `bytes starts-with`, and maybe `bytes ends-with`).
- Refactor the `tee` code, Devyn has already done some work on this.

---------

Co-authored-by: Devyn Cairns <devyn.cairns@gmail.com>
2024-05-16 07:11:18 -07:00

310 lines
11 KiB
Rust

use std::{
io::{BufReader, BufWriter},
path::Path,
process::Child,
sync::{Arc, Mutex},
};
#[cfg(unix)]
use std::os::unix::process::CommandExt;
#[cfg(windows)]
use std::os::windows::process::CommandExt;
use nu_plugin_core::{
CommunicationMode, EncodingType, InterfaceManager, PreparedServerCommunication,
ServerCommunicationIo,
};
use nu_protocol::{
engine::StateWorkingSet, report_error_new, PluginIdentity, PluginRegistryFile,
PluginRegistryItem, PluginRegistryItemData, RegisteredPlugin, ShellError, Span,
};
use crate::{
PersistentPlugin, PluginDeclaration, PluginGc, PluginInterface, PluginInterfaceManager,
PluginSource,
};
/// This should be larger than the largest commonly sent message to avoid excessive fragmentation.
///
/// The buffers coming from byte streams are typically each 8192 bytes, so double that.
pub(crate) const OUTPUT_BUFFER_SIZE: usize = 16384;
/// Spawn the command for a plugin, in the given `mode`. After spawning, it can be passed to
/// [`make_plugin_interface()`] to get a [`PluginInterface`].
pub fn create_command(
path: &Path,
mut shell: Option<&Path>,
mode: &CommunicationMode,
) -> std::process::Command {
log::trace!("Starting plugin: {path:?}, shell = {shell:?}, mode = {mode:?}");
let mut shell_args = vec![];
if shell.is_none() {
// We only have to do this for things that are not executable by Rust's Command API on
// Windows. They do handle bat/cmd files for us, helpfully.
//
// Also include anything that wouldn't be executable with a shebang, like JAR files.
shell = match path.extension().and_then(|e| e.to_str()) {
Some("sh") => {
if cfg!(unix) {
// We don't want to override what might be in the shebang if this is Unix, since
// some scripts will have a shebang specifying bash even if they're .sh
None
} else {
Some(Path::new("sh"))
}
}
Some("nu") => {
shell_args.push("--stdin");
Some(Path::new("nu"))
}
Some("py") => Some(Path::new("python")),
Some("rb") => Some(Path::new("ruby")),
Some("jar") => {
shell_args.push("-jar");
Some(Path::new("java"))
}
_ => None,
};
}
let mut process = if let Some(shell) = shell {
let mut process = std::process::Command::new(shell);
process.args(shell_args);
process.arg(path);
process
} else {
std::process::Command::new(path)
};
process.args(mode.args());
// Setup I/O according to the communication mode
mode.setup_command_io(&mut process);
// The plugin should be run in a new process group to prevent Ctrl-C from stopping it
#[cfg(unix)]
process.process_group(0);
#[cfg(windows)]
process.creation_flags(windows::Win32::System::Threading::CREATE_NEW_PROCESS_GROUP.0);
// In order to make bugs with improper use of filesystem without getting the engine current
// directory more obvious, the plugin always starts in the directory of its executable
if let Some(dirname) = path.parent() {
process.current_dir(dirname);
}
process
}
/// Create a plugin interface from a spawned child process.
///
/// `comm` determines the communication type the process was spawned with, and whether stdio will
/// be taken from the child.
pub fn make_plugin_interface(
mut child: Child,
comm: PreparedServerCommunication,
source: Arc<PluginSource>,
pid: Option<u32>,
gc: Option<PluginGc>,
) -> Result<PluginInterface, ShellError> {
match comm.connect(&mut child)? {
ServerCommunicationIo::Stdio(stdin, stdout) => make_plugin_interface_with_streams(
stdout,
stdin,
move || {
let _ = child.wait();
},
source,
pid,
gc,
),
#[cfg(feature = "local-socket")]
ServerCommunicationIo::LocalSocket { read_out, write_in } => {
make_plugin_interface_with_streams(
read_out,
write_in,
move || {
let _ = child.wait();
},
source,
pid,
gc,
)
}
}
}
/// Create a plugin interface from low-level components.
///
/// - `after_close` is called to clean up after the `reader` ends.
/// - `source` is required so that custom values produced by the plugin can spawn it.
/// - `pid` may be provided for process management (e.g. `EnterForeground`).
/// - `gc` may be provided for communication with the plugin's GC (e.g. `SetGcDisabled`).
pub fn make_plugin_interface_with_streams(
mut reader: impl std::io::Read + Send + 'static,
writer: impl std::io::Write + Send + 'static,
after_close: impl FnOnce() + Send + 'static,
source: Arc<PluginSource>,
pid: Option<u32>,
gc: Option<PluginGc>,
) -> Result<PluginInterface, ShellError> {
let encoder = get_plugin_encoding(&mut reader)?;
let reader = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, reader);
let writer = BufWriter::with_capacity(OUTPUT_BUFFER_SIZE, writer);
let mut manager =
PluginInterfaceManager::new(source.clone(), pid, (Mutex::new(writer), encoder));
manager.set_garbage_collector(gc);
let interface = manager.get_interface();
interface.hello()?;
// Spawn the reader on a new thread. We need to be able to read messages at the same time that
// we write, because we are expected to be able to handle multiple messages coming in from the
// plugin at any time, including stream messages like `Drop`.
std::thread::Builder::new()
.name(format!(
"plugin interface reader ({})",
source.identity.name()
))
.spawn(move || {
if let Err(err) = manager.consume_all((reader, encoder)) {
log::warn!("Error in PluginInterfaceManager: {err}");
}
// If the loop has ended, drop the manager so everyone disconnects and then run
// after_close
drop(manager);
after_close();
})
.map_err(|err| ShellError::PluginFailedToLoad {
msg: format!("Failed to spawn thread for plugin: {err}"),
})?;
Ok(interface)
}
/// Determine the plugin's encoding from a freshly opened stream.
///
/// The plugin is expected to send a 1-byte length and either `json` or `msgpack`, so this reads
/// that and determines the right length.
pub fn get_plugin_encoding(
child_stdout: &mut impl std::io::Read,
) -> Result<EncodingType, ShellError> {
let mut length_buf = [0u8; 1];
child_stdout
.read_exact(&mut length_buf)
.map_err(|e| ShellError::PluginFailedToLoad {
msg: format!("unable to get encoding from plugin: {e}"),
})?;
let mut buf = vec![0u8; length_buf[0] as usize];
child_stdout
.read_exact(&mut buf)
.map_err(|e| ShellError::PluginFailedToLoad {
msg: format!("unable to get encoding from plugin: {e}"),
})?;
EncodingType::try_from_bytes(&buf).ok_or_else(|| {
let encoding_for_debug = String::from_utf8_lossy(&buf);
ShellError::PluginFailedToLoad {
msg: format!("get unsupported plugin encoding: {encoding_for_debug}"),
}
})
}
/// Load the definitions from the plugin file into the engine state
pub fn load_plugin_file(
working_set: &mut StateWorkingSet,
plugin_registry_file: &PluginRegistryFile,
span: Option<Span>,
) {
for plugin in &plugin_registry_file.plugins {
// Any errors encountered should just be logged.
if let Err(err) = load_plugin_registry_item(working_set, plugin, span) {
report_error_new(working_set.permanent_state, &err)
}
}
}
/// Load a definition from the plugin file into the engine state
pub fn load_plugin_registry_item(
working_set: &mut StateWorkingSet,
plugin: &PluginRegistryItem,
span: Option<Span>,
) -> Result<Arc<PersistentPlugin>, ShellError> {
let identity =
PluginIdentity::new(plugin.filename.clone(), plugin.shell.clone()).map_err(|_| {
ShellError::GenericError {
error: "Invalid plugin filename in plugin registry file".into(),
msg: "loaded from here".into(),
span,
help: Some(format!(
"the filename for `{}` is not a valid nushell plugin: {}",
plugin.name,
plugin.filename.display()
)),
inner: vec![],
}
})?;
match &plugin.data {
PluginRegistryItemData::Valid { commands } => {
let plugin = add_plugin_to_working_set(working_set, &identity)?;
// Ensure that the plugin is reset. We're going to load new signatures, so we want to
// make sure the running plugin reflects those new signatures, and it's possible that it
// doesn't.
plugin.reset()?;
// Create the declarations from the commands
for signature in commands {
let decl = PluginDeclaration::new(plugin.clone(), signature.clone());
working_set.add_decl(Box::new(decl));
}
Ok(plugin)
}
PluginRegistryItemData::Invalid => Err(ShellError::PluginRegistryDataInvalid {
plugin_name: identity.name().to_owned(),
span,
add_command: identity.add_command(),
}),
}
}
/// Find [`PersistentPlugin`] with the given `identity` in the `working_set`, or construct it
/// if it doesn't exist.
///
/// The garbage collection config is always found and set in either case.
pub fn add_plugin_to_working_set(
working_set: &mut StateWorkingSet,
identity: &PluginIdentity,
) -> Result<Arc<PersistentPlugin>, ShellError> {
// Find garbage collection config for the plugin
let gc_config = working_set
.get_config()
.plugin_gc
.get(identity.name())
.clone();
// Add it to / get it from the working set
let plugin = working_set.find_or_create_plugin(identity, || {
Arc::new(PersistentPlugin::new(identity.clone(), gc_config.clone()))
});
plugin.set_gc_config(&gc_config);
// Downcast the plugin to `PersistentPlugin` - we generally expect this to succeed.
// The trait object only exists so that nu-protocol can contain plugins without knowing
// anything about their implementation, but we only use `PersistentPlugin` in practice.
plugin
.as_any()
.downcast()
.map_err(|_| ShellError::NushellFailed {
msg: "encountered unexpected RegisteredPlugin type".into(),
})
}