From 012c99839c2ee2730ee37c27aa0548b50712a394 Mon Sep 17 00:00:00 2001 From: "Joseph T. Lyons" Date: Thu, 4 Jun 2020 04:42:23 -0400 Subject: [PATCH] Moving some commands off of async stream (#1934) * Remove async_stream from rm * Remove async_stream from sort_by * Remove async_stream from split_by * Remove dbg!() statement * Remove async_stream from uniq * Remove async_stream from mkdir * Don't change functions from private to public * Clippy fixes * Peer-review updates --- crates/nu-cli/src/commands/mkdir.rs | 19 ++---- crates/nu-cli/src/commands/rm.rs | 34 +++++------ crates/nu-cli/src/commands/sort_by.rs | 84 ++++++++++++++------------ crates/nu-cli/src/commands/split_by.rs | 41 ++++++------- crates/nu-cli/src/commands/uniq.rs | 23 ++++--- 5 files changed, 99 insertions(+), 102 deletions(-) diff --git a/crates/nu-cli/src/commands/mkdir.rs b/crates/nu-cli/src/commands/mkdir.rs index e286a9858..54debf250 100644 --- a/crates/nu-cli/src/commands/mkdir.rs +++ b/crates/nu-cli/src/commands/mkdir.rs @@ -32,7 +32,7 @@ impl WholeStreamCommand for Mkdir { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - mkdir(args, registry) + mkdir(args, registry).await } fn examples(&self) -> Vec { @@ -44,20 +44,13 @@ impl WholeStreamCommand for Mkdir { } } -fn mkdir(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn mkdir(args: CommandArgs, registry: &CommandRegistry) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let name = args.call_info.name_tag.clone(); - let shell_manager = args.shell_manager.clone(); - let (args, _) = args.process(®istry).await?; - let mut result = shell_manager.mkdir(args, name)?; + let name = args.call_info.name_tag.clone(); + let shell_manager = args.shell_manager.clone(); + let (args, _) = args.process(®istry).await?; - while let Some(item) = result.next().await { - yield item; - } - }; - - Ok(stream.to_output_stream()) + shell_manager.mkdir(args, name) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/rm.rs b/crates/nu-cli/src/commands/rm.rs index e676cf946..c5b3cf4f6 100644 --- a/crates/nu-cli/src/commands/rm.rs +++ b/crates/nu-cli/src/commands/rm.rs @@ -49,7 +49,7 @@ impl WholeStreamCommand for Remove { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - rm(args, registry) + rm(args, registry).await } fn examples(&self) -> Vec { @@ -73,27 +73,21 @@ impl WholeStreamCommand for Remove { } } -fn rm(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn rm(args: CommandArgs, registry: &CommandRegistry) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let name = args.call_info.name_tag.clone(); - let shell_manager = args.shell_manager.clone(); - let (args, _): (RemoveArgs, _) = args.process(®istry).await?; - let mut result = if args.trash.item && args.permanent.item { - OutputStream::one(Err(ShellError::labeled_error( - "only one of --permanent and --trash can be used", - "conflicting flags", - name - ))) - } else { - shell_manager.rm(args, name)? - }; - while let Some(item) = result.next().await { - yield item; - } - }; + let name = args.call_info.name_tag.clone(); + let shell_manager = args.shell_manager.clone(); + let (args, _): (RemoveArgs, _) = args.process(®istry).await?; - Ok(stream.to_output_stream()) + if args.trash.item && args.permanent.item { + return Ok(OutputStream::one(Err(ShellError::labeled_error( + "only one of --permanent and --trash can be used", + "conflicting flags", + name, + )))); + } + + shell_manager.rm(args, name) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/sort_by.rs b/crates/nu-cli/src/commands/sort_by.rs index 87d3b6f96..cde0c5fc8 100644 --- a/crates/nu-cli/src/commands/sort_by.rs +++ b/crates/nu-cli/src/commands/sort_by.rs @@ -31,7 +31,7 @@ impl WholeStreamCommand for SortBy { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - sort_by(args, registry) + sort_by(args, registry).await } fn examples(&self) -> Vec { @@ -60,51 +60,59 @@ impl WholeStreamCommand for SortBy { } } -fn sort_by(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn sort_by( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let (SortByArgs { rest }, mut input) = args.process(®istry).await?; - let mut vec = input.drain_vec().await; + let tag = args.call_info.name_tag.clone(); - if vec.is_empty() { - return; + let (SortByArgs { rest }, mut input) = args.process(®istry).await?; + let mut vec = input.drain_vec().await; + + if vec.is_empty() { + return Err(ShellError::labeled_error( + "Error performing sort-by command", + "sort-by error", + tag, + )); + } + + for sort_arg in rest.iter() { + let match_test = get_data_by_key(&vec[0], sort_arg.borrow_spanned()); + if match_test == None { + return Err(ShellError::labeled_error( + "Can not find column to sort by", + "invalid column", + sort_arg.borrow_spanned().span, + )); } + } - for sort_arg in rest.iter() { - let match_test = get_data_by_key(&vec[0], sort_arg.borrow_spanned()); - if match_test == None { - yield Err(ShellError::labeled_error( - "Can not find column to sort by", - "invalid column", - sort_arg.borrow_spanned().span, - )); - return; - } + match &vec[0] { + Value { + value: UntaggedValue::Primitive(_), + .. + } => { + vec.sort(); } - - match &vec[0] { - Value { - value: UntaggedValue::Primitive(_), - .. - } => { - vec.sort(); - }, - _ => { - let calc_key = |item: &Value| { - rest.iter() - .map(|f| get_data_by_key(item, f.borrow_spanned())) - .collect::>>() - }; - vec.sort_by_cached_key(calc_key); - }, - }; - - for item in vec { - yield item.into(); + _ => { + let calc_key = |item: &Value| { + rest.iter() + .map(|f| get_data_by_key(item, f.borrow_spanned())) + .collect::>>() + }; + vec.sort_by_cached_key(calc_key); } }; - Ok(stream.to_output_stream()) + let mut values_vec_deque: VecDeque = VecDeque::new(); + + for item in vec { + values_vec_deque.push_back(item); + } + + Ok(futures::stream::iter(values_vec_deque).to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/split_by.rs b/crates/nu-cli/src/commands/split_by.rs index 185323845..6a35a178c 100644 --- a/crates/nu-cli/src/commands/split_by.rs +++ b/crates/nu-cli/src/commands/split_by.rs @@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand; use crate::prelude::*; use nu_errors::ShellError; use nu_protocol::{ - ReturnSuccess, Signature, SpannedTypeName, SyntaxShape, TaggedDictBuilder, UntaggedValue, Value, + Signature, SpannedTypeName, SyntaxShape, TaggedDictBuilder, UntaggedValue, Value, }; use nu_source::Tagged; @@ -36,32 +36,31 @@ impl WholeStreamCommand for SplitBy { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - split_by(args, registry) + split_by(args, registry).await } } -pub fn split_by(args: CommandArgs, registry: &CommandRegistry) -> Result { +pub async fn split_by( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let name = args.call_info.name_tag.clone(); - let (SplitByArgs { column_name }, mut input) = args.process(®istry).await?; - let values: Vec = input.collect().await; + let name = args.call_info.name_tag.clone(); + let (SplitByArgs { column_name }, input) = args.process(®istry).await?; + let values: Vec = input.collect().await; - if values.len() > 1 || values.is_empty() { - yield Err(ShellError::labeled_error( - "Expected table from pipeline", - "requires a table input", - column_name.span() - )) - } else { - match split(&column_name, &values[0], name) { - Ok(split) => yield ReturnSuccess::value(split), - Err(err) => yield Err(err), - } - } - }; + if values.len() > 1 || values.is_empty() { + return Err(ShellError::labeled_error( + "Expected table from pipeline", + "requires a table input", + column_name.span(), + )); + } - Ok(stream.to_output_stream()) + match split(&column_name, &values[0], name) { + Ok(split) => Ok(OutputStream::one(split)), + Err(err) => Err(err), + } } pub fn split( diff --git a/crates/nu-cli/src/commands/uniq.rs b/crates/nu-cli/src/commands/uniq.rs index 869972033..4f024188d 100644 --- a/crates/nu-cli/src/commands/uniq.rs +++ b/crates/nu-cli/src/commands/uniq.rs @@ -26,21 +26,24 @@ impl WholeStreamCommand for Uniq { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - uniq(args, registry) + uniq(args, registry).await } } -fn uniq(args: CommandArgs, _registry: &CommandRegistry) -> Result { - let stream = async_stream! { - let mut input = args.input; - let uniq_values: IndexSet<_> = input.collect().await; +async fn uniq(args: CommandArgs, _registry: &CommandRegistry) -> Result { + let input = args.input; + let uniq_values: IndexSet<_> = input.collect().await; - for item in uniq_values.iter().map(|row| ReturnSuccess::value(row.clone())) { - yield item; - } - }; + let mut values_vec_deque = VecDeque::new(); - Ok(stream.to_output_stream()) + for item in uniq_values + .iter() + .map(|row| ReturnSuccess::value(row.clone())) + { + values_vec_deque.push_back(item); + } + + Ok(futures::stream::iter(values_vec_deque).to_output_stream()) } #[cfg(test)]