allow select to stream more (#14492)

# Description

closes https://github.com/nushell/nushell/issues/14487

This PR tries to allow the `select` to stream better by changing the for
loops that collected the output into a `Vec<Value>` prior to returning
it into a map that returns the data as it is processed.

One curiosity, `select` transforms the input into a `PipelineIterator`.
If I remove this code, it still passes all tests. I'm not sure all this
`PipelineIterator` code is even needed. I left it for someone to tell me
if it's necessary.

# User-Facing Changes
<!-- List of all changes that impact the user experience here. This
helps us keep track of breaking changes. -->

# Tests + Formatting
<!--
Don't forget to add tests that cover your changes.

Make sure you've run and fixed any issues with these commands:

- `cargo fmt --all -- --check` to check standard code formatting (`cargo
fmt --all` applies these changes)
- `cargo clippy --workspace -- -D warnings -D clippy::unwrap_used` to
check that you're using the standard code style
- `cargo test --workspace` to check that all tests pass (on Windows make
sure to [enable developer
mode](https://learn.microsoft.com/en-us/windows/apps/get-started/developer-mode-features-and-debugging))
- `cargo run -- -c "use toolkit.nu; toolkit test stdlib"` to run the
tests for the standard library

> **Note**
> from `nushell` you can also use the `toolkit` as follows
> ```bash
> use toolkit.nu # or use an `env_change` hook to activate it
automatically
> toolkit check pr
> ```
-->

# After Submitting
<!-- If your PR had any user-facing changes, update [the
documentation](https://github.com/nushell/nushell.github.io) after the
PR is merged, if necessary. This will help us keep the docs up to date.
-->
This commit is contained in:
Darren Schroeder 2024-12-03 20:45:31 -06:00 committed by GitHub
parent 217be24963
commit b2d8bd08f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 42 additions and 52 deletions

View File

@ -206,7 +206,6 @@ fn select(
let columns = new_columns; let columns = new_columns;
let input = if !unique_rows.is_empty() { let input = if !unique_rows.is_empty() {
// let skip = call.has_flag(engine_state, stack, "skip")?;
let metadata = input.metadata(); let metadata = input.metadata();
let pipeline_iter: PipelineIterator = input.into_iter(); let pipeline_iter: PipelineIterator = input.into_iter();
@ -231,9 +230,9 @@ fn select(
Value::List { Value::List {
vals: input_vals, .. vals: input_vals, ..
} => { } => {
let mut output = vec![]; Ok(input_vals
let mut columns_with_value = Vec::new(); .into_iter()
for input_val in input_vals { .map(move |input_val| {
if !columns.is_empty() { if !columns.is_empty() {
let mut record = Record::new(); let mut record = Record::new();
for path in &columns { for path in &columns {
@ -241,23 +240,17 @@ fn select(
match input_val.clone().follow_cell_path(&path.members, false) { match input_val.clone().follow_cell_path(&path.members, false) {
Ok(fetcher) => { Ok(fetcher) => {
record.push(path.to_column_name(), fetcher); record.push(path.to_column_name(), fetcher);
if !columns_with_value.contains(&path) {
columns_with_value.push(path);
}
}
Err(e) => {
return Err(e);
} }
Err(e) => return Value::error(e, call_span),
} }
} }
output.push(Value::record(record, span)) Value::record(record, span)
} else { } else {
output.push(input_val) input_val.clone()
} }
} })
.into_pipeline_data_with_metadata(
Ok(output.into_iter().into_pipeline_data_with_metadata(
call_span, call_span,
engine_state.signals().clone(), engine_state.signals().clone(),
metadata, metadata,
@ -286,9 +279,8 @@ fn select(
} }
} }
PipelineData::ListStream(stream, metadata, ..) => { PipelineData::ListStream(stream, metadata, ..) => {
let mut values = vec![]; Ok(stream
.map(move |x| {
for x in stream {
if !columns.is_empty() { if !columns.is_empty() {
let mut record = Record::new(); let mut record = Record::new();
for path in &columns { for path in &columns {
@ -297,16 +289,15 @@ fn select(
Ok(value) => { Ok(value) => {
record.push(path.to_column_name(), value); record.push(path.to_column_name(), value);
} }
Err(e) => return Err(e), Err(e) => return Value::error(e, call_span),
} }
} }
values.push(Value::record(record, call_span)); Value::record(record, call_span)
} else { } else {
values.push(x); x
} }
} })
.into_pipeline_data_with_metadata(
Ok(values.into_pipeline_data_with_metadata(
call_span, call_span,
engine_state.signals().clone(), engine_state.signals().clone(),
metadata, metadata,

View File

@ -63,7 +63,7 @@ fn complex_nested_columns() {
fn fails_if_given_unknown_column_name() { fn fails_if_given_unknown_column_name() {
let actual = nu!(pipeline( let actual = nu!(pipeline(
r#" r#"
echo [ [
[first_name, last_name, rusty_at, type]; [first_name, last_name, rusty_at, type];
[Andrés Robalino '10/11/2013' A] [Andrés Robalino '10/11/2013' A]
@ -71,7 +71,6 @@ fn fails_if_given_unknown_column_name() {
[Yehuda Katz '10/11/2013' A] [Yehuda Katz '10/11/2013' A]
] ]
| select rrusty_at first_name | select rrusty_at first_name
| length
"# "#
)); ));