From 40673e45994b73d6ee4f1019b46e4ee9876db178 Mon Sep 17 00:00:00 2001 From: Jonathan Turner Date: Sat, 13 Jun 2020 12:13:36 -0700 Subject: [PATCH] Another batch of removing async_stream (#1978) --- TODO.md | 60 -------- crates/nu-cli/src/commands/autoview.rs | 22 +-- crates/nu-cli/src/commands/command.rs | 54 +++---- crates/nu-cli/src/commands/echo.rs | 111 +++++++------- crates/nu-cli/src/commands/from_xlsx.rs | 84 +++++----- crates/nu-cli/src/commands/split/column.rs | 35 +++-- .../nu-cli/src/commands/str_/to_datetime.rs | 51 +++--- crates/nu-cli/src/commands/t_sort_by.rs | 57 ++++--- crates/nu-cli/src/commands/table.rs | 129 ++++++++-------- crates/nu-cli/src/commands/to_bson.rs | 80 +++++----- crates/nu-cli/src/commands/to_csv.rs | 63 ++++---- .../nu-cli/src/commands/to_delimited_data.rs | 49 +++--- crates/nu-cli/src/commands/to_json.rs | 145 ++++++++++-------- crates/nu-cli/src/commands/to_toml.rs | 80 +++++----- crates/nu-cli/src/commands/to_tsv.rs | 2 +- crates/nu-cli/src/commands/where_.rs | 100 ++++++------ 16 files changed, 554 insertions(+), 568 deletions(-) diff --git a/TODO.md b/TODO.md index 2ac069f45a..e69de29bb2 100644 --- a/TODO.md +++ b/TODO.md @@ -1,60 +0,0 @@ -This pattern is extremely repetitive and can be abstracted: - -```rs - let args = args.evaluate_once(registry)?; - let tag = args.name_tag(); - let input = args.input; - - let stream = async_stream! { - let values: Vec = input.values.collect().await; - - let mut concat_string = String::new(); - let mut latest_tag: Option = None; - - for value in values { - latest_tag = Some(value_tag.clone()); - let value_span = value.tag.span; - - match &value.value { - UntaggedValue::Primitive(Primitive::String(s)) => { - concat_string.push_str(&s); - concat_string.push_str("\n"); - } - _ => yield Err(ShellError::labeled_error_with_secondary( - "Expected a string from pipeline", - "requires string input", - name_span, - "value originates from here", - value_span, - )), - - } - } - -``` - -Mandatory and Optional in parse_command - -trace_remaining? - -select_fields and select_fields take unnecessary Tag - -Value#value should be Value#untagged - -Unify dictionary building, probably around a macro - -sys plugin in own crate - -textview in own crate - -Combine atomic and atomic_parse in parser - -at_end_possible_ws needs to be comment and separator sensitive - -Eliminate unnecessary `nodes` parser - -#[derive(HasSpan)] - -Figure out a solution for the duplication in stuff like NumberShape vs. NumberExpressionShape - -use `struct Expander` from signature.rs \ No newline at end of file diff --git a/crates/nu-cli/src/commands/autoview.rs b/crates/nu-cli/src/commands/autoview.rs index eee694efbe..c587d603b3 100644 --- a/crates/nu-cli/src/commands/autoview.rs +++ b/crates/nu-cli/src/commands/autoview.rs @@ -9,7 +9,6 @@ use parking_lot::Mutex; use prettytable::format::{FormatBuilder, LinePosition, LineSeparator}; use prettytable::{color, Attr, Cell, Row, Table}; use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; use textwrap::fill; pub struct Autoview; @@ -115,23 +114,12 @@ pub async fn autoview(context: RunnableContext) -> Result { let ctrl_c = context.ctrl_c.clone(); - let stream = async_stream! { - yield Ok(x); - yield Ok(y); + let xy = vec![x, y]; + let xy_stream = futures::stream::iter(xy) + .chain(input_stream) + .interruptible(ctrl_c); - loop { - match input_stream.next().await { - Some(z) => { - if ctrl_c.load(Ordering::SeqCst) { - break; - } - yield Ok(z); - } - _ => break, - } - } - }; - let stream = stream.to_input_stream(); + let stream = InputStream::from_stream(xy_stream); if let Some(table) = table { let command_args = create_default_command_args(&context).with_input(stream); diff --git a/crates/nu-cli/src/commands/command.rs b/crates/nu-cli/src/commands/command.rs index 77b51a4d2a..b5a138eda3 100644 --- a/crates/nu-cli/src/commands/command.rs +++ b/crates/nu-cli/src/commands/command.rs @@ -389,42 +389,44 @@ impl WholeStreamCommand for FnFilterCommand { ctrl_c, shell_manager, call_info, - mut input, + input, .. }: CommandArgs, registry: &CommandRegistry, ) -> Result { - let host: Arc> = host.clone(); - let registry: CommandRegistry = registry.clone(); + let registry = Arc::new(registry.clone()); let func = self.func; - let stream = async_stream! { - while let Some(it) = input.next().await { + Ok(input + .then(move |it| { + let host = host.clone(); let registry = registry.clone(); - let call_info = match call_info.clone().evaluate_with_new_it(®istry, &it).await { - Err(err) => { yield Err(err); return; }, - Ok(args) => args, - }; - - let args = EvaluatedFilterCommandArgs::new( - host.clone(), - ctrl_c.clone(), - shell_manager.clone(), - call_info, - ); - - match func(args) { - Err(err) => yield Err(err), - Ok(mut stream) => { - while let Some(value) = stream.values.next().await { - yield value; + let ctrl_c = ctrl_c.clone(); + let shell_manager = shell_manager.clone(); + let call_info = call_info.clone(); + async move { + let call_info = match call_info.evaluate_with_new_it(&*registry, &it).await { + Err(err) => { + return OutputStream::one(Err(err)); } + Ok(args) => args, + }; + + let args = EvaluatedFilterCommandArgs::new( + host.clone(), + ctrl_c.clone(), + shell_manager.clone(), + call_info, + ); + + match func(args) { + Err(err) => return OutputStream::one(Err(err)), + Ok(stream) => stream, } } - } - }; - - Ok(stream.to_output_stream()) + }) + .flatten() + .to_output_stream()) } } diff --git a/crates/nu-cli/src/commands/echo.rs b/crates/nu-cli/src/commands/echo.rs index e510794c35..4d9b22f57a 100644 --- a/crates/nu-cli/src/commands/echo.rs +++ b/crates/nu-cli/src/commands/echo.rs @@ -32,7 +32,7 @@ impl WholeStreamCommand for Echo { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - echo(args, registry) + echo(args, registry).await } fn examples(&self) -> Vec { @@ -51,67 +51,62 @@ impl WholeStreamCommand for Echo { } } -fn echo(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn echo(args: CommandArgs, registry: &CommandRegistry) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let (args, _): (EchoArgs, _) = args.process(®istry).await?; + let (args, _): (EchoArgs, _) = args.process(®istry).await?; - for i in args.rest { - match i.as_string() { - Ok(s) => { - yield Ok(ReturnSuccess::Value( - UntaggedValue::string(s).into_value(i.tag.clone()), - )); - } - _ => match i { - Value { - value: UntaggedValue::Table(table), - .. - } => { - for value in table { - yield Ok(ReturnSuccess::Value(value.clone())); - } - } - Value { - value: UntaggedValue::Primitive(Primitive::Range(range)), - tag - } => { - let mut current = range.from.0.item; - while current != range.to.0.item { - yield Ok(ReturnSuccess::Value(UntaggedValue::Primitive(current.clone()).into_value(&tag))); - current = match crate::data::value::compute_values(Operator::Plus, &UntaggedValue::Primitive(current), &UntaggedValue::int(1)) { - Ok(result) => match result { - UntaggedValue::Primitive(p) => p, - _ => { - yield Err(ShellError::unimplemented("Internal error: expected a primitive result from increment")); - return; - } - }, - Err((left_type, right_type)) => { - yield Err(ShellError::coerce_error( - left_type.spanned(tag.span), - right_type.spanned(tag.span), - )); - return; - } - } - } - match range.to.1 { - RangeInclusion::Inclusive => { - yield Ok(ReturnSuccess::Value(UntaggedValue::Primitive(current.clone()).into_value(&tag))); - } - _ => {} - } - } - _ => { - yield Ok(ReturnSuccess::Value(i.clone())); - } - }, + let stream = args.rest.into_iter().map(|i| { + match i.as_string() { + Ok(s) => { + OutputStream::one(Ok(ReturnSuccess::Value( + UntaggedValue::string(s).into_value(i.tag.clone()), + ))) } - } - }; + _ => match i { + Value { + value: UntaggedValue::Table(table), + .. + } => { + futures::stream::iter(table.into_iter().map(ReturnSuccess::value)).to_output_stream() + } + Value { + value: UntaggedValue::Primitive(Primitive::Range(range)), + tag + } => { + let mut output_vec = vec![]; - Ok(stream.to_output_stream()) + let mut current = range.from.0.item; + while current != range.to.0.item { + output_vec.push(Ok(ReturnSuccess::Value(UntaggedValue::Primitive(current.clone()).into_value(&tag)))); + current = match crate::data::value::compute_values(Operator::Plus, &UntaggedValue::Primitive(current), &UntaggedValue::int(1)) { + Ok(result) => match result { + UntaggedValue::Primitive(p) => p, + _ => { + return OutputStream::one(Err(ShellError::unimplemented("Internal error: expected a primitive result from increment"))); + } + }, + Err((left_type, right_type)) => { + return OutputStream::one(Err(ShellError::coerce_error( + left_type.spanned(tag.span), + right_type.spanned(tag.span), + ))); + } + } + } + if let RangeInclusion::Inclusive = range.to.1 { + output_vec.push(Ok(ReturnSuccess::Value(UntaggedValue::Primitive(current).into_value(&tag)))); + } + + futures::stream::iter(output_vec.into_iter()).to_output_stream() + } + _ => { + OutputStream::one(Ok(ReturnSuccess::Value(i.clone()))) + } + }, + } + }); + + Ok(futures::stream::iter(stream).flatten().to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/from_xlsx.rs b/crates/nu-cli/src/commands/from_xlsx.rs index 3bc6691ed9..c518837ef1 100644 --- a/crates/nu-cli/src/commands/from_xlsx.rs +++ b/crates/nu-cli/src/commands/from_xlsx.rs @@ -36,62 +36,66 @@ impl WholeStreamCommand for FromXLSX { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - from_xlsx(args, registry) + from_xlsx(args, registry).await } } -fn from_xlsx(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn from_xlsx( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let tag = args.call_info.name_tag.clone(); let registry = registry.clone(); - let stream = async_stream! { - let (FromXLSXArgs { headerless: _headerless }, mut input) = args.process(®istry).await?; - let value = input.collect_binary(tag.clone()).await?; + let ( + FromXLSXArgs { + headerless: _headerless, + }, + input, + ) = args.process(®istry).await?; + let value = input.collect_binary(tag.clone()).await?; - let mut buf: Cursor> = Cursor::new(value.item); - let mut xls = Xlsx::<_>::new(buf).map_err(|_| { - ShellError::labeled_error("Could not load xlsx file", "could not load xlsx file", &tag) - })?; + let buf: Cursor> = Cursor::new(value.item); + let mut xls = Xlsx::<_>::new(buf).map_err(|_| { + ShellError::labeled_error("Could not load xlsx file", "could not load xlsx file", &tag) + })?; - let mut dict = TaggedDictBuilder::new(&tag); + let mut dict = TaggedDictBuilder::new(&tag); - let sheet_names = xls.sheet_names().to_owned(); + let sheet_names = xls.sheet_names().to_owned(); - for sheet_name in &sheet_names { - let mut sheet_output = TaggedListBuilder::new(&tag); + for sheet_name in &sheet_names { + let mut sheet_output = TaggedListBuilder::new(&tag); - if let Some(Ok(current_sheet)) = xls.worksheet_range(sheet_name) { - for row in current_sheet.rows() { - let mut row_output = TaggedDictBuilder::new(&tag); - for (i, cell) in row.iter().enumerate() { - let value = match cell { - DataType::Empty => UntaggedValue::nothing(), - DataType::String(s) => UntaggedValue::string(s), - DataType::Float(f) => UntaggedValue::decimal(*f), - DataType::Int(i) => UntaggedValue::int(*i), - DataType::Bool(b) => UntaggedValue::boolean(*b), - _ => UntaggedValue::nothing(), - }; + if let Some(Ok(current_sheet)) = xls.worksheet_range(sheet_name) { + for row in current_sheet.rows() { + let mut row_output = TaggedDictBuilder::new(&tag); + for (i, cell) in row.iter().enumerate() { + let value = match cell { + DataType::Empty => UntaggedValue::nothing(), + DataType::String(s) => UntaggedValue::string(s), + DataType::Float(f) => UntaggedValue::decimal(*f), + DataType::Int(i) => UntaggedValue::int(*i), + DataType::Bool(b) => UntaggedValue::boolean(*b), + _ => UntaggedValue::nothing(), + }; - row_output.insert_untagged(&format!("Column{}", i), value); - } - - sheet_output.push_untagged(row_output.into_untagged_value()); + row_output.insert_untagged(&format!("Column{}", i), value); } - dict.insert_untagged(sheet_name, sheet_output.into_untagged_value()); - } else { - yield Err(ShellError::labeled_error( - "Could not load sheet", - "could not load sheet", - &tag, - )); + sheet_output.push_untagged(row_output.into_untagged_value()); } + + dict.insert_untagged(sheet_name, sheet_output.into_untagged_value()); + } else { + return Err(ShellError::labeled_error( + "Could not load sheet", + "could not load sheet", + &tag, + )); } + } - yield ReturnSuccess::value(dict.into_value()); - }; - - Ok(stream.to_output_stream()) + Ok(OutputStream::one(ReturnSuccess::value(dict.into_value()))) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/split/column.rs b/crates/nu-cli/src/commands/split/column.rs index 5508d310ca..6c419f3d30 100644 --- a/crates/nu-cli/src/commands/split/column.rs +++ b/crates/nu-cli/src/commands/split/column.rs @@ -43,16 +43,27 @@ impl WholeStreamCommand for SubCommand { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - split_column(args, registry) + split_column(args, registry).await } } -fn split_column(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn split_column( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let name_span = args.call_info.name_tag.span; let registry = registry.clone(); - let stream = async_stream! { - let (SplitColumnArgs { separator, rest, collapse_empty }, mut input) = args.process(®istry).await?; - while let Some(v) = input.next().await { + let ( + SplitColumnArgs { + separator, + rest, + collapse_empty, + }, + input, + ) = args.process(®istry).await?; + + Ok(input + .map(move |v| { if let Ok(s) = v.as_string() { let splitter = separator.replace("\\n", "\n"); trace!("splitting with {:?}", splitter); @@ -79,7 +90,7 @@ fn split_column(args: CommandArgs, registry: &CommandRegistry) -> Result Result Result { - operate(args, registry) + operate(args, registry).await } fn examples(&self) -> Vec { @@ -62,54 +62,53 @@ impl WholeStreamCommand for SubCommand { #[derive(Clone)] struct DatetimeFormat(String); -fn operate(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn operate( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let (Arguments { format, rest }, mut input) = args.process(®istry).await?; + let (Arguments { format, rest }, input) = args.process(®istry).await?; - let column_paths: Vec<_> = rest.iter().map(|x| x.clone()).collect(); + let column_paths: Vec<_> = rest; - let options = if let Some(Tagged { item: fmt, tag }) = format { - DatetimeFormat(fmt) - } else { - DatetimeFormat(String::from("%d.%m.%Y %H:%M %P %z")) - }; + let options = if let Some(Tagged { item: fmt, .. }) = format { + DatetimeFormat(fmt) + } else { + DatetimeFormat(String::from("%d.%m.%Y %H:%M %P %z")) + }; - while let Some(v) = input.next().await { + Ok(input + .map(move |v| { if column_paths.is_empty() { match action(&v, &options, v.tag()) { - Ok(out) => yield ReturnSuccess::value(out), - Err(err) => { - yield Err(err); - return; - } + Ok(out) => ReturnSuccess::value(out), + Err(err) => Err(err), } } else { - - let mut ret = v.clone(); + let mut ret = v; for path in &column_paths { let options = options.clone(); - let swapping = ret.swap_data_by_column_path(path, Box::new(move |old| action(old, &options, old.tag()))); + let swapping = ret.swap_data_by_column_path( + path, + Box::new(move |old| action(old, &options, old.tag())), + ); match swapping { Ok(new_value) => { ret = new_value; } Err(err) => { - yield Err(err); - return; + return Err(err); } } } - yield ReturnSuccess::value(ret); + ReturnSuccess::value(ret) } - } - }; - - Ok(stream.to_output_stream()) + }) + .to_output_stream()) } fn action( diff --git a/crates/nu-cli/src/commands/t_sort_by.rs b/crates/nu-cli/src/commands/t_sort_by.rs index 1c86da94db..98d76ec27a 100644 --- a/crates/nu-cli/src/commands/t_sort_by.rs +++ b/crates/nu-cli/src/commands/t_sort_by.rs @@ -57,36 +57,47 @@ impl WholeStreamCommand for TSortBy { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - t_sort_by(args, registry) + t_sort_by(args, registry).await } } -fn t_sort_by(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn t_sort_by( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let name = args.call_info.name_tag.clone(); - let (TSortByArgs { show_columns, group_by, ..}, mut input) = args.process(®istry).await?; - let values: Vec = input.collect().await; + let name = args.call_info.name_tag.clone(); + let ( + TSortByArgs { + show_columns, + group_by, + .. + }, + mut input, + ) = args.process(®istry).await?; + let values: Vec = input.collect().await; - let column_grouped_by_name = if let Some(grouped_by) = group_by { - Some(grouped_by.item().clone()) - } else { - None - }; - - if show_columns { - for label in columns_sorted(column_grouped_by_name, &values[0], &name).into_iter() { - yield ReturnSuccess::value(UntaggedValue::string(label.item).into_value(label.tag)); - } - } else { - match t_sort(column_grouped_by_name, None, &values[0], name) { - Ok(sorted) => yield ReturnSuccess::value(sorted), - Err(err) => yield Err(err) - } - } + let column_grouped_by_name = if let Some(grouped_by) = group_by { + Some(grouped_by.item().clone()) + } else { + None }; - Ok(stream.to_output_stream()) + if show_columns { + Ok(futures::stream::iter( + columns_sorted(column_grouped_by_name, &values[0], &name) + .into_iter() + .map(move |label| { + ReturnSuccess::value(UntaggedValue::string(label.item).into_value(label.tag)) + }), + ) + .to_output_stream()) + } else { + match t_sort(column_grouped_by_name, None, &values[0], name) { + Ok(sorted) => Ok(OutputStream::one(ReturnSuccess::value(sorted))), + Err(err) => Ok(OutputStream::one(Err(err))), + } + } } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/table.rs b/crates/nu-cli/src/commands/table.rs index 796aaa64bc..ea8f3da027 100644 --- a/crates/nu-cli/src/commands/table.rs +++ b/crates/nu-cli/src/commands/table.rs @@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand; use crate::format::TableView; use crate::prelude::*; use nu_errors::ShellError; -use nu_protocol::{Primitive, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value}; +use nu_protocol::{Primitive, Signature, SyntaxShape, UntaggedValue, Value}; use std::time::Instant; const STREAM_PAGE_SIZE: usize = 1000; @@ -34,100 +34,97 @@ impl WholeStreamCommand for Table { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - table(args, registry) + table(args, registry).await } } -fn table(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn table(args: CommandArgs, registry: &CommandRegistry) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let mut args = args.evaluate_once(®istry).await?; - let mut finished = false; + let mut args = args.evaluate_once(®istry).await?; + let mut finished = false; - let host = args.host.clone(); - let mut start_number = match args.get("start_number") { - Some(Value { value: UntaggedValue::Primitive(Primitive::Int(i)), .. }) => { - if let Some(num) = i.to_usize() { - num - } else { - yield Err(ShellError::labeled_error("Expected a row number", "expected a row number", &args.args.call_info.name_tag)); - 0 - } + let host = args.host.clone(); + let mut start_number = match args.get("start_number") { + Some(Value { + value: UntaggedValue::Primitive(Primitive::Int(i)), + .. + }) => { + if let Some(num) = i.to_usize() { + num + } else { + return Err(ShellError::labeled_error( + "Expected a row number", + "expected a row number", + &args.args.call_info.name_tag, + )); } - _ => { - 0 - } - }; + } + _ => 0, + }; - let mut delay_slot = None; + let mut delay_slot = None; - while !finished { - let mut new_input: VecDeque = VecDeque::new(); + while !finished { + let mut new_input: VecDeque = VecDeque::new(); - let start_time = Instant::now(); - for idx in 0..STREAM_PAGE_SIZE { - if let Some(val) = delay_slot { - new_input.push_back(val); - delay_slot = None; - } else { - match args.input.next().await { - Some(a) => { - if !new_input.is_empty() { - if let Some(descs) = new_input.get(0) { - let descs = descs.data_descriptors(); - let compare = a.data_descriptors(); - if descs != compare { - delay_slot = Some(a); - break; - } else { - new_input.push_back(a); - } + let start_time = Instant::now(); + for idx in 0..STREAM_PAGE_SIZE { + if let Some(val) = delay_slot { + new_input.push_back(val); + delay_slot = None; + } else { + match args.input.next().await { + Some(a) => { + if !new_input.is_empty() { + if let Some(descs) = new_input.get(0) { + let descs = descs.data_descriptors(); + let compare = a.data_descriptors(); + if descs != compare { + delay_slot = Some(a); + break; } else { new_input.push_back(a); } } else { new_input.push_back(a); } - } - _ => { - finished = true; - break; + } else { + new_input.push_back(a); } } + _ => { + finished = true; + break; + } + } - // Check if we've gone over our buffering threshold - if (idx + 1) % STREAM_TIMEOUT_CHECK_INTERVAL == 0 { - let end_time = Instant::now(); + // Check if we've gone over our buffering threshold + if (idx + 1) % STREAM_TIMEOUT_CHECK_INTERVAL == 0 { + let end_time = Instant::now(); - // If we've been buffering over a second, go ahead and send out what we have so far - if (end_time - start_time).as_secs() >= 1 { - break; - } + // If we've been buffering over a second, go ahead and send out what we have so far + if (end_time - start_time).as_secs() >= 1 { + break; } } } + } - let input: Vec = new_input.into(); + let input: Vec = new_input.into(); - if input.len() > 0 { - let mut host = host.lock(); - let view = TableView::from_list(&input, start_number); + if !input.is_empty() { + let mut host = host.lock(); + let view = TableView::from_list(&input, start_number); - if let Some(view) = view { - handle_unexpected(&mut *host, |host| crate::format::print_view(&view, host)); - } + if let Some(view) = view { + handle_unexpected(&mut *host, |host| crate::format::print_view(&view, host)); } - - start_number += input.len(); } - // Needed for async_stream to type check - if false { - yield ReturnSuccess::value(UntaggedValue::nothing().into_value(Tag::unknown())); - } - }; + start_number += input.len(); + } - Ok(OutputStream::new(stream)) + Ok(OutputStream::empty()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/to_bson.rs b/crates/nu-cli/src/commands/to_bson.rs index 4ef5ba6a1a..4f75416384 100644 --- a/crates/nu-cli/src/commands/to_bson.rs +++ b/crates/nu-cli/src/commands/to_bson.rs @@ -29,7 +29,7 @@ impl WholeStreamCommand for ToBSON { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - to_bson(args, registry) + to_bson(args, registry).await } fn is_binary(&self) -> bool { @@ -261,51 +261,53 @@ fn bson_value_to_bytes(bson: Bson, tag: Tag) -> Result, ShellError> { Ok(out) } -fn to_bson(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn to_bson( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let args = args.evaluate_once(®istry).await?; - let name_tag = args.name_tag(); - let name_span = name_tag.span; + let args = args.evaluate_once(®istry).await?; + let name_tag = args.name_tag(); + let name_span = name_tag.span; - let input: Vec = args.input.collect().await; + let input: Vec = args.input.collect().await; - let to_process_input = if input.len() > 1 { + let to_process_input = match input.len() { + x if x > 1 => { let tag = input[0].tag.clone(); - vec![Value { value: UntaggedValue::Table(input), tag } ] - } else if input.len() == 1 { - input - } else { - vec![] - }; - - for value in to_process_input { - match value_to_bson_value(&value) { - Ok(bson_value) => { - let value_span = value.tag.span; - - match bson_value_to_bytes(bson_value, name_tag.clone()) { - Ok(x) => yield ReturnSuccess::value( - UntaggedValue::binary(x).into_value(&name_tag), - ), - _ => yield Err(ShellError::labeled_error_with_secondary( - "Expected a table with BSON-compatible structure from pipeline", - "requires BSON-compatible input", - name_span, - "originates from here".to_string(), - value_span, - )), - } - } - _ => yield Err(ShellError::labeled_error( - "Expected a table with BSON-compatible structure from pipeline", - "requires BSON-compatible input", - &name_tag)) - } + vec![Value { + value: UntaggedValue::Table(input), + tag, + }] } + 1 => input, + _ => vec![], }; - Ok(stream.to_output_stream()) + Ok(futures::stream::iter(to_process_input.into_iter().map( + move |value| match value_to_bson_value(&value) { + Ok(bson_value) => { + let value_span = value.tag.span; + + match bson_value_to_bytes(bson_value, name_tag.clone()) { + Ok(x) => ReturnSuccess::value(UntaggedValue::binary(x).into_value(&name_tag)), + _ => Err(ShellError::labeled_error_with_secondary( + "Expected a table with BSON-compatible structure from pipeline", + "requires BSON-compatible input", + name_span, + "originates from here".to_string(), + value_span, + )), + } + } + _ => Err(ShellError::labeled_error( + "Expected a table with BSON-compatible structure from pipeline", + "requires BSON-compatible input", + &name_tag, + )), + }, + )) + .to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/to_csv.rs b/crates/nu-cli/src/commands/to_csv.rs index 9f8c641a2a..b41aeb674e 100644 --- a/crates/nu-cli/src/commands/to_csv.rs +++ b/crates/nu-cli/src/commands/to_csv.rs @@ -42,47 +42,44 @@ impl WholeStreamCommand for ToCSV { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - to_csv(args, registry) + to_csv(args, registry).await } } -fn to_csv(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn to_csv(args: CommandArgs, registry: &CommandRegistry) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let name = args.call_info.name_tag.clone(); - let (ToCSVArgs { separator, headerless }, mut input) = args.process(®istry).await?; - let sep = match separator { - Some(Value { - value: UntaggedValue::Primitive(Primitive::String(s)), - tag, - .. - }) => { - if s == r"\t" { - '\t' - } else { - let vec_s: Vec = s.chars().collect(); - if vec_s.len() != 1 { - yield Err(ShellError::labeled_error( - "Expected a single separator char from --separator", - "requires a single character string input", - tag, - )); - return; - }; - vec_s[0] - } + let name = args.call_info.name_tag.clone(); + let ( + ToCSVArgs { + separator, + headerless, + }, + input, + ) = args.process(®istry).await?; + let sep = match separator { + Some(Value { + value: UntaggedValue::Primitive(Primitive::String(s)), + tag, + .. + }) => { + if s == r"\t" { + '\t' + } else { + let vec_s: Vec = s.chars().collect(); + if vec_s.len() != 1 { + return Err(ShellError::labeled_error( + "Expected a single separator char from --separator", + "requires a single character string input", + tag, + )); + }; + vec_s[0] } - _ => ',', - }; - - let mut result = to_delimited_data(headerless, sep, "CSV", input, name)?; - - while let Some(item) = result.next().await { - yield item; } + _ => ',', }; - Ok(stream.to_output_stream()) + to_delimited_data(headerless, sep, "CSV", input, name).await } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/to_delimited_data.rs b/crates/nu-cli/src/commands/to_delimited_data.rs index 9958620317..010becdcb8 100644 --- a/crates/nu-cli/src/commands/to_delimited_data.rs +++ b/crates/nu-cli/src/commands/to_delimited_data.rs @@ -165,7 +165,7 @@ fn merge_descriptors(values: &[Value]) -> Vec> { ret } -pub fn to_delimited_data( +pub async fn to_delimited_data( headerless: bool, sep: char, format_name: &'static str, @@ -175,33 +175,41 @@ pub fn to_delimited_data( let name_tag = name; let name_span = name_tag.span; - let stream = async_stream! { - let input: Vec = input.collect().await; + let input: Vec = input.collect().await; - let to_process_input = if input.len() > 1 { + let to_process_input = match input.len() { + x if x > 1 => { let tag = input[0].tag.clone(); - vec![Value { value: UntaggedValue::Table(input), tag } ] - } else if input.len() == 1 { - input - } else { - vec![] - }; + vec![Value { + value: UntaggedValue::Table(input), + tag, + }] + } + 1 => input, + _ => vec![], + }; - for value in to_process_input { + Ok( + futures::stream::iter(to_process_input.into_iter().map(move |value| { match from_value_to_delimited_string(&clone_tagged_value(&value), sep) { Ok(mut x) => { if headerless { - x.find('\n').map(|second_line|{ + if let Some(second_line) = x.find('\n') { let start = second_line + 1; x.replace_range(0..start, ""); - }); + } } - yield ReturnSuccess::value(UntaggedValue::Primitive(Primitive::String(x)).into_value(&name_tag)) + ReturnSuccess::value( + UntaggedValue::Primitive(Primitive::String(x)).into_value(&name_tag), + ) } - Err(x) => { - let expected = format!("Expected a table with {}-compatible structure from pipeline", format_name); + Err(_) => { + let expected = format!( + "Expected a table with {}-compatible structure from pipeline", + format_name + ); let requires = format!("requires {}-compatible input", format_name); - yield Err(ShellError::labeled_error_with_secondary( + Err(ShellError::labeled_error_with_secondary( expected, requires, name_span, @@ -210,8 +218,7 @@ pub fn to_delimited_data( )) } } - } - }; - - Ok(stream.to_output_stream()) + })) + .to_output_stream(), + ) } diff --git a/crates/nu-cli/src/commands/to_json.rs b/crates/nu-cli/src/commands/to_json.rs index f46c9c0834..da2bced795 100644 --- a/crates/nu-cli/src/commands/to_json.rs +++ b/crates/nu-cli/src/commands/to_json.rs @@ -38,7 +38,7 @@ impl WholeStreamCommand for ToJSON { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - to_json(args, registry) + to_json(args, registry).await } fn examples(&self) -> Vec { @@ -163,78 +163,103 @@ fn json_list(input: &[Value]) -> Result, ShellError> { Ok(out) } -fn to_json(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn to_json( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let name_tag = args.call_info.name_tag.clone(); - let (ToJSONArgs { pretty }, mut input) = args.process(®istry).await?; - let name_span = name_tag.span; - let input: Vec = input.collect().await; + let name_tag = args.call_info.name_tag.clone(); + let (ToJSONArgs { pretty }, input) = args.process(®istry).await?; + let name_span = name_tag.span; + let input: Vec = input.collect().await; - let to_process_input = if input.len() > 1 { + let to_process_input = match input.len() { + x if x > 1 => { let tag = input[0].tag.clone(); - vec![Value { value: UntaggedValue::Table(input), tag } ] - } else if input.len() == 1 { - input - } else { - vec![] - }; + vec![Value { + value: UntaggedValue::Table(input), + tag, + }] + } + 1 => input, + _ => vec![], + }; - for value in to_process_input { - match value_to_json_value(&value) { - Ok(json_value) => { - let value_span = value.tag.span; + Ok(futures::stream::iter(to_process_input.into_iter().map( + move |value| match value_to_json_value(&value) { + Ok(json_value) => { + let value_span = value.tag.span; - match serde_json::to_string(&json_value) { - Ok(mut serde_json_string) => { - if let Some(pretty_value) = &pretty { - let mut pretty_format_failed = true; + match serde_json::to_string(&json_value) { + Ok(mut serde_json_string) => { + if let Some(pretty_value) = &pretty { + let mut pretty_format_failed = true; - if let Ok(pretty_u64) = pretty_value.as_u64() { - if let Ok(serde_json_value) = serde_json::from_str::(serde_json_string.as_str()) { - let indentation_string = std::iter::repeat(" ").take(pretty_u64 as usize).collect::(); - let serde_formatter = serde_json::ser::PrettyFormatter::with_indent(indentation_string.as_bytes()); - let serde_buffer = Vec::new(); - let mut serde_serializer = serde_json::Serializer::with_formatter(serde_buffer, serde_formatter); - let serde_json_object = json!(serde_json_value); + if let Ok(pretty_u64) = pretty_value.as_u64() { + if let Ok(serde_json_value) = + serde_json::from_str::( + serde_json_string.as_str(), + ) + { + let indentation_string = std::iter::repeat(" ") + .take(pretty_u64 as usize) + .collect::(); + let serde_formatter = + serde_json::ser::PrettyFormatter::with_indent( + indentation_string.as_bytes(), + ); + let serde_buffer = Vec::new(); + let mut serde_serializer = + serde_json::Serializer::with_formatter( + serde_buffer, + serde_formatter, + ); + let serde_json_object = json!(serde_json_value); - if let Ok(()) = serde_json_object.serialize(&mut serde_serializer) { - if let Ok(ser_json_string) = String::from_utf8(serde_serializer.into_inner()) { - pretty_format_failed = false; - serde_json_string = ser_json_string - } + if let Ok(()) = + serde_json_object.serialize(&mut serde_serializer) + { + if let Ok(ser_json_string) = + String::from_utf8(serde_serializer.into_inner()) + { + pretty_format_failed = false; + serde_json_string = ser_json_string } } } - - if pretty_format_failed { - yield Err(ShellError::labeled_error("Pretty formatting failed", "failed", pretty_value.tag())); - return; - } } - yield ReturnSuccess::value( - UntaggedValue::Primitive(Primitive::String(serde_json_string)).into_value(&name_tag), - ) - }, - _ => yield Err(ShellError::labeled_error_with_secondary( - "Expected a table with JSON-compatible structure.tag() from pipeline", - "requires JSON-compatible input", - name_span, - "originates from here".to_string(), - value_span, - )), - } - } - _ => yield Err(ShellError::labeled_error( - "Expected a table with JSON-compatible structure from pipeline", - "requires JSON-compatible input", - &name_tag)) - } - } - }; + if pretty_format_failed { + return Err(ShellError::labeled_error( + "Pretty formatting failed", + "failed", + pretty_value.tag(), + )); + } + } - Ok(stream.to_output_stream()) + ReturnSuccess::value( + UntaggedValue::Primitive(Primitive::String(serde_json_string)) + .into_value(&name_tag), + ) + } + _ => Err(ShellError::labeled_error_with_secondary( + "Expected a table with JSON-compatible structure.tag() from pipeline", + "requires JSON-compatible input", + name_span, + "originates from here".to_string(), + value_span, + )), + } + } + _ => Err(ShellError::labeled_error( + "Expected a table with JSON-compatible structure from pipeline", + "requires JSON-compatible input", + &name_tag, + )), + }, + )) + .to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/to_toml.rs b/crates/nu-cli/src/commands/to_toml.rs index db61b51e55..f4c70b38c6 100644 --- a/crates/nu-cli/src/commands/to_toml.rs +++ b/crates/nu-cli/src/commands/to_toml.rs @@ -24,7 +24,7 @@ impl WholeStreamCommand for ToTOML { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - to_toml(args, registry) + to_toml(args, registry).await } // TODO: add an example here. What commands to run to get a Row(Dictionary)? // fn examples(&self) -> Vec { @@ -135,49 +135,53 @@ fn collect_values(input: &[Value]) -> Result, ShellError> { Ok(out) } -fn to_toml(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn to_toml( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let args = args.evaluate_once(®istry).await?; - let name_tag = args.name_tag(); - let name_span = name_tag.span; - let input: Vec = args.input.collect().await; + let args = args.evaluate_once(®istry).await?; + let name_tag = args.name_tag(); + let name_span = name_tag.span; + let input: Vec = args.input.collect().await; - let to_process_input = if input.len() > 1 { + let to_process_input = match input.len() { + x if x > 1 => { let tag = input[0].tag.clone(); - vec![Value { value: UntaggedValue::Table(input), tag } ] - } else if input.len() == 1 { - input - } else { - vec![] - }; - - for value in to_process_input { - let value_span = value.tag.span; - match value_to_toml_value(&value) { - Ok(toml_value) => { - match toml::to_string(&toml_value) { - Ok(x) => yield ReturnSuccess::value( - UntaggedValue::Primitive(Primitive::String(x)).into_value(&name_tag), - ), - _ => yield Err(ShellError::labeled_error_with_secondary( - "Expected a table with TOML-compatible structure.tag() from pipeline", - "requires TOML-compatible input", - name_span, - "originates from here".to_string(), - value_span, - )), - } - } - _ => yield Err(ShellError::labeled_error( - "Expected a table with TOML-compatible structure from pipeline", - "requires TOML-compatible input", - &name_tag)) - } + vec![Value { + value: UntaggedValue::Table(input), + tag, + }] } + 1 => input, + _ => vec![], }; - Ok(stream.to_output_stream()) + Ok( + futures::stream::iter(to_process_input.into_iter().map(move |value| { + let value_span = value.tag.span; + match value_to_toml_value(&value) { + Ok(toml_value) => match toml::to_string(&toml_value) { + Ok(x) => ReturnSuccess::value( + UntaggedValue::Primitive(Primitive::String(x)).into_value(&name_tag), + ), + _ => Err(ShellError::labeled_error_with_secondary( + "Expected a table with TOML-compatible structure.tag() from pipeline", + "requires TOML-compatible input", + name_span, + "originates from here".to_string(), + value_span, + )), + }, + _ => Err(ShellError::labeled_error( + "Expected a table with TOML-compatible structure from pipeline", + "requires TOML-compatible input", + &name_tag, + )), + } + })) + .to_output_stream(), + ) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/to_tsv.rs b/crates/nu-cli/src/commands/to_tsv.rs index e77c0858ff..974884986e 100644 --- a/crates/nu-cli/src/commands/to_tsv.rs +++ b/crates/nu-cli/src/commands/to_tsv.rs @@ -43,7 +43,7 @@ async fn to_tsv(args: CommandArgs, registry: &CommandRegistry) -> Result Result { - where_command(args, registry) + where_command(args, registry).await } fn examples(&self) -> Vec { @@ -63,65 +63,71 @@ impl WholeStreamCommand for Where { ] } } -fn where_command( +async fn where_command( raw_args: CommandArgs, registry: &CommandRegistry, ) -> Result { - let registry = registry.clone(); - let scope = raw_args.call_info.scope.clone(); + let registry = Arc::new(registry.clone()); + let scope = Arc::new(raw_args.call_info.scope.clone()); let tag = raw_args.call_info.name_tag.clone(); - let stream = async_stream! { - let (WhereArgs { block }, mut input) = raw_args.process(®istry).await?; - let condition = { - if block.block.len() != 1 { - yield Err(ShellError::labeled_error( - "Expected a condition", - "expected a condition", - tag, - )); - return; - } - match block.block[0].list.get(0) { - Some(item) => match item { - ClassifiedCommand::Expr(expr) => expr.clone(), - _ => { - yield Err(ShellError::labeled_error( - "Expected a condition", - "expected a condition", - tag, - )); - return; - } - }, - None => { - yield Err(ShellError::labeled_error( + let (WhereArgs { block }, input) = raw_args.process(®istry).await?; + let condition = { + if block.block.len() != 1 { + return Err(ShellError::labeled_error( + "Expected a condition", + "expected a condition", + tag, + )); + } + match block.block[0].list.get(0) { + Some(item) => match item { + ClassifiedCommand::Expr(expr) => expr.clone(), + _ => { + return Err(ShellError::labeled_error( "Expected a condition", "expected a condition", tag, )); - return; } + }, + None => { + return Err(ShellError::labeled_error( + "Expected a condition", + "expected a condition", + tag, + )); } - }; - - let mut input = input; - while let Some(input) = input.next().await { - - //FIXME: should we use the scope that's brought in as well? - let condition = evaluate_baseline_expr(&condition, ®istry, &input, &scope.vars, &scope.env).await?; - - match condition.as_bool() { - Ok(b) => { - if b { - yield Ok(ReturnSuccess::Value(input)); - } - } - Err(e) => yield Err(e), - }; } }; - Ok(stream.to_output_stream()) + Ok(input + .filter_map(move |input| { + let condition = condition.clone(); + let registry = registry.clone(); + let scope = scope.clone(); + + async move { + //FIXME: should we use the scope that's brought in as well? + let condition = + evaluate_baseline_expr(&condition, &*registry, &input, &scope.vars, &scope.env) + .await; + + match condition { + Ok(condition) => match condition.as_bool() { + Ok(b) => { + if b { + Some(Ok(ReturnSuccess::Value(input))) + } else { + None + } + } + Err(e) => Some(Err(e)), + }, + Err(e) => Some(Err(e)), + } + } + }) + .to_output_stream()) } #[cfg(test)]