Refactor window (#13401)

# Description
Following from #13377, this PR refactors the related `window` command.
In the case where `window` should behave exactly like `chunks`, it now
reuses the same code that `chunks` does. Otherwise, the other cases have
been rewritten and have resulted in a performance increase:

| window size | stride | old time (ms) | new time (ms) |
| -----------:| ------:| -------------:| -------------:|
| 20          | 1      | 757           | 722           |
| 2           | 1      | 364           | 333           |
| 1           | 1      | 343           | 293           |
| 20          | 20     | 90            | 63            |
| 2           | 2      | 215           | 175           |
| 20          | 30     | 74            | 60            |
| 2           | 4      | 141           | 110           |


# User-Facing Changes
`window` will now error if the window size or stride is 0, which is
technically a breaking change.
This commit is contained in:
Ian Manske 2024-07-19 04:16:09 +00:00 committed by GitHub
parent 22379c9846
commit c19944f291
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 297 additions and 162 deletions

View File

@ -1,5 +1,6 @@
use nu_engine::command_prelude::*; use nu_engine::command_prelude::*;
use nu_protocol::ListStream; use nu_protocol::ListStream;
use std::num::NonZeroUsize;
#[derive(Clone)] #[derive(Clone)]
pub struct Chunks; pub struct Chunks;
@ -89,26 +90,33 @@ impl Command for Chunks {
span: chunk_size.span(), span: chunk_size.span(),
})?; })?;
if size == 0 { let size = NonZeroUsize::try_from(size).map_err(|_| ShellError::IncorrectValue {
return Err(ShellError::IncorrectValue { msg: "`chunk_size` cannot be zero".into(),
msg: "`chunk_size` cannot be zero".into(), val_span: chunk_size.span(),
val_span: chunk_size.span(), call_span: head,
call_span: head, })?;
});
}
match input { chunks(engine_state, input, size, head)
PipelineData::Value(Value::List { vals, .. }, metadata) => { }
let chunks = ChunksIter::new(vals, size, head); }
let stream = ListStream::new(chunks, head, engine_state.signals().clone());
Ok(PipelineData::ListStream(stream, metadata)) pub fn chunks(
} engine_state: &EngineState,
PipelineData::ListStream(stream, metadata) => { input: PipelineData,
let stream = stream.modify(|iter| ChunksIter::new(iter, size, head)); chunk_size: NonZeroUsize,
Ok(PipelineData::ListStream(stream, metadata)) span: Span,
} ) -> Result<PipelineData, ShellError> {
input => Err(input.unsupported_input_error("list", head)), match input {
PipelineData::Value(Value::List { vals, .. }, metadata) => {
let chunks = ChunksIter::new(vals, chunk_size, span);
let stream = ListStream::new(chunks, span, engine_state.signals().clone());
Ok(PipelineData::ListStream(stream, metadata))
} }
PipelineData::ListStream(stream, metadata) => {
let stream = stream.modify(|iter| ChunksIter::new(iter, chunk_size, span));
Ok(PipelineData::ListStream(stream, metadata))
}
input => Err(input.unsupported_input_error("list", span)),
} }
} }
@ -119,10 +127,10 @@ struct ChunksIter<I: Iterator<Item = Value>> {
} }
impl<I: Iterator<Item = Value>> ChunksIter<I> { impl<I: Iterator<Item = Value>> ChunksIter<I> {
fn new(iter: impl IntoIterator<IntoIter = I>, size: usize, span: Span) -> Self { fn new(iter: impl IntoIterator<IntoIter = I>, size: NonZeroUsize, span: Span) -> Self {
Self { Self {
iter: iter.into_iter(), iter: iter.into_iter(),
size, size: size.into(),
span, span,
} }
} }

View File

@ -1,5 +1,6 @@
use nu_engine::command_prelude::*; use nu_engine::command_prelude::*;
use nu_protocol::ValueIterator; use nu_protocol::ListStream;
use std::num::NonZeroUsize;
#[derive(Clone)] #[derive(Clone)]
pub struct Window; pub struct Window;
@ -12,8 +13,8 @@ impl Command for Window {
fn signature(&self) -> Signature { fn signature(&self) -> Signature {
Signature::build("window") Signature::build("window")
.input_output_types(vec![( .input_output_types(vec![(
Type::List(Box::new(Type::Any)), Type::list(Type::Any),
Type::List(Box::new(Type::List(Box::new(Type::Any)))), Type::list(Type::list(Type::Any)),
)]) )])
.required("window_size", SyntaxShape::Int, "The size of each window.") .required("window_size", SyntaxShape::Int, "The size of each window.")
.named( .named(
@ -34,72 +35,41 @@ impl Command for Window {
"Creates a sliding window of `window_size` that slide by n rows/elements across input." "Creates a sliding window of `window_size` that slide by n rows/elements across input."
} }
fn extra_usage(&self) -> &str {
"This command will error if `window_size` or `stride` are negative or zero."
}
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
let stream_test_1 = vec![
Value::list(
vec![Value::test_int(1), Value::test_int(2)],
Span::test_data(),
),
Value::list(
vec![Value::test_int(2), Value::test_int(3)],
Span::test_data(),
),
Value::list(
vec![Value::test_int(3), Value::test_int(4)],
Span::test_data(),
),
];
let stream_test_2 = vec![
Value::list(
vec![Value::test_int(1), Value::test_int(2)],
Span::test_data(),
),
Value::list(
vec![Value::test_int(4), Value::test_int(5)],
Span::test_data(),
),
Value::list(
vec![Value::test_int(7), Value::test_int(8)],
Span::test_data(),
),
];
let stream_test_3 = vec![
Value::list(
vec![Value::test_int(1), Value::test_int(2), Value::test_int(3)],
Span::test_data(),
),
Value::list(
vec![Value::test_int(4), Value::test_int(5)],
Span::test_data(),
),
];
vec![ vec![
Example { Example {
example: "[1 2 3 4] | window 2", example: "[1 2 3 4] | window 2",
description: "A sliding window of two elements", description: "A sliding window of two elements",
result: Some(Value::list( result: Some(Value::test_list(vec![
stream_test_1, Value::test_list(vec![Value::test_int(1), Value::test_int(2)]),
Span::test_data(), Value::test_list(vec![Value::test_int(2), Value::test_int(3)]),
)), Value::test_list(vec![Value::test_int(3), Value::test_int(4)]),
])),
}, },
Example { Example {
example: "[1, 2, 3, 4, 5, 6, 7, 8] | window 2 --stride 3", example: "[1, 2, 3, 4, 5, 6, 7, 8] | window 2 --stride 3",
description: "A sliding window of two elements, with a stride of 3", description: "A sliding window of two elements, with a stride of 3",
result: Some(Value::list( result: Some(Value::test_list(vec![
stream_test_2, Value::test_list(vec![Value::test_int(1), Value::test_int(2)]),
Span::test_data(), Value::test_list(vec![Value::test_int(4), Value::test_int(5)]),
)), Value::test_list(vec![Value::test_int(7), Value::test_int(8)]),
])),
}, },
Example { Example {
example: "[1, 2, 3, 4, 5] | window 3 --stride 3 --remainder", example: "[1, 2, 3, 4, 5] | window 3 --stride 3 --remainder",
description: "A sliding window of equal stride that includes remainder. Equivalent to chunking", description: "A sliding window of equal stride that includes remainder. Equivalent to chunking",
result: Some(Value::list( result: Some(Value::test_list(vec![
stream_test_3, Value::test_list(vec![
Span::test_data(), Value::test_int(1),
)), Value::test_int(2),
Value::test_int(3),
]),
Value::test_list(vec![Value::test_int(4), Value::test_int(5)]),
])),
}, },
] ]
} }
@ -112,116 +82,169 @@ impl Command for Window {
input: PipelineData, input: PipelineData,
) -> Result<PipelineData, ShellError> { ) -> Result<PipelineData, ShellError> {
let head = call.head; let head = call.head;
let group_size: Spanned<usize> = call.req(engine_state, stack, 0)?; let window_size: Value = call.req(engine_state, stack, 0)?;
let metadata = input.metadata(); let stride: Option<Value> = call.get_flag(engine_state, stack, "stride")?;
let stride: Option<usize> = call.get_flag(engine_state, stack, "stride")?;
let remainder = call.has_flag(engine_state, stack, "remainder")?; let remainder = call.has_flag(engine_state, stack, "remainder")?;
let stride = stride.unwrap_or(1); let size =
usize::try_from(window_size.as_int()?).map_err(|_| ShellError::NeedsPositiveValue {
span: window_size.span(),
})?;
//FIXME: add in support for external redirection when engine-q supports it generally let size = NonZeroUsize::try_from(size).map_err(|_| ShellError::IncorrectValue {
msg: "`window_size` cannot be zero".into(),
val_span: window_size.span(),
call_span: head,
})?;
let each_group_iterator = EachWindowIterator { let stride = if let Some(stride_val) = stride {
group_size: group_size.item, let stride = usize::try_from(stride_val.as_int()?).map_err(|_| {
input: Box::new(input.into_iter()), ShellError::NeedsPositiveValue {
span: head, span: stride_val.span(),
previous: None, }
stride, })?;
remainder,
NonZeroUsize::try_from(stride).map_err(|_| ShellError::IncorrectValue {
msg: "`stride` cannot be zero".into(),
val_span: stride_val.span(),
call_span: head,
})?
} else {
NonZeroUsize::MIN
}; };
Ok(each_group_iterator.into_pipeline_data_with_metadata( if remainder && size == stride {
head, super::chunks::chunks(engine_state, input, size, head)
engine_state.signals().clone(), } else if stride >= size {
metadata, match input {
)) PipelineData::Value(Value::List { vals, .. }, metadata) => {
let chunks = WindowGapIter::new(vals, size, stride, remainder, head);
let stream = ListStream::new(chunks, head, engine_state.signals().clone());
Ok(PipelineData::ListStream(stream, metadata))
}
PipelineData::ListStream(stream, metadata) => {
let stream = stream
.modify(|iter| WindowGapIter::new(iter, size, stride, remainder, head));
Ok(PipelineData::ListStream(stream, metadata))
}
input => Err(input.unsupported_input_error("list", head)),
}
} else {
match input {
PipelineData::Value(Value::List { vals, .. }, metadata) => {
let chunks = WindowOverlapIter::new(vals, size, stride, remainder, head);
let stream = ListStream::new(chunks, head, engine_state.signals().clone());
Ok(PipelineData::ListStream(stream, metadata))
}
PipelineData::ListStream(stream, metadata) => {
let stream = stream
.modify(|iter| WindowOverlapIter::new(iter, size, stride, remainder, head));
Ok(PipelineData::ListStream(stream, metadata))
}
input => Err(input.unsupported_input_error("list", head)),
}
}
} }
} }
struct EachWindowIterator { struct WindowOverlapIter<I: Iterator<Item = Value>> {
group_size: usize, iter: I,
input: ValueIterator, window: Vec<Value>,
span: Span,
previous: Option<Vec<Value>>,
stride: usize, stride: usize,
remainder: bool, remainder: bool,
span: Span,
} }
impl Iterator for EachWindowIterator { impl<I: Iterator<Item = Value>> WindowOverlapIter<I> {
fn new(
iter: impl IntoIterator<IntoIter = I>,
size: NonZeroUsize,
stride: NonZeroUsize,
remainder: bool,
span: Span,
) -> Self {
Self {
iter: iter.into_iter(),
window: Vec::with_capacity(size.into()),
stride: stride.into(),
remainder,
span,
}
}
}
impl<I: Iterator<Item = Value>> Iterator for WindowOverlapIter<I> {
type Item = Value; type Item = Value;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
let mut group = self.previous.take().unwrap_or_else(|| { let len = if self.window.is_empty() {
let mut vec = Vec::new(); self.window.capacity()
// We default to a Vec of capacity size + stride as striding pushes n extra elements to the end
vec.try_reserve(self.group_size + self.stride).ok();
vec
});
let mut current_count = 0;
if group.is_empty() {
loop {
let item = self.input.next();
match item {
Some(v) => {
group.push(v);
current_count += 1;
if current_count >= self.group_size {
break;
}
}
None => {
if self.remainder {
break;
} else {
return None;
}
}
}
}
} else { } else {
// our historic buffer is already full, so stride instead self.stride
};
loop { self.window.extend((&mut self.iter).take(len));
let item = self.input.next();
match item { if self.window.len() == self.window.capacity()
Some(v) => { || (self.remainder && !self.window.is_empty())
group.push(v); {
let mut next = Vec::with_capacity(self.window.len());
current_count += 1; next.extend(self.window.iter().skip(self.stride).cloned());
if current_count >= self.stride { let window = std::mem::replace(&mut self.window, next);
break; Some(Value::list(window, self.span))
} } else {
} None
None => {
if self.remainder {
break;
} else {
return None;
}
}
}
}
// We now have elements + stride in our group, and need to
// drop the skipped elements. Drain to preserve allocation and capacity
// Dropping this iterator consumes it.
group.drain(..self.stride.min(group.len()));
} }
}
}
if group.is_empty() { struct WindowGapIter<I: Iterator<Item = Value>> {
return None; iter: I,
size: usize,
skip: usize,
first: bool,
remainder: bool,
span: Span,
}
impl<I: Iterator<Item = Value>> WindowGapIter<I> {
fn new(
iter: impl IntoIterator<IntoIter = I>,
size: NonZeroUsize,
stride: NonZeroUsize,
remainder: bool,
span: Span,
) -> Self {
let size = size.into();
Self {
iter: iter.into_iter(),
size,
skip: stride.get() - size,
first: true,
remainder,
span,
} }
}
}
let return_group = group.clone(); impl<I: Iterator<Item = Value>> Iterator for WindowGapIter<I> {
self.previous = Some(group); type Item = Value;
Some(Value::list(return_group, self.span)) fn next(&mut self) -> Option<Self::Item> {
let mut window = Vec::with_capacity(self.size);
window.extend(
(&mut self.iter)
.skip(if self.first { 0 } else { self.skip })
.take(self.size),
);
self.first = false;
if window.len() == self.size || (self.remainder && !window.is_empty()) {
Some(Value::list(window, self.span))
} else {
None
}
} }
} }

View File

@ -115,6 +115,7 @@ mod try_;
mod ucp; mod ucp;
#[cfg(unix)] #[cfg(unix)]
mod ulimit; mod ulimit;
mod window;
mod debug; mod debug;
mod umkdir; mod umkdir;

View File

@ -0,0 +1,103 @@
use nu_test_support::nu;
#[test]
fn window_size_negative() {
let actual = nu!("[0 1 2] | window -1");
assert!(actual.err.contains("positive"));
}
#[test]
fn window_size_zero() {
let actual = nu!("[0 1 2] | window 0");
assert!(actual.err.contains("zero"));
}
#[test]
fn window_size_not_int() {
let actual = nu!("[0 1 2] | window (if true { 1sec })");
assert!(actual.err.contains("can't convert"));
}
#[test]
fn stride_negative() {
let actual = nu!("[0 1 2] | window 1 -s -1");
assert!(actual.err.contains("positive"));
}
#[test]
fn stride_zero() {
let actual = nu!("[0 1 2] | window 1 -s 0");
assert!(actual.err.contains("zero"));
}
#[test]
fn stride_not_int() {
let actual = nu!("[0 1 2] | window 1 -s (if true { 1sec })");
assert!(actual.err.contains("can't convert"));
}
#[test]
fn empty() {
let actual = nu!("[] | window 2 | is-empty");
assert_eq!(actual.out, "true");
}
#[test]
fn list_stream() {
let actual = nu!("([0 1 2] | every 1 | window 2) == ([0 1 2] | window 2)");
assert_eq!(actual.out, "true");
}
#[test]
fn table_stream() {
let actual = nu!("([[foo bar]; [0 1] [2 3] [4 5]] | every 1 | window 2) == ([[foo bar]; [0 1] [2 3] [4 5]] | window 2)");
assert_eq!(actual.out, "true");
}
#[test]
fn no_empty_chunks() {
let actual = nu!("([0 1 2 3 4 5] | window 3 -s 3 -r | length) == 2");
assert_eq!(actual.out, "true");
}
#[test]
fn same_as_chunks() {
let actual = nu!("([0 1 2 3 4] | window 2 -s 2 -r) == ([0 1 2 3 4 ] | chunks 2)");
assert_eq!(actual.out, "true");
}
#[test]
fn stride_equal_to_window_size() {
let actual = nu!("([0 1 2 3] | window 2 -s 2 | flatten) == [0 1 2 3]");
assert_eq!(actual.out, "true");
}
#[test]
fn stride_greater_than_window_size() {
let actual = nu!("([0 1 2 3 4] | window 2 -s 3 | flatten) == [0 1 3 4]");
assert_eq!(actual.out, "true");
}
#[test]
fn stride_less_than_window_size() {
let actual = nu!("([0 1 2 3 4 5] | window 3 -s 2 | length) == 2");
assert_eq!(actual.out, "true");
}
#[test]
fn stride_equal_to_window_size_remainder() {
let actual = nu!("([0 1 2 3 4] | window 2 -s 2 -r | flatten) == [0 1 2 3 4]");
assert_eq!(actual.out, "true");
}
#[test]
fn stride_greater_than_window_size_remainder() {
let actual = nu!("([0 1 2 3 4] | window 2 -s 3 -r | flatten) == [0 1 3 4]");
assert_eq!(actual.out, "true");
}
#[test]
fn stride_less_than_window_size_remainder() {
let actual = nu!("([0 1 2 3 4 5] | window 3 -s 2 -r | length) == 3");
assert_eq!(actual.out, "true");
}