Redirect: support redirect stderr with piping stdout to next commands. (#10851)

# Description
Fixes: #10271

Given the following script:
```shell
# test.sh
echo aaaaa
echo bbbbb 1>&2
echo cc
```

This pr makes the following command possible:
```nushell
bash test.sh err> /dev/null | lines | each {|line| $line | str length}
```


## General idea behind the change:
When nushell redirect stderr message to external file
1. it take stdout of external stream, and pass this stream to next
command, so it won't block next pipeline command from running.
2. relative stderr stream are handled by `save` command

These two streams are handled separately, so we need to delegate a
thread to `save` command, or else we'll have a chance to hang nushell,
we have meet a similar before: #5625.

### One case to consider
What if we're failed to save to an external stream? (Like we don't have
a permission to save to a file)?
In this case nushell will just print a waning message, and don't stop
the following scripts from running.

# User-Facing Changes
## Before
```nushell
❯ bash test2.sh err> /dev/null | lines | each {|line| $line | str length}
aaaaa
cc
```

## After
```nushell
❯ bash test2.sh err> /dev/null | lines | each {|line| $line | str length}
╭───┬───╮
│ 0 │ 5 │
│ 1 │ 2 │
╰───┴───╯
```

BTY, after this pr, the following commands are impossible either, it's
important to make sure that the implementation doesn't introduce too
much costs:
```nushell
❯ echo a e> a.txt e> a.txt
Error:   × Can't make stderr redirection twice
   ╭─[entry #1:1:1]
 1 │ echo a e> a.txt e> a.txt
   ·                 ─┬
   ·                  ╰── try to remove one
   ╰────

❯ echo a o> a.txt o> a.txt
Error:   × Can't make stdout redirection twice
   ╭─[entry #2:1:1]
 1 │ echo a o> a.txt o> a.txt
   ·                 ─┬
   ·                  ╰── try to remove one
   ╰────
```
This commit is contained in:
WindSoilder 2023-11-23 10:11:00 +08:00 committed by GitHub
parent 6cfe35eb7e
commit 57808ca7cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 269 additions and 83 deletions

View File

@ -288,3 +288,53 @@ fn redirection_should_have_a_target() {
); );
} }
} }
#[test]
#[cfg(not(windows))]
fn redirection_with_pipe() {
use nu_test_support::fs::Stub::FileWithContent;
use nu_test_support::playground::Playground;
Playground::setup(
"external with many stdout and stderr messages",
|dirs, sandbox| {
let script_body = r#"
x=$(printf '=%.0s' {1..40})
echo -n $x
echo -n $x 1>&2
"#;
let mut expect_body = String::new();
for _ in 0..40 {
expect_body.push('=');
}
sandbox.with_files(vec![FileWithContent("test.sh", script_body)]);
// check for stdout
let actual = nu!(
cwd: dirs.test(),
"bash test.sh err> tmp_file | str length",
);
assert_eq!(actual.out, "40");
// check for stderr redirection file.
let expected_out_file = dirs.test().join("tmp_file");
let actual_len = file_contents(expected_out_file).len();
assert_eq!(actual_len, 40);
// check it inside a function
let actual = nu!(
cwd: dirs.test(),
"bash test.sh err> tmp_file; print aa"
);
assert!(actual.out.contains(&format!("{}aa", expect_body)));
},
)
}
#[test]
fn no_duplicate_redirection() {
let actual = nu!("echo 3 o> a.txt o> a.txt");
assert!(actual.err.contains("Redirection can be set only once"));
let actual = nu!("echo 3 e> a.txt e> a.txt");
assert!(actual.err.contains("Redirection can be set only once"));
}

View File

@ -10,6 +10,7 @@ use nu_protocol::{
ShellError, Span, Spanned, Unit, Value, VarId, ENV_VARIABLE_ID, ShellError, Span, Spanned, Unit, Value, VarId, ENV_VARIABLE_ID,
}; };
use std::collections::HashMap; use std::collections::HashMap;
use std::thread::{self, JoinHandle};
pub fn eval_call( pub fn eval_call(
engine_state: &EngineState, engine_state: &EngineState,
@ -742,6 +743,7 @@ fn eval_element_with_input(
mut input: PipelineData, mut input: PipelineData,
redirect_stdout: bool, redirect_stdout: bool,
redirect_stderr: bool, redirect_stderr: bool,
stderr_writer_jobs: &mut Vec<DataSaveJob>,
) -> Result<(PipelineData, bool), ShellError> { ) -> Result<(PipelineData, bool), ShellError> {
match element { match element {
PipelineElement::Expression(_, expr) => eval_expression_with_input( PipelineElement::Expression(_, expr) => eval_expression_with_input(
@ -752,7 +754,8 @@ fn eval_element_with_input(
redirect_stdout, redirect_stdout,
redirect_stderr, redirect_stderr,
), ),
PipelineElement::Redirection(span, redirection, expr) => match &expr.expr { PipelineElement::Redirection(span, redirection, expr) => {
match &expr.expr {
Expr::String(_) Expr::String(_)
| Expr::FullCellPath(_) | Expr::FullCellPath(_)
| Expr::StringInterpolation(_) | Expr::StringInterpolation(_)
@ -761,18 +764,22 @@ fn eval_element_with_input(
PipelineData::ExternalStream { exit_code, .. } => exit_code.take(), PipelineData::ExternalStream { exit_code, .. } => exit_code.take(),
_ => None, _ => None,
}; };
let input = match (redirection, input) {
// when nushell get Stderr Redirection, we want to take `stdout` part of `input`
// so this stdout stream can be handled by next command.
let (input, out_stream) = match (redirection, input) {
( (
Redirection::Stderr, Redirection::Stderr,
PipelineData::ExternalStream { PipelineData::ExternalStream {
stdout,
stderr, stderr,
exit_code, exit_code,
span, span,
metadata, metadata,
trim_end_newline, trim_end_newline,
..
}, },
) => PipelineData::ExternalStream { ) => (
PipelineData::ExternalStream {
stdout: stderr, stdout: stderr,
stderr: None, stderr: None,
exit_code, exit_code,
@ -780,11 +787,15 @@ fn eval_element_with_input(
metadata, metadata,
trim_end_newline, trim_end_newline,
}, },
(_, input) => input, Some(stdout),
),
(_, input) => (input, None),
}; };
if let Some(save_command) = engine_state.find_decl(b"save", &[]) { if let Some(save_command) = engine_state.find_decl(b"save", &[]) {
let save_call = gen_save_call(save_command, (*span, expr.clone()), None); let save_call = gen_save_call(save_command, (*span, expr.clone()), None);
match out_stream {
None => {
eval_call(engine_state, stack, &save_call, input).map(|_| { eval_call(engine_state, stack, &save_call, input).map(|_| {
// save is internal command, normally it exists with non-ExternalStream // save is internal command, normally it exists with non-ExternalStream
// but here in redirection context, we make it returns ExternalStream // but here in redirection context, we make it returns ExternalStream
@ -801,12 +812,40 @@ fn eval_element_with_input(
false, false,
) )
}) })
}
Some(out_stream) => {
// delegate to a different thread
// so nushell won't hang if external command generates both too much
// stderr and stdout message
let stderr_stack = stack.clone();
let engine_state_clone = engine_state.clone();
stderr_writer_jobs.push(DataSaveJob::spawn(
engine_state_clone,
stderr_stack,
save_call,
input,
));
Ok((
PipelineData::ExternalStream {
stdout: out_stream,
stderr: None,
exit_code,
span: *span,
metadata: None,
trim_end_newline: false,
},
false,
))
}
}
} else { } else {
Err(ShellError::CommandNotFound { span: *span }) Err(ShellError::CommandNotFound { span: *span })
} }
} }
_ => Err(ShellError::CommandNotFound { span: *span }), _ => Err(ShellError::CommandNotFound { span: *span }),
}, }
}
PipelineElement::SeparateRedirection { PipelineElement::SeparateRedirection {
out: (out_span, out_expr), out: (out_span, out_expr),
err: (err_span, err_expr), err: (err_span, err_expr),
@ -888,6 +927,7 @@ fn eval_element_with_input(
input, input,
true, true,
redirect_stderr, redirect_stderr,
stderr_writer_jobs,
) )
.map(|x| x.0)? .map(|x| x.0)?
} }
@ -903,6 +943,7 @@ fn eval_element_with_input(
input, input,
redirect_stdout, redirect_stdout,
redirect_stderr, redirect_stderr,
stderr_writer_jobs,
) )
} }
PipelineElement::And(_, expr) => eval_expression_with_input( PipelineElement::And(_, expr) => eval_expression_with_input(
@ -950,8 +991,8 @@ pub fn eval_block(
stack: &mut Stack, stack: &mut Stack,
block: &Block, block: &Block,
mut input: PipelineData, mut input: PipelineData,
redirect_stdout: bool, mut redirect_stdout: bool,
redirect_stderr: bool, mut redirect_stderr: bool,
) -> Result<PipelineData, ShellError> { ) -> Result<PipelineData, ShellError> {
// if Block contains recursion, make sure we don't recurse too deeply (to avoid stack overflow) // if Block contains recursion, make sure we don't recurse too deeply (to avoid stack overflow)
if let Some(recursive) = block.recursive { if let Some(recursive) = block.recursive {
@ -972,28 +1013,49 @@ pub fn eval_block(
let num_pipelines = block.len(); let num_pipelines = block.len();
for (pipeline_idx, pipeline) in block.pipelines.iter().enumerate() { for (pipeline_idx, pipeline) in block.pipelines.iter().enumerate() {
let mut elements_iter = pipeline.elements.iter().peekable(); let mut stderr_writer_jobs = vec![];
while let Some(element) = elements_iter.next() { let elements = &pipeline.elements;
let redirect_stderr = redirect_stderr let elements_length = elements.len();
|| (elements_iter.peek().map_or(false, |next_element| { for (idx, element) in elements.iter().enumerate() {
matches!( if !redirect_stderr && idx < elements_length - 1 {
let next_element = &elements[idx + 1];
if matches!(
next_element, next_element,
PipelineElement::Redirection(_, Redirection::Stderr, _) PipelineElement::Redirection(_, Redirection::Stderr, _)
| PipelineElement::Redirection(_, Redirection::StdoutAndStderr, _) | PipelineElement::Redirection(_, Redirection::StdoutAndStderr, _)
| PipelineElement::SeparateRedirection { .. } | PipelineElement::SeparateRedirection { .. }
) ) {
})); redirect_stderr = true;
}
}
let redirect_stdout = redirect_stdout if !redirect_stdout && idx < elements_length - 1 {
|| (elements_iter.peek().map_or(false, |next_element| { let next_element = &elements[idx + 1];
matches!( match next_element {
next_element, // is next element a stdout relative redirection?
PipelineElement::Redirection(_, Redirection::Stdout, _) PipelineElement::Redirection(_, Redirection::Stdout, _)
| PipelineElement::Redirection(_, Redirection::StdoutAndStderr, _) | PipelineElement::Redirection(_, Redirection::StdoutAndStderr, _)
| PipelineElement::Expression(..)
| PipelineElement::SeparateRedirection { .. } | PipelineElement::SeparateRedirection { .. }
) | PipelineElement::Expression(..) => redirect_stdout = true,
}));
PipelineElement::Redirection(_, Redirection::Stderr, _) => {
// a stderr redirection, but we still need to check for the next 2nd
// element, to handle for the following case:
// cat a.txt err> /dev/null | lines
//
// we only need to check the next 2nd element because we already make sure
// that we don't have duplicate err> like this:
// cat a.txt err> /dev/null err> /tmp/a
if idx < elements_length - 2 {
let next_2nd_element = &elements[idx + 2];
if matches!(next_2nd_element, PipelineElement::Expression(..)) {
redirect_stdout = true
}
}
}
_ => {}
}
}
// if eval internal command failed, it can just make early return with `Err(ShellError)`. // if eval internal command failed, it can just make early return with `Err(ShellError)`.
let eval_result = eval_element_with_input( let eval_result = eval_element_with_input(
@ -1003,15 +1065,12 @@ pub fn eval_block(
input, input,
redirect_stdout, redirect_stdout,
redirect_stderr, redirect_stderr,
&mut stderr_writer_jobs,
); );
match (eval_result, redirect_stderr) { match (eval_result, redirect_stderr) {
(Ok((pipeline_data, _)), true) => { (Ok((pipeline_data, _)), true) => {
input = pipeline_data; input = pipeline_data;
// external command may runs to failed
// make early return so remaining commands will not be executed.
// don't return `Err(ShellError)`, so nushell wouldn't show extra error message.
} }
(Err(error), true) => { (Err(error), true) => {
input = PipelineData::Value( input = PipelineData::Value(
@ -1035,6 +1094,12 @@ pub fn eval_block(
} }
} }
// `eval_element_with_input` may creates some threads
// to write stderr message to a file, here we need to wait and make sure that it's
// finished.
for h in stderr_writer_jobs {
let _ = h.join();
}
if pipeline_idx < (num_pipelines) - 1 { if pipeline_idx < (num_pipelines) - 1 {
match input { match input {
PipelineData::Value(Value::Nothing { .. }, ..) => {} PipelineData::Value(Value::Nothing { .. }, ..) => {}
@ -1070,8 +1135,24 @@ pub fn eval_subexpression(
mut input: PipelineData, mut input: PipelineData,
) -> Result<PipelineData, ShellError> { ) -> Result<PipelineData, ShellError> {
for pipeline in block.pipelines.iter() { for pipeline in block.pipelines.iter() {
let mut stderr_writer_jobs = vec![];
for expr in pipeline.elements.iter() { for expr in pipeline.elements.iter() {
input = eval_element_with_input(engine_state, stack, expr, input, true, false)?.0 input = eval_element_with_input(
engine_state,
stack,
expr,
input,
true,
false,
&mut stderr_writer_jobs,
)?
.0
}
// `eval_element_with_input` may creates some threads
// to write stderr message to a file, here we need to wait and make sure that it's
// finished.
for h in stderr_writer_jobs {
let _ = h.join();
} }
} }
@ -1161,3 +1242,33 @@ fn gen_save_call(
parser_info: HashMap::new(), parser_info: HashMap::new(),
} }
} }
/// A job which saves `PipelineData` to a file in a child thread.
struct DataSaveJob {
inner: JoinHandle<()>,
}
impl DataSaveJob {
pub fn spawn(
engine_state: EngineState,
mut stack: Stack,
save_call: Call,
input: PipelineData,
) -> Self {
Self {
inner: thread::Builder::new()
.name("stderr saver".to_string())
.spawn(move || {
let result = eval_call(&engine_state, &mut stack, &save_call, input);
if let Err(err) = result {
eprintln!("WARNING: error occurred when redirect to stderr: {:?}", err);
}
})
.expect("Failed to create thread"),
}
}
pub fn join(self) -> thread::Result<()> {
self.inner.join()
}
}

View File

@ -71,6 +71,17 @@ impl LitePipeline {
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.commands.is_empty() self.commands.is_empty()
} }
pub fn exists(&self, new_target: Redirection) -> bool {
for cmd in &self.commands {
if let LiteElement::Redirection(_, exists_target, _) = cmd {
if exists_target == &new_target {
return true;
}
}
}
false
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -367,7 +378,7 @@ pub fn lite_parse(tokens: &[Token]) -> (LiteBlock, Option<ParseError>) {
/// push a `command` to `pipeline` /// push a `command` to `pipeline`
/// ///
/// It will return Some(err) if `command` is empty and we want to push a /// It will return Some(err) if `command` is empty and we want to push a
/// redirection command. /// redirection command, or we have meet the same redirection in `pipeline`.
fn push_command_to( fn push_command_to(
pipeline: &mut LitePipeline, pipeline: &mut LitePipeline,
command: LiteCommand, command: LiteCommand,
@ -377,14 +388,28 @@ fn push_command_to(
if !command.is_empty() { if !command.is_empty() {
match last_connector { match last_connector {
TokenContents::OutGreaterThan => { TokenContents::OutGreaterThan => {
pipeline.push(LiteElement::Redirection( let span = last_connector_span
last_connector_span .expect("internal error: redirection missing span information");
.expect("internal error: redirection missing span information"), if pipeline.exists(Redirection::Stdout) {
Redirection::Stdout, return Some(ParseError::LabeledError(
command, "Redirection can be set only once".into(),
"try to remove one".into(),
span,
)); ));
} }
pipeline.push(LiteElement::Redirection(span, Redirection::Stdout, command));
}
TokenContents::ErrGreaterThan => { TokenContents::ErrGreaterThan => {
let span = last_connector_span
.expect("internal error: redirection missing span information");
if pipeline.exists(Redirection::Stderr) {
return Some(ParseError::LabeledError(
"Redirection can be set only once".into(),
"try to remove one".into(),
span,
));
}
pipeline.push(LiteElement::Redirection( pipeline.push(LiteElement::Redirection(
last_connector_span last_connector_span
.expect("internal error: redirection missing span information"), .expect("internal error: redirection missing span information"),

View File

@ -2,7 +2,7 @@ use crate::{ast::Expression, engine::StateWorkingSet, Span, VarId};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::ops::{Index, IndexMut}; use std::ops::{Index, IndexMut};
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub enum Redirection { pub enum Redirection {
Stdout, Stdout,
Stderr, Stderr,