forked from extern/nushell
Throw out error if external command in subexpression is failed to run (#8204)
This commit is contained in:
@ -531,7 +531,7 @@ impl PipelineData {
|
||||
// Only need ExternalStream without redirecting output.
|
||||
// It indicates we have no more commands to execute currently.
|
||||
if let PipelineData::ExternalStream {
|
||||
stdout: None,
|
||||
stdout,
|
||||
stderr,
|
||||
mut exit_code,
|
||||
span,
|
||||
@ -542,22 +542,56 @@ impl PipelineData {
|
||||
let exit_code = exit_code.take();
|
||||
|
||||
// Note:
|
||||
// In run-external's implementation detail, the result sender thread
|
||||
// send out stderr message first, then stdout message, then exit_code.
|
||||
// use a thread to receive stderr message.
|
||||
// Or we may get a deadlock if child process sends out too much bytes to stdout.
|
||||
//
|
||||
// In this clause, we already make sure that `stdout` is None
|
||||
// But not the case of `stderr`, so if `stderr` is not None
|
||||
// We need to consume stderr message before reading external commands' exit code.
|
||||
//
|
||||
// Or we'll never have a chance to read exit_code if stderr producer produce too much stderr message.
|
||||
// So we consume stderr stream and rebuild it.
|
||||
let stderr = stderr.map(|stderr_stream| {
|
||||
let stderr_ctrlc = stderr_stream.ctrlc.clone();
|
||||
let stderr_span = stderr_stream.span;
|
||||
let stderr_bytes = stderr_stream
|
||||
// For example: in normal linux system, stdout pipe's limit is 65535 bytes.
|
||||
// if child process sends out 65536 bytes, the process will be hanged because no consumer
|
||||
// consumes the first 65535 bytes
|
||||
// So we need a thread to receive stderr message, then the current thread can continue to consume
|
||||
// stdout messages.
|
||||
let stderr_handler = stderr.map(|stderr| {
|
||||
let stderr_span = stderr.span;
|
||||
let stderr_ctrlc = stderr.ctrlc.clone();
|
||||
(
|
||||
thread::Builder::new()
|
||||
.name("stderr consumer".to_string())
|
||||
.spawn(move || {
|
||||
stderr
|
||||
.into_bytes()
|
||||
.map(|bytes| bytes.item)
|
||||
.unwrap_or_default()
|
||||
})
|
||||
.expect("failed to create thread"),
|
||||
stderr_span,
|
||||
stderr_ctrlc,
|
||||
)
|
||||
});
|
||||
let stdout = stdout.map(|stdout_stream| {
|
||||
let stdout_ctrlc = stdout_stream.ctrlc.clone();
|
||||
let stdout_span = stdout_stream.span;
|
||||
let stdout_bytes = stdout_stream
|
||||
.into_bytes()
|
||||
.map(|bytes| bytes.item)
|
||||
.unwrap_or_default();
|
||||
RawStream::new(
|
||||
Box::new(vec![Ok(stdout_bytes)].into_iter()),
|
||||
stdout_ctrlc,
|
||||
stdout_span,
|
||||
None,
|
||||
)
|
||||
});
|
||||
let stderr = stderr_handler.map(|(handler, stderr_span, stderr_ctrlc)| {
|
||||
let stderr_bytes = handler
|
||||
.join()
|
||||
.map_err(|err| {
|
||||
ShellError::ExternalCommand(
|
||||
"Fail to receive external commands stderr message".to_string(),
|
||||
format!("{err:?}"),
|
||||
stderr_span,
|
||||
)
|
||||
})
|
||||
.unwrap_or_default();
|
||||
RawStream::new(
|
||||
Box::new(vec![Ok(stderr_bytes)].into_iter()),
|
||||
stderr_ctrlc,
|
||||
@ -565,7 +599,6 @@ impl PipelineData {
|
||||
None,
|
||||
)
|
||||
});
|
||||
|
||||
match exit_code {
|
||||
Some(exit_code_stream) => {
|
||||
let ctrlc = exit_code_stream.ctrlc.clone();
|
||||
@ -578,7 +611,7 @@ impl PipelineData {
|
||||
}
|
||||
(
|
||||
PipelineData::ExternalStream {
|
||||
stdout: None,
|
||||
stdout,
|
||||
stderr,
|
||||
exit_code: Some(ListStream::from_stream(exit_code.into_iter(), ctrlc)),
|
||||
span,
|
||||
@ -590,7 +623,7 @@ impl PipelineData {
|
||||
}
|
||||
None => (
|
||||
PipelineData::ExternalStream {
|
||||
stdout: None,
|
||||
stdout,
|
||||
stderr,
|
||||
exit_code: None,
|
||||
span,
|
||||
|
Reference in New Issue
Block a user