Name threads (#7879)

I noticed that [it's pretty easy to name threads in
Rust](https://doc.rust-lang.org/std/thread/#naming-threads). We might as
well do this; it's a nice quality of life improvement when you're
profiling something and the developers took the time to give threads
names.

Also added/cleaned up some comments while I was in the area.
This commit is contained in:
Reilly Wood 2023-01-28 21:40:52 +01:00 committed by GitHub
parent e616b2e247
commit f4d7d19370
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 116 additions and 82 deletions

View File

@ -130,16 +130,21 @@ impl Command for Do {
// So we need a thread to receive stdout message, then the current thread can continue to consume // So we need a thread to receive stdout message, then the current thread can continue to consume
// stderr messages. // stderr messages.
let stdout_handler = stdout.map(|stdout_stream| { let stdout_handler = stdout.map(|stdout_stream| {
thread::spawn(move || { thread::Builder::new()
let ctrlc = stdout_stream.ctrlc.clone(); .name("stderr redirector".to_string())
let span = stdout_stream.span; .spawn(move || {
RawStream::new( let ctrlc = stdout_stream.ctrlc.clone();
Box::new(vec![stdout_stream.into_bytes().map(|s| s.item)].into_iter()), let span = stdout_stream.span;
ctrlc, RawStream::new(
span, Box::new(
None, vec![stdout_stream.into_bytes().map(|s| s.item)].into_iter(),
) ),
}) ctrlc,
span,
None,
)
})
.expect("Failed to create thread")
}); });
// Intercept stderr so we can return it in the error if the exit code is non-zero. // Intercept stderr so we can return it in the error if the exit code is non-zero.

View File

@ -8,6 +8,7 @@ use nu_protocol::{
use std::fs::File; use std::fs::File;
use std::io::{BufWriter, Write}; use std::io::{BufWriter, Write};
use std::path::Path; use std::path::Path;
use std::thread;
use crate::progress_bar; use crate::progress_bar;
@ -85,13 +86,17 @@ impl Command for Save {
// delegate a thread to redirect stderr to result. // delegate a thread to redirect stderr to result.
let handler = stderr.map(|stderr_stream| match stderr_file { let handler = stderr.map(|stderr_stream| match stderr_file {
Some(stderr_file) => std::thread::spawn(move || { Some(stderr_file) => thread::Builder::new()
stream_to_file(stderr_stream, stderr_file, span, progress) .name("stderr redirector".to_string())
}), .spawn(move || stream_to_file(stderr_stream, stderr_file, span, progress))
None => std::thread::spawn(move || { .expect("Failed to create thread"),
let _ = stderr_stream.into_bytes(); None => thread::Builder::new()
Ok(PipelineData::empty()) .name("stderr redirector".to_string())
}), .spawn(move || {
let _ = stderr_stream.into_bytes();
Ok(PipelineData::empty())
})
.expect("Failed to create thread"),
}); });
let res = stream_to_file(stream, file, span, progress); let res = stream_to_file(stream, file, span, progress);

View File

@ -4,6 +4,8 @@ use nu_protocol::{
Category, Example, IntoPipelineData, PipelineData, ShellError, Signature, Type, Value, Category, Example, IntoPipelineData, PipelineData, ShellError, Signature, Type, Value,
}; };
use std::thread;
#[derive(Clone)] #[derive(Clone)]
pub struct Complete; pub struct Complete;
@ -50,20 +52,23 @@ impl Command for Complete {
let stderr_handler = stderr.map(|stderr| { let stderr_handler = stderr.map(|stderr| {
let stderr_span = stderr.span; let stderr_span = stderr.span;
( (
std::thread::spawn(move || { thread::Builder::new()
let stderr = stderr.into_bytes()?; .name("stderr consumer".to_string())
if let Ok(st) = String::from_utf8(stderr.item.clone()) { .spawn(move || {
Ok::<_, ShellError>(Value::String { let stderr = stderr.into_bytes()?;
val: st, if let Ok(st) = String::from_utf8(stderr.item.clone()) {
span: stderr.span, Ok::<_, ShellError>(Value::String {
}) val: st,
} else { span: stderr.span,
Ok::<_, ShellError>(Value::Binary { })
val: stderr.item, } else {
span: stderr.span, Ok::<_, ShellError>(Value::Binary {
}) val: stderr.item,
} span: stderr.span,
}), })
}
})
.expect("failed to create thread"),
stderr_span, stderr_span,
) )
}); });

View File

@ -18,6 +18,7 @@ use std::process::{Command as CommandSys, Stdio};
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{self, SyncSender}; use std::sync::mpsc::{self, SyncSender};
use std::sync::Arc; use std::sync::Arc;
use std::thread;
const OUTPUT_BUFFER_SIZE: usize = 1024; const OUTPUT_BUFFER_SIZE: usize = 1024;
const OUTPUT_BUFFERS_IN_FLIGHT: usize = 3; const OUTPUT_BUFFERS_IN_FLIGHT: usize = 3;
@ -347,32 +348,41 @@ impl ExternalCommand {
// Turn off color as we pass data through // Turn off color as we pass data through
engine_state.config.use_ansi_coloring = false; engine_state.config.use_ansi_coloring = false;
// if there is a string or a stream, that is sent to the pipe std // Pipe input into the external command's stdin
if let Some(mut stdin_write) = child.as_mut().stdin.take() { if let Some(mut stdin_write) = child.as_mut().stdin.take() {
std::thread::spawn(move || { thread::Builder::new()
let input = crate::Table::run( .name("external stdin worker".to_string())
&crate::Table, .spawn(move || {
&engine_state, // Attempt to render the input as a table before piping it to the external.
&mut stack, // This is important for pagers like `less`;
&Call::new(head), // they need to get Nu data rendered for display to users.
input, //
); // TODO: should we do something different for list<string> inputs?
// Users often expect those to be piped to *nix tools as raw strings separated by newlines
let input = crate::Table::run(
&crate::Table,
&engine_state,
&mut stack,
&Call::new(head),
input,
);
if let Ok(input) = input { if let Ok(input) = input {
for value in input.into_iter() { for value in input.into_iter() {
let buf = match value { let buf = match value {
Value::String { val, .. } => val.into_bytes(), Value::String { val, .. } => val.into_bytes(),
Value::Binary { val, .. } => val, Value::Binary { val, .. } => val,
_ => return Err(()), _ => return Err(()),
}; };
if stdin_write.write(&buf).is_err() { if stdin_write.write(&buf).is_err() {
return Ok(()); return Ok(());
}
} }
} }
}
Ok(()) Ok(())
}); })
.expect("Failed to create thread");
} }
} }
@ -388,24 +398,26 @@ impl ExternalCommand {
let stdout = child.as_mut().stdout.take(); let stdout = child.as_mut().stdout.take();
let stderr = child.as_mut().stderr.take(); let stderr = child.as_mut().stderr.take();
// If this external is not the last expression, then its output is piped to a channel // If this external is not the last expression, then its output is piped to a channel
// and we create a ListStream that can be consumed // and we create a ListStream that can be consumed
//
// Create two threads: one for redirect stdout message, and wait for child process to complete.
// The other may be created when we want to redirect stderr message.
std::thread::spawn(move || {
if redirect_stdout {
let stdout = stdout.ok_or_else(|| {
ShellError::ExternalCommand(
"Error taking stdout from external".to_string(),
"Redirects need access to stdout of an external command"
.to_string(),
span,
)
})?;
read_and_redirect_message(stdout, stdout_tx, ctrlc) // First create a thread to redirect the external's stdout and wait for an exit code.
} thread::Builder::new()
.name("stdout redirector + exit code waiter".to_string())
.spawn(move || {
if redirect_stdout {
let stdout = stdout.ok_or_else(|| {
ShellError::ExternalCommand(
"Error taking stdout from external".to_string(),
"Redirects need access to stdout of an external command"
.to_string(),
span,
)
})?;
read_and_redirect_message(stdout, stdout_tx, ctrlc)
}
match child.as_mut().wait() { match child.as_mut().wait() {
Err(err) => Err(ShellError::ExternalCommand( Err(err) => Err(ShellError::ExternalCommand(
@ -462,23 +474,26 @@ impl ExternalCommand {
Ok(()) Ok(())
} }
} }
}); }).expect("Failed to create thread");
let (stderr_tx, stderr_rx) = mpsc::sync_channel(OUTPUT_BUFFERS_IN_FLIGHT); let (stderr_tx, stderr_rx) = mpsc::sync_channel(OUTPUT_BUFFERS_IN_FLIGHT);
if redirect_stderr { if redirect_stderr {
std::thread::spawn(move || { thread::Builder::new()
let stderr = stderr.ok_or_else(|| { .name("stderr redirector".to_string())
ShellError::ExternalCommand( .spawn(move || {
"Error taking stderr from external".to_string(), let stderr = stderr.ok_or_else(|| {
"Redirects need access to stderr of an external command" ShellError::ExternalCommand(
.to_string(), "Error taking stderr from external".to_string(),
span, "Redirects need access to stderr of an external command"
) .to_string(),
})?; span,
)
})?;
read_and_redirect_message(stderr, stderr_tx, stderr_ctrlc); read_and_redirect_message(stderr, stderr_tx, stderr_ctrlc);
Ok::<(), ShellError>(()) Ok::<(), ShellError>(())
}); })
.expect("Failed to create thread");
} }
let stdout_receiver = ChannelReceiver::new(stdout_rx); let stdout_receiver = ChannelReceiver::new(stdout_rx);

View File

@ -5,6 +5,7 @@ use crate::{
}; };
use nu_utils::{stderr_write_all_and_flush, stdout_write_all_and_flush}; use nu_utils::{stderr_write_all_and_flush, stdout_write_all_and_flush};
use std::sync::{atomic::AtomicBool, Arc}; use std::sync::{atomic::AtomicBool, Arc};
use std::thread;
const LINE_ENDING_PATTERN: &[char] = &['\r', '\n']; const LINE_ENDING_PATTERN: &[char] = &['\r', '\n'];
@ -726,8 +727,11 @@ pub fn print_if_stream(
exit_code: Option<ListStream>, exit_code: Option<ListStream>,
) -> Result<i64, ShellError> { ) -> Result<i64, ShellError> {
// NOTE: currently we don't need anything from stderr // NOTE: currently we don't need anything from stderr
// so directly consumes `stderr_stream` to make sure that everything is done. // so we just consume and throw away `stderr_stream` to make sure the pipe doesn't fill up
std::thread::spawn(move || stderr_stream.map(|x| x.into_bytes())); thread::Builder::new()
.name("stderr consumer".to_string())
.spawn(move || stderr_stream.map(|x| x.into_bytes()))
.expect("could not create thread");
if let Some(stream) = stream { if let Some(stream) = stream {
for s in stream { for s in stream {
let s_live = s?; let s_live = s?;