avoid freeze when capturing external stderr (#6700)

* avoid freeze when capturing external stderr

* try replace from sh to bash

* change description

* fmt code
This commit is contained in:
WindSoilder 2022-10-12 21:41:20 +08:00 committed by GitHub
parent 0bbb3a20df
commit 5815f122ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 130 additions and 22 deletions

View File

@ -37,22 +37,9 @@ impl Command for Complete {
let mut cols = vec![];
let mut vals = vec![];
if let Some(stdout) = stdout {
cols.push("stdout".to_string());
let stdout = stdout.into_bytes()?;
if let Ok(st) = String::from_utf8(stdout.item.clone()) {
vals.push(Value::String {
val: st,
span: stdout.span,
})
} else {
vals.push(Value::Binary {
val: stdout.item,
span: stdout.span,
})
}
}
// the order is important, we need to read
// stderr, then stdout, then exit_code
// because run_external generate them in this order.
if let Some(stderr) = stderr {
cols.push("stderr".to_string());
let stderr = stderr.into_bytes()?;
@ -69,6 +56,22 @@ impl Command for Complete {
};
}
if let Some(stdout) = stdout {
cols.push("stdout".to_string());
let stdout = stdout.into_bytes()?;
if let Ok(st) = String::from_utf8(stdout.item.clone()) {
vals.push(Value::String {
val: st,
span: stdout.span,
})
} else {
vals.push(Value::Binary {
val: stdout.item,
span: stdout.span,
})
}
}
if let Some(exit_code) = exit_code {
let mut v: Vec<_> = exit_code.collect();

View File

@ -337,6 +337,13 @@ impl ExternalCommand {
let mut buf_read = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, stderr);
while let Ok(bytes) = buf_read.fill_buf() {
if bytes.is_empty() {
// drop stderr sender manually, so stderr message consumer
// can make sure that there is no more stderr messages.
//
// and message consumer can continue to wait stdout message.
// If we don't make manually drop, and external command produces many
// stdout messages, relative message consumer will hang on so we'll get a deadlock.
drop(stderr_tx);
break;
}
@ -438,11 +445,15 @@ impl ExternalCommand {
} else {
None
},
stderr: Some(RawStream::new(
Box::new(stderr_receiver),
output_ctrlc.clone(),
head,
)),
stderr: if redirect_stderr {
Some(RawStream::new(
Box::new(stderr_receiver),
output_ctrlc.clone(),
head,
))
} else {
None
},
exit_code: Some(ListStream::from_stream(
Box::new(exit_code_receiver),
output_ctrlc,

View File

@ -59,3 +59,68 @@ fn do_with_semicolon_break_on_failed_external() {
assert_eq!(actual.out, "");
}
#[test]
#[cfg(not(windows))]
fn ignore_error_not_hang_nushell() {
use nu_test_support::fs::Stub::FileWithContent;
use nu_test_support::pipeline;
use nu_test_support::playground::Playground;
Playground::setup("external with many stderr message", |dirs, sandbox| {
let bytes: usize = 81920;
let mut large_file_body = String::with_capacity(bytes);
for _ in 0..bytes {
large_file_body.push_str("a");
}
sandbox.with_files(vec![FileWithContent("a_large_file.txt", &large_file_body)]);
let actual = nu!(
cwd: dirs.test(), pipeline(
r#"
do -i {sh -c "cat a_large_file.txt 1>&2"} | complete | get stderr
"#
));
assert_eq!(actual.out, large_file_body);
})
}
#[test]
#[cfg(not(windows))]
fn ignore_error_with_both_stdout_stderr_messages_not_hang_nushell() {
use nu_test_support::fs::Stub::FileWithContent;
use nu_test_support::playground::Playground;
Playground::setup(
"external with many stdout and stderr messages",
|dirs, sandbox| {
let script_body = r#"
x=$(printf '=%.0s' {1..40960})
echo $x
echo $x 1>&2
"#;
let mut expect_body = String::new();
for _ in 0..40960 {
expect_body.push_str("=");
}
sandbox.with_files(vec![FileWithContent("test.sh", &script_body)]);
// check for stdout
let actual = nu!(
cwd: dirs.test(), pipeline(
r#"
do -i {bash test.sh} | complete | get stdout | str trim
"#
));
assert_eq!(actual.out, expect_body);
// check for stderr
let actual = nu!(
cwd: dirs.test(), pipeline(
r#"
do -i {bash test.sh} | complete | get stderr | str trim
"#
));
assert_eq!(actual.out, expect_body);
},
)
}

View File

@ -4,7 +4,7 @@ use nu_protocol::{
ast::{Block, Call, Expr, Expression, Operator},
engine::{EngineState, Stack, Visibility},
Config, HistoryFileFormat, IntoInterruptiblePipelineData, IntoPipelineData, ListStream,
PipelineData, Range, ShellError, Span, Spanned, SyntaxShape, Unit, Value, VarId,
PipelineData, Range, RawStream, ShellError, Span, Spanned, SyntaxShape, Unit, Value, VarId,
ENV_VARIABLE_ID,
};
use nu_utils::stdout_write_all_and_flush;
@ -699,6 +699,31 @@ fn might_consume_external_result(input: PipelineData) -> (PipelineData, bool) {
} = input
{
let exit_code = exit_code.take();
// Note:
// In run-external's implementation detail, the result sender thread
// send out stderr message first, then stdout message, then exit_code.
//
// In this clause, we already make sure that `stdout` is None
// But not the case of `stderr`, so if `stderr` is not None
// We need to consume stderr message before reading external commands' exit code.
//
// Or we'll never have a chance to read exit_code if stderr producer produce too much stderr message.
// So we consume stderr stream and rebuild it.
let stderr = stderr.map(|stderr_stream| {
let stderr_ctrlc = stderr_stream.ctrlc.clone();
let stderr_span = stderr_stream.span;
let stderr_bytes = match stderr_stream.into_bytes() {
Err(_) => vec![],
Ok(bytes) => bytes.item,
};
RawStream::new(
Box::new(vec![Ok(stderr_bytes)].into_iter()),
stderr_ctrlc,
stderr_span,
)
});
match exit_code {
Some(exit_code_stream) => {
let ctrlc = exit_code_stream.ctrlc.clone();

View File

@ -433,10 +433,14 @@ impl PipelineData {
if let PipelineData::ExternalStream {
stdout: stream,
stderr: stderr_stream,
exit_code,
..
} = self
{
// NOTE: currently we don't need anything from stderr
// so directly consumes `stderr_stream` to make sure that everything is done.
let _ = stderr_stream.map(|x| x.into_bytes());
if let Some(stream) = stream {
for s in stream {
let s_live = s?;