window --remainder (#6738)

* Implement window remainder, and save allocation

* Fallible memory reservation
This commit is contained in:
Doru 2022-10-15 10:06:54 -03:00 committed by GitHub
parent 4ffa4ac42a
commit e22f2e9f13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -23,6 +23,11 @@ impl Command for Window {
"the number of rows to slide over between windows",
Some('s'),
)
.switch(
"remainder",
"yield last chunks even if they have fewer elements than size",
Some('r'),
)
.category(Category::Filters)
}
@ -115,6 +120,39 @@ impl Command for Window {
},
];
let stream_test_3 = vec![
Value::List {
vals: vec![
Value::Int {
val: 1,
span: Span::test_data(),
},
Value::Int {
val: 2,
span: Span::test_data(),
},
Value::Int {
val: 3,
span: Span::test_data(),
},
],
span: Span::test_data(),
},
Value::List {
vals: vec![
Value::Int {
val: 4,
span: Span::test_data(),
},
Value::Int {
val: 5,
span: Span::test_data(),
},
],
span: Span::test_data(),
},
];
vec![
Example {
example: "echo [1 2 3 4] | window 2",
@ -132,6 +170,14 @@ impl Command for Window {
span: Span::test_data(),
}),
},
Example {
example: "[1, 2, 3, 4, 5] | window 3 --stride 3 --remainder",
description: "A sliding window of equal stride that includes remainder. Equivalent to chunking",
result: Some(Value::List {
vals: stream_test_3,
span: Span::test_data(),
}),
},
]
}
@ -146,6 +192,7 @@ impl Command for Window {
let ctrlc = engine_state.ctrlc.clone();
let metadata = input.metadata();
let stride: Option<usize> = call.get_flag(engine_state, stack, "stride")?;
let remainder = call.has_flag("remainder");
let stride = stride.unwrap_or(1);
@ -157,6 +204,7 @@ impl Command for Window {
span: call.head,
previous: None,
stride,
remainder,
};
Ok(each_group_iterator
@ -171,13 +219,21 @@ struct EachWindowIterator {
span: Span,
previous: Option<Vec<Value>>,
stride: usize,
remainder: bool,
}
impl Iterator for EachWindowIterator {
type Item = Value;
fn next(&mut self) -> Option<Self::Item> {
let mut group = self.previous.take().unwrap_or_default();
let mut group = self.previous.take().unwrap_or_else(|| {
let mut vec = Vec::new();
// We default to a Vec of capacity size + stride as striding pushes n extra elements to the end
vec.try_reserve(self.group_size + self.stride).ok();
vec
});
let mut current_count = 0;
if group.is_empty() {
@ -193,7 +249,13 @@ impl Iterator for EachWindowIterator {
break;
}
}
None => return None,
None => {
if self.remainder {
break;
} else {
return None;
}
}
}
}
} else {
@ -211,14 +273,23 @@ impl Iterator for EachWindowIterator {
break;
}
}
None => return None,
None => {
if self.remainder {
break;
} else {
return None;
}
}
}
}
group = group[current_count..].to_vec();
// We now have elements + stride in our group, and need to
// drop the skipped elements. Drain to preserve allocation and capacity
// Dropping this iterator consumes it.
group.drain(..self.stride.min(group.len()));
}
if group.is_empty() || current_count == 0 {
if group.is_empty() {
return None;
}