mirror of
https://github.com/nushell/nushell.git
synced 2025-07-01 07:00:37 +02:00
IO and redirection overhaul (#11934)
# Description The PR overhauls how IO redirection is handled, allowing more explicit and fine-grain control over `stdout` and `stderr` output as well as more efficient IO and piping. To summarize the changes in this PR: - Added a new `IoStream` type to indicate the intended destination for a pipeline element's `stdout` and `stderr`. - The `stdout` and `stderr` `IoStream`s are stored in the `Stack` and to avoid adding 6 additional arguments to every eval function and `Command::run`. The `stdout` and `stderr` streams can be temporarily overwritten through functions on `Stack` and these functions will return a guard that restores the original `stdout` and `stderr` when dropped. - In the AST, redirections are now directly part of a `PipelineElement` as a `Option<Redirection>` field instead of having multiple different `PipelineElement` enum variants for each kind of redirection. This required changes to the parser, mainly in `lite_parser.rs`. - `Command`s can also set a `IoStream` override/redirection which will apply to the previous command in the pipeline. This is used, for example, in `ignore` to allow the previous external command to have its stdout redirected to `Stdio::null()` at spawn time. In contrast, the current implementation has to create an os pipe and manually consume the output on nushell's side. File and pipe redirections (`o>`, `e>`, `e>|`, etc.) have precedence over overrides from commands. This PR improves piping and IO speed, partially addressing #10763. Using the `throughput` command from that issue, this PR gives the following speedup on my setup for the commands below: | Command | Before (MB/s) | After (MB/s) | Bash (MB/s) | | --------------------------- | -------------:| ------------:| -----------:| | `throughput o> /dev/null` | 1169 | 52938 | 54305 | | `throughput \| ignore` | 840 | 55438 | N/A | | `throughput \| null` | Error | 53617 | N/A | | `throughput \| rg 'x'` | 1165 | 3049 | 3736 | | `(throughput) \| rg 'x'` | 810 | 3085 | 3815 | (Numbers above are the median samples for throughput) This PR also paves the way to refactor our `ExternalStream` handling in the various commands. For example, this PR already fixes the following code: ```nushell ^sh -c 'echo -n "hello "; sleep 0; echo "world"' | find "hello world" ``` This returns an empty list on 0.90.1 and returns a highlighted "hello world" on this PR. Since the `stdout` and `stderr` `IoStream`s are available to commands when they are run, then this unlocks the potential for more convenient behavior. E.g., the `find` command can disable its ansi highlighting if it detects that the output `IoStream` is not the terminal. Knowing the output streams will also allow background job output to be redirected more easily and efficiently. # User-Facing Changes - External commands returned from closures will be collected (in most cases): ```nushell 1..2 | each {|_| nu -c "print a" } ``` This gives `["a", "a"]` on this PR, whereas this used to print "a\na\n" and then return an empty list. ```nushell 1..2 | each {|_| nu -c "print -e a" } ``` This gives `["", ""]` and prints "a\na\n" to stderr, whereas this used to return an empty list and print "a\na\n" to stderr. - Trailing new lines are always trimmed for external commands when piping into internal commands or collecting it as a value. (Failure to decode the output as utf-8 will keep the trailing newline for the last binary value.) In the current nushell version, the following three code snippets differ only in parenthesis placement, but they all also have different outputs: 1. `1..2 | each { ^echo a }` ``` a a ╭────────────╮ │ empty list │ ╰────────────╯ ``` 2. `1..2 | each { (^echo a) }` ``` ╭───┬───╮ │ 0 │ a │ │ 1 │ a │ ╰───┴───╯ ``` 3. `1..2 | (each { ^echo a })` ``` ╭───┬───╮ │ 0 │ a │ │ │ │ │ 1 │ a │ │ │ │ ╰───┴───╯ ``` But in this PR, the above snippets will all have the same output: ``` ╭───┬───╮ │ 0 │ a │ │ 1 │ a │ ╰───┴───╯ ``` - All existing flags on `run-external` are now deprecated. - File redirections now apply to all commands inside a code block: ```nushell (nu -c "print -e a"; nu -c "print -e b") e> test.out ``` This gives "a\nb\n" in `test.out` and prints nothing. The same result would happen when printing to stdout and using a `o>` file redirection. - External command output will (almost) never be ignored, and ignoring output must be explicit now: ```nushell (^echo a; ^echo b) ``` This prints "a\nb\n", whereas this used to print only "b\n". This only applies to external commands; values and internal commands not in return position will not print anything (e.g., `(echo a; echo b)` still only prints "b"). - `complete` now always captures stderr (`do` is not necessary). # After Submitting The language guide and other documentation will need to be updated.
This commit is contained in:
@ -2,14 +2,12 @@ use nu_cmd_base::hook::eval_hook;
|
||||
use nu_engine::env_to_strings;
|
||||
use nu_engine::get_eval_expression;
|
||||
use nu_engine::CallExt;
|
||||
use nu_protocol::IntoSpanned;
|
||||
use nu_protocol::NuGlob;
|
||||
use nu_protocol::{
|
||||
ast::{Call, Expr},
|
||||
did_you_mean,
|
||||
engine::{Command, EngineState, Stack},
|
||||
Category, Example, ListStream, PipelineData, RawStream, ShellError, Signature, Span, Spanned,
|
||||
SyntaxShape, Type, Value,
|
||||
Category, Example, IntoSpanned, IoStream, ListStream, NuGlob, PipelineData, RawStream,
|
||||
ShellError, Signature, Span, Spanned, SyntaxShape, Type, Value,
|
||||
};
|
||||
use nu_system::ForegroundChild;
|
||||
use nu_utils::IgnoreCaseExt;
|
||||
@ -19,14 +17,9 @@ use std::collections::HashMap;
|
||||
use std::io::{BufRead, BufReader, Read, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::{Command as CommandSys, Stdio};
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::mpsc::{self, SyncSender};
|
||||
use std::sync::Arc;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
|
||||
const OUTPUT_BUFFER_SIZE: usize = 1024;
|
||||
const OUTPUT_BUFFERS_IN_FLIGHT: usize = 3;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct External;
|
||||
|
||||
@ -76,15 +69,61 @@ impl Command for External {
|
||||
});
|
||||
}
|
||||
|
||||
let command = create_external_command(
|
||||
engine_state,
|
||||
stack,
|
||||
call,
|
||||
redirect_stdout,
|
||||
redirect_stderr,
|
||||
redirect_combine,
|
||||
trim_end_newline,
|
||||
)?;
|
||||
if trim_end_newline {
|
||||
nu_protocol::report_error_new(
|
||||
engine_state,
|
||||
&ShellError::GenericError {
|
||||
error: "Deprecated flag".into(),
|
||||
msg: "`--trim-end-newline` is deprecated".into(),
|
||||
span: Some(call.arguments_span()),
|
||||
help: Some(
|
||||
"trailing new lines are now removed by default when collecting into a value"
|
||||
.into(),
|
||||
),
|
||||
inner: vec![],
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
if redirect_combine {
|
||||
nu_protocol::report_error_new(
|
||||
engine_state,
|
||||
&ShellError::GenericError {
|
||||
error: "Deprecated flag".into(),
|
||||
msg: "`--redirect-combine` is deprecated".into(),
|
||||
span: Some(call.arguments_span()),
|
||||
help: Some("use the `o+e>|` pipe redirection instead".into()),
|
||||
inner: vec![],
|
||||
},
|
||||
);
|
||||
} else if redirect_stdout {
|
||||
nu_protocol::report_error_new(
|
||||
engine_state,
|
||||
&ShellError::GenericError {
|
||||
error: "Deprecated flag".into(),
|
||||
msg: "`--redirect-stdout` is deprecated".into(),
|
||||
span: Some(call.arguments_span()),
|
||||
help: Some(
|
||||
"`run-external` will now always redirect stdout if there is a pipe `|` afterwards"
|
||||
.into(),
|
||||
),
|
||||
inner: vec![],
|
||||
},
|
||||
);
|
||||
} else if redirect_stderr {
|
||||
nu_protocol::report_error_new(
|
||||
engine_state,
|
||||
&ShellError::GenericError {
|
||||
error: "Deprecated flag".into(),
|
||||
msg: "`--redirect-stderr` is deprecated".into(),
|
||||
span: Some(call.arguments_span()),
|
||||
help: Some("use the `e>|` stderr pipe redirection instead".into()),
|
||||
inner: vec![],
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
let command = create_external_command(engine_state, stack, call)?;
|
||||
|
||||
command.run_with_input(engine_state, stack, input, false)
|
||||
}
|
||||
@ -98,7 +137,12 @@ impl Command for External {
|
||||
},
|
||||
Example {
|
||||
description: "Redirect stdout from an external command into the pipeline",
|
||||
example: r#"run-external --redirect-stdout "echo" "-n" "hello" | split chars"#,
|
||||
example: r#"run-external "echo" "-n" "hello" | split chars"#,
|
||||
result: None,
|
||||
},
|
||||
Example {
|
||||
description: "Redirect stderr from an external command into the pipeline",
|
||||
example: r#"run-external "nu" "-c" "print -e hello" e>| split chars"#,
|
||||
result: None,
|
||||
},
|
||||
]
|
||||
@ -110,10 +154,6 @@ pub fn create_external_command(
|
||||
engine_state: &EngineState,
|
||||
stack: &mut Stack,
|
||||
call: &Call,
|
||||
redirect_stdout: bool,
|
||||
redirect_stderr: bool,
|
||||
redirect_combine: bool,
|
||||
trim_end_newline: bool,
|
||||
) -> Result<ExternalCommand, ShellError> {
|
||||
let name: Spanned<String> = call.req(engine_state, stack, 0)?;
|
||||
|
||||
@ -180,11 +220,9 @@ pub fn create_external_command(
|
||||
name,
|
||||
args: spanned_args,
|
||||
arg_keep_raw,
|
||||
redirect_stdout,
|
||||
redirect_stderr,
|
||||
redirect_combine,
|
||||
out: stack.stdout().clone(),
|
||||
err: stack.stderr().clone(),
|
||||
env_vars: env_vars_str,
|
||||
trim_end_newline,
|
||||
})
|
||||
}
|
||||
|
||||
@ -193,11 +231,9 @@ pub struct ExternalCommand {
|
||||
pub name: Spanned<String>,
|
||||
pub args: Vec<Spanned<String>>,
|
||||
pub arg_keep_raw: Vec<bool>,
|
||||
pub redirect_stdout: bool,
|
||||
pub redirect_stderr: bool,
|
||||
pub redirect_combine: bool,
|
||||
pub out: IoStream,
|
||||
pub err: IoStream,
|
||||
pub env_vars: HashMap<String, String>,
|
||||
pub trim_end_newline: bool,
|
||||
}
|
||||
|
||||
impl ExternalCommand {
|
||||
@ -364,6 +400,7 @@ impl ExternalCommand {
|
||||
let mut engine_state = engine_state.clone();
|
||||
if let Some(hook) = engine_state.config.hooks.command_not_found.clone()
|
||||
{
|
||||
let stack = &mut stack.start_capture();
|
||||
if let Ok(PipelineData::Value(Value::String { val, .. }, ..)) =
|
||||
eval_hook(
|
||||
&mut engine_state,
|
||||
@ -412,6 +449,7 @@ impl ExternalCommand {
|
||||
thread::Builder::new()
|
||||
.name("external stdin worker".to_string())
|
||||
.spawn(move || {
|
||||
let stack = &mut stack.start_capture();
|
||||
// Attempt to render the input as a table before piping it to the external.
|
||||
// This is important for pagers like `less`;
|
||||
// they need to get Nu data rendered for display to users.
|
||||
@ -421,7 +459,7 @@ impl ExternalCommand {
|
||||
let input = crate::Table::run(
|
||||
&crate::Table,
|
||||
&engine_state,
|
||||
&mut stack,
|
||||
stack,
|
||||
&Call::new(head),
|
||||
input,
|
||||
);
|
||||
@ -447,63 +485,66 @@ impl ExternalCommand {
|
||||
|
||||
#[cfg(unix)]
|
||||
let commandname = self.name.item.clone();
|
||||
let redirect_stdout = self.redirect_stdout;
|
||||
let redirect_stderr = self.redirect_stderr;
|
||||
let redirect_combine = self.redirect_combine;
|
||||
let span = self.name.span;
|
||||
let output_ctrlc = ctrlc.clone();
|
||||
let stderr_ctrlc = ctrlc.clone();
|
||||
let (stdout_tx, stdout_rx) = mpsc::sync_channel(OUTPUT_BUFFERS_IN_FLIGHT);
|
||||
let (exit_code_tx, exit_code_rx) = mpsc::channel();
|
||||
|
||||
let stdout = child.as_mut().stdout.take();
|
||||
let stderr = child.as_mut().stderr.take();
|
||||
let (stdout, stderr) = if let Some(combined) = reader {
|
||||
(
|
||||
Some(RawStream::new(
|
||||
Box::new(ByteLines::new(combined)),
|
||||
ctrlc.clone(),
|
||||
head,
|
||||
None,
|
||||
)),
|
||||
None,
|
||||
)
|
||||
} else {
|
||||
let stdout = child.as_mut().stdout.take().map(|out| {
|
||||
RawStream::new(Box::new(ByteLines::new(out)), ctrlc.clone(), head, None)
|
||||
});
|
||||
|
||||
// If this external is not the last expression, then its output is piped to a channel
|
||||
// and we create a ListStream that can be consumed
|
||||
let stderr = child.as_mut().stderr.take().map(|err| {
|
||||
RawStream::new(Box::new(ByteLines::new(err)), ctrlc.clone(), head, None)
|
||||
});
|
||||
|
||||
// First create a thread to redirect the external's stdout and wait for an exit code.
|
||||
if matches!(self.err, IoStream::Pipe) {
|
||||
(stderr, stdout)
|
||||
} else {
|
||||
(stdout, stderr)
|
||||
}
|
||||
};
|
||||
|
||||
// Create a thread to wait for an exit code.
|
||||
thread::Builder::new()
|
||||
.name("stdout redirector + exit code waiter".to_string())
|
||||
.name("exit code waiter".into())
|
||||
.spawn(move || {
|
||||
if redirect_stdout {
|
||||
let stdout = stdout.ok_or_else(|| {
|
||||
ShellError::ExternalCommand { label: "Error taking stdout from external".to_string(), help: "Redirects need access to stdout of an external command"
|
||||
.to_string(), span }
|
||||
})?;
|
||||
match child.as_mut().wait() {
|
||||
Err(err) => Err(ShellError::ExternalCommand {
|
||||
label: "External command exited with error".into(),
|
||||
help: err.to_string(),
|
||||
span
|
||||
}),
|
||||
Ok(x) => {
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use nu_ansi_term::{Color, Style};
|
||||
use std::ffi::CStr;
|
||||
use std::os::unix::process::ExitStatusExt;
|
||||
|
||||
read_and_redirect_message(stdout, stdout_tx, ctrlc)
|
||||
} else if redirect_combine {
|
||||
let stdout = reader.ok_or_else(|| {
|
||||
ShellError::ExternalCommand { label: "Error taking combined stdout and stderr from external".to_string(), help: "Combined redirects need access to reader pipe of an external command"
|
||||
.to_string(), span }
|
||||
})?;
|
||||
read_and_redirect_message(stdout, stdout_tx, ctrlc)
|
||||
}
|
||||
if x.core_dumped() {
|
||||
let cause = x.signal().and_then(|sig| unsafe {
|
||||
// SAFETY: We should be the first to call `char * strsignal(int sig)`
|
||||
let sigstr_ptr = libc::strsignal(sig);
|
||||
if sigstr_ptr.is_null() {
|
||||
return None;
|
||||
}
|
||||
|
||||
match child.as_mut().wait() {
|
||||
Err(err) => Err(ShellError::ExternalCommand { label: "External command exited with error".into(), help: err.to_string(), span }),
|
||||
Ok(x) => {
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use nu_ansi_term::{Color, Style};
|
||||
use std::ffi::CStr;
|
||||
use std::os::unix::process::ExitStatusExt;
|
||||
// SAFETY: The pointer points to a valid non-null string
|
||||
let sigstr = CStr::from_ptr(sigstr_ptr);
|
||||
sigstr.to_str().map(String::from).ok()
|
||||
});
|
||||
|
||||
if x.core_dumped() {
|
||||
let cause = x.signal().and_then(|sig| unsafe {
|
||||
// SAFETY: We should be the first to call `char * strsignal(int sig)`
|
||||
let sigstr_ptr = libc::strsignal(sig);
|
||||
if sigstr_ptr.is_null() {
|
||||
return None;
|
||||
}
|
||||
|
||||
// SAFETY: The pointer points to a valid non-null string
|
||||
let sigstr = CStr::from_ptr(sigstr_ptr);
|
||||
sigstr.to_str().map(String::from).ok()
|
||||
});
|
||||
|
||||
let cause = cause.as_deref().unwrap_or("Something went wrong");
|
||||
let cause = cause.as_deref().unwrap_or("Something went wrong");
|
||||
|
||||
let style = Style::new().bold().on(Color::Red);
|
||||
eprintln!(
|
||||
@ -531,56 +572,18 @@ impl ExternalCommand {
|
||||
}
|
||||
}).map_err(|e| e.into_spanned(head))?;
|
||||
|
||||
let (stderr_tx, stderr_rx) = mpsc::sync_channel(OUTPUT_BUFFERS_IN_FLIGHT);
|
||||
if redirect_stderr {
|
||||
thread::Builder::new()
|
||||
.name("stderr redirector".to_string())
|
||||
.spawn(move || {
|
||||
let stderr = stderr.ok_or_else(|| ShellError::ExternalCommand {
|
||||
label: "Error taking stderr from external".to_string(),
|
||||
help: "Redirects need access to stderr of an external command"
|
||||
.to_string(),
|
||||
span,
|
||||
})?;
|
||||
|
||||
read_and_redirect_message(stderr, stderr_tx, stderr_ctrlc);
|
||||
Ok::<(), ShellError>(())
|
||||
})
|
||||
.map_err(|e| e.into_spanned(head))?;
|
||||
}
|
||||
|
||||
let stdout_receiver = ChannelReceiver::new(stdout_rx);
|
||||
let stderr_receiver = ChannelReceiver::new(stderr_rx);
|
||||
let exit_code_receiver = ValueReceiver::new(exit_code_rx);
|
||||
|
||||
Ok(PipelineData::ExternalStream {
|
||||
stdout: if redirect_stdout || redirect_combine {
|
||||
Some(RawStream::new(
|
||||
Box::new(stdout_receiver),
|
||||
output_ctrlc.clone(),
|
||||
head,
|
||||
None,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
},
|
||||
stderr: if redirect_stderr {
|
||||
Some(RawStream::new(
|
||||
Box::new(stderr_receiver),
|
||||
output_ctrlc.clone(),
|
||||
head,
|
||||
None,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
},
|
||||
stdout,
|
||||
stderr,
|
||||
exit_code: Some(ListStream::from_stream(
|
||||
Box::new(exit_code_receiver),
|
||||
output_ctrlc,
|
||||
ctrlc.clone(),
|
||||
)),
|
||||
span: head,
|
||||
metadata: None,
|
||||
trim_end_newline: self.trim_end_newline,
|
||||
trim_end_newline: true,
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -621,20 +624,15 @@ impl ExternalCommand {
|
||||
|
||||
// If the external is not the last command, its output will get piped
|
||||
// either as a string or binary
|
||||
let reader = if self.redirect_combine {
|
||||
let reader = if matches!(self.out, IoStream::Pipe) && matches!(self.err, IoStream::Pipe) {
|
||||
let (reader, writer) = os_pipe::pipe()?;
|
||||
let writer_clone = writer.try_clone()?;
|
||||
process.stdout(writer);
|
||||
process.stderr(writer_clone);
|
||||
Some(reader)
|
||||
} else {
|
||||
if self.redirect_stdout {
|
||||
process.stdout(Stdio::piped());
|
||||
}
|
||||
|
||||
if self.redirect_stderr {
|
||||
process.stderr(Stdio::piped());
|
||||
}
|
||||
process.stdout(Stdio::try_from(&self.out)?);
|
||||
process.stderr(Stdio::try_from(&self.err)?);
|
||||
None
|
||||
};
|
||||
|
||||
@ -824,63 +822,27 @@ fn remove_quotes(input: String) -> String {
|
||||
}
|
||||
}
|
||||
|
||||
// read message from given `reader`, and send out through `sender`.
|
||||
//
|
||||
// `ctrlc` is used to control the process, if ctrl-c is pressed, the read and redirect
|
||||
// process will be breaked.
|
||||
fn read_and_redirect_message<R>(
|
||||
reader: R,
|
||||
sender: SyncSender<Vec<u8>>,
|
||||
ctrlc: Option<Arc<AtomicBool>>,
|
||||
) where
|
||||
R: Read,
|
||||
{
|
||||
// read using the BufferReader. It will do so until there is an
|
||||
// error or there are no more bytes to read
|
||||
let mut buf_read = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, reader);
|
||||
while let Ok(bytes) = buf_read.fill_buf() {
|
||||
if bytes.is_empty() {
|
||||
break;
|
||||
}
|
||||
struct ByteLines<R: Read>(BufReader<R>);
|
||||
|
||||
// The Cow generated from the function represents the conversion
|
||||
// from bytes to String. If no replacements are required, then the
|
||||
// borrowed value is a proper UTF-8 string. The Owned option represents
|
||||
// a string where the values had to be replaced, thus marking it as bytes
|
||||
let bytes = bytes.to_vec();
|
||||
let length = bytes.len();
|
||||
buf_read.consume(length);
|
||||
|
||||
if nu_utils::ctrl_c::was_pressed(&ctrlc) {
|
||||
break;
|
||||
}
|
||||
|
||||
match sender.send(bytes) {
|
||||
Ok(_) => continue,
|
||||
Err(_) => break,
|
||||
}
|
||||
impl<R: Read> ByteLines<R> {
|
||||
fn new(read: R) -> Self {
|
||||
Self(BufReader::new(read))
|
||||
}
|
||||
}
|
||||
|
||||
// Receiver used for the RawStream
|
||||
// It implements iterator so it can be used as a RawStream
|
||||
struct ChannelReceiver {
|
||||
rx: mpsc::Receiver<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl ChannelReceiver {
|
||||
pub fn new(rx: mpsc::Receiver<Vec<u8>>) -> Self {
|
||||
Self { rx }
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for ChannelReceiver {
|
||||
impl<R: Read> Iterator for ByteLines<R> {
|
||||
type Item = Result<Vec<u8>, ShellError>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
match self.rx.recv() {
|
||||
Ok(v) => Some(Ok(v)),
|
||||
Err(_) => None,
|
||||
let mut buf = Vec::new();
|
||||
// `read_until` will never stop reading unless `\n` or EOF is encountered,
|
||||
// so let's limit the number of bytes using `take` as the Rust docs suggest.
|
||||
let capacity = self.0.capacity() as u64;
|
||||
let mut reader = (&mut self.0).take(capacity);
|
||||
match reader.read_until(b'\n', &mut buf) {
|
||||
Ok(0) => None,
|
||||
Ok(_) => Some(Ok(buf)),
|
||||
Err(e) => Some(Err(e.into())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user