diff --git a/crates/nu-command/src/core_commands/do_.rs b/crates/nu-command/src/core_commands/do_.rs index 5bd3e5f84..79dfb29c0 100644 --- a/crates/nu-command/src/core_commands/do_.rs +++ b/crates/nu-command/src/core_commands/do_.rs @@ -130,16 +130,21 @@ impl Command for Do { // So we need a thread to receive stdout message, then the current thread can continue to consume // stderr messages. let stdout_handler = stdout.map(|stdout_stream| { - thread::spawn(move || { - let ctrlc = stdout_stream.ctrlc.clone(); - let span = stdout_stream.span; - RawStream::new( - Box::new(vec![stdout_stream.into_bytes().map(|s| s.item)].into_iter()), - ctrlc, - span, - None, - ) - }) + thread::Builder::new() + .name("stderr redirector".to_string()) + .spawn(move || { + let ctrlc = stdout_stream.ctrlc.clone(); + let span = stdout_stream.span; + RawStream::new( + Box::new( + vec![stdout_stream.into_bytes().map(|s| s.item)].into_iter(), + ), + ctrlc, + span, + None, + ) + }) + .expect("Failed to create thread") }); // Intercept stderr so we can return it in the error if the exit code is non-zero. diff --git a/crates/nu-command/src/filesystem/save.rs b/crates/nu-command/src/filesystem/save.rs index de87cda5a..af57d87ab 100644 --- a/crates/nu-command/src/filesystem/save.rs +++ b/crates/nu-command/src/filesystem/save.rs @@ -8,6 +8,7 @@ use nu_protocol::{ use std::fs::File; use std::io::{BufWriter, Write}; use std::path::Path; +use std::thread; use crate::progress_bar; @@ -85,13 +86,17 @@ impl Command for Save { // delegate a thread to redirect stderr to result. let handler = stderr.map(|stderr_stream| match stderr_file { - Some(stderr_file) => std::thread::spawn(move || { - stream_to_file(stderr_stream, stderr_file, span, progress) - }), - None => std::thread::spawn(move || { - let _ = stderr_stream.into_bytes(); - Ok(PipelineData::empty()) - }), + Some(stderr_file) => thread::Builder::new() + .name("stderr redirector".to_string()) + .spawn(move || stream_to_file(stderr_stream, stderr_file, span, progress)) + .expect("Failed to create thread"), + None => thread::Builder::new() + .name("stderr redirector".to_string()) + .spawn(move || { + let _ = stderr_stream.into_bytes(); + Ok(PipelineData::empty()) + }) + .expect("Failed to create thread"), }); let res = stream_to_file(stream, file, span, progress); diff --git a/crates/nu-command/src/system/complete.rs b/crates/nu-command/src/system/complete.rs index 2afee176a..4a925f519 100644 --- a/crates/nu-command/src/system/complete.rs +++ b/crates/nu-command/src/system/complete.rs @@ -4,6 +4,8 @@ use nu_protocol::{ Category, Example, IntoPipelineData, PipelineData, ShellError, Signature, Type, Value, }; +use std::thread; + #[derive(Clone)] pub struct Complete; @@ -50,20 +52,23 @@ impl Command for Complete { let stderr_handler = stderr.map(|stderr| { let stderr_span = stderr.span; ( - std::thread::spawn(move || { - let stderr = stderr.into_bytes()?; - if let Ok(st) = String::from_utf8(stderr.item.clone()) { - Ok::<_, ShellError>(Value::String { - val: st, - span: stderr.span, - }) - } else { - Ok::<_, ShellError>(Value::Binary { - val: stderr.item, - span: stderr.span, - }) - } - }), + thread::Builder::new() + .name("stderr consumer".to_string()) + .spawn(move || { + let stderr = stderr.into_bytes()?; + if let Ok(st) = String::from_utf8(stderr.item.clone()) { + Ok::<_, ShellError>(Value::String { + val: st, + span: stderr.span, + }) + } else { + Ok::<_, ShellError>(Value::Binary { + val: stderr.item, + span: stderr.span, + }) + } + }) + .expect("failed to create thread"), stderr_span, ) }); diff --git a/crates/nu-command/src/system/run_external.rs b/crates/nu-command/src/system/run_external.rs index 51dade205..7de612b34 100644 --- a/crates/nu-command/src/system/run_external.rs +++ b/crates/nu-command/src/system/run_external.rs @@ -18,6 +18,7 @@ use std::process::{Command as CommandSys, Stdio}; use std::sync::atomic::AtomicBool; use std::sync::mpsc::{self, SyncSender}; use std::sync::Arc; +use std::thread; const OUTPUT_BUFFER_SIZE: usize = 1024; const OUTPUT_BUFFERS_IN_FLIGHT: usize = 3; @@ -347,32 +348,41 @@ impl ExternalCommand { // Turn off color as we pass data through engine_state.config.use_ansi_coloring = false; - // if there is a string or a stream, that is sent to the pipe std + // Pipe input into the external command's stdin if let Some(mut stdin_write) = child.as_mut().stdin.take() { - std::thread::spawn(move || { - let input = crate::Table::run( - &crate::Table, - &engine_state, - &mut stack, - &Call::new(head), - input, - ); + thread::Builder::new() + .name("external stdin worker".to_string()) + .spawn(move || { + // Attempt to render the input as a table before piping it to the external. + // This is important for pagers like `less`; + // they need to get Nu data rendered for display to users. + // + // TODO: should we do something different for list inputs? + // Users often expect those to be piped to *nix tools as raw strings separated by newlines + let input = crate::Table::run( + &crate::Table, + &engine_state, + &mut stack, + &Call::new(head), + input, + ); - if let Ok(input) = input { - for value in input.into_iter() { - let buf = match value { - Value::String { val, .. } => val.into_bytes(), - Value::Binary { val, .. } => val, - _ => return Err(()), - }; - if stdin_write.write(&buf).is_err() { - return Ok(()); + if let Ok(input) = input { + for value in input.into_iter() { + let buf = match value { + Value::String { val, .. } => val.into_bytes(), + Value::Binary { val, .. } => val, + _ => return Err(()), + }; + if stdin_write.write(&buf).is_err() { + return Ok(()); + } } } - } - Ok(()) - }); + Ok(()) + }) + .expect("Failed to create thread"); } } @@ -388,24 +398,26 @@ impl ExternalCommand { let stdout = child.as_mut().stdout.take(); let stderr = child.as_mut().stderr.take(); + // If this external is not the last expression, then its output is piped to a channel // and we create a ListStream that can be consumed - // - // Create two threads: one for redirect stdout message, and wait for child process to complete. - // The other may be created when we want to redirect stderr message. - std::thread::spawn(move || { - if redirect_stdout { - let stdout = stdout.ok_or_else(|| { - ShellError::ExternalCommand( - "Error taking stdout from external".to_string(), - "Redirects need access to stdout of an external command" - .to_string(), - span, - ) - })?; - read_and_redirect_message(stdout, stdout_tx, ctrlc) - } + // First create a thread to redirect the external's stdout and wait for an exit code. + thread::Builder::new() + .name("stdout redirector + exit code waiter".to_string()) + .spawn(move || { + if redirect_stdout { + let stdout = stdout.ok_or_else(|| { + ShellError::ExternalCommand( + "Error taking stdout from external".to_string(), + "Redirects need access to stdout of an external command" + .to_string(), + span, + ) + })?; + + read_and_redirect_message(stdout, stdout_tx, ctrlc) + } match child.as_mut().wait() { Err(err) => Err(ShellError::ExternalCommand( @@ -462,23 +474,26 @@ impl ExternalCommand { Ok(()) } } - }); + }).expect("Failed to create thread"); let (stderr_tx, stderr_rx) = mpsc::sync_channel(OUTPUT_BUFFERS_IN_FLIGHT); if redirect_stderr { - std::thread::spawn(move || { - let stderr = stderr.ok_or_else(|| { - ShellError::ExternalCommand( - "Error taking stderr from external".to_string(), - "Redirects need access to stderr of an external command" - .to_string(), - span, - ) - })?; + thread::Builder::new() + .name("stderr redirector".to_string()) + .spawn(move || { + let stderr = stderr.ok_or_else(|| { + ShellError::ExternalCommand( + "Error taking stderr from external".to_string(), + "Redirects need access to stderr of an external command" + .to_string(), + span, + ) + })?; - read_and_redirect_message(stderr, stderr_tx, stderr_ctrlc); - Ok::<(), ShellError>(()) - }); + read_and_redirect_message(stderr, stderr_tx, stderr_ctrlc); + Ok::<(), ShellError>(()) + }) + .expect("Failed to create thread"); } let stdout_receiver = ChannelReceiver::new(stdout_rx); diff --git a/crates/nu-protocol/src/pipeline_data.rs b/crates/nu-protocol/src/pipeline_data.rs index 3e57efd07..d3b4eae8a 100644 --- a/crates/nu-protocol/src/pipeline_data.rs +++ b/crates/nu-protocol/src/pipeline_data.rs @@ -5,6 +5,7 @@ use crate::{ }; use nu_utils::{stderr_write_all_and_flush, stdout_write_all_and_flush}; use std::sync::{atomic::AtomicBool, Arc}; +use std::thread; const LINE_ENDING_PATTERN: &[char] = &['\r', '\n']; @@ -726,8 +727,11 @@ pub fn print_if_stream( exit_code: Option, ) -> Result { // NOTE: currently we don't need anything from stderr - // so directly consumes `stderr_stream` to make sure that everything is done. - std::thread::spawn(move || stderr_stream.map(|x| x.into_bytes())); + // so we just consume and throw away `stderr_stream` to make sure the pipe doesn't fill up + thread::Builder::new() + .name("stderr consumer".to_string()) + .spawn(move || stderr_stream.map(|x| x.into_bytes())) + .expect("could not create thread"); if let Some(stream) = stream { for s in stream { let s_live = s?;