From 0a0475ebad1a9d73772288be026dd5780d8ea120 Mon Sep 17 00:00:00 2001 From: Renan Ribeiro <55855728+cosineblast@users.noreply.github.com> Date: Wed, 25 Dec 2024 11:13:05 -0300 Subject: [PATCH] add streaming to `get` and `reject` (#14622) Closes #14487. # Description `get` and `reject` now stream properly: Before: ![image](https://github.com/user-attachments/assets/57ecb705-1f98-49a4-a47e-27bba1c6c732) Now: ![image](https://github.com/user-attachments/assets/dc5c7fba-e1ef-46d2-bd78-fd777b9e9dad) # User-Facing Changes # Tests + Formatting # After Submitting --------- Co-authored-by: Wind Co-authored-by: 132ikl <132@ikl.sh> --- crates/nu-command/src/filters/get.rs | 54 ++++++++++++++++++-- crates/nu-command/src/filters/reject.rs | 37 +++++++++++--- crates/nu-command/tests/commands/split_by.rs | 9 ++-- 3 files changed, 88 insertions(+), 12 deletions(-) diff --git a/crates/nu-command/src/filters/get.rs b/crates/nu-command/src/filters/get.rs index a79c1c48f4..3f12e66814 100644 --- a/crates/nu-command/src/filters/get.rs +++ b/crates/nu-command/src/filters/get.rs @@ -1,4 +1,5 @@ use nu_engine::command_prelude::*; +use nu_protocol::{ast::PathMember, Signals}; #[derive(Clone)] pub struct Get; @@ -72,9 +73,13 @@ If multiple cell paths are given, this will produce a list of values."# } if rest.is_empty() { - input - .follow_cell_path(&cell_path.members, call.head, !sensitive) - .map(|x| x.into_pipeline_data()) + follow_cell_path_into_stream( + input, + engine_state.signals().clone(), + cell_path.members, + call.head, + !sensitive, + ) } else { let mut output = vec![]; @@ -94,6 +99,7 @@ If multiple cell paths are given, this will produce a list of values."# } .map(|x| x.set_metadata(metadata)) } + fn examples(&self) -> Vec { vec![ Example { @@ -139,6 +145,48 @@ If multiple cell paths are given, this will produce a list of values."# } } +// the PipelineData.follow_cell_path function, when given a +// stream, collects it into a vec before doing its job +// +// this is fine, since it returns a Result, +// but if we want to follow a PipelineData into a cell path and +// return another PipelineData, then we have to take care to +// make sure it streams +pub fn follow_cell_path_into_stream( + data: PipelineData, + signals: Signals, + cell_path: Vec, + head: Span, + insensitive: bool, +) -> Result { + // when given an integer/indexing, we fallback to + // the default nushell indexing behaviour + let has_int_member = cell_path + .iter() + .any(|it| matches!(it, PathMember::Int { .. })); + match data { + PipelineData::ListStream(stream, ..) if !has_int_member => { + let result = stream + .into_iter() + .map(move |value| { + let span = value.span(); + + match value.follow_cell_path(&cell_path, insensitive) { + Ok(v) => v, + Err(error) => Value::error(error, span), + } + }) + .into_pipeline_data(head, signals); + + Ok(result) + } + + _ => data + .follow_cell_path(&cell_path, head, insensitive) + .map(|x| x.into_pipeline_data()), + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/nu-command/src/filters/reject.rs b/crates/nu-command/src/filters/reject.rs index 3f966e2da5..058ad1d70b 100644 --- a/crates/nu-command/src/filters/reject.rs +++ b/crates/nu-command/src/filters/reject.rs @@ -166,15 +166,13 @@ impl Command for Reject { } fn reject( - _engine_state: &EngineState, + engine_state: &EngineState, span: Span, input: PipelineData, cell_paths: Vec, ) -> Result { let mut unique_rows: HashSet = HashSet::new(); let metadata = input.metadata(); - let val = input.into_value(span)?; - let mut val = val; let mut new_columns = vec![]; let mut new_rows = vec![]; for column in cell_paths { @@ -212,10 +210,37 @@ fn reject( }); new_columns.append(&mut new_rows); - for cell_path in new_columns { - val.remove_data_at_cell_path(&cell_path.members)?; + + match input { + PipelineData::ListStream(stream, ..) => { + let result = stream + .into_iter() + .map(move |mut value| { + let span = value.span(); + + for cell_path in new_columns.iter() { + if let Err(error) = value.remove_data_at_cell_path(&cell_path.members) { + return Value::error(error, span); + } + } + + value + }) + .into_pipeline_data(span, engine_state.signals().clone()); + + Ok(result) + } + + input => { + let mut val = input.into_value(span)?; + + for cell_path in new_columns { + val.remove_data_at_cell_path(&cell_path.members)?; + } + + Ok(val.into_pipeline_data_with_metadata(metadata)) + } } - Ok(val.into_pipeline_data_with_metadata(metadata)) } #[cfg(test)] diff --git a/crates/nu-command/tests/commands/split_by.rs b/crates/nu-command/tests/commands/split_by.rs index ab4631d079..1f648a1fcf 100644 --- a/crates/nu-command/tests/commands/split_by.rs +++ b/crates/nu-command/tests/commands/split_by.rs @@ -56,8 +56,11 @@ fn errors_if_non_record_input() { " )); - assert!(only_supports - .err - .contains("only Record input data is supported")); + assert!( + only_supports + .err + .contains("only Record input data is supported") + || only_supports.err.contains("expected: record") + ); }) }