From fe6d96e9969e27e6cc86d87dc3eaf8234b260c9b Mon Sep 17 00:00:00 2001 From: Jonathan Turner Date: Sat, 13 Jun 2020 01:43:21 -0700 Subject: [PATCH] Another batch of converting commands away from async_stream (#1974) * Another batch of removing async_stream * merge master --- crates/nu-cli/src/commands/from_json.rs | 80 +++++----- crates/nu-cli/src/commands/histogram.rs | 167 +++++++++++++-------- crates/nu-cli/src/commands/history.rs | 34 +++-- crates/nu-cli/src/commands/kill.rs | 85 +++++------ crates/nu-cli/src/commands/merge.rs | 96 ++++++------ crates/nu-cli/src/commands/nth.rs | 50 +++--- crates/nu-cli/src/commands/pivot.rs | 145 ++++++++++-------- crates/nu-cli/src/commands/reject.rs | 1 - crates/nu-cli/src/commands/rename.rs | 31 ++-- crates/nu-cli/src/commands/run_alias.rs | 76 +++------- crates/nu-cli/src/commands/run_external.rs | 104 ++++++------- crates/nu-cli/src/commands/shuffle.rs | 28 ++-- crates/nu-cli/src/commands/sum.rs | 78 +++++----- crates/nu-cli/src/commands/to.rs | 12 +- 14 files changed, 495 insertions(+), 492 deletions(-) diff --git a/crates/nu-cli/src/commands/from_json.rs b/crates/nu-cli/src/commands/from_json.rs index 7f0975c78..b94ca8fed 100644 --- a/crates/nu-cli/src/commands/from_json.rs +++ b/crates/nu-cli/src/commands/from_json.rs @@ -33,7 +33,7 @@ impl WholeStreamCommand for FromJSON { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - from_json(args, registry) + from_json(args, registry).await } } @@ -71,65 +71,73 @@ pub fn from_json_string_to_value(s: String, tag: impl Into) -> serde_hjson: Ok(convert_json_value_to_nu_value(&v, tag)) } -fn from_json(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn from_json( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let name_tag = args.call_info.name_tag.clone(); let registry = registry.clone(); - let stream = async_stream! { - let (FromJSONArgs { objects }, mut input) = args.process(®istry).await?; - let concat_string = input.collect_string(name_tag.clone()).await?; + let (FromJSONArgs { objects }, input) = args.process(®istry).await?; + let concat_string = input.collect_string(name_tag.clone()).await?; - if objects { - for json_str in concat_string.item.lines() { + let string_clone: Vec<_> = concat_string.item.lines().map(|x| x.to_string()).collect(); + + if objects { + Ok( + futures::stream::iter(string_clone.into_iter().filter_map(move |json_str| { if json_str.is_empty() { - continue; + return None; } - match from_json_string_to_value(json_str.to_string(), &name_tag) { - Ok(x) => - yield ReturnSuccess::value(x), + match from_json_string_to_value(json_str, &name_tag) { + Ok(x) => Some(ReturnSuccess::value(x)), Err(e) => { let mut message = "Could not parse as JSON (".to_string(); message.push_str(&e.to_string()); message.push_str(")"); - yield Err(ShellError::labeled_error_with_secondary( + Some(Err(ShellError::labeled_error_with_secondary( message, "input cannot be parsed as JSON", - &name_tag, + name_tag.clone(), "value originates from here", - concat_string.tag.clone())) + concat_string.tag.clone(), + ))) } } - } - } else { - match from_json_string_to_value(concat_string.item, name_tag.clone()) { - Ok(x) => - match x { - Value { value: UntaggedValue::Table(list), .. } => { - for l in list { - yield ReturnSuccess::value(l); - } - } - x => yield ReturnSuccess::value(x), - } - Err(e) => { - let mut message = "Could not parse as JSON (".to_string(); - message.push_str(&e.to_string()); - message.push_str(")"); + })) + .to_output_stream(), + ) + } else { + match from_json_string_to_value(concat_string.item, name_tag.clone()) { + Ok(x) => match x { + Value { + value: UntaggedValue::Table(list), + .. + } => Ok( + futures::stream::iter(list.into_iter().map(ReturnSuccess::value)) + .to_output_stream(), + ), + x => Ok(OutputStream::one(ReturnSuccess::value(x))), + }, + Err(e) => { + let mut message = "Could not parse as JSON (".to_string(); + message.push_str(&e.to_string()); + message.push_str(")"); - yield Err(ShellError::labeled_error_with_secondary( + Ok(OutputStream::one(Err( + ShellError::labeled_error_with_secondary( message, "input cannot be parsed as JSON", name_tag, "value originates from here", - concat_string.tag)) - } + concat_string.tag, + ), + ))) } } - }; - - Ok(stream.to_output_stream()) + } } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/histogram.rs b/crates/nu-cli/src/commands/histogram.rs index 003b1faf5..8a8c3b70a 100644 --- a/crates/nu-cli/src/commands/histogram.rs +++ b/crates/nu-cli/src/commands/histogram.rs @@ -45,7 +45,7 @@ impl WholeStreamCommand for Histogram { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - histogram(args, registry) + histogram(args, registry).await } fn examples(&self) -> Vec { @@ -70,95 +70,136 @@ impl WholeStreamCommand for Histogram { } } -pub fn histogram( +pub async fn histogram( args: CommandArgs, registry: &CommandRegistry, ) -> Result { let registry = registry.clone(); let name = args.call_info.name_tag.clone(); - let stream = async_stream! { - let (HistogramArgs { column_name, rest}, mut input) = args.process(®istry).await?; - let values: Vec = input.collect().await; + let (HistogramArgs { column_name, rest }, input) = args.process(®istry).await?; + let values: Vec = input.collect().await; - let Tagged { item: group_by, .. } = column_name.clone(); + let Tagged { item: group_by, .. } = column_name.clone(); - let groups = group(&column_name, values, &name)?; - let group_labels = columns_sorted(Some(group_by.clone()), &groups, &name); - let sorted = t_sort(Some(group_by.clone()), None, &groups, &name)?; - let evaled = evaluate(&sorted, None, &name)?; - let reduced = reduce(&evaled, None, &name)?; - let maxima = map_max(&reduced, None, &name)?; - let percents = percentages(&reduced, maxima, &name)?; + let groups = group(&column_name, values, &name)?; + let group_labels = columns_sorted(Some(group_by.clone()), &groups, &name); + let sorted = t_sort(Some(group_by), None, &groups, &name)?; + let evaled = evaluate(&sorted, None, &name)?; + let reduced = reduce(&evaled, None, &name)?; + let maxima = map_max(&reduced, None, &name)?; + let percents = percentages(&reduced, maxima, &name)?; - match percents { - Value { - value: UntaggedValue::Table(datasets), - .. - } => { + match percents { + Value { + value: UntaggedValue::Table(datasets), + .. + } => { + let mut idx = 0; - let mut idx = 0; + let column_names_supplied: Vec<_> = rest.iter().map(|f| f.item.clone()).collect(); - let column_names_supplied: Vec<_> = rest.iter().map(|f| f.item.clone()).collect(); + let frequency_column_name = if column_names_supplied.is_empty() { + "frequency".to_string() + } else { + column_names_supplied[0].clone() + }; - let frequency_column_name = if column_names_supplied.is_empty() { - "frequency".to_string() - } else { - column_names_supplied[0].clone() - }; + let column = (*column_name).clone(); - let column = (*column_name).clone(); + let count_column_name = "count".to_string(); + let count_shell_error = ShellError::labeled_error( + "Unable to load group count", + "unabled to load group count", + &name, + ); + let mut count_values: Vec = Vec::new(); - let count_column_name = "count".to_string(); - let count_shell_error = ShellError::labeled_error("Unable to load group count", "unabled to load group count", &name); - let mut count_values: Vec = Vec::new(); - - for table_entry in reduced.table_entries() { - match table_entry { - Value { - value: UntaggedValue::Table(list), - .. - } => { - for i in list { - if let Ok(count) = i.value.clone().into_value(&name).as_u64() { - count_values.push(count); - } else { - yield Err(count_shell_error); - return; - } + for table_entry in reduced.table_entries() { + match table_entry { + Value { + value: UntaggedValue::Table(list), + .. + } => { + for i in list { + if let Ok(count) = i.value.clone().into_value(&name).as_u64() { + count_values.push(count); + } else { + return Err(count_shell_error); } } - _ => { - yield Err(count_shell_error); - return; - } + } + _ => { + return Err(count_shell_error); } } + } - if let Value { value: UntaggedValue::Table(start), .. } = datasets.get(0).ok_or_else(|| ShellError::labeled_error("Unable to load dataset", "unabled to load dataset", &name))? { - for percentage in start.iter() { - + if let Value { + value: UntaggedValue::Table(start), + .. + } = datasets.get(0).ok_or_else(|| { + ShellError::labeled_error( + "Unable to load dataset", + "unabled to load dataset", + &name, + ) + })? { + let start = start.clone(); + Ok( + futures::stream::iter(start.into_iter().map(move |percentage| { let mut fact = TaggedDictBuilder::new(&name); - let value: Tagged = group_labels.get(idx).ok_or_else(|| ShellError::labeled_error("Unable to load group labels", "unabled to load group labels", &name))?.clone(); - fact.insert_value(&column, UntaggedValue::string(value.item).into_value(value.tag)); + let value: Tagged = group_labels + .get(idx) + .ok_or_else(|| { + ShellError::labeled_error( + "Unable to load group labels", + "unabled to load group labels", + &name, + ) + })? + .clone(); + fact.insert_value( + &column, + UntaggedValue::string(value.item).into_value(value.tag), + ); - fact.insert_untagged(&count_column_name, UntaggedValue::int(count_values[idx])); + fact.insert_untagged( + &count_column_name, + UntaggedValue::int(count_values[idx]), + ); - if let Value { value: UntaggedValue::Primitive(Primitive::Int(ref num)), ref tag } = percentage.clone() { - let string = std::iter::repeat("*").take(num.to_i32().ok_or_else(|| ShellError::labeled_error("Expected a number", "expected a number", tag))? as usize).collect::(); - fact.insert_untagged(&frequency_column_name, UntaggedValue::string(string)); + if let Value { + value: UntaggedValue::Primitive(Primitive::Int(ref num)), + ref tag, + } = percentage + { + let string = std::iter::repeat("*") + .take(num.to_i32().ok_or_else(|| { + ShellError::labeled_error( + "Expected a number", + "expected a number", + tag, + ) + })? as usize) + .collect::(); + fact.insert_untagged( + &frequency_column_name, + UntaggedValue::string(string), + ); } idx += 1; - yield ReturnSuccess::value(fact.into_value()); - } - } + ReturnSuccess::value(fact.into_value()) + })) + .to_output_stream(), + ) + } else { + Ok(OutputStream::empty()) } - _ => {} } - }; - - Ok(stream.to_output_stream()) + _ => Ok(OutputStream::empty()), + } } fn percentages(values: &Value, max: Value, tag: impl Into) -> Result { diff --git a/crates/nu-cli/src/commands/history.rs b/crates/nu-cli/src/commands/history.rs index d3f3ed118..213583fb3 100644 --- a/crates/nu-cli/src/commands/history.rs +++ b/crates/nu-cli/src/commands/history.rs @@ -33,21 +33,25 @@ impl WholeStreamCommand for History { fn history(args: CommandArgs, _registry: &CommandRegistry) -> Result { let tag = args.call_info.name_tag; - let stream = async_stream! { - let history_path = HistoryFile::path(); - let file = File::open(history_path); - if let Ok(file) = file { - let reader = BufReader::new(file); - for line in reader.lines() { - if let Ok(line) = line { - yield ReturnSuccess::value(UntaggedValue::string(line).into_value(tag.clone())); - } - } - } else { - yield Err(ShellError::labeled_error("Could not open history", "history file could not be opened", tag.clone())); - } - }; - Ok(stream.to_output_stream()) + let history_path = HistoryFile::path(); + let file = File::open(history_path); + if let Ok(file) = file { + let reader = BufReader::new(file); + let output = reader.lines().filter_map(move |line| match line { + Ok(line) => Some(ReturnSuccess::value( + UntaggedValue::string(line).into_value(tag.clone()), + )), + Err(_) => None, + }); + + Ok(futures::stream::iter(output).to_output_stream()) + } else { + Err(ShellError::labeled_error( + "Could not open history", + "history file could not be opened", + tag, + )) + } } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/kill.rs b/crates/nu-cli/src/commands/kill.rs index 4c39583df..8ea29665f 100644 --- a/crates/nu-cli/src/commands/kill.rs +++ b/crates/nu-cli/src/commands/kill.rs @@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand; use crate::context::CommandRegistry; use crate::prelude::*; use nu_errors::ShellError; -use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue}; +use nu_protocol::{Signature, SyntaxShape}; use nu_source::Tagged; use std::process::{Command, Stdio}; @@ -43,7 +43,7 @@ impl WholeStreamCommand for Kill { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - kill(args, registry) + kill(args, registry).await } fn examples(&self) -> Vec { @@ -62,63 +62,60 @@ impl WholeStreamCommand for Kill { } } -fn kill(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn kill(args: CommandArgs, registry: &CommandRegistry) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let (KillArgs { + let ( + KillArgs { pid, rest, force, quiet, - }, mut input) = args.process(®istry).await?; - let mut cmd = if cfg!(windows) { - let mut cmd = Command::new("taskkill"); + }, + .., + ) = args.process(®istry).await?; + let mut cmd = if cfg!(windows) { + let mut cmd = Command::new("taskkill"); - if *force { - cmd.arg("/F"); - } + if *force { + cmd.arg("/F"); + } + cmd.arg("/PID"); + cmd.arg(pid.item().to_string()); + + // each pid must written as `/PID 0` otherwise + // taskkill will act as `killall` unix command + for id in &rest { cmd.arg("/PID"); - cmd.arg(pid.item().to_string()); - - // each pid must written as `/PID 0` otherwise - // taskkill will act as `killall` unix command - for id in &rest { - cmd.arg("/PID"); - cmd.arg(id.item().to_string()); - } - - cmd - } else { - let mut cmd = Command::new("kill"); - - if *force { - cmd.arg("-9"); - } - - cmd.arg(pid.item().to_string()); - - cmd.args(rest.iter().map(move |id| id.item().to_string())); - - cmd - }; - - // pipe everything to null - if *quiet { - cmd.stdin(Stdio::null()) - .stdout(Stdio::null()) - .stderr(Stdio::null()); + cmd.arg(id.item().to_string()); } - cmd.status().expect("failed to execute shell command"); + cmd + } else { + let mut cmd = Command::new("kill"); - if false { - yield ReturnSuccess::value(UntaggedValue::nothing().into_value(Tag::unknown())); + if *force { + cmd.arg("-9"); } + + cmd.arg(pid.item().to_string()); + + cmd.args(rest.iter().map(move |id| id.item().to_string())); + + cmd }; - Ok(stream.to_output_stream()) + // pipe everything to null + if *quiet { + cmd.stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()); + } + + cmd.status().expect("failed to execute shell command"); + + Ok(OutputStream::empty()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/merge.rs b/crates/nu-cli/src/commands/merge.rs index 2948782d3..6cf6178d7 100644 --- a/crates/nu-cli/src/commands/merge.rs +++ b/crates/nu-cli/src/commands/merge.rs @@ -37,7 +37,7 @@ impl WholeStreamCommand for Merge { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - merge(args, registry) + merge(args, registry).await } fn examples(&self) -> Vec { @@ -49,57 +49,61 @@ impl WholeStreamCommand for Merge { } } -fn merge(raw_args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn merge( + raw_args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); let scope = raw_args.call_info.scope.clone(); - let stream = async_stream! { - let mut context = Context::from_raw(&raw_args, ®istry); - let name_tag = raw_args.call_info.name_tag.clone(); - let (merge_args, mut input): (MergeArgs, _) = raw_args.process(®istry).await?; - let block = merge_args.block; + let mut context = Context::from_raw(&raw_args, ®istry); + let name_tag = raw_args.call_info.name_tag.clone(); + let (merge_args, input): (MergeArgs, _) = raw_args.process(®istry).await?; + let block = merge_args.block; - let table: Option> = match run_block(&block, - &mut context, - InputStream::empty(), - &scope.it, - &scope.vars, - &scope.env).await { - Ok(mut stream) => Some(stream.drain_vec().await), - Err(err) => { - yield Err(err); - return; - } - }; - - - let table = table.unwrap_or_else(|| vec![Value { - value: UntaggedValue::row(IndexMap::default()), - tag: name_tag, - }]); - - let mut idx = 0; - - while let Some(value) = input.next().await { - let other = table.get(idx); - - match other { - Some(replacement) => { - match merge_values(&value.value, &replacement.value) { - Ok(merged_value) => yield ReturnSuccess::value(merged_value.into_value(&value.tag)), - Err(err) => { - let message = format!("The row at {:?} types mismatch", idx); - yield Err(ShellError::labeled_error("Could not merge", &message, &value.tag)); - } - } - } - None => yield ReturnSuccess::value(value), - } - - idx += 1; + let table: Option> = match run_block( + &block, + &mut context, + InputStream::empty(), + &scope.it, + &scope.vars, + &scope.env, + ) + .await + { + Ok(mut stream) => Some(stream.drain_vec().await), + Err(err) => { + return Err(err); } }; - Ok(stream.to_output_stream()) + let table = table.unwrap_or_else(|| { + vec![Value { + value: UntaggedValue::row(IndexMap::default()), + tag: name_tag, + }] + }); + + Ok(input + .enumerate() + .map(move |(idx, value)| { + let other = table.get(idx); + + match other { + Some(replacement) => match merge_values(&value.value, &replacement.value) { + Ok(merged_value) => ReturnSuccess::value(merged_value.into_value(&value.tag)), + Err(_) => { + let message = format!("The row at {:?} types mismatch", idx); + Err(ShellError::labeled_error( + "Could not merge", + &message, + &value.tag, + )) + } + }, + None => ReturnSuccess::value(value), + } + }) + .to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/nth.rs b/crates/nu-cli/src/commands/nth.rs index 61c32a949..9e10531e1 100644 --- a/crates/nu-cli/src/commands/nth.rs +++ b/crates/nu-cli/src/commands/nth.rs @@ -38,7 +38,7 @@ impl WholeStreamCommand for Nth { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - nth(args, registry) + nth(args, registry).await } fn examples(&self) -> Vec { @@ -57,30 +57,36 @@ impl WholeStreamCommand for Nth { } } -fn nth(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn nth(args: CommandArgs, registry: &CommandRegistry) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let (NthArgs { row_number, rest: and_rows}, input) = args.process(®istry).await?; + let ( + NthArgs { + row_number, + rest: and_rows, + }, + input, + ) = args.process(®istry).await?; - let mut inp = input.enumerate(); - while let Some((idx, item)) = inp.next().await { - let row_number = vec![row_number.clone()]; + let row_numbers = vec![vec![row_number], and_rows] + .into_iter() + .flatten() + .collect::>>(); - let row_numbers = vec![&row_number, &and_rows] - .into_iter() - .flatten() - .collect::>>(); - - if row_numbers - .iter() - .any(|requested| requested.item == idx as u64) - { - yield ReturnSuccess::value(item); - } - } - }; - - Ok(stream.to_output_stream()) + Ok(input + .enumerate() + .filter_map(move |(idx, item)| { + futures::future::ready( + if row_numbers + .iter() + .any(|requested| requested.item == idx as u64) + { + Some(ReturnSuccess::value(item)) + } else { + None + }, + ) + }) + .to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/pivot.rs b/crates/nu-cli/src/commands/pivot.rs index 4f8748eef..30d56c5a7 100644 --- a/crates/nu-cli/src/commands/pivot.rs +++ b/crates/nu-cli/src/commands/pivot.rs @@ -51,92 +51,105 @@ impl WholeStreamCommand for Pivot { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - pivot(args, registry) + pivot(args, registry).await } } -pub fn pivot(args: CommandArgs, registry: &CommandRegistry) -> Result { +pub async fn pivot( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); let name = args.call_info.name_tag.clone(); - let stream = async_stream! { - let (args, mut input): (PivotArgs, _) = args.process(®istry).await?; - let input = input.into_vec().await; + let (args, input): (PivotArgs, _) = args.process(®istry).await?; + let input = input.into_vec().await; - let descs = merge_descriptors(&input); + let descs = merge_descriptors(&input); - let mut headers: Vec = vec![]; + let mut headers: Vec = vec![]; - if args.rest.len() > 0 && args.header_row { - yield Err(ShellError::labeled_error("Can not provide header names and use header row", "using header row", name)); - return; - } + if !args.rest.is_empty() && args.header_row { + return Err(ShellError::labeled_error( + "Can not provide header names and use header row", + "using header row", + name, + )); + } - if args.header_row { - for i in input.clone() { - if let Some(desc) = descs.get(0) { - match get_data_by_key(&i, desc[..].spanned_unknown()) { - Some(x) => { - if let Ok(s) = x.as_string() { - headers.push(s.to_string()); - } else { - yield Err(ShellError::labeled_error("Header row needs string headers", "used non-string headers", name)); - return; - } - } - _ => { - yield Err(ShellError::labeled_error("Header row is incomplete and can't be used", "using incomplete header row", name)); - return; - } - } - } else { - yield Err(ShellError::labeled_error("Header row is incomplete and can't be used", "using incomplete header row", name)); - return; - } - } - } else { - for i in 0..=input.len() { - if let Some(name) = args.rest.get(i) { - headers.push(name.to_string()) - } else { - headers.push(format!("Column{}", i)); - } - } - } - - let descs: Vec<_> = if args.header_row { - descs.iter().skip(1).collect() - } else { - descs.iter().collect() - }; - - for desc in descs { - let mut column_num: usize = 0; - let mut dict = TaggedDictBuilder::new(&name); - - if !args.ignore_titles && !args.header_row { - dict.insert_untagged(headers[column_num].clone(), UntaggedValue::string(desc.clone())); - column_num += 1 - } - - for i in input.clone() { + if args.header_row { + for i in input.clone() { + if let Some(desc) = descs.get(0) { match get_data_by_key(&i, desc[..].spanned_unknown()) { Some(x) => { - dict.insert_value(headers[column_num].clone(), x.clone()); + if let Ok(s) = x.as_string() { + headers.push(s.to_string()); + } else { + return Err(ShellError::labeled_error( + "Header row needs string headers", + "used non-string headers", + name, + )); + } } _ => { - dict.insert_untagged(headers[column_num].clone(), UntaggedValue::nothing()); + return Err(ShellError::labeled_error( + "Header row is incomplete and can't be used", + "using incomplete header row", + name, + )); } } - column_num += 1; + } else { + return Err(ShellError::labeled_error( + "Header row is incomplete and can't be used", + "using incomplete header row", + name, + )); } - - yield ReturnSuccess::value(dict.into_value()); } + } else { + for i in 0..=input.len() { + if let Some(name) = args.rest.get(i) { + headers.push(name.to_string()) + } else { + headers.push(format!("Column{}", i)); + } + } + } - + let descs: Vec<_> = if args.header_row { + descs.into_iter().skip(1).collect() + } else { + descs }; - Ok(OutputStream::new(stream)) + Ok(futures::stream::iter(descs.into_iter().map(move |desc| { + let mut column_num: usize = 0; + let mut dict = TaggedDictBuilder::new(&name); + + if !args.ignore_titles && !args.header_row { + dict.insert_untagged( + headers[column_num].clone(), + UntaggedValue::string(desc.clone()), + ); + column_num += 1 + } + + for i in input.clone() { + match get_data_by_key(&i, desc[..].spanned_unknown()) { + Some(x) => { + dict.insert_value(headers[column_num].clone(), x.clone()); + } + _ => { + dict.insert_untagged(headers[column_num].clone(), UntaggedValue::nothing()); + } + } + column_num += 1; + } + + ReturnSuccess::value(dict.into_value()) + })) + .to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/reject.rs b/crates/nu-cli/src/commands/reject.rs index a2858a673..a3ff4899b 100644 --- a/crates/nu-cli/src/commands/reject.rs +++ b/crates/nu-cli/src/commands/reject.rs @@ -47,7 +47,6 @@ async fn reject(args: CommandArgs, registry: &CommandRegistry) -> Result Result { - rename(args, registry) + rename(args, registry).await } fn examples(&self) -> Vec { @@ -57,17 +57,20 @@ impl WholeStreamCommand for Rename { } } -pub fn rename(args: CommandArgs, registry: &CommandRegistry) -> Result { +pub async fn rename( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); let name = args.call_info.name_tag.clone(); - let stream = async_stream! { - let (Arguments { column_name, rest }, mut input) = args.process(®istry).await?; - let mut new_column_names = vec![vec![column_name]]; - new_column_names.push(rest); + let (Arguments { column_name, rest }, input) = args.process(®istry).await?; + let mut new_column_names = vec![vec![column_name]]; + new_column_names.push(rest); - let new_column_names = new_column_names.into_iter().flatten().collect::>(); + let new_column_names = new_column_names.into_iter().flatten().collect::>(); - while let Some(item) = input.next().await { + Ok(input + .map(move |item| { if let Value { value: UntaggedValue::Row(row), tag, @@ -87,21 +90,19 @@ pub fn rename(args: CommandArgs, registry: &CommandRegistry) -> Result Result { - let tag = args.call_info.name_tag.clone(); let call_info = args.call_info.clone(); let registry = registry.clone(); let block = self.block.clone(); @@ -46,61 +45,26 @@ impl WholeStreamCommand for AliasCommand { let mut context = Context::from_args(&args, ®istry); let input = args.input; - let stream = async_stream! { - let mut scope = call_info.scope.clone(); - let evaluated = call_info.evaluate(®istry).await?; - if let Some(positional) = &evaluated.args.positional { - for (pos, arg) in positional.iter().enumerate() { - scope.vars.insert(alias_command.args[pos].to_string(), arg.clone()); - } + let mut scope = call_info.scope.clone(); + let evaluated = call_info.evaluate(®istry).await?; + if let Some(positional) = &evaluated.args.positional { + for (pos, arg) in positional.iter().enumerate() { + scope + .vars + .insert(alias_command.args[pos].to_string(), arg.clone()); } + } - let result = run_block( - &block, - &mut context, - input, - &scope.it, - &scope.vars, - &scope.env, - ).await; - - match result { - Ok(stream) if stream.is_empty() => { - yield Err(ShellError::labeled_error( - "Expected a block", - "alias needs a block", - tag, - )); - } - Ok(mut stream) => { - // We collect first to ensure errors are put into the context - while let Some(result) = stream.next().await { - yield Ok(ReturnSuccess::Value(result)); - } - - let errors = context.get_errors(); - if let Some(x) = errors.first() { - yield Err(ShellError::labeled_error_with_secondary( - "Alias failed to run", - "alias failed to run", - tag.clone(), - x.to_string(), - tag - )); - } - } - Err(e) => { - yield Err(ShellError::labeled_error_with_secondary( - "Alias failed to run", - "alias failed to run", - tag.clone(), - e.to_string(), - tag - )); - } - } - }; - - Ok(stream.to_output_stream()) + // FIXME: we need to patch up the spans to point at the top-level error + Ok(run_block( + &block, + &mut context, + input, + &scope.it, + &scope.vars, + &scope.env, + ) + .await? + .to_output_stream()) } } diff --git a/crates/nu-cli/src/commands/run_external.rs b/crates/nu-cli/src/commands/run_external.rs index f6982cb20..e5326228b 100644 --- a/crates/nu-cli/src/commands/run_external.rs +++ b/crates/nu-cli/src/commands/run_external.rs @@ -9,7 +9,7 @@ use std::path::PathBuf; use nu_errors::ShellError; use nu_protocol::hir::{Expression, ExternalArgs, ExternalCommand, Literal, SpannedExpression}; -use nu_protocol::{ReturnSuccess, Signature, SyntaxShape}; +use nu_protocol::{Signature, SyntaxShape}; use nu_source::Tagged; #[derive(Deserialize)] @@ -99,69 +99,49 @@ impl WholeStreamCommand for RunExternalCommand { let is_interactive = self.interactive; - let stream = async_stream! { - let command = ExternalCommand { - name, - name_tag: args.call_info.name_tag.clone(), - args: ExternalArgs { - list: positionals.collect(), - span: args.call_info.args.span, - }, - }; - - // If we're in interactive mode, we will "auto cd". That is, instead of interpreting - // this as an external command, we will see it as a path and `cd` into it. - if is_interactive { - if let Some(path) = maybe_autocd_dir(&command, &mut external_context).await { - let cd_args = CdArgs { - path: Some(Tagged { - item: PathBuf::from(path), - tag: args.call_info.name_tag.clone(), - }) - }; - - let result = external_context.shell_manager.cd(cd_args, args.call_info.name_tag.clone()); - match result { - Ok(mut stream) => { - while let Some(value) = stream.next().await { - yield value; - } - }, - Err(e) => { - yield Err(e); - }, - _ => {} - } - - return; - } - } - - let scope = args.call_info.scope.clone(); - let is_last = args.call_info.args.is_last; - let input = args.input; - let result = external::run_external_command( - command, - &mut external_context, - input, - &scope, - is_last, - ).await; - - match result { - Ok(mut stream) => { - while let Some(value) = stream.next().await { - yield Ok(ReturnSuccess::Value(value)); - } - }, - Err(e) => { - yield Err(e); - }, - _ => {} - } + let command = ExternalCommand { + name, + name_tag: args.call_info.name_tag.clone(), + args: ExternalArgs { + list: positionals.collect(), + span: args.call_info.args.span, + }, }; - Ok(stream.to_output_stream()) + // If we're in interactive mode, we will "auto cd". That is, instead of interpreting + // this as an external command, we will see it as a path and `cd` into it. + if is_interactive { + if let Some(path) = maybe_autocd_dir(&command, &mut external_context).await { + let cd_args = CdArgs { + path: Some(Tagged { + item: PathBuf::from(path), + tag: args.call_info.name_tag.clone(), + }), + }; + + let result = external_context + .shell_manager + .cd(cd_args, args.call_info.name_tag.clone()); + match result { + Ok(stream) => return Ok(stream.to_output_stream()), + Err(e) => { + return Err(e); + } + } + } + } + + let scope = args.call_info.scope.clone(); + let is_last = args.call_info.args.is_last; + let input = args.input; + let result = + external::run_external_command(command, &mut external_context, input, &scope, is_last) + .await; + + match result { + Ok(stream) => Ok(stream.to_output_stream()), + Err(e) => Err(e), + } } } diff --git a/crates/nu-cli/src/commands/shuffle.rs b/crates/nu-cli/src/commands/shuffle.rs index 7b3297802..d1a00809f 100644 --- a/crates/nu-cli/src/commands/shuffle.rs +++ b/crates/nu-cli/src/commands/shuffle.rs @@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand; use crate::context::CommandRegistry; use crate::prelude::*; use nu_errors::ShellError; -use nu_protocol::{ReturnSuccess, ReturnValue, Value}; +use nu_protocol::{ReturnSuccess, Value}; use rand::seq::SliceRandom; use rand::thread_rng; @@ -24,28 +24,20 @@ impl WholeStreamCommand for Shuffle { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - shuffle(args, registry) + shuffle(args, registry).await } } -fn shuffle(args: CommandArgs, _registry: &CommandRegistry) -> Result { - let stream = async_stream! { - let mut input = args.input; - let mut values: Vec = input.collect().await; +async fn shuffle( + args: CommandArgs, + _registry: &CommandRegistry, +) -> Result { + let input = args.input; + let mut values: Vec = input.collect().await; - let out = { - values.shuffle(&mut thread_rng()); - values.clone() - }; + values.shuffle(&mut thread_rng()); - for val in out.into_iter() { - yield ReturnSuccess::value(val); - } - }; - - let stream: BoxStream<'static, ReturnValue> = stream.boxed(); - - Ok(stream.to_output_stream()) + Ok(futures::stream::iter(values.into_iter().map(ReturnSuccess::value)).to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/sum.rs b/crates/nu-cli/src/commands/sum.rs index 55b9aaeb8..b326b72e0 100644 --- a/crates/nu-cli/src/commands/sum.rs +++ b/crates/nu-cli/src/commands/sum.rs @@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand; use crate::prelude::*; use crate::utils::data_processing::{reducer_for, Reduce}; use nu_errors::ShellError; -use nu_protocol::{Dictionary, ReturnSuccess, ReturnValue, Signature, UntaggedValue, Value}; +use nu_protocol::{Dictionary, ReturnSuccess, Signature, UntaggedValue, Value}; use num_traits::identities::Zero; use indexmap::map::IndexMap; @@ -38,6 +38,7 @@ impl WholeStreamCommand for Sum { name: args.call_info.name_tag, raw_input: args.raw_input, }) + .await } fn examples(&self) -> Vec { @@ -56,48 +57,45 @@ impl WholeStreamCommand for Sum { } } -fn sum(RunnableContext { mut input, .. }: RunnableContext) -> Result { - let stream = async_stream! { - let mut values: Vec = input.drain_vec().await; - let action = reducer_for(Reduce::Sum); +async fn sum( + RunnableContext { mut input, .. }: RunnableContext, +) -> Result { + let values: Vec = input.drain_vec().await; + let action = reducer_for(Reduce::Sum); - if values.iter().all(|v| if let UntaggedValue::Primitive(_) = v.value {true} else {false}) { - let total = action(Value::zero(), values)?; - yield ReturnSuccess::value(total) - } else { - let mut column_values = IndexMap::new(); - for value in values { - match value.value { - UntaggedValue::Row(row_dict) => { - for (key, value) in row_dict.entries.iter() { - column_values - .entry(key.clone()) - .and_modify(|v: &mut Vec| v.push(value.clone())) - .or_insert(vec![value.clone()]); - } - }, - table => {}, - }; - } - - let mut column_totals = IndexMap::new(); - for (col_name, col_vals) in column_values { - let sum = action(Value::zero(), col_vals); - match sum { - Ok(value) => { - column_totals.insert(col_name, value); - }, - Err(err) => yield Err(err), - }; - } - yield ReturnSuccess::value( - UntaggedValue::Row(Dictionary {entries: column_totals}).into_untagged_value()) + if values.iter().all(|v| v.is_primitive()) { + let total = action(Value::zero(), values)?; + Ok(OutputStream::one(ReturnSuccess::value(total))) + } else { + let mut column_values = IndexMap::new(); + for value in values { + if let UntaggedValue::Row(row_dict) = value.value { + for (key, value) in row_dict.entries.iter() { + column_values + .entry(key.clone()) + .and_modify(|v: &mut Vec| v.push(value.clone())) + .or_insert(vec![value.clone()]); + } + }; } - }; - let stream: BoxStream<'static, ReturnValue> = stream.boxed(); - - Ok(stream.to_output_stream()) + let mut column_totals = IndexMap::new(); + for (col_name, col_vals) in column_values { + let sum = action(Value::zero(), col_vals); + match sum { + Ok(value) => { + column_totals.insert(col_name, value); + } + Err(err) => return Err(err), + }; + } + Ok(OutputStream::one(ReturnSuccess::value( + UntaggedValue::Row(Dictionary { + entries: column_totals, + }) + .into_untagged_value(), + ))) + } } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/to.rs b/crates/nu-cli/src/commands/to.rs index 5db99546b..5c35d945e 100644 --- a/crates/nu-cli/src/commands/to.rs +++ b/crates/nu-cli/src/commands/to.rs @@ -26,14 +26,10 @@ impl WholeStreamCommand for To { registry: &CommandRegistry, ) -> Result { let registry = registry.clone(); - let stream = async_stream! { - yield Ok(ReturnSuccess::Value( - UntaggedValue::string(crate::commands::help::get_help(&To, ®istry)) - .into_value(Tag::unknown()), - )); - }; - - Ok(stream.to_output_stream()) + Ok(OutputStream::one(ReturnSuccess::value( + UntaggedValue::string(crate::commands::help::get_help(&To, ®istry)) + .into_value(Tag::unknown()), + ))) } }