Add command "reduce" (#2292)

* initial

* fold working

* tests and cleanup

* change command to reduce, with fold flag

* move complex example to tests

* add --numbered flag
This commit is contained in:
Bailey Layzer 2020-08-04 10:16:19 -07:00 committed by GitHub
parent 7f35bfc005
commit e1c5940b04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 289 additions and 5 deletions

View File

@ -366,6 +366,7 @@ pub fn create_default_context(
whole_stream_command(Wrap),
whole_stream_command(Pivot),
whole_stream_command(Headers),
whole_stream_command(Reduce),
// Data processing
whole_stream_command(Histogram),
whole_stream_command(Autoenv),

View File

@ -84,6 +84,7 @@ pub(crate) mod prev;
pub(crate) mod pwd;
pub(crate) mod random;
pub(crate) mod range;
pub(crate) mod reduce;
pub(crate) mod reject;
pub(crate) mod rename;
pub(crate) mod reverse;
@ -214,6 +215,7 @@ pub(crate) use pwd::Pwd;
pub(crate) use random::RandomUUID;
pub(crate) use random::{Random, RandomBool, RandomDice};
pub(crate) use range::Range;
pub(crate) use reduce::Reduce;
pub(crate) use reject::Reject;
pub(crate) use rename::Rename;
pub(crate) use reverse::Reverse;

View File

@ -105,6 +105,14 @@ pub async fn process_row(
.to_output_stream())
}
pub(crate) fn make_indexed_item(index: usize, item: Value) -> Value {
let mut dict = TaggedDictBuilder::new(item.tag());
dict.insert_untagged("index", UntaggedValue::int(index));
dict.insert_value("item", item);
dict.into_value()
}
async fn each(
raw_args: CommandArgs,
registry: &CommandRegistry,
@ -124,13 +132,10 @@ async fn each(
let scope = scope.clone();
let head = head.clone();
let context = context.clone();
let mut dict = TaggedDictBuilder::new(input.1.tag());
dict.insert_untagged("index", UntaggedValue::int(input.0));
dict.insert_value("item", input.1);
let row = make_indexed_item(input.0, input.1);
async {
match process_row(block, scope, head, context, dict.into_value()).await {
match process_row(block, scope, head, context, row).await {
Ok(s) => s,
Err(e) => OutputStream::one(Err(e)),
}

View File

@ -0,0 +1,178 @@
use crate::commands::classified::block::run_block;
use crate::commands::each;
use crate::commands::WholeStreamCommand;
use crate::prelude::*;
use crate::{CommandArgs, CommandRegistry, Example, OutputStream};
use futures::stream::once;
use nu_errors::ShellError;
use nu_protocol::{hir::Block, Primitive, Scope, Signature, SyntaxShape, UntaggedValue, Value};
use nu_source::Tagged;
pub struct Reduce;
#[derive(Deserialize)]
pub struct ReduceArgs {
block: Block,
fold: Option<Value>,
numbered: Tagged<bool>,
}
#[async_trait]
impl WholeStreamCommand for Reduce {
fn name(&self) -> &str {
"reduce"
}
fn signature(&self) -> Signature {
Signature::build("reduce")
.named(
"fold",
SyntaxShape::Any,
"reduce with initial value",
Some('f'),
)
.required("block", SyntaxShape::Block, "reducing function")
.switch(
"numbered",
"returned a numbered item ($it.index and $it.item)",
Some('n'),
)
}
fn usage(&self) -> &str {
"Aggregate a list table to a single value using an accumulator block. Block must be
(A, A) -> A unless --fold is selected, in which case it may be A, B -> A."
}
async fn run(
&self,
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
reduce(args, registry).await
}
fn examples(&self) -> Vec<Example> {
vec![
Example {
description: "Simple summation (equivalent to math sum)",
example: "echo 1 2 3 4 | reduce { = $acc + $it }",
result: Some(vec![UntaggedValue::int(10).into()]),
},
Example {
description: "Summation from starting value using fold",
example: "echo 1 2 3 4 | reduce -f $(= -1) { = $acc + $it }",
result: Some(vec![UntaggedValue::int(9).into()]),
},
Example {
description: "Folding with rows",
example: "<table> | reduce -f 1.6 { = $acc * $(echo $it.a | str to-int) + $(echo $it.b | str to-int) }",
result: None,
},
Example {
description: "Numbered reduce to find index of longest word",
example: "echo one longest three bar | reduce -n { if $(echo $it.item | str length) > $(echo $acc.item | str length) {echo $it} {echo $acc}} | get index",
result: None,
},
]
}
}
async fn process_row(
block: Arc<Block>,
scope: Arc<Scope>,
mut context: Arc<Context>,
row: Value,
) -> Result<InputStream, ShellError> {
let row_clone = row.clone();
let input_stream = once(async { Ok(row_clone) }).to_input_stream();
Ok(run_block(
&block,
Arc::make_mut(&mut context),
input_stream,
&row,
&scope.vars,
&scope.env,
)
.await?)
}
async fn reduce(
raw_args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let base_scope = raw_args.call_info.scope.clone();
let context = Arc::new(Context::from_raw(&raw_args, &registry));
let (reduce_args, mut input): (ReduceArgs, _) = raw_args.process(&registry).await?;
let block = Arc::new(reduce_args.block);
let (ioffset, start) = match reduce_args.fold {
None => {
let first = input
.next()
.await
.expect("empty stream expected to contain Primitive::Nothing");
if let UntaggedValue::Primitive(Primitive::Nothing) = first.value {
return Err(ShellError::missing_value(None, "empty input"));
}
(1, first)
}
Some(acc) => (0, acc),
};
if reduce_args.numbered.item {
// process_row returns Result<InputStream, ShellError>, so we must fold with one
let initial = Ok(InputStream::one(each::make_indexed_item(
ioffset - 1,
start,
)));
Ok(input
.enumerate()
.fold(initial, move |acc, input| {
let block = Arc::clone(&block);
let mut scope = base_scope.clone();
let context = Arc::clone(&context);
let row = each::make_indexed_item(input.0 + ioffset, input.1);
async {
let f = acc?.into_vec().await[0].clone();
scope.vars.insert(String::from("$acc"), f);
process_row(block, Arc::new(scope), context, row).await
}
})
.await?
.to_output_stream())
} else {
let initial = Ok(InputStream::one(start));
Ok(input
.fold(initial, move |acc, row| {
let block = Arc::clone(&block);
let mut scope = base_scope.clone();
let context = Arc::clone(&context);
async {
scope
.vars
.insert(String::from("$acc"), acc?.into_vec().await[0].clone());
process_row(block, Arc::new(scope), context, row).await
}
})
.await?
.to_output_stream())
}
}
#[cfg(test)]
mod tests {
use super::Reduce;
#[test]
fn examples_work_as_expected() {
use crate::examples::test as test_examples;
test_examples(Reduce {})
}
}

View File

@ -34,6 +34,7 @@ mod parse;
mod prepend;
mod random;
mod range;
mod reduce;
mod rename;
mod reverse;
mod rm;

View File

@ -0,0 +1,89 @@
use nu_test_support::{nu, pipeline};
#[test]
fn reduce_table_column() {
let actual = nu!(
cwd: ".", pipeline(
r#"
echo "[{month:2,total:30}, {month:3,total:10}, {month:4,total:3}, {month:5,total:60}]"
| from json
| get total
| reduce -f 20 { = $it + $( math eval `{{$acc}}^1.05` )}
| str from -d 1
"#
)
);
assert_eq!(actual.out, "180.6");
let actual = nu!(
cwd: ".", pipeline(
r#"
echo "[{month:2,total:30}, {month:3,total:10}, {month:4,total:3}, {month:5,total:60}]"
| from json
| reduce -f 20 { = $it.total + $( math eval `{{$acc}}^1.05` )}
| str from -d 1
"#
)
);
assert_eq!(actual.out, "180.6");
}
#[test]
fn reduce_rows_example() {
let actual = nu!(
cwd: ".", pipeline(
r#"
echo a,b 1,2 3,4
| split column ,
| headers
| reduce -f 1.6 { = $acc * $(echo $it.a | str to-int) + $(echo $it.b | str to-int) }
"#
)
);
assert_eq!(actual.out, "14.8");
}
#[test]
fn reduce_numbered_example() {
let actual = nu!(
cwd: ".", pipeline(
r#"
echo one longest three bar
| reduce -n { if $(echo $it.item | str length) > $(echo $acc.item | str length) {echo $it} {echo $acc}}
| get index
| echo $it
"#
)
);
assert_eq!(actual.out, "1");
}
#[test]
fn error_reduce_fold_type_mismatch() {
let actual = nu!(
cwd: ".", pipeline(
r#"
echo a b c | reduce -f 0 { = $acc + $it }
"#
)
);
assert!(actual.err.contains("Coercion"));
}
#[test]
fn error_reduce_empty() {
let actual = nu!(
cwd: ".", pipeline(
r#"
reduce { = $acc + $it }
"#
)
);
assert!(actual.err.contains("empty input"));
}

View File

@ -349,6 +349,14 @@ impl ShellError {
.start()
}
pub fn missing_value(span: impl Into<Option<Span>>, reason: impl Into<String>) -> ShellError {
ProximateShellError::MissingValue {
span: span.into(),
reason: reason.into(),
}
.start()
}
pub fn invalid_integer_index(
subpath: Spanned<impl Into<String>>,
integer: impl Into<Span>,