Add threads to the ls command in order to increase performance in some circumstances (#13836)

# Description

This PR tries to allow the `ls` command to use multiple threads if so
specified. The reason why you'd want to use threads is if you notice
`ls` taking a long time. The one place I see that happening is from WSL.

I'm not sure how real-world this test is but you can see that this
simple `ls` of a folder with length takes a while 9366 ms. I've run this
test many times and it ranges from about 15 seconds to about 10 seconds.
But with the `--threads` parameter, it takes less time, 2744ms in this
screenshot.

![image](https://github.com/user-attachments/assets/e5c4afa2-7837-4437-8e6e-5d4bc3894ae1)

The only way forward I could find was to _always_ use threading and
adjust the number of threads based on if the user provides a flag. That
seemed the easiest way to do it after applying @devyn's interleave
advice.

No feelings hurt if this doesn't land. It's more of an experiment but I
think it has potential.

# User-Facing Changes
<!-- List of all changes that impact the user experience here. This
helps us keep track of breaking changes. -->

# Tests + Formatting
<!--
Don't forget to add tests that cover your changes.

Make sure you've run and fixed any issues with these commands:

- `cargo fmt --all -- --check` to check standard code formatting (`cargo
fmt --all` applies these changes)
- `cargo clippy --workspace -- -D warnings -D clippy::unwrap_used` to
check that you're using the standard code style
- `cargo test --workspace` to check that all tests pass (on Windows make
sure to [enable developer
mode](https://learn.microsoft.com/en-us/windows/apps/get-started/developer-mode-features-and-debugging))
- `cargo run -- -c "use toolkit.nu; toolkit test stdlib"` to run the
tests for the standard library

> **Note**
> from `nushell` you can also use the `toolkit` as follows
> ```bash
> use toolkit.nu # or use an `env_change` hook to activate it
automatically
> toolkit check pr
> ```
-->

# After Submitting
<!-- If your PR had any user-facing changes, update [the
documentation](https://github.com/nushell/nushell.github.io) after the
PR is merged, if necessary. This will help us keep the docs up to date.
-->
This commit is contained in:
Darren Schroeder 2024-09-24 08:40:48 -05:00 committed by GitHub
parent 151767a5e3
commit 65bb0ff167
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 156 additions and 88 deletions

View File

@ -942,7 +942,7 @@ fn flag_completions() {
// Test completions for the 'ls' flags
let suggestions = completer.complete("ls -", 4);
assert_eq!(16, suggestions.len());
assert_eq!(18, suggestions.len());
let expected: Vec<String> = vec![
"--all".into(),
@ -953,6 +953,7 @@ fn flag_completions() {
"--long".into(),
"--mime-type".into(),
"--short-names".into(),
"--threads".into(),
"-D".into(),
"-a".into(),
"-d".into(),
@ -961,6 +962,7 @@ fn flag_completions() {
"-l".into(),
"-m".into(),
"-s".into(),
"-t".into(),
];
// Match results

View File

@ -8,11 +8,14 @@ use nu_glob::MatchOptions;
use nu_path::{expand_path_with, expand_to_real_path};
use nu_protocol::{DataSource, NuGlob, PipelineMetadata, Signals};
use pathdiff::diff_paths;
use rayon::prelude::*;
#[cfg(unix)]
use std::os::unix::fs::PermissionsExt;
use std::{
path::PathBuf,
sync::mpsc,
sync::{Arc, Mutex},
time::{SystemTime, UNIX_EPOCH},
};
@ -28,6 +31,7 @@ struct Args {
du: bool,
directory: bool,
use_mime_type: bool,
use_threads: bool,
call_span: Span,
}
@ -75,6 +79,7 @@ impl Command for Ls {
Some('D'),
)
.switch("mime-type", "Show mime-type in type column instead of 'file' (based on filenames only; files' contents are not examined)", Some('m'))
.switch("threads", "Use multiple threads to list contents. Output will be non-deterministic.", Some('t'))
.category(Category::FileSystem)
}
@ -92,6 +97,7 @@ impl Command for Ls {
let du = call.has_flag(engine_state, stack, "du")?;
let directory = call.has_flag(engine_state, stack, "directory")?;
let use_mime_type = call.has_flag(engine_state, stack, "mime-type")?;
let use_threads = call.has_flag(engine_state, stack, "threads")?;
let call_span = call.head;
#[allow(deprecated)]
let cwd = current_dir(engine_state, stack)?;
@ -104,6 +110,7 @@ impl Command for Ls {
du,
directory,
use_mime_type,
use_threads,
call_span,
};
@ -114,22 +121,24 @@ impl Command for Ls {
Some(pattern_arg)
};
match input_pattern_arg {
None => Ok(ls_for_one_pattern(None, args, engine_state.signals(), cwd)?
.into_pipeline_data_with_metadata(
call_span,
engine_state.signals().clone(),
PipelineMetadata {
data_source: DataSource::Ls,
content_type: None,
},
)),
None => Ok(
ls_for_one_pattern(None, args, engine_state.signals().clone(), cwd)?
.into_pipeline_data_with_metadata(
call_span,
engine_state.signals().clone(),
PipelineMetadata {
data_source: DataSource::Ls,
content_type: None,
},
),
),
Some(pattern) => {
let mut result_iters = vec![];
for pat in pattern {
result_iters.push(ls_for_one_pattern(
Some(pat),
args,
engine_state.signals(),
engine_state.signals().clone(),
cwd.clone(),
)?)
}
@ -213,9 +222,27 @@ impl Command for Ls {
fn ls_for_one_pattern(
pattern_arg: Option<Spanned<NuGlob>>,
args: Args,
signals: &Signals,
signals: Signals,
cwd: PathBuf,
) -> Result<Box<dyn Iterator<Item = Value> + Send>, ShellError> {
) -> Result<PipelineData, ShellError> {
fn create_pool(num_threads: usize) -> Result<rayon::ThreadPool, ShellError> {
match rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.build()
{
Err(e) => Err(e).map_err(|e| ShellError::GenericError {
error: "Error creating thread pool".into(),
msg: e.to_string(),
span: Some(Span::unknown()),
help: None,
inner: vec![],
}),
Ok(pool) => Ok(pool),
}
}
let (tx, rx) = mpsc::channel();
let Args {
all,
long,
@ -224,6 +251,7 @@ fn ls_for_one_pattern(
du,
directory,
use_mime_type,
use_threads,
call_span,
} = args;
let pattern_arg = {
@ -281,7 +309,7 @@ fn ls_for_one_pattern(
});
}
if is_empty_dir(&tmp_expanded) {
return Ok(Box::new(vec![].into_iter()));
return Ok(Value::test_nothing().into_pipeline_data());
}
just_read_dir = !(pat.item.is_expand() && pat.item.as_ref().contains(GLOB_CHARS));
}
@ -300,7 +328,7 @@ fn ls_for_one_pattern(
if directory {
(NuGlob::Expand(".".to_string()), false)
} else if is_empty_dir(&cwd) {
return Ok(Box::new(vec![].into_iter()));
return Ok(Value::test_nothing().into_pipeline_data());
} else {
(NuGlob::Expand("*".to_string()), false)
}
@ -338,92 +366,130 @@ fn ls_for_one_pattern(
});
}
let mut hidden_dirs = vec![];
let hidden_dirs = Arc::new(Mutex::new(Vec::new()));
let signals = signals.clone();
Ok(Box::new(paths_peek.filter_map(move |x| match x {
Ok(path) => {
let metadata = match std::fs::symlink_metadata(&path) {
Ok(metadata) => Some(metadata),
Err(_) => None,
};
if path_contains_hidden_folder(&path, &hidden_dirs) {
return None;
}
let signals_clone = signals.clone();
if !all && !hidden_dir_specified && is_hidden_dir(&path) {
if path.is_dir() {
hidden_dirs.push(path);
}
return None;
}
let pool = if use_threads {
let count = std::thread::available_parallelism()?.get();
create_pool(count)?
} else {
create_pool(1)?
};
let display_name = if short_names {
path.file_name().map(|os| os.to_string_lossy().to_string())
} else if full_paths || absolute_path {
Some(path.to_string_lossy().to_string())
} else if let Some(prefix) = &prefix {
if let Ok(remainder) = path.strip_prefix(prefix) {
if directory {
// When the path is the same as the cwd, path_diff should be "."
let path_diff = if let Some(path_diff_not_dot) = diff_paths(&path, &cwd) {
let path_diff_not_dot = path_diff_not_dot.to_string_lossy();
if path_diff_not_dot.is_empty() {
".".to_string()
pool.install(|| {
paths_peek
.par_bridge()
.filter_map(move |x| match x {
Ok(path) => {
let metadata = match std::fs::symlink_metadata(&path) {
Ok(metadata) => Some(metadata),
Err(_) => None,
};
let hidden_dir_clone = Arc::clone(&hidden_dirs);
let mut hidden_dir_mutex = hidden_dir_clone
.lock()
.expect("Unable to acquire lock for hidden_dirs");
if path_contains_hidden_folder(&path, &hidden_dir_mutex) {
return None;
}
if !all && !hidden_dir_specified && is_hidden_dir(&path) {
if path.is_dir() {
hidden_dir_mutex.push(path);
drop(hidden_dir_mutex);
}
return None;
}
let display_name = if short_names {
path.file_name().map(|os| os.to_string_lossy().to_string())
} else if full_paths || absolute_path {
Some(path.to_string_lossy().to_string())
} else if let Some(prefix) = &prefix {
if let Ok(remainder) = path.strip_prefix(prefix) {
if directory {
// When the path is the same as the cwd, path_diff should be "."
let path_diff =
if let Some(path_diff_not_dot) = diff_paths(&path, &cwd) {
let path_diff_not_dot = path_diff_not_dot.to_string_lossy();
if path_diff_not_dot.is_empty() {
".".to_string()
} else {
path_diff_not_dot.to_string()
}
} else {
path.to_string_lossy().to_string()
};
Some(path_diff)
} else {
path_diff_not_dot.to_string()
let new_prefix = if let Some(pfx) = diff_paths(prefix, &cwd) {
pfx
} else {
prefix.to_path_buf()
};
Some(new_prefix.join(remainder).to_string_lossy().to_string())
}
} else {
path.to_string_lossy().to_string()
};
Some(path_diff)
Some(path.to_string_lossy().to_string())
}
} else {
let new_prefix = if let Some(pfx) = diff_paths(prefix, &cwd) {
pfx
} else {
prefix.to_path_buf()
};
Some(new_prefix.join(remainder).to_string_lossy().to_string())
Some(path.to_string_lossy().to_string())
}
} else {
Some(path.to_string_lossy().to_string())
}
} else {
Some(path.to_string_lossy().to_string())
}
.ok_or_else(|| ShellError::GenericError {
error: format!("Invalid file name: {:}", path.to_string_lossy()),
msg: "invalid file name".into(),
span: Some(call_span),
help: None,
inner: vec![],
});
.ok_or_else(|| ShellError::GenericError {
error: format!("Invalid file name: {:}", path.to_string_lossy()),
msg: "invalid file name".into(),
span: Some(call_span),
help: None,
inner: vec![],
});
match display_name {
Ok(name) => {
let entry = dir_entry_dict(
&path,
&name,
metadata.as_ref(),
call_span,
long,
du,
&signals,
use_mime_type,
args.full_paths,
);
match entry {
Ok(value) => Some(value),
match display_name {
Ok(name) => {
let entry = dir_entry_dict(
&path,
&name,
metadata.as_ref(),
call_span,
long,
du,
&signals_clone,
use_mime_type,
args.full_paths,
);
match entry {
Ok(value) => Some(value),
Err(err) => Some(Value::error(err, call_span)),
}
}
Err(err) => Some(Value::error(err, call_span)),
}
}
Err(err) => Some(Value::error(err, call_span)),
}
}
Err(err) => Some(Value::error(err, call_span)),
})))
})
.try_for_each(|stream| {
tx.send(stream).map_err(|e| ShellError::GenericError {
error: "Error streaming data".into(),
msg: e.to_string(),
span: Some(call_span),
help: None,
inner: vec![],
})
})
})
.map_err(|err| ShellError::GenericError {
error: "Unable to create a rayon pool".into(),
msg: err.to_string(),
span: Some(call_span),
help: None,
inner: vec![],
})?;
Ok(rx
.into_iter()
.into_pipeline_data(call_span, signals.clone()))
}
fn permission_denied(dir: impl AsRef<Path>) -> bool {