From a498234f1d81aaa2a9a171b57c1fb7ac8226e9f2 Mon Sep 17 00:00:00 2001 From: WindSoilder Date: Sun, 16 Oct 2022 03:29:29 +0800 Subject: [PATCH] fix stdout hangged on (#6715) --- crates/nu-command/src/system/complete.rs | 58 ++++--- crates/nu-command/src/system/run_external.rs | 153 +++++++++---------- crates/nu-command/tests/commands/do_.rs | 27 +++- crates/nu-protocol/src/pipeline_data.rs | 2 +- 4 files changed, 139 insertions(+), 101 deletions(-) diff --git a/crates/nu-command/src/system/complete.rs b/crates/nu-command/src/system/complete.rs index ffbfe04148..fc4629d1ee 100644 --- a/crates/nu-command/src/system/complete.rs +++ b/crates/nu-command/src/system/complete.rs @@ -37,24 +37,34 @@ impl Command for Complete { let mut cols = vec![]; let mut vals = vec![]; - // the order is important, we need to read - // stderr, then stdout, then exit_code - // because run_external generate them in this order. - if let Some(stderr) = stderr { - cols.push("stderr".to_string()); - let stderr = stderr.into_bytes()?; - if let Ok(st) = String::from_utf8(stderr.item.clone()) { - vals.push(Value::String { - val: st, - span: stderr.span, - }) - } else { - vals.push(Value::Binary { - val: stderr.item, - span: stderr.span, - }) - }; - } + // use a thread to receive stderr message. + // Or we may get a deadlock if child process sends out too much bytes to stdout. + // + // For example: in normal linux system, stdout pipe's limit is 65535 bytes. + // if child process sends out 65536 bytes, the process will be hanged because no consumer + // consumes the first 65535 bytes + // So we need a thread to receive stderr message, then the current thread can continue to consume + // stdout messages. + let stderr_handler = stderr.map(|stderr| { + let stderr_span = stderr.span; + ( + std::thread::spawn(move || { + let stderr = stderr.into_bytes()?; + if let Ok(st) = String::from_utf8(stderr.item.clone()) { + Ok::<_, ShellError>(Value::String { + val: st, + span: stderr.span, + }) + } else { + Ok::<_, ShellError>(Value::Binary { + val: stderr.item, + span: stderr.span, + }) + } + }), + stderr_span, + ) + }); if let Some(stdout) = stdout { cols.push("stdout".to_string()); @@ -72,6 +82,18 @@ impl Command for Complete { } } + if let Some((handler, stderr_span)) = stderr_handler { + cols.push("stderr".to_string()); + let res = handler.join().map_err(|err| { + ShellError::ExternalCommand( + "Fail to receive external commands stderr message".to_string(), + format!("{err:?}"), + stderr_span, + ) + })??; + vals.push(res) + }; + if let Some(exit_code) = exit_code { let mut v: Vec<_> = exit_code.collect(); diff --git a/crates/nu-command/src/system/run_external.rs b/crates/nu-command/src/system/run_external.rs index 05d0c6b84c..2f62a160fc 100644 --- a/crates/nu-command/src/system/run_external.rs +++ b/crates/nu-command/src/system/run_external.rs @@ -10,11 +10,12 @@ use nu_protocol::{Category, Example, ListStream, PipelineData, RawStream, Span, use nu_system::ForegroundProcess; use pathdiff::diff_paths; use std::collections::HashMap; -use std::io::{BufRead, BufReader, Write}; +use std::io::{BufRead, BufReader, Read, Write}; use std::path::{Path, PathBuf}; use std::process::{Command as CommandSys, Stdio}; -use std::sync::atomic::Ordering; -use std::sync::mpsc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{self, SyncSender}; +use std::sync::Arc; const OUTPUT_BUFFER_SIZE: usize = 1024; const OUTPUT_BUFFERS_IN_FLIGHT: usize = 3; @@ -337,62 +338,20 @@ impl ExternalCommand { let redirect_stderr = self.redirect_stderr; let span = self.name.span; let output_ctrlc = ctrlc.clone(); + let stderr_ctrlc = ctrlc.clone(); let (stdout_tx, stdout_rx) = mpsc::sync_channel(OUTPUT_BUFFERS_IN_FLIGHT); - let (stderr_tx, stderr_rx) = mpsc::sync_channel(OUTPUT_BUFFERS_IN_FLIGHT); let (exit_code_tx, exit_code_rx) = mpsc::channel(); + let stdout = child.as_mut().stdout.take(); + let stderr = child.as_mut().stderr.take(); + // If this external is not the last expression, then its output is piped to a channel + // and we create a ListStream that can be consumed + // + // Create two threads: one for redirect stdout message, and wait for child process to complete. + // The other may be created when we want to redirect stderr message. std::thread::spawn(move || { - // If this external is not the last expression, then its output is piped to a channel - // and we create a ListStream that can be consumed - - if redirect_stderr { - let stderr = child.as_mut().stderr.take().ok_or_else(|| { - ShellError::ExternalCommand( - "Error taking stderr from external".to_string(), - "Redirects need access to stderr of an external command" - .to_string(), - span, - ) - })?; - - // Stderr is read using the Buffer reader. It will do so until there is an - // error or there are no more bytes to read - let mut buf_read = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, stderr); - while let Ok(bytes) = buf_read.fill_buf() { - if bytes.is_empty() { - // drop stderr sender manually, so stderr message consumer - // can make sure that there is no more stderr messages. - // - // and message consumer can continue to wait stdout message. - // If we don't make manually drop, and external command produces many - // stdout messages, relative message consumer will hang on so we'll get a deadlock. - drop(stderr_tx); - break; - } - - // The Cow generated from the function represents the conversion - // from bytes to String. If no replacements are required, then the - // borrowed value is a proper UTF-8 string. The Owned option represents - // a string where the values had to be replaced, thus marking it as bytes - let bytes = bytes.to_vec(); - let length = bytes.len(); - buf_read.consume(length); - - if let Some(ctrlc) = &ctrlc { - if ctrlc.load(Ordering::SeqCst) { - break; - } - } - - match stderr_tx.send(bytes) { - Ok(_) => continue, - Err(_) => break, - } - } - } - if redirect_stdout { - let stdout = child.as_mut().stdout.take().ok_or_else(|| { + let stdout = stdout.ok_or_else(|| { ShellError::ExternalCommand( "Error taking stdout from external".to_string(), "Redirects need access to stdout of an external command" @@ -401,33 +360,7 @@ impl ExternalCommand { ) })?; - // Stdout is read using the Buffer reader. It will do so until there is an - // error or there are no more bytes to read - let mut buf_read = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, stdout); - while let Ok(bytes) = buf_read.fill_buf() { - if bytes.is_empty() { - break; - } - - // The Cow generated from the function represents the conversion - // from bytes to String. If no replacements are required, then the - // borrowed value is a proper UTF-8 string. The Owned option represents - // a string where the values had to be replaced, thus marking it as bytes - let bytes = bytes.to_vec(); - let length = bytes.len(); - buf_read.consume(length); - - if let Some(ctrlc) = &ctrlc { - if ctrlc.load(Ordering::SeqCst) { - break; - } - } - - match stdout_tx.send(bytes) { - Ok(_) => continue, - Err(_) => break, - } - } + read_and_redirect_message(stdout, stdout_tx, ctrlc) } match child.as_mut().wait() { @@ -454,6 +387,24 @@ impl ExternalCommand { } } }); + + let (stderr_tx, stderr_rx) = mpsc::sync_channel(OUTPUT_BUFFERS_IN_FLIGHT); + if redirect_stderr { + std::thread::spawn(move || { + let stderr = stderr.ok_or_else(|| { + ShellError::ExternalCommand( + "Error taking stderr from external".to_string(), + "Redirects need access to stderr of an external command" + .to_string(), + span, + ) + })?; + + read_and_redirect_message(stderr, stderr_tx, stderr_ctrlc); + Ok::<(), ShellError>(()) + }); + } + let stdout_receiver = ChannelReceiver::new(stdout_rx); let stderr_receiver = ChannelReceiver::new(stderr_rx); let exit_code_receiver = ValueReceiver::new(exit_code_rx); @@ -732,6 +683,46 @@ fn remove_quotes(input: String) -> String { } } +// read message from given `reader`, and send out through `sender`. +// +// `ctrlc` is used to control the process, if ctrl-c is pressed, the read and redirect +// process will be breaked. +fn read_and_redirect_message( + reader: R, + sender: SyncSender>, + ctrlc: Option>, +) where + R: Read, +{ + // read using the BufferReader. It will do so until there is an + // error or there are no more bytes to read + let mut buf_read = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, reader); + while let Ok(bytes) = buf_read.fill_buf() { + if bytes.is_empty() { + break; + } + + // The Cow generated from the function represents the conversion + // from bytes to String. If no replacements are required, then the + // borrowed value is a proper UTF-8 string. The Owned option represents + // a string where the values had to be replaced, thus marking it as bytes + let bytes = bytes.to_vec(); + let length = bytes.len(); + buf_read.consume(length); + + if let Some(ctrlc) = &ctrlc { + if ctrlc.load(Ordering::SeqCst) { + break; + } + } + + match sender.send(bytes) { + Ok(_) => continue, + Err(_) => break, + } + } +} + // Receiver used for the RawStream // It implements iterator so it can be used as a RawStream struct ChannelReceiver { diff --git a/crates/nu-command/tests/commands/do_.rs b/crates/nu-command/tests/commands/do_.rs index 86be3ab4a2..e4643fc4d8 100644 --- a/crates/nu-command/tests/commands/do_.rs +++ b/crates/nu-command/tests/commands/do_.rs @@ -62,7 +62,7 @@ fn do_with_semicolon_break_on_failed_external() { #[test] #[cfg(not(windows))] -fn ignore_error_not_hang_nushell() { +fn ignore_error_with_too_much_stderr_not_hang_nushell() { use nu_test_support::fs::Stub::FileWithContent; use nu_test_support::pipeline; use nu_test_support::playground::Playground; @@ -85,6 +85,31 @@ fn ignore_error_not_hang_nushell() { }) } +#[test] +#[cfg(not(windows))] +fn ignore_error_with_too_much_stdout_not_hang_nushell() { + use nu_test_support::fs::Stub::FileWithContent; + use nu_test_support::pipeline; + use nu_test_support::playground::Playground; + Playground::setup("external with many stdout message", |dirs, sandbox| { + let bytes: usize = 81920; + let mut large_file_body = String::with_capacity(bytes); + for _ in 0..bytes { + large_file_body.push_str("a"); + } + sandbox.with_files(vec![FileWithContent("a_large_file.txt", &large_file_body)]); + + let actual = nu!( + cwd: dirs.test(), pipeline( + r#" + do -i {sh -c "cat a_large_file.txt"} | complete | get stdout + "# + )); + + assert_eq!(actual.out, large_file_body); + }) +} + #[test] #[cfg(not(windows))] fn ignore_error_with_both_stdout_stderr_messages_not_hang_nushell() { diff --git a/crates/nu-protocol/src/pipeline_data.rs b/crates/nu-protocol/src/pipeline_data.rs index b9ea88433c..6b5516fdc7 100644 --- a/crates/nu-protocol/src/pipeline_data.rs +++ b/crates/nu-protocol/src/pipeline_data.rs @@ -440,7 +440,7 @@ impl PipelineData { { // NOTE: currently we don't need anything from stderr // so directly consumes `stderr_stream` to make sure that everything is done. - let _ = stderr_stream.map(|x| x.into_bytes()); + std::thread::spawn(move || stderr_stream.map(|x| x.into_bytes())); if let Some(stream) = stream { for s in stream { let s_live = s?;