Make every stream-able (#2120)

* Make every stream-able

* Make each over ranges stream-able
This commit is contained in:
Jonathan Turner 2020-07-06 01:23:27 -07:00 committed by GitHub
parent a1a0710ee6
commit c3ba1e476f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 109 additions and 63 deletions

View File

@ -3,7 +3,7 @@ use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::hir::Operator;
use nu_protocol::{
Primitive, RangeInclusion, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value,
Primitive, Range, RangeInclusion, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value,
};
pub struct Echo;
@ -55,60 +55,89 @@ async fn echo(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStr
let registry = registry.clone();
let (args, _): (EchoArgs, _) = args.process(&registry).await?;
let stream = args.rest.into_iter().map(|i| {
match i.as_string() {
Ok(s) => {
OutputStream::one(Ok(ReturnSuccess::Value(
UntaggedValue::string(s).into_value(i.tag.clone()),
)))
}
_ => match i {
Value {
value: UntaggedValue::Table(table),
..
} => {
futures::stream::iter(table.into_iter().map(ReturnSuccess::value)).to_output_stream()
}
Value {
value: UntaggedValue::Primitive(Primitive::Range(range)),
tag
} => {
let mut output_vec = vec![];
let mut current = range.from.0.item;
while current != range.to.0.item {
output_vec.push(Ok(ReturnSuccess::Value(UntaggedValue::Primitive(current.clone()).into_value(&tag))));
current = match crate::data::value::compute_values(Operator::Plus, &UntaggedValue::Primitive(current), &UntaggedValue::int(1)) {
Ok(result) => match result {
UntaggedValue::Primitive(p) => p,
_ => {
return OutputStream::one(Err(ShellError::unimplemented("Internal error: expected a primitive result from increment")));
}
},
Err((left_type, right_type)) => {
return OutputStream::one(Err(ShellError::coerce_error(
left_type.spanned(tag.span),
right_type.spanned(tag.span),
)));
}
}
}
if let RangeInclusion::Inclusive = range.to.1 {
output_vec.push(Ok(ReturnSuccess::Value(UntaggedValue::Primitive(current).into_value(&tag))));
}
futures::stream::iter(output_vec.into_iter()).to_output_stream()
}
_ => {
OutputStream::one(Ok(ReturnSuccess::Value(i.clone())))
}
},
}
let stream = args.rest.into_iter().map(|i| match i.as_string() {
Ok(s) => OutputStream::one(Ok(ReturnSuccess::Value(
UntaggedValue::string(s).into_value(i.tag.clone()),
))),
_ => match i {
Value {
value: UntaggedValue::Table(table),
..
} => futures::stream::iter(table.into_iter().map(ReturnSuccess::value))
.to_output_stream(),
Value {
value: UntaggedValue::Primitive(Primitive::Range(range)),
tag,
} => futures::stream::iter(RangeIterator::new(*range, tag)).to_output_stream(),
_ => OutputStream::one(Ok(ReturnSuccess::Value(i.clone()))),
},
});
Ok(futures::stream::iter(stream).flatten().to_output_stream())
}
struct RangeIterator {
curr: Primitive,
end: Primitive,
tag: Tag,
is_end_inclusive: bool,
is_done: bool,
}
impl RangeIterator {
pub fn new(range: Range, tag: Tag) -> RangeIterator {
RangeIterator {
curr: range.from.0.item,
end: range.to.0.item,
tag,
is_end_inclusive: match range.to.1 {
RangeInclusion::Inclusive => true,
RangeInclusion::Exclusive => false,
},
is_done: false,
}
}
}
impl Iterator for RangeIterator {
type Item = Result<ReturnSuccess, ShellError>;
fn next(&mut self) -> Option<Self::Item> {
if self.curr != self.end {
let output = UntaggedValue::Primitive(self.curr.clone()).into_value(self.tag.clone());
self.curr = match crate::data::value::compute_values(
Operator::Plus,
&UntaggedValue::Primitive(self.curr.clone()),
&UntaggedValue::int(1),
) {
Ok(result) => match result {
UntaggedValue::Primitive(p) => p,
_ => {
return Some(Err(ShellError::unimplemented(
"Internal error: expected a primitive result from increment",
)));
}
},
Err((left_type, right_type)) => {
return Some(Err(ShellError::coerce_error(
left_type.spanned(self.tag.span),
right_type.spanned(self.tag.span),
)));
}
};
Some(ReturnSuccess::value(output))
} else if self.is_end_inclusive && !self.is_done {
self.is_done = true;
Some(ReturnSuccess::value(
UntaggedValue::Primitive(self.curr.clone()).into_value(self.tag.clone()),
))
} else {
// TODO: add inclusive/exclusive ranges
None
}
}
}
#[cfg(test)]
mod tests {
use super::Echo;

View File

@ -71,20 +71,23 @@ impl WholeStreamCommand for Every {
async fn every(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let (EveryArgs { stride, skip }, input) = args.process(&registry).await?;
let v: Vec<_> = input.into_vec().await;
let iter = v.into_iter().enumerate().filter_map(move |(i, value)| {
let stride_desired = if stride.item < 1 { 1 } else { stride.item } as usize;
let should_include = skip.item == (i % stride_desired != 0);
let stride = stride.item;
let skip = skip.item;
if should_include {
return Some(ReturnSuccess::value(value));
}
Ok(input
.enumerate()
.filter_map(move |(i, value)| async move {
let stride_desired = if stride < 1 { 1 } else { stride } as usize;
let should_include = skip == (i % stride_desired != 0);
None
});
Ok(futures::stream::iter(iter).to_output_stream())
if should_include {
Some(ReturnSuccess::value(value))
} else {
None
}
})
.to_output_stream())
}
#[cfg(test)]

View File

@ -55,7 +55,7 @@ impl Clone for FilesystemShell {
}
impl FilesystemShell {
pub fn basic(commands: CommandRegistry) -> Result<FilesystemShell, std::io::Error> {
pub fn basic(commands: CommandRegistry) -> Result<FilesystemShell, Error> {
let path = std::env::current_dir()?;
Ok(FilesystemShell {
@ -162,7 +162,7 @@ impl Shell for FilesystemShell {
let metadata = match std::fs::symlink_metadata(&path) {
Ok(metadata) => Some(metadata),
Err(e) => {
if e.kind() == std::io::ErrorKind::PermissionDenied {
if e.kind() == ErrorKind::PermissionDenied {
None
} else {
return Some(Err(e.into()));

View File

@ -0,0 +1,13 @@
use nu_test_support::{nu, pipeline};
#[test]
fn echo_range_is_lazy() {
let actual = nu!(
cwd: "tests/fixtures/formats", pipeline(
r#"
echo 1..10000000000 | first 3 | echo $it | to json
"#
));
assert_eq!(actual.out, "[1,2,3]");
}

View File

@ -11,6 +11,7 @@ mod cp;
mod default;
mod drop;
mod each;
mod echo;
mod enter;
mod every;
mod first;