Replace ExternalStream with new ByteStream type (#12774)

# Description
This PR introduces a `ByteStream` type which is a `Read`-able stream of
bytes. Internally, it has an enum over three different byte stream
sources:
```rust
pub enum ByteStreamSource {
    Read(Box<dyn Read + Send + 'static>),
    File(File),
    Child(ChildProcess),
}
```

This is in comparison to the current `RawStream` type, which is an
`Iterator<Item = Vec<u8>>` and has to allocate for each read chunk.

Currently, `PipelineData::ExternalStream` serves a weird dual role where
it is either external command output or a wrapper around `RawStream`.
`ByteStream` makes this distinction more clear (via `ByteStreamSource`)
and replaces `PipelineData::ExternalStream` in this PR:
```rust
pub enum PipelineData {
    Empty,
    Value(Value, Option<PipelineMetadata>),
    ListStream(ListStream, Option<PipelineMetadata>),
    ByteStream(ByteStream, Option<PipelineMetadata>),
}
```

The PR is relatively large, but a decent amount of it is just repetitive
changes.

This PR fixes #7017, fixes #10763, and fixes #12369.

This PR also improves performance when piping external commands. Nushell
should, in most cases, have competitive pipeline throughput compared to,
e.g., bash.
| Command | Before (MB/s) | After (MB/s) | Bash (MB/s) |
| -------------------------------------------------- | -------------:|
------------:| -----------:|
| `throughput \| rg 'x'` | 3059 | 3744 | 3739 |
| `throughput \| nu --testbin relay o> /dev/null` | 3508 | 8087 | 8136 |

# User-Facing Changes
- This is a breaking change for the plugin communication protocol,
because the `ExternalStreamInfo` was replaced with `ByteStreamInfo`.
Plugins now only have to deal with a single input stream, as opposed to
the previous three streams: stdout, stderr, and exit code.
- The output of `describe` has been changed for external/byte streams.
- Temporary breaking change: `bytes starts-with` no longer works with
byte streams. This is to keep the PR smaller, and `bytes ends-with`
already does not work on byte streams.
- If a process core dumped, then instead of having a `Value::Error` in
the `exit_code` column of the output returned from `complete`, it now is
a `Value::Int` with the negation of the signal number.

# After Submitting
- Update docs and book as necessary
- Release notes (e.g., plugin protocol changes)
- Adapt/convert commands to work with byte streams (high priority is
`str length`, `bytes starts-with`, and maybe `bytes ends-with`).
- Refactor the `tee` code, Devyn has already done some work on this.

---------

Co-authored-by: Devyn Cairns <devyn.cairns@gmail.com>
This commit is contained in:
Ian Manske
2024-05-16 14:11:18 +00:00
committed by GitHub
parent 1b8eb23785
commit 6fd854ed9f
210 changed files with 3955 additions and 4012 deletions

View File

@ -1,8 +1,8 @@
use super::util::get_rest_for_glob_pattern;
#[allow(deprecated)]
use nu_engine::{command_prelude::*, current_dir, get_eval_block};
use nu_protocol::{BufferedReader, DataSource, NuGlob, PipelineMetadata, RawStream};
use std::{io::BufReader, path::Path};
use nu_protocol::{ByteStream, DataSource, NuGlob, PipelineMetadata};
use std::path::Path;
#[cfg(feature = "sqlite")]
use crate::database::SQLiteDatabase;
@ -143,23 +143,13 @@ impl Command for Open {
}
};
let buf_reader = BufReader::new(file);
let file_contents = PipelineData::ExternalStream {
stdout: Some(RawStream::new(
Box::new(BufferedReader::new(buf_reader)),
ctrlc.clone(),
call_span,
None,
)),
stderr: None,
exit_code: None,
span: call_span,
metadata: Some(PipelineMetadata {
let stream = PipelineData::ByteStream(
ByteStream::file(file, call_span, ctrlc.clone()),
Some(PipelineMetadata {
data_source: DataSource::FilePath(path.to_path_buf()),
}),
trim_end_newline: false,
};
);
let exts_opt: Option<Vec<String>> = if raw {
None
} else {
@ -184,9 +174,9 @@ impl Command for Open {
let decl = engine_state.get_decl(converter_id);
let command_output = if let Some(block_id) = decl.get_block_id() {
let block = engine_state.get_block(block_id);
eval_block(engine_state, stack, block, file_contents)
eval_block(engine_state, stack, block, stream)
} else {
decl.run(engine_state, stack, &Call::new(call_span), file_contents)
decl.run(engine_state, stack, &Call::new(call_span), stream)
};
output.push(command_output.map_err(|inner| {
ShellError::GenericError{
@ -198,7 +188,7 @@ impl Command for Open {
}
})?);
}
None => output.push(file_contents),
None => output.push(stream),
}
}
}

View File

@ -5,12 +5,15 @@ use nu_engine::{command_prelude::*, current_dir};
use nu_path::expand_path_with;
use nu_protocol::{
ast::{Expr, Expression},
DataSource, OutDest, PipelineMetadata, RawStream,
byte_stream::copy_with_interrupt,
process::ChildPipe,
ByteStreamSource, DataSource, OutDest, PipelineMetadata,
};
use std::{
fs::File,
io::Write,
io::{self, BufRead, BufReader, Read, Write},
path::{Path, PathBuf},
sync::{atomic::AtomicBool, Arc},
thread,
};
@ -104,12 +107,7 @@ impl Command for Save {
});
match input {
PipelineData::ExternalStream {
stdout,
stderr,
metadata,
..
} => {
PipelineData::ByteStream(stream, metadata) => {
check_saving_to_source_file(metadata.as_ref(), &path, stderr_path.as_ref())?;
let (file, stderr_file) = get_files(
@ -121,40 +119,97 @@ impl Command for Save {
force,
)?;
match (stdout, stderr) {
(Some(stdout), stderr) => {
// delegate a thread to redirect stderr to result.
let handler = stderr
.map(|stderr| match stderr_file {
Some(stderr_file) => thread::Builder::new()
.name("stderr redirector".to_string())
.spawn(move || {
stream_to_file(stderr, stderr_file, span, progress)
}),
None => thread::Builder::new()
.name("stderr redirector".to_string())
.spawn(move || stderr.drain()),
})
.transpose()
.err_span(span)?;
let size = stream.known_size();
let ctrlc = engine_state.ctrlc.clone();
let res = stream_to_file(stdout, file, span, progress);
if let Some(h) = handler {
h.join().map_err(|err| ShellError::ExternalCommand {
label: "Fail to receive external commands stderr message"
.to_string(),
help: format!("{err:?}"),
span,
})??;
}
res?;
match stream.into_source() {
ByteStreamSource::Read(read) => {
stream_to_file(read, size, ctrlc, file, span, progress)?;
}
(None, Some(stderr)) => match stderr_file {
Some(stderr_file) => stream_to_file(stderr, stderr_file, span, progress)?,
None => stderr.drain()?,
},
(None, None) => {}
};
ByteStreamSource::File(source) => {
stream_to_file(source, size, ctrlc, file, span, progress)?;
}
ByteStreamSource::Child(mut child) => {
fn write_or_consume_stderr(
stderr: ChildPipe,
file: Option<File>,
span: Span,
ctrlc: Option<Arc<AtomicBool>>,
progress: bool,
) -> Result<(), ShellError> {
if let Some(file) = file {
match stderr {
ChildPipe::Pipe(pipe) => {
stream_to_file(pipe, None, ctrlc, file, span, progress)
}
ChildPipe::Tee(tee) => {
stream_to_file(tee, None, ctrlc, file, span, progress)
}
}?
} else {
match stderr {
ChildPipe::Pipe(mut pipe) => {
io::copy(&mut pipe, &mut io::sink())
}
ChildPipe::Tee(mut tee) => io::copy(&mut tee, &mut io::sink()),
}
.err_span(span)?;
}
Ok(())
}
match (child.stdout.take(), child.stderr.take()) {
(Some(stdout), stderr) => {
// delegate a thread to redirect stderr to result.
let handler = stderr
.map(|stderr| {
let ctrlc = ctrlc.clone();
thread::Builder::new().name("stderr saver".into()).spawn(
move || {
write_or_consume_stderr(
stderr,
stderr_file,
span,
ctrlc,
progress,
)
},
)
})
.transpose()
.err_span(span)?;
let res = match stdout {
ChildPipe::Pipe(pipe) => {
stream_to_file(pipe, None, ctrlc, file, span, progress)
}
ChildPipe::Tee(tee) => {
stream_to_file(tee, None, ctrlc, file, span, progress)
}
};
if let Some(h) = handler {
h.join().map_err(|err| ShellError::ExternalCommand {
label: "Fail to receive external commands stderr message"
.to_string(),
help: format!("{err:?}"),
span,
})??;
}
res?;
}
(None, Some(stderr)) => {
write_or_consume_stderr(
stderr,
stderr_file,
span,
ctrlc,
progress,
)?;
}
(None, None) => {}
};
}
}
Ok(PipelineData::Empty)
}
@ -302,8 +357,7 @@ fn input_to_bytes(
) -> Result<Vec<u8>, ShellError> {
let ext = if raw {
None
// if is extern stream , in other words , not value
} else if let PipelineData::ExternalStream { .. } = input {
} else if let PipelineData::ByteStream(..) = input {
None
} else if let PipelineData::Value(Value::String { .. }, ..) = input {
None
@ -318,7 +372,7 @@ fn input_to_bytes(
input
};
value_to_bytes(input.into_value(span))
value_to_bytes(input.into_value(span)?)
}
/// Convert given data into content of file of specified extension if
@ -448,84 +502,54 @@ fn get_files(
}
fn stream_to_file(
mut stream: RawStream,
mut source: impl Read,
known_size: Option<u64>,
ctrlc: Option<Arc<AtomicBool>>,
mut file: File,
span: Span,
progress: bool,
) -> Result<(), ShellError> {
// https://github.com/nushell/nushell/pull/9377 contains the reason
// for not using BufWriter<File>
let writer = &mut file;
// https://github.com/nushell/nushell/pull/9377 contains the reason for not using `BufWriter`
if progress {
let mut bytes_processed = 0;
let mut bytes_processed: u64 = 0;
let bytes_processed_p = &mut bytes_processed;
let file_total_size = stream.known_size;
let mut process_failed = false;
let process_failed_p = &mut process_failed;
let mut bar = progress_bar::NuProgressBar::new(known_size);
// Create the progress bar
// It looks a bit messy but I am doing it this way to avoid
// creating the bar when is not needed
let (mut bar_opt, bar_opt_clone) = if progress {
let tmp_bar = progress_bar::NuProgressBar::new(file_total_size);
let tmp_bar_clone = tmp_bar.clone();
// TODO: reduce the number of progress bar updates?
(Some(tmp_bar), Some(tmp_bar_clone))
} else {
(None, None)
};
let mut reader = BufReader::new(source);
stream.try_for_each(move |result| {
let buf = match result {
Ok(v) => match v {
Value::String { val, .. } => val.into_bytes(),
Value::Binary { val, .. } => val,
// Propagate errors by explicitly matching them before the final case.
Value::Error { error, .. } => return Err(*error),
other => {
return Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "string or binary".into(),
wrong_type: other.get_type().to_string(),
dst_span: span,
src_span: other.span(),
});
let res = loop {
if nu_utils::ctrl_c::was_pressed(&ctrlc) {
bar.abandoned_msg("# Cancelled #".to_owned());
return Ok(());
}
match reader.fill_buf() {
Ok(&[]) => break Ok(()),
Ok(buf) => {
file.write_all(buf).err_span(span)?;
let len = buf.len();
reader.consume(len);
bytes_processed += len as u64;
bar.update_bar(bytes_processed);
}
},
Err(err) => {
*process_failed_p = true;
return Err(err);
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => break Err(e),
}
};
// If the `progress` flag is set then
if progress {
// Update the total amount of bytes that has been saved and then print the progress bar
*bytes_processed_p += buf.len() as u64;
if let Some(bar) = &mut bar_opt {
bar.update_bar(*bytes_processed_p);
}
}
if let Err(err) = writer.write_all(&buf) {
*process_failed_p = true;
return Err(ShellError::IOError {
msg: err.to_string(),
});
}
Ok(())
})?;
// If the `progress` flag is set then
if progress {
// If the process failed, stop the progress bar with an error message.
if process_failed {
if let Some(bar) = bar_opt_clone {
bar.abandoned_msg("# Error while saving #".to_owned());
}
if let Err(err) = res {
let _ = file.flush();
bar.abandoned_msg("# Error while saving #".to_owned());
Err(err.into_spanned(span).into())
} else {
file.flush().err_span(span)?;
Ok(())
}
} else {
copy_with_interrupt(&mut source, &mut file, span, ctrlc.as_deref())?;
Ok(())
}
file.flush()?;
Ok(())
}