mirror of
https://github.com/nushell/nushell.git
synced 2024-11-23 00:43:33 +01:00
fix stdout hangged on (#6715)
This commit is contained in:
parent
de77cb0cc4
commit
a498234f1d
@ -37,24 +37,34 @@ impl Command for Complete {
|
|||||||
let mut cols = vec![];
|
let mut cols = vec![];
|
||||||
let mut vals = vec![];
|
let mut vals = vec![];
|
||||||
|
|
||||||
// the order is important, we need to read
|
// use a thread to receive stderr message.
|
||||||
// stderr, then stdout, then exit_code
|
// Or we may get a deadlock if child process sends out too much bytes to stdout.
|
||||||
// because run_external generate them in this order.
|
//
|
||||||
if let Some(stderr) = stderr {
|
// For example: in normal linux system, stdout pipe's limit is 65535 bytes.
|
||||||
cols.push("stderr".to_string());
|
// if child process sends out 65536 bytes, the process will be hanged because no consumer
|
||||||
let stderr = stderr.into_bytes()?;
|
// consumes the first 65535 bytes
|
||||||
if let Ok(st) = String::from_utf8(stderr.item.clone()) {
|
// So we need a thread to receive stderr message, then the current thread can continue to consume
|
||||||
vals.push(Value::String {
|
// stdout messages.
|
||||||
val: st,
|
let stderr_handler = stderr.map(|stderr| {
|
||||||
span: stderr.span,
|
let stderr_span = stderr.span;
|
||||||
})
|
(
|
||||||
} else {
|
std::thread::spawn(move || {
|
||||||
vals.push(Value::Binary {
|
let stderr = stderr.into_bytes()?;
|
||||||
val: stderr.item,
|
if let Ok(st) = String::from_utf8(stderr.item.clone()) {
|
||||||
span: stderr.span,
|
Ok::<_, ShellError>(Value::String {
|
||||||
})
|
val: st,
|
||||||
};
|
span: stderr.span,
|
||||||
}
|
})
|
||||||
|
} else {
|
||||||
|
Ok::<_, ShellError>(Value::Binary {
|
||||||
|
val: stderr.item,
|
||||||
|
span: stderr.span,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
stderr_span,
|
||||||
|
)
|
||||||
|
});
|
||||||
|
|
||||||
if let Some(stdout) = stdout {
|
if let Some(stdout) = stdout {
|
||||||
cols.push("stdout".to_string());
|
cols.push("stdout".to_string());
|
||||||
@ -72,6 +82,18 @@ impl Command for Complete {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some((handler, stderr_span)) = stderr_handler {
|
||||||
|
cols.push("stderr".to_string());
|
||||||
|
let res = handler.join().map_err(|err| {
|
||||||
|
ShellError::ExternalCommand(
|
||||||
|
"Fail to receive external commands stderr message".to_string(),
|
||||||
|
format!("{err:?}"),
|
||||||
|
stderr_span,
|
||||||
|
)
|
||||||
|
})??;
|
||||||
|
vals.push(res)
|
||||||
|
};
|
||||||
|
|
||||||
if let Some(exit_code) = exit_code {
|
if let Some(exit_code) = exit_code {
|
||||||
let mut v: Vec<_> = exit_code.collect();
|
let mut v: Vec<_> = exit_code.collect();
|
||||||
|
|
||||||
|
@ -10,11 +10,12 @@ use nu_protocol::{Category, Example, ListStream, PipelineData, RawStream, Span,
|
|||||||
use nu_system::ForegroundProcess;
|
use nu_system::ForegroundProcess;
|
||||||
use pathdiff::diff_paths;
|
use pathdiff::diff_paths;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io::{BufRead, BufReader, Write};
|
use std::io::{BufRead, BufReader, Read, Write};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::process::{Command as CommandSys, Stdio};
|
use std::process::{Command as CommandSys, Stdio};
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc;
|
use std::sync::mpsc::{self, SyncSender};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
const OUTPUT_BUFFER_SIZE: usize = 1024;
|
const OUTPUT_BUFFER_SIZE: usize = 1024;
|
||||||
const OUTPUT_BUFFERS_IN_FLIGHT: usize = 3;
|
const OUTPUT_BUFFERS_IN_FLIGHT: usize = 3;
|
||||||
@ -337,62 +338,20 @@ impl ExternalCommand {
|
|||||||
let redirect_stderr = self.redirect_stderr;
|
let redirect_stderr = self.redirect_stderr;
|
||||||
let span = self.name.span;
|
let span = self.name.span;
|
||||||
let output_ctrlc = ctrlc.clone();
|
let output_ctrlc = ctrlc.clone();
|
||||||
|
let stderr_ctrlc = ctrlc.clone();
|
||||||
let (stdout_tx, stdout_rx) = mpsc::sync_channel(OUTPUT_BUFFERS_IN_FLIGHT);
|
let (stdout_tx, stdout_rx) = mpsc::sync_channel(OUTPUT_BUFFERS_IN_FLIGHT);
|
||||||
let (stderr_tx, stderr_rx) = mpsc::sync_channel(OUTPUT_BUFFERS_IN_FLIGHT);
|
|
||||||
let (exit_code_tx, exit_code_rx) = mpsc::channel();
|
let (exit_code_tx, exit_code_rx) = mpsc::channel();
|
||||||
|
|
||||||
|
let stdout = child.as_mut().stdout.take();
|
||||||
|
let stderr = child.as_mut().stderr.take();
|
||||||
|
// If this external is not the last expression, then its output is piped to a channel
|
||||||
|
// and we create a ListStream that can be consumed
|
||||||
|
//
|
||||||
|
// Create two threads: one for redirect stdout message, and wait for child process to complete.
|
||||||
|
// The other may be created when we want to redirect stderr message.
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
// If this external is not the last expression, then its output is piped to a channel
|
|
||||||
// and we create a ListStream that can be consumed
|
|
||||||
|
|
||||||
if redirect_stderr {
|
|
||||||
let stderr = child.as_mut().stderr.take().ok_or_else(|| {
|
|
||||||
ShellError::ExternalCommand(
|
|
||||||
"Error taking stderr from external".to_string(),
|
|
||||||
"Redirects need access to stderr of an external command"
|
|
||||||
.to_string(),
|
|
||||||
span,
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
// Stderr is read using the Buffer reader. It will do so until there is an
|
|
||||||
// error or there are no more bytes to read
|
|
||||||
let mut buf_read = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, stderr);
|
|
||||||
while let Ok(bytes) = buf_read.fill_buf() {
|
|
||||||
if bytes.is_empty() {
|
|
||||||
// drop stderr sender manually, so stderr message consumer
|
|
||||||
// can make sure that there is no more stderr messages.
|
|
||||||
//
|
|
||||||
// and message consumer can continue to wait stdout message.
|
|
||||||
// If we don't make manually drop, and external command produces many
|
|
||||||
// stdout messages, relative message consumer will hang on so we'll get a deadlock.
|
|
||||||
drop(stderr_tx);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// The Cow generated from the function represents the conversion
|
|
||||||
// from bytes to String. If no replacements are required, then the
|
|
||||||
// borrowed value is a proper UTF-8 string. The Owned option represents
|
|
||||||
// a string where the values had to be replaced, thus marking it as bytes
|
|
||||||
let bytes = bytes.to_vec();
|
|
||||||
let length = bytes.len();
|
|
||||||
buf_read.consume(length);
|
|
||||||
|
|
||||||
if let Some(ctrlc) = &ctrlc {
|
|
||||||
if ctrlc.load(Ordering::SeqCst) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
match stderr_tx.send(bytes) {
|
|
||||||
Ok(_) => continue,
|
|
||||||
Err(_) => break,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if redirect_stdout {
|
if redirect_stdout {
|
||||||
let stdout = child.as_mut().stdout.take().ok_or_else(|| {
|
let stdout = stdout.ok_or_else(|| {
|
||||||
ShellError::ExternalCommand(
|
ShellError::ExternalCommand(
|
||||||
"Error taking stdout from external".to_string(),
|
"Error taking stdout from external".to_string(),
|
||||||
"Redirects need access to stdout of an external command"
|
"Redirects need access to stdout of an external command"
|
||||||
@ -401,33 +360,7 @@ impl ExternalCommand {
|
|||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
// Stdout is read using the Buffer reader. It will do so until there is an
|
read_and_redirect_message(stdout, stdout_tx, ctrlc)
|
||||||
// error or there are no more bytes to read
|
|
||||||
let mut buf_read = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, stdout);
|
|
||||||
while let Ok(bytes) = buf_read.fill_buf() {
|
|
||||||
if bytes.is_empty() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// The Cow generated from the function represents the conversion
|
|
||||||
// from bytes to String. If no replacements are required, then the
|
|
||||||
// borrowed value is a proper UTF-8 string. The Owned option represents
|
|
||||||
// a string where the values had to be replaced, thus marking it as bytes
|
|
||||||
let bytes = bytes.to_vec();
|
|
||||||
let length = bytes.len();
|
|
||||||
buf_read.consume(length);
|
|
||||||
|
|
||||||
if let Some(ctrlc) = &ctrlc {
|
|
||||||
if ctrlc.load(Ordering::SeqCst) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
match stdout_tx.send(bytes) {
|
|
||||||
Ok(_) => continue,
|
|
||||||
Err(_) => break,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
match child.as_mut().wait() {
|
match child.as_mut().wait() {
|
||||||
@ -454,6 +387,24 @@ impl ExternalCommand {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let (stderr_tx, stderr_rx) = mpsc::sync_channel(OUTPUT_BUFFERS_IN_FLIGHT);
|
||||||
|
if redirect_stderr {
|
||||||
|
std::thread::spawn(move || {
|
||||||
|
let stderr = stderr.ok_or_else(|| {
|
||||||
|
ShellError::ExternalCommand(
|
||||||
|
"Error taking stderr from external".to_string(),
|
||||||
|
"Redirects need access to stderr of an external command"
|
||||||
|
.to_string(),
|
||||||
|
span,
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
read_and_redirect_message(stderr, stderr_tx, stderr_ctrlc);
|
||||||
|
Ok::<(), ShellError>(())
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
let stdout_receiver = ChannelReceiver::new(stdout_rx);
|
let stdout_receiver = ChannelReceiver::new(stdout_rx);
|
||||||
let stderr_receiver = ChannelReceiver::new(stderr_rx);
|
let stderr_receiver = ChannelReceiver::new(stderr_rx);
|
||||||
let exit_code_receiver = ValueReceiver::new(exit_code_rx);
|
let exit_code_receiver = ValueReceiver::new(exit_code_rx);
|
||||||
@ -732,6 +683,46 @@ fn remove_quotes(input: String) -> String {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// read message from given `reader`, and send out through `sender`.
|
||||||
|
//
|
||||||
|
// `ctrlc` is used to control the process, if ctrl-c is pressed, the read and redirect
|
||||||
|
// process will be breaked.
|
||||||
|
fn read_and_redirect_message<R>(
|
||||||
|
reader: R,
|
||||||
|
sender: SyncSender<Vec<u8>>,
|
||||||
|
ctrlc: Option<Arc<AtomicBool>>,
|
||||||
|
) where
|
||||||
|
R: Read,
|
||||||
|
{
|
||||||
|
// read using the BufferReader. It will do so until there is an
|
||||||
|
// error or there are no more bytes to read
|
||||||
|
let mut buf_read = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, reader);
|
||||||
|
while let Ok(bytes) = buf_read.fill_buf() {
|
||||||
|
if bytes.is_empty() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The Cow generated from the function represents the conversion
|
||||||
|
// from bytes to String. If no replacements are required, then the
|
||||||
|
// borrowed value is a proper UTF-8 string. The Owned option represents
|
||||||
|
// a string where the values had to be replaced, thus marking it as bytes
|
||||||
|
let bytes = bytes.to_vec();
|
||||||
|
let length = bytes.len();
|
||||||
|
buf_read.consume(length);
|
||||||
|
|
||||||
|
if let Some(ctrlc) = &ctrlc {
|
||||||
|
if ctrlc.load(Ordering::SeqCst) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match sender.send(bytes) {
|
||||||
|
Ok(_) => continue,
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Receiver used for the RawStream
|
// Receiver used for the RawStream
|
||||||
// It implements iterator so it can be used as a RawStream
|
// It implements iterator so it can be used as a RawStream
|
||||||
struct ChannelReceiver {
|
struct ChannelReceiver {
|
||||||
|
@ -62,7 +62,7 @@ fn do_with_semicolon_break_on_failed_external() {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[cfg(not(windows))]
|
#[cfg(not(windows))]
|
||||||
fn ignore_error_not_hang_nushell() {
|
fn ignore_error_with_too_much_stderr_not_hang_nushell() {
|
||||||
use nu_test_support::fs::Stub::FileWithContent;
|
use nu_test_support::fs::Stub::FileWithContent;
|
||||||
use nu_test_support::pipeline;
|
use nu_test_support::pipeline;
|
||||||
use nu_test_support::playground::Playground;
|
use nu_test_support::playground::Playground;
|
||||||
@ -85,6 +85,31 @@ fn ignore_error_not_hang_nushell() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[cfg(not(windows))]
|
||||||
|
fn ignore_error_with_too_much_stdout_not_hang_nushell() {
|
||||||
|
use nu_test_support::fs::Stub::FileWithContent;
|
||||||
|
use nu_test_support::pipeline;
|
||||||
|
use nu_test_support::playground::Playground;
|
||||||
|
Playground::setup("external with many stdout message", |dirs, sandbox| {
|
||||||
|
let bytes: usize = 81920;
|
||||||
|
let mut large_file_body = String::with_capacity(bytes);
|
||||||
|
for _ in 0..bytes {
|
||||||
|
large_file_body.push_str("a");
|
||||||
|
}
|
||||||
|
sandbox.with_files(vec![FileWithContent("a_large_file.txt", &large_file_body)]);
|
||||||
|
|
||||||
|
let actual = nu!(
|
||||||
|
cwd: dirs.test(), pipeline(
|
||||||
|
r#"
|
||||||
|
do -i {sh -c "cat a_large_file.txt"} | complete | get stdout
|
||||||
|
"#
|
||||||
|
));
|
||||||
|
|
||||||
|
assert_eq!(actual.out, large_file_body);
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[cfg(not(windows))]
|
#[cfg(not(windows))]
|
||||||
fn ignore_error_with_both_stdout_stderr_messages_not_hang_nushell() {
|
fn ignore_error_with_both_stdout_stderr_messages_not_hang_nushell() {
|
||||||
|
@ -440,7 +440,7 @@ impl PipelineData {
|
|||||||
{
|
{
|
||||||
// NOTE: currently we don't need anything from stderr
|
// NOTE: currently we don't need anything from stderr
|
||||||
// so directly consumes `stderr_stream` to make sure that everything is done.
|
// so directly consumes `stderr_stream` to make sure that everything is done.
|
||||||
let _ = stderr_stream.map(|x| x.into_bytes());
|
std::thread::spawn(move || stderr_stream.map(|x| x.into_bytes()));
|
||||||
if let Some(stream) = stream {
|
if let Some(stream) = stream {
|
||||||
for s in stream {
|
for s in stream {
|
||||||
let s_live = s?;
|
let s_live = s?;
|
||||||
|
Loading…
Reference in New Issue
Block a user