From c19944f2916e023ef2325331ebedd4d36be5a46e Mon Sep 17 00:00:00 2001 From: Ian Manske Date: Fri, 19 Jul 2024 04:16:09 +0000 Subject: [PATCH] Refactor `window` (#13401) # Description Following from #13377, this PR refactors the related `window` command. In the case where `window` should behave exactly like `chunks`, it now reuses the same code that `chunks` does. Otherwise, the other cases have been rewritten and have resulted in a performance increase: | window size | stride | old time (ms) | new time (ms) | | -----------:| ------:| -------------:| -------------:| | 20 | 1 | 757 | 722 | | 2 | 1 | 364 | 333 | | 1 | 1 | 343 | 293 | | 20 | 20 | 90 | 63 | | 2 | 2 | 215 | 175 | | 20 | 30 | 74 | 60 | | 2 | 4 | 141 | 110 | # User-Facing Changes `window` will now error if the window size or stride is 0, which is technically a breaking change. --- crates/nu-command/src/filters/chunks.rs | 48 ++-- crates/nu-command/src/filters/window.rs | 307 +++++++++++---------- crates/nu-command/tests/commands/mod.rs | 1 + crates/nu-command/tests/commands/window.rs | 103 +++++++ 4 files changed, 297 insertions(+), 162 deletions(-) create mode 100644 crates/nu-command/tests/commands/window.rs diff --git a/crates/nu-command/src/filters/chunks.rs b/crates/nu-command/src/filters/chunks.rs index 11a59c2aa9..68d682ecd1 100644 --- a/crates/nu-command/src/filters/chunks.rs +++ b/crates/nu-command/src/filters/chunks.rs @@ -1,5 +1,6 @@ use nu_engine::command_prelude::*; use nu_protocol::ListStream; +use std::num::NonZeroUsize; #[derive(Clone)] pub struct Chunks; @@ -89,26 +90,33 @@ impl Command for Chunks { span: chunk_size.span(), })?; - if size == 0 { - return Err(ShellError::IncorrectValue { - msg: "`chunk_size` cannot be zero".into(), - val_span: chunk_size.span(), - call_span: head, - }); - } + let size = NonZeroUsize::try_from(size).map_err(|_| ShellError::IncorrectValue { + msg: "`chunk_size` cannot be zero".into(), + val_span: chunk_size.span(), + call_span: head, + })?; - match input { - PipelineData::Value(Value::List { vals, .. }, metadata) => { - let chunks = ChunksIter::new(vals, size, head); - let stream = ListStream::new(chunks, head, engine_state.signals().clone()); - Ok(PipelineData::ListStream(stream, metadata)) - } - PipelineData::ListStream(stream, metadata) => { - let stream = stream.modify(|iter| ChunksIter::new(iter, size, head)); - Ok(PipelineData::ListStream(stream, metadata)) - } - input => Err(input.unsupported_input_error("list", head)), + chunks(engine_state, input, size, head) + } +} + +pub fn chunks( + engine_state: &EngineState, + input: PipelineData, + chunk_size: NonZeroUsize, + span: Span, +) -> Result { + match input { + PipelineData::Value(Value::List { vals, .. }, metadata) => { + let chunks = ChunksIter::new(vals, chunk_size, span); + let stream = ListStream::new(chunks, span, engine_state.signals().clone()); + Ok(PipelineData::ListStream(stream, metadata)) } + PipelineData::ListStream(stream, metadata) => { + let stream = stream.modify(|iter| ChunksIter::new(iter, chunk_size, span)); + Ok(PipelineData::ListStream(stream, metadata)) + } + input => Err(input.unsupported_input_error("list", span)), } } @@ -119,10 +127,10 @@ struct ChunksIter> { } impl> ChunksIter { - fn new(iter: impl IntoIterator, size: usize, span: Span) -> Self { + fn new(iter: impl IntoIterator, size: NonZeroUsize, span: Span) -> Self { Self { iter: iter.into_iter(), - size, + size: size.into(), span, } } diff --git a/crates/nu-command/src/filters/window.rs b/crates/nu-command/src/filters/window.rs index 3c470f478d..556543118b 100644 --- a/crates/nu-command/src/filters/window.rs +++ b/crates/nu-command/src/filters/window.rs @@ -1,5 +1,6 @@ use nu_engine::command_prelude::*; -use nu_protocol::ValueIterator; +use nu_protocol::ListStream; +use std::num::NonZeroUsize; #[derive(Clone)] pub struct Window; @@ -12,8 +13,8 @@ impl Command for Window { fn signature(&self) -> Signature { Signature::build("window") .input_output_types(vec![( - Type::List(Box::new(Type::Any)), - Type::List(Box::new(Type::List(Box::new(Type::Any)))), + Type::list(Type::Any), + Type::list(Type::list(Type::Any)), )]) .required("window_size", SyntaxShape::Int, "The size of each window.") .named( @@ -34,72 +35,41 @@ impl Command for Window { "Creates a sliding window of `window_size` that slide by n rows/elements across input." } + fn extra_usage(&self) -> &str { + "This command will error if `window_size` or `stride` are negative or zero." + } + fn examples(&self) -> Vec { - let stream_test_1 = vec![ - Value::list( - vec![Value::test_int(1), Value::test_int(2)], - Span::test_data(), - ), - Value::list( - vec![Value::test_int(2), Value::test_int(3)], - Span::test_data(), - ), - Value::list( - vec![Value::test_int(3), Value::test_int(4)], - Span::test_data(), - ), - ]; - - let stream_test_2 = vec![ - Value::list( - vec![Value::test_int(1), Value::test_int(2)], - Span::test_data(), - ), - Value::list( - vec![Value::test_int(4), Value::test_int(5)], - Span::test_data(), - ), - Value::list( - vec![Value::test_int(7), Value::test_int(8)], - Span::test_data(), - ), - ]; - - let stream_test_3 = vec![ - Value::list( - vec![Value::test_int(1), Value::test_int(2), Value::test_int(3)], - Span::test_data(), - ), - Value::list( - vec![Value::test_int(4), Value::test_int(5)], - Span::test_data(), - ), - ]; - vec![ Example { example: "[1 2 3 4] | window 2", description: "A sliding window of two elements", - result: Some(Value::list( - stream_test_1, - Span::test_data(), - )), + result: Some(Value::test_list(vec![ + Value::test_list(vec![Value::test_int(1), Value::test_int(2)]), + Value::test_list(vec![Value::test_int(2), Value::test_int(3)]), + Value::test_list(vec![Value::test_int(3), Value::test_int(4)]), + ])), }, Example { example: "[1, 2, 3, 4, 5, 6, 7, 8] | window 2 --stride 3", description: "A sliding window of two elements, with a stride of 3", - result: Some(Value::list( - stream_test_2, - Span::test_data(), - )), + result: Some(Value::test_list(vec![ + Value::test_list(vec![Value::test_int(1), Value::test_int(2)]), + Value::test_list(vec![Value::test_int(4), Value::test_int(5)]), + Value::test_list(vec![Value::test_int(7), Value::test_int(8)]), + ])), }, Example { example: "[1, 2, 3, 4, 5] | window 3 --stride 3 --remainder", description: "A sliding window of equal stride that includes remainder. Equivalent to chunking", - result: Some(Value::list( - stream_test_3, - Span::test_data(), - )), + result: Some(Value::test_list(vec![ + Value::test_list(vec![ + Value::test_int(1), + Value::test_int(2), + Value::test_int(3), + ]), + Value::test_list(vec![Value::test_int(4), Value::test_int(5)]), + ])), }, ] } @@ -112,116 +82,169 @@ impl Command for Window { input: PipelineData, ) -> Result { let head = call.head; - let group_size: Spanned = call.req(engine_state, stack, 0)?; - let metadata = input.metadata(); - let stride: Option = call.get_flag(engine_state, stack, "stride")?; + let window_size: Value = call.req(engine_state, stack, 0)?; + let stride: Option = call.get_flag(engine_state, stack, "stride")?; let remainder = call.has_flag(engine_state, stack, "remainder")?; - let stride = stride.unwrap_or(1); + let size = + usize::try_from(window_size.as_int()?).map_err(|_| ShellError::NeedsPositiveValue { + span: window_size.span(), + })?; - //FIXME: add in support for external redirection when engine-q supports it generally + let size = NonZeroUsize::try_from(size).map_err(|_| ShellError::IncorrectValue { + msg: "`window_size` cannot be zero".into(), + val_span: window_size.span(), + call_span: head, + })?; - let each_group_iterator = EachWindowIterator { - group_size: group_size.item, - input: Box::new(input.into_iter()), - span: head, - previous: None, - stride, - remainder, + let stride = if let Some(stride_val) = stride { + let stride = usize::try_from(stride_val.as_int()?).map_err(|_| { + ShellError::NeedsPositiveValue { + span: stride_val.span(), + } + })?; + + NonZeroUsize::try_from(stride).map_err(|_| ShellError::IncorrectValue { + msg: "`stride` cannot be zero".into(), + val_span: stride_val.span(), + call_span: head, + })? + } else { + NonZeroUsize::MIN }; - Ok(each_group_iterator.into_pipeline_data_with_metadata( - head, - engine_state.signals().clone(), - metadata, - )) + if remainder && size == stride { + super::chunks::chunks(engine_state, input, size, head) + } else if stride >= size { + match input { + PipelineData::Value(Value::List { vals, .. }, metadata) => { + let chunks = WindowGapIter::new(vals, size, stride, remainder, head); + let stream = ListStream::new(chunks, head, engine_state.signals().clone()); + Ok(PipelineData::ListStream(stream, metadata)) + } + PipelineData::ListStream(stream, metadata) => { + let stream = stream + .modify(|iter| WindowGapIter::new(iter, size, stride, remainder, head)); + Ok(PipelineData::ListStream(stream, metadata)) + } + input => Err(input.unsupported_input_error("list", head)), + } + } else { + match input { + PipelineData::Value(Value::List { vals, .. }, metadata) => { + let chunks = WindowOverlapIter::new(vals, size, stride, remainder, head); + let stream = ListStream::new(chunks, head, engine_state.signals().clone()); + Ok(PipelineData::ListStream(stream, metadata)) + } + PipelineData::ListStream(stream, metadata) => { + let stream = stream + .modify(|iter| WindowOverlapIter::new(iter, size, stride, remainder, head)); + Ok(PipelineData::ListStream(stream, metadata)) + } + input => Err(input.unsupported_input_error("list", head)), + } + } } } -struct EachWindowIterator { - group_size: usize, - input: ValueIterator, - span: Span, - previous: Option>, +struct WindowOverlapIter> { + iter: I, + window: Vec, stride: usize, remainder: bool, + span: Span, } -impl Iterator for EachWindowIterator { +impl> WindowOverlapIter { + fn new( + iter: impl IntoIterator, + size: NonZeroUsize, + stride: NonZeroUsize, + remainder: bool, + span: Span, + ) -> Self { + Self { + iter: iter.into_iter(), + window: Vec::with_capacity(size.into()), + stride: stride.into(), + remainder, + span, + } + } +} + +impl> Iterator for WindowOverlapIter { type Item = Value; fn next(&mut self) -> Option { - let mut group = self.previous.take().unwrap_or_else(|| { - let mut vec = Vec::new(); - - // We default to a Vec of capacity size + stride as striding pushes n extra elements to the end - vec.try_reserve(self.group_size + self.stride).ok(); - - vec - }); - let mut current_count = 0; - - if group.is_empty() { - loop { - let item = self.input.next(); - - match item { - Some(v) => { - group.push(v); - - current_count += 1; - if current_count >= self.group_size { - break; - } - } - None => { - if self.remainder { - break; - } else { - return None; - } - } - } - } + let len = if self.window.is_empty() { + self.window.capacity() } else { - // our historic buffer is already full, so stride instead + self.stride + }; - loop { - let item = self.input.next(); + self.window.extend((&mut self.iter).take(len)); - match item { - Some(v) => { - group.push(v); - - current_count += 1; - if current_count >= self.stride { - break; - } - } - None => { - if self.remainder { - break; - } else { - return None; - } - } - } - } - - // We now have elements + stride in our group, and need to - // drop the skipped elements. Drain to preserve allocation and capacity - // Dropping this iterator consumes it. - group.drain(..self.stride.min(group.len())); + if self.window.len() == self.window.capacity() + || (self.remainder && !self.window.is_empty()) + { + let mut next = Vec::with_capacity(self.window.len()); + next.extend(self.window.iter().skip(self.stride).cloned()); + let window = std::mem::replace(&mut self.window, next); + Some(Value::list(window, self.span)) + } else { + None } + } +} - if group.is_empty() { - return None; +struct WindowGapIter> { + iter: I, + size: usize, + skip: usize, + first: bool, + remainder: bool, + span: Span, +} + +impl> WindowGapIter { + fn new( + iter: impl IntoIterator, + size: NonZeroUsize, + stride: NonZeroUsize, + remainder: bool, + span: Span, + ) -> Self { + let size = size.into(); + Self { + iter: iter.into_iter(), + size, + skip: stride.get() - size, + first: true, + remainder, + span, } + } +} - let return_group = group.clone(); - self.previous = Some(group); +impl> Iterator for WindowGapIter { + type Item = Value; - Some(Value::list(return_group, self.span)) + fn next(&mut self) -> Option { + let mut window = Vec::with_capacity(self.size); + window.extend( + (&mut self.iter) + .skip(if self.first { 0 } else { self.skip }) + .take(self.size), + ); + + self.first = false; + + if window.len() == self.size || (self.remainder && !window.is_empty()) { + Some(Value::list(window, self.span)) + } else { + None + } } } diff --git a/crates/nu-command/tests/commands/mod.rs b/crates/nu-command/tests/commands/mod.rs index 4c9f8e86f8..3cb468ad30 100644 --- a/crates/nu-command/tests/commands/mod.rs +++ b/crates/nu-command/tests/commands/mod.rs @@ -115,6 +115,7 @@ mod try_; mod ucp; #[cfg(unix)] mod ulimit; +mod window; mod debug; mod umkdir; diff --git a/crates/nu-command/tests/commands/window.rs b/crates/nu-command/tests/commands/window.rs new file mode 100644 index 0000000000..6e4addc6c0 --- /dev/null +++ b/crates/nu-command/tests/commands/window.rs @@ -0,0 +1,103 @@ +use nu_test_support::nu; + +#[test] +fn window_size_negative() { + let actual = nu!("[0 1 2] | window -1"); + assert!(actual.err.contains("positive")); +} + +#[test] +fn window_size_zero() { + let actual = nu!("[0 1 2] | window 0"); + assert!(actual.err.contains("zero")); +} + +#[test] +fn window_size_not_int() { + let actual = nu!("[0 1 2] | window (if true { 1sec })"); + assert!(actual.err.contains("can't convert")); +} + +#[test] +fn stride_negative() { + let actual = nu!("[0 1 2] | window 1 -s -1"); + assert!(actual.err.contains("positive")); +} + +#[test] +fn stride_zero() { + let actual = nu!("[0 1 2] | window 1 -s 0"); + assert!(actual.err.contains("zero")); +} + +#[test] +fn stride_not_int() { + let actual = nu!("[0 1 2] | window 1 -s (if true { 1sec })"); + assert!(actual.err.contains("can't convert")); +} + +#[test] +fn empty() { + let actual = nu!("[] | window 2 | is-empty"); + assert_eq!(actual.out, "true"); +} + +#[test] +fn list_stream() { + let actual = nu!("([0 1 2] | every 1 | window 2) == ([0 1 2] | window 2)"); + assert_eq!(actual.out, "true"); +} + +#[test] +fn table_stream() { + let actual = nu!("([[foo bar]; [0 1] [2 3] [4 5]] | every 1 | window 2) == ([[foo bar]; [0 1] [2 3] [4 5]] | window 2)"); + assert_eq!(actual.out, "true"); +} + +#[test] +fn no_empty_chunks() { + let actual = nu!("([0 1 2 3 4 5] | window 3 -s 3 -r | length) == 2"); + assert_eq!(actual.out, "true"); +} + +#[test] +fn same_as_chunks() { + let actual = nu!("([0 1 2 3 4] | window 2 -s 2 -r) == ([0 1 2 3 4 ] | chunks 2)"); + assert_eq!(actual.out, "true"); +} + +#[test] +fn stride_equal_to_window_size() { + let actual = nu!("([0 1 2 3] | window 2 -s 2 | flatten) == [0 1 2 3]"); + assert_eq!(actual.out, "true"); +} + +#[test] +fn stride_greater_than_window_size() { + let actual = nu!("([0 1 2 3 4] | window 2 -s 3 | flatten) == [0 1 3 4]"); + assert_eq!(actual.out, "true"); +} + +#[test] +fn stride_less_than_window_size() { + let actual = nu!("([0 1 2 3 4 5] | window 3 -s 2 | length) == 2"); + assert_eq!(actual.out, "true"); +} + +#[test] +fn stride_equal_to_window_size_remainder() { + let actual = nu!("([0 1 2 3 4] | window 2 -s 2 -r | flatten) == [0 1 2 3 4]"); + assert_eq!(actual.out, "true"); +} + +#[test] +fn stride_greater_than_window_size_remainder() { + let actual = nu!("([0 1 2 3 4] | window 2 -s 3 -r | flatten) == [0 1 3 4]"); + assert_eq!(actual.out, "true"); +} + +#[test] +fn stride_less_than_window_size_remainder() { + let actual = nu!("([0 1 2 3 4 5] | window 3 -s 2 -r | length) == 3"); + assert_eq!(actual.out, "true"); +}