diff --git a/crates/nu-cli/src/commands/from_ods.rs b/crates/nu-cli/src/commands/from_ods.rs index 81f5cfc17..94c42f010 100644 --- a/crates/nu-cli/src/commands/from_ods.rs +++ b/crates/nu-cli/src/commands/from_ods.rs @@ -36,62 +36,66 @@ impl WholeStreamCommand for FromODS { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - from_ods(args, registry) + from_ods(args, registry).await } } -fn from_ods(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn from_ods( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let tag = args.call_info.name_tag.clone(); let registry = registry.clone(); - let stream = async_stream! { - let (FromODSArgs { headerless: _headerless }, mut input) = args.process(®istry).await?; - let bytes = input.collect_binary(tag.clone()).await?; - let mut buf: Cursor> = Cursor::new(bytes.item); - let mut ods = Ods::<_>::new(buf).map_err(|_| ShellError::labeled_error( - "Could not load ods file", - "could not load ods file", - &tag))?; + let ( + FromODSArgs { + headerless: _headerless, + }, + input, + ) = args.process(®istry).await?; + let bytes = input.collect_binary(tag.clone()).await?; + let buf: Cursor> = Cursor::new(bytes.item); + let mut ods = Ods::<_>::new(buf).map_err(|_| { + ShellError::labeled_error("Could not load ods file", "could not load ods file", &tag) + })?; - let mut dict = TaggedDictBuilder::new(&tag); + let mut dict = TaggedDictBuilder::new(&tag); - let sheet_names = ods.sheet_names().to_owned(); + let sheet_names = ods.sheet_names().to_owned(); - for sheet_name in &sheet_names { - let mut sheet_output = TaggedListBuilder::new(&tag); + for sheet_name in &sheet_names { + let mut sheet_output = TaggedListBuilder::new(&tag); - if let Some(Ok(current_sheet)) = ods.worksheet_range(sheet_name) { - for row in current_sheet.rows() { - let mut row_output = TaggedDictBuilder::new(&tag); - for (i, cell) in row.iter().enumerate() { - let value = match cell { - DataType::Empty => UntaggedValue::nothing(), - DataType::String(s) => UntaggedValue::string(s), - DataType::Float(f) => UntaggedValue::decimal(*f), - DataType::Int(i) => UntaggedValue::int(*i), - DataType::Bool(b) => UntaggedValue::boolean(*b), - _ => UntaggedValue::nothing(), - }; + if let Some(Ok(current_sheet)) = ods.worksheet_range(sheet_name) { + for row in current_sheet.rows() { + let mut row_output = TaggedDictBuilder::new(&tag); + for (i, cell) in row.iter().enumerate() { + let value = match cell { + DataType::Empty => UntaggedValue::nothing(), + DataType::String(s) => UntaggedValue::string(s), + DataType::Float(f) => UntaggedValue::decimal(*f), + DataType::Int(i) => UntaggedValue::int(*i), + DataType::Bool(b) => UntaggedValue::boolean(*b), + _ => UntaggedValue::nothing(), + }; - row_output.insert_untagged(&format!("Column{}", i), value); - } - - sheet_output.push_untagged(row_output.into_untagged_value()); + row_output.insert_untagged(&format!("Column{}", i), value); } - dict.insert_untagged(sheet_name, sheet_output.into_untagged_value()); - } else { - yield Err(ShellError::labeled_error( - "Could not load sheet", - "could not load sheet", - &tag)); + sheet_output.push_untagged(row_output.into_untagged_value()); } + + dict.insert_untagged(sheet_name, sheet_output.into_untagged_value()); + } else { + return Err(ShellError::labeled_error( + "Could not load sheet", + "could not load sheet", + &tag, + )); } + } - yield ReturnSuccess::value(dict.into_value()); - }; - - Ok(stream.to_output_stream()) + Ok(OutputStream::one(ReturnSuccess::value(dict.into_value()))) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/from_ssv.rs b/crates/nu-cli/src/commands/from_ssv.rs index f30a8c78f..c0b3a6151 100644 --- a/crates/nu-cli/src/commands/from_ssv.rs +++ b/crates/nu-cli/src/commands/from_ssv.rs @@ -51,7 +51,7 @@ impl WholeStreamCommand for FromSSV { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - from_ssv(args, registry) + from_ssv(args, registry).await } } @@ -251,37 +251,53 @@ fn from_ssv_string_to_value( Some(UntaggedValue::Table(rows).into_value(&tag)) } -fn from_ssv(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn from_ssv( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let name = args.call_info.name_tag.clone(); let registry = registry.clone(); - let stream = async_stream! { - let (FromSSVArgs { headerless, aligned_columns, minimum_spaces }, mut input) = args.process(®istry).await?; - let concat_string = input.collect_string(name.clone()).await?; - let split_at = match minimum_spaces { - Some(number) => number.item, - None => DEFAULT_MINIMUM_SPACES - }; + let ( + FromSSVArgs { + headerless, + aligned_columns, + minimum_spaces, + }, + input, + ) = args.process(®istry).await?; + let concat_string = input.collect_string(name.clone()).await?; + let split_at = match minimum_spaces { + Some(number) => number.item, + None => DEFAULT_MINIMUM_SPACES, + }; - match from_ssv_string_to_value(&concat_string.item, headerless, aligned_columns, split_at, name.clone()) { + Ok( + match from_ssv_string_to_value( + &concat_string.item, + headerless, + aligned_columns, + split_at, + name.clone(), + ) { Some(x) => match x { - Value { value: UntaggedValue::Table(list), ..} => { - for l in list { yield ReturnSuccess::value(l) } - } - x => yield ReturnSuccess::value(x) + Value { + value: UntaggedValue::Table(list), + .. + } => futures::stream::iter(list.into_iter().map(ReturnSuccess::value)) + .to_output_stream(), + x => OutputStream::one(ReturnSuccess::value(x)), }, None => { - yield Err(ShellError::labeled_error_with_secondary( + return Err(ShellError::labeled_error_with_secondary( "Could not parse as SSV", "input cannot be parsed ssv", &name, "value originates from here", &concat_string.tag, - )) - }, - } - }; - - Ok(stream.to_output_stream()) + )); + } + }, + ) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/from_toml.rs b/crates/nu-cli/src/commands/from_toml.rs index a0c58d61c..2dffe9e81 100644 --- a/crates/nu-cli/src/commands/from_toml.rs +++ b/crates/nu-cli/src/commands/from_toml.rs @@ -24,7 +24,7 @@ impl WholeStreamCommand for FromTOML { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - from_toml(args, registry) + from_toml(args, registry).await } } @@ -64,28 +64,28 @@ pub fn from_toml_string_to_value(s: String, tag: impl Into) -> Result Result { let registry = registry.clone(); - let stream = async_stream! { - let args = args.evaluate_once(®istry).await?; - let tag = args.name_tag(); - let input = args.input; + let args = args.evaluate_once(®istry).await?; + let tag = args.name_tag(); + let input = args.input; - let concat_string = input.collect_string(tag.clone()).await?; + let concat_string = input.collect_string(tag.clone()).await?; + Ok( match from_toml_string_to_value(concat_string.item, tag.clone()) { Ok(x) => match x { - Value { value: UntaggedValue::Table(list), .. } => { - for l in list { - yield ReturnSuccess::value(l); - } - } - x => yield ReturnSuccess::value(x), + Value { + value: UntaggedValue::Table(list), + .. + } => futures::stream::iter(list.into_iter().map(ReturnSuccess::value)) + .to_output_stream(), + x => OutputStream::one(ReturnSuccess::value(x)), }, Err(_) => { - yield Err(ShellError::labeled_error_with_secondary( + return Err(ShellError::labeled_error_with_secondary( "Could not parse as TOML", "input cannot be parsed as TOML", &tag, @@ -93,10 +93,8 @@ pub fn from_toml( concat_string.tag, )) } - } - }; - - Ok(stream.to_output_stream()) + }, + ) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/from_xml.rs b/crates/nu-cli/src/commands/from_xml.rs index 14c5cd514..6e13b7190 100644 --- a/crates/nu-cli/src/commands/from_xml.rs +++ b/crates/nu-cli/src/commands/from_xml.rs @@ -24,7 +24,7 @@ impl WholeStreamCommand for FromXML { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - from_xml(args, registry) + from_xml(args, registry).await } } @@ -99,37 +99,38 @@ pub fn from_xml_string_to_value(s: String, tag: impl Into) -> Result Result { +async fn from_xml( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let args = args.evaluate_once(®istry).await?; - let tag = args.name_tag(); - let input = args.input; + let args = args.evaluate_once(®istry).await?; + let tag = args.name_tag(); + let input = args.input; - let concat_string = input.collect_string(tag.clone()).await?; + let concat_string = input.collect_string(tag.clone()).await?; + Ok( match from_xml_string_to_value(concat_string.item, tag.clone()) { Ok(x) => match x { - Value { value: UntaggedValue::Table(list), .. } => { - for l in list { - yield ReturnSuccess::value(l); - } - } - x => yield ReturnSuccess::value(x), + Value { + value: UntaggedValue::Table(list), + .. + } => futures::stream::iter(list.into_iter().map(ReturnSuccess::value)) + .to_output_stream(), + x => OutputStream::one(ReturnSuccess::value(x)), }, Err(_) => { - yield Err(ShellError::labeled_error_with_secondary( + return Err(ShellError::labeled_error_with_secondary( "Could not parse as XML", "input cannot be parsed as XML", &tag, "value originates from here", &concat_string.tag, )) - } , - } - }; - - Ok(stream.to_output_stream()) + } + }, + ) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/headers.rs b/crates/nu-cli/src/commands/headers.rs index 5e734715b..4baf72842 100644 --- a/crates/nu-cli/src/commands/headers.rs +++ b/crates/nu-cli/src/commands/headers.rs @@ -28,7 +28,7 @@ impl WholeStreamCommand for Headers { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - headers(args, registry) + headers(args, registry).await } fn examples(&self) -> Vec { @@ -40,51 +40,65 @@ impl WholeStreamCommand for Headers { } } -pub fn headers(args: CommandArgs, _registry: &CommandRegistry) -> Result { - let stream = async_stream! { - let mut input = args.input; - let rows: Vec = input.collect().await; +pub async fn headers( + args: CommandArgs, + _registry: &CommandRegistry, +) -> Result { + let input = args.input; + let rows: Vec = input.collect().await; - if rows.len() < 1 { - yield Err(ShellError::untagged_runtime_error("Couldn't find headers, was the input a properly formatted, non-empty table?")); - } + if rows.is_empty() { + return Err(ShellError::untagged_runtime_error( + "Couldn't find headers, was the input a properly formatted, non-empty table?", + )); + } - //the headers are the first row in the table - let headers: Vec = match &rows[0].value { - UntaggedValue::Row(d) => { - Ok(d.entries.iter().map(|(k, v)| { + //the headers are the first row in the table + let headers: Vec = match &rows[0].value { + UntaggedValue::Row(d) => { + Ok(d.entries + .iter() + .map(|(k, v)| { match v.as_string() { Ok(s) => s, - Err(_) => { //If a cell that should contain a header name is empty, we name the column Column[index] + Err(_) => { + //If a cell that should contain a header name is empty, we name the column Column[index] match d.entries.get_full(k) { Some((index, _, _)) => format!("Column{}", index), - None => "unknownColumn".to_string() + None => "unknownColumn".to_string(), } } } - }).collect()) - } - _ => Err(ShellError::unexpected_eof("Could not get headers, is the table empty?", rows[0].tag.span)) - }?; + }) + .collect()) + } + _ => Err(ShellError::unexpected_eof( + "Could not get headers, is the table empty?", + rows[0].tag.span, + )), + }?; - //Each row is a dictionary with the headers as keys - for r in rows.iter().skip(1) { + Ok( + futures::stream::iter(rows.into_iter().skip(1).map(move |r| { + //Each row is a dictionary with the headers as keys match &r.value { UntaggedValue::Row(d) => { - let mut i = 0; let mut entries = IndexMap::new(); - for (_, v) in d.entries.iter() { + for (i, (_, v)) in d.entries.iter().enumerate() { entries.insert(headers[i].clone(), v.clone()); - i += 1; } - yield Ok(ReturnSuccess::Value(UntaggedValue::Row(Dictionary{entries}).into_value(r.tag.clone()))) + Ok(ReturnSuccess::Value( + UntaggedValue::Row(Dictionary { entries }).into_value(r.tag.clone()), + )) } - _ => yield Err(ShellError::unexpected_eof("Couldn't iterate through rows, was the input a properly formatted table?", r.tag.span)) + _ => Err(ShellError::unexpected_eof( + "Couldn't iterate through rows, was the input a properly formatted table?", + r.tag.span, + )), } - } - }; - - Ok(stream.to_output_stream()) + })) + .to_output_stream(), + ) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/insert.rs b/crates/nu-cli/src/commands/insert.rs index 44a7f2431..51f895b4e 100644 --- a/crates/nu-cli/src/commands/insert.rs +++ b/crates/nu-cli/src/commands/insert.rs @@ -42,38 +42,32 @@ impl WholeStreamCommand for Insert { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - insert(args, registry) + insert(args, registry).await } } -fn insert(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn insert(args: CommandArgs, registry: &CommandRegistry) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let (InsertArgs { column, value }, mut input) = args.process(®istry).await?; - while let Some(row) = input.next().await { - match row { - Value { - value: UntaggedValue::Row(_), - .. - } => match row.insert_data_at_column_path(&column, value.clone()) { - Ok(v) => yield Ok(ReturnSuccess::Value(v)), - Err(err) => yield Err(err), - }, + let (InsertArgs { column, value }, input) = args.process(®istry).await?; - Value { tag, ..} => { - yield Err(ShellError::labeled_error( - "Unrecognized type in stream", - "original value", - tag, - )); - } + Ok(input + .map(move |row| match row { + Value { + value: UntaggedValue::Row(_), + .. + } => match row.insert_data_at_column_path(&column, value.clone()) { + Ok(v) => Ok(ReturnSuccess::Value(v)), + Err(err) => Err(err), + }, - } - }; - - }; - Ok(stream.to_output_stream()) + Value { tag, .. } => Err(ShellError::labeled_error( + "Unrecognized type in stream", + "original value", + tag, + )), + }) + .to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/keep.rs b/crates/nu-cli/src/commands/keep.rs index bac83b5d9..5ba94cd60 100644 --- a/crates/nu-cli/src/commands/keep.rs +++ b/crates/nu-cli/src/commands/keep.rs @@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand; use crate::context::CommandRegistry; use crate::prelude::*; use nu_errors::ShellError; -use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue}; +use nu_protocol::{Signature, SyntaxShape, UntaggedValue}; use nu_source::Tagged; pub struct Keep; @@ -35,7 +35,7 @@ impl WholeStreamCommand for Keep { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - keep(args, registry) + keep(args, registry).await } fn examples(&self) -> Vec { @@ -59,27 +59,16 @@ impl WholeStreamCommand for Keep { } } -fn keep(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn keep(args: CommandArgs, registry: &CommandRegistry) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let (KeepArgs { rows }, mut input) = args.process(®istry).await?; - let mut rows_desired = if let Some(quantity) = rows { - *quantity - } else { - 1 - }; - - while let Some(input) = input.next().await { - if rows_desired > 0 { - yield ReturnSuccess::value(input); - rows_desired -= 1; - } else { - break; - } - } + let (KeepArgs { rows }, input) = args.process(®istry).await?; + let rows_desired = if let Some(quantity) = rows { + *quantity + } else { + 1 }; - Ok(stream.to_output_stream()) + Ok(input.take(rows_desired).to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/prepend.rs b/crates/nu-cli/src/commands/prepend.rs index 8d9459805..6d7cdbe89 100644 --- a/crates/nu-cli/src/commands/prepend.rs +++ b/crates/nu-cli/src/commands/prepend.rs @@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand; use crate::context::CommandRegistry; use crate::prelude::*; use nu_errors::ShellError; -use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value}; +use nu_protocol::{Signature, SyntaxShape, UntaggedValue, Value}; #[derive(Deserialize)] struct PrependArgs { @@ -34,7 +34,7 @@ impl WholeStreamCommand for Prepend { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - prepend(args, registry) + prepend(args, registry).await } fn examples(&self) -> Vec { @@ -51,19 +51,17 @@ impl WholeStreamCommand for Prepend { } } -fn prepend(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn prepend( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let (PrependArgs { row }, mut input) = args.process(®istry).await?; + let (PrependArgs { row }, input) = args.process(®istry).await?; - yield ReturnSuccess::value(row); - while let Some(item) = input.next().await { - yield ReturnSuccess::value(item); - } - }; + let bos = futures::stream::iter(vec![row]); - Ok(stream.to_output_stream()) + Ok(bos.chain(input).to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/range.rs b/crates/nu-cli/src/commands/range.rs index 6b0057970..e180e4a02 100644 --- a/crates/nu-cli/src/commands/range.rs +++ b/crates/nu-cli/src/commands/range.rs @@ -36,28 +36,25 @@ impl WholeStreamCommand for Range { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - range(args, registry) + range(args, registry).await } } -fn range(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn range(args: CommandArgs, registry: &CommandRegistry) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let (RangeArgs { area }, mut input) = args.process(®istry).await?; - let range = area.item; - let (from, _) = range.from; - let (to, _) = range.to; + let (RangeArgs { area }, input) = args.process(®istry).await?; + let range = area.item; + let (from, _) = range.from; + let (to, _) = range.to; - let from = *from as usize; - let to = *to as usize; + let from = *from as usize; + let to = *to as usize; - let mut inp = input.skip(from).take(to - from + 1); - while let Some(item) = inp.next().await { - yield ReturnSuccess::value(item); - } - }; - - Ok(stream.to_output_stream()) + Ok(input + .skip(from) + .take(to - from + 1) + .map(ReturnSuccess::value) + .to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/reverse.rs b/crates/nu-cli/src/commands/reverse.rs index 2d3b36fef..69294769c 100644 --- a/crates/nu-cli/src/commands/reverse.rs +++ b/crates/nu-cli/src/commands/reverse.rs @@ -25,7 +25,7 @@ impl WholeStreamCommand for Reverse { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - reverse(args, registry) + reverse(args, registry).await } fn examples(&self) -> Vec { @@ -43,19 +43,16 @@ impl WholeStreamCommand for Reverse { } } -fn reverse(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn reverse( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let args = args.evaluate_once(®istry).await?; - let (input, _args) = args.parts(); + let args = args.evaluate_once(®istry).await?; + let (input, _args) = args.parts(); - let input = input.collect::>().await; - for output in input.into_iter().rev() { - yield ReturnSuccess::value(output); - } - }; - - Ok(stream.to_output_stream()) + let input = input.collect::>().await; + Ok(futures::stream::iter(input.into_iter().rev().map(ReturnSuccess::value)).to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/str_/find_replace.rs b/crates/nu-cli/src/commands/str_/find_replace.rs index f22129eb1..c2ac63a0d 100644 --- a/crates/nu-cli/src/commands/str_/find_replace.rs +++ b/crates/nu-cli/src/commands/str_/find_replace.rs @@ -44,7 +44,7 @@ impl WholeStreamCommand for SubCommand { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - operate(args, registry) + operate(args, registry).await } fn examples(&self) -> Vec { @@ -59,52 +59,56 @@ impl WholeStreamCommand for SubCommand { #[derive(Clone)] struct FindReplace(String, String); -fn operate(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn operate( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let (Arguments { find, replace, rest }, mut input) = args.process(®istry).await?; - let options = FindReplace(find.item, replace.item); + let ( + Arguments { + find, + replace, + rest, + }, + input, + ) = args.process(®istry).await?; + let options = FindReplace(find.item, replace.item); - let column_paths: Vec<_> = rest.iter().map(|x| x.clone()).collect(); + let column_paths: Vec<_> = rest; - while let Some(v) = input.next().await { + Ok(input + .map(move |v| { if column_paths.is_empty() { match action(&v, &options, v.tag()) { - Ok(out) => yield ReturnSuccess::value(out), - Err(err) => { - yield Err(err); - return; - } + Ok(out) => ReturnSuccess::value(out), + Err(err) => Err(err), } } else { - - let mut ret = v.clone(); + let mut ret = v; for path in &column_paths { let options = options.clone(); - let swapping = ret.swap_data_by_column_path(path, Box::new(move |old| { - action(old, &options, old.tag()) - })); + let swapping = ret.swap_data_by_column_path( + path, + Box::new(move |old| action(old, &options, old.tag())), + ); match swapping { Ok(new_value) => { ret = new_value; } Err(err) => { - yield Err(err); - return; + return Err(err); } } } - yield ReturnSuccess::value(ret); + ReturnSuccess::value(ret) } - } - }; - - Ok(stream.to_output_stream()) + }) + .to_output_stream()) } fn action(input: &Value, options: &FindReplace, tag: impl Into) -> Result { diff --git a/crates/nu-cli/src/commands/str_/upcase.rs b/crates/nu-cli/src/commands/str_/upcase.rs index 140b9534c..f13dad510 100644 --- a/crates/nu-cli/src/commands/str_/upcase.rs +++ b/crates/nu-cli/src/commands/str_/upcase.rs @@ -37,7 +37,7 @@ impl WholeStreamCommand for SubCommand { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - operate(args, registry) + operate(args, registry).await } fn examples(&self) -> Vec { @@ -49,47 +49,46 @@ impl WholeStreamCommand for SubCommand { } } -fn operate(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn operate( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let (Arguments { rest }, mut input) = args.process(®istry).await?; + let (Arguments { rest }, input) = args.process(®istry).await?; - let column_paths: Vec<_> = rest.iter().map(|x| x.clone()).collect(); + let column_paths: Vec<_> = rest; - while let Some(v) = input.next().await { + Ok(input + .map(move |v| { if column_paths.is_empty() { match action(&v, v.tag()) { - Ok(out) => yield ReturnSuccess::value(out), - Err(err) => { - yield Err(err); - return; - } + Ok(out) => ReturnSuccess::value(out), + Err(err) => Err(err), } } else { - - let mut ret = v.clone(); + let mut ret = v; for path in &column_paths { - let swapping = ret.swap_data_by_column_path(path, Box::new(move |old| action(old, old.tag()))); + let swapping = ret.swap_data_by_column_path( + path, + Box::new(move |old| action(old, old.tag())), + ); match swapping { Ok(new_value) => { ret = new_value; } Err(err) => { - yield Err(err); - return; + return Err(err); } } } - yield ReturnSuccess::value(ret); + ReturnSuccess::value(ret) } - } - }; - - Ok(stream.to_output_stream()) + }) + .to_output_stream()) } fn action(input: &Value, tag: impl Into) -> Result { diff --git a/crates/nu-cli/src/commands/to_url.rs b/crates/nu-cli/src/commands/to_url.rs index abc66a97c..d939ca271 100644 --- a/crates/nu-cli/src/commands/to_url.rs +++ b/crates/nu-cli/src/commands/to_url.rs @@ -24,67 +24,58 @@ impl WholeStreamCommand for ToURL { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - to_url(args, registry) + to_url(args, registry).await } } -fn to_url(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn to_url(args: CommandArgs, registry: &CommandRegistry) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let args = args.evaluate_once(®istry).await?; - let tag = args.name_tag(); - let input = args.input; + let args = args.evaluate_once(®istry).await?; + let tag = args.name_tag(); + let input = args.input; - let input: Vec = input.collect().await; - - for value in input { - match value { - Value { value: UntaggedValue::Row(row), .. } => { - let mut row_vec = vec![]; - for (k,v) in row.entries { - match v.as_string() { - Ok(s) => { - row_vec.push((k.clone(), s.to_string())); - } - _ => { - yield Err(ShellError::labeled_error_with_secondary( - "Expected table with string values", - "requires table with strings", - &tag, - "value originates from here", - v.tag, - )) - } - } - } - - match serde_urlencoded::to_string(row_vec) { + Ok(input + .map(move |value| match value { + Value { + value: UntaggedValue::Row(row), + .. + } => { + let mut row_vec = vec![]; + for (k, v) in row.entries { + match v.as_string() { Ok(s) => { - yield ReturnSuccess::value(UntaggedValue::string(s).into_value(&tag)); + row_vec.push((k.clone(), s.to_string())); } _ => { - yield Err(ShellError::labeled_error( - "Failed to convert to url-encoded", - "cannot url-encode", + return Err(ShellError::labeled_error_with_secondary( + "Expected table with string values", + "requires table with strings", &tag, - )) + "value originates from here", + v.tag, + )); } } } - Value { tag: value_tag, .. } => { - yield Err(ShellError::labeled_error_with_secondary( - "Expected a table from pipeline", - "requires table input", + + match serde_urlencoded::to_string(row_vec) { + Ok(s) => ReturnSuccess::value(UntaggedValue::string(s).into_value(&tag)), + _ => Err(ShellError::labeled_error( + "Failed to convert to url-encoded", + "cannot url-encode", &tag, - "value originates from here", - value_tag.span, - )) + )), } } - } - }; - - Ok(stream.to_output_stream()) + Value { tag: value_tag, .. } => Err(ShellError::labeled_error_with_secondary( + "Expected a table from pipeline", + "requires table input", + &tag, + "value originates from here", + value_tag.span, + )), + }) + .to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/to_yaml.rs b/crates/nu-cli/src/commands/to_yaml.rs index a30c9af01..3c0109f19 100644 --- a/crates/nu-cli/src/commands/to_yaml.rs +++ b/crates/nu-cli/src/commands/to_yaml.rs @@ -24,7 +24,7 @@ impl WholeStreamCommand for ToYAML { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - to_yaml(args, registry) + to_yaml(args, registry).await } } @@ -125,51 +125,55 @@ pub fn value_to_yaml_value(v: &Value) -> Result { }) } -fn to_yaml(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn to_yaml( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let args = args.evaluate_once(®istry).await?; - let name_tag = args.name_tag(); - let name_span = name_tag.span; + let args = args.evaluate_once(®istry).await?; + let name_tag = args.name_tag(); + let name_span = name_tag.span; - let input: Vec = args.input.collect().await; + let input: Vec = args.input.collect().await; - let to_process_input = if input.len() > 1 { + let to_process_input = match input.len() { + x if x > 1 => { let tag = input[0].tag.clone(); - vec![Value { value: UntaggedValue::Table(input), tag } ] - } else if input.len() == 1 { - input - } else { - vec![] - }; + vec![Value { + value: UntaggedValue::Table(input), + tag, + }] + } + 1 => input, + _ => vec![], + }; - for value in to_process_input { + Ok( + futures::stream::iter(to_process_input.into_iter().map(move |value| { let value_span = value.tag.span; match value_to_yaml_value(&value) { - Ok(yaml_value) => { - match serde_yaml::to_string(&yaml_value) { - Ok(x) => yield ReturnSuccess::value( - UntaggedValue::Primitive(Primitive::String(x)).into_value(&name_tag), - ), - _ => yield Err(ShellError::labeled_error_with_secondary( - "Expected a table with YAML-compatible structure from pipeline", - "requires YAML-compatible input", - name_span, - "originates from here".to_string(), - value_span, - )), - } - } - _ => yield Err(ShellError::labeled_error( + Ok(yaml_value) => match serde_yaml::to_string(&yaml_value) { + Ok(x) => ReturnSuccess::value( + UntaggedValue::Primitive(Primitive::String(x)).into_value(&name_tag), + ), + _ => Err(ShellError::labeled_error_with_secondary( + "Expected a table with YAML-compatible structure from pipeline", + "requires YAML-compatible input", + name_span, + "originates from here".to_string(), + value_span, + )), + }, + _ => Err(ShellError::labeled_error( "Expected a table with YAML-compatible structure from pipeline", "requires YAML-compatible input", - &name_tag)) + &name_tag, + )), } - } - }; - - Ok(stream.to_output_stream()) + })) + .to_output_stream(), + ) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/update.rs b/crates/nu-cli/src/commands/update.rs index 239322d04..2590d42bb 100644 --- a/crates/nu-cli/src/commands/update.rs +++ b/crates/nu-cli/src/commands/update.rs @@ -3,7 +3,7 @@ use crate::commands::WholeStreamCommand; use crate::context::CommandRegistry; use crate::prelude::*; use nu_errors::ShellError; -use nu_protocol::{ColumnPath, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value}; +use nu_protocol::{ColumnPath, ReturnSuccess, Scope, Signature, SyntaxShape, UntaggedValue, Value}; use nu_value_ext::ValueExt; use futures::stream::once; @@ -44,105 +44,124 @@ impl WholeStreamCommand for Update { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - update(args, registry) + update(args, registry).await } } -fn update(raw_args: CommandArgs, registry: &CommandRegistry) -> Result { - let registry = registry.clone(); - let scope = raw_args.call_info.scope.clone(); +async fn process_row( + scope: Arc, + mut context: Arc, + input: Value, + mut replacement: Arc, + field: Arc, +) -> Result { + let replacement = Arc::make_mut(&mut replacement); - let stream = async_stream! { - let mut context = Context::from_raw(&raw_args, ®istry); - let (UpdateArgs { field, replacement }, mut input) = raw_args.process(®istry).await?; - while let Some(input) = input.next().await { - let replacement = replacement.clone(); - match replacement { - Value { - value: UntaggedValue::Block(block), - tag, - } => { - let for_block = input.clone(); - let input_stream = once(async { Ok(for_block) }).to_input_stream(); + Ok(match replacement { + Value { + value: UntaggedValue::Block(block), + .. + } => { + let for_block = input.clone(); + let input_stream = once(async { Ok(for_block) }).to_input_stream(); - let result = run_block( - &block, - &mut context, - input_stream, - &input, - &scope.vars, - &scope.env - ).await; + let result = run_block( + &block, + Arc::make_mut(&mut context), + input_stream, + &input, + &scope.vars, + &scope.env, + ) + .await; - match result { - Ok(mut stream) => { - let errors = context.get_errors(); - if let Some(error) = errors.first() { - yield Err(error.clone()); - } - - match input { - obj @ Value { - value: UntaggedValue::Row(_), - .. - } => { - if let Some(result) = stream.next().await { - match obj.replace_data_at_column_path(&field, result.clone()) { - Some(v) => yield Ok(ReturnSuccess::Value(v)), - None => { - yield Err(ShellError::labeled_error( - "update could not find place to insert column", - "column name", - obj.tag, - )) - } - } - } - } - Value { tag, ..} => { - yield Err(ShellError::labeled_error( - "Unrecognized type in stream", - "original value", - tag, - )) - } - } - } - Err(e) => { - yield Err(e); - } + match result { + Ok(mut stream) => { + let errors = context.get_errors(); + if let Some(error) = errors.first() { + return Err(error.clone()); } - } - _ => { + match input { - obj @ Value { + obj + @ + Value { value: UntaggedValue::Row(_), .. - } => match obj.replace_data_at_column_path(&field, replacement.clone()) { - Some(v) => yield Ok(ReturnSuccess::Value(v)), - None => { - yield Err(ShellError::labeled_error( - "update could not find place to insert column", - "column name", - obj.tag, - )) + } => { + if let Some(result) = stream.next().await { + match obj.replace_data_at_column_path(&field, result) { + Some(v) => OutputStream::one(ReturnSuccess::value(v)), + None => OutputStream::one(Err(ShellError::labeled_error( + "update could not find place to insert column", + "column name", + obj.tag, + ))), + } + } else { + OutputStream::empty() } - }, - Value { tag, ..} => { - yield Err(ShellError::labeled_error( - "Unrecognized type in stream", - "original value", - tag, - )) } - _ => {} + Value { tag, .. } => OutputStream::one(Err(ShellError::labeled_error( + "Unrecognized type in stream", + "original value", + tag, + ))), } } + Err(e) => OutputStream::one(Err(e)), } } - }; + _ => match input { + obj + @ + Value { + value: UntaggedValue::Row(_), + .. + } => match obj.replace_data_at_column_path(&field, replacement.clone()) { + Some(v) => OutputStream::one(ReturnSuccess::value(v)), + None => OutputStream::one(Err(ShellError::labeled_error( + "update could not find place to insert column", + "column name", + obj.tag, + ))), + }, + Value { tag, .. } => OutputStream::one(Err(ShellError::labeled_error( + "Unrecognized type in stream", + "original value", + tag, + ))), + }, + }) +} - Ok(stream.to_output_stream()) +async fn update( + raw_args: CommandArgs, + registry: &CommandRegistry, +) -> Result { + let registry = registry.clone(); + let scope = Arc::new(raw_args.call_info.scope.clone()); + let context = Arc::new(Context::from_raw(&raw_args, ®istry)); + let (UpdateArgs { field, replacement }, input) = raw_args.process(®istry).await?; + let replacement = Arc::new(replacement); + let field = Arc::new(field); + + Ok(input + .then(move |input| { + let replacement = replacement.clone(); + let scope = scope.clone(); + let context = context.clone(); + let field = field.clone(); + + async { + match process_row(scope, context, input, replacement, field).await { + Ok(s) => s, + Err(e) => OutputStream::one(Err(e)), + } + } + }) + .flatten() + .to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/which_.rs b/crates/nu-cli/src/commands/which_.rs index e0a12fafe..2515cf885 100644 --- a/crates/nu-cli/src/commands/which_.rs +++ b/crates/nu-cli/src/commands/which_.rs @@ -28,7 +28,7 @@ impl WholeStreamCommand for Which { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - which(args, registry) + which(args, registry).await } } @@ -77,36 +77,42 @@ struct WhichArgs { all: bool, } -fn which(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn which(args: CommandArgs, registry: &CommandRegistry) -> Result { let registry = registry.clone(); - let mut all = true; - let stream = async_stream! { - let (WhichArgs { application, all: all_items }, _) = args.process(®istry).await?; - all = all_items; - let external = application.starts_with('^'); - let item = if external { - application.item[1..].to_string() - } else { - application.item.clone() - }; - if !external { - let builtin = registry.has(&item); - if builtin { - yield ReturnSuccess::value(entry_builtin!(item, application.tag.clone())); - } - } - if let Ok(paths) = ichwh::which_all(&item).await { - for path in paths { - yield ReturnSuccess::value(entry_path!(item, path.into(), application.tag.clone())); - } - } + let mut output = vec![]; + + let (WhichArgs { application, all }, _) = args.process(®istry).await?; + let external = application.starts_with('^'); + let item = if external { + application.item[1..].to_string() + } else { + application.item.clone() }; + if !external { + let builtin = registry.has(&item); + if builtin { + output.push(ReturnSuccess::value(entry_builtin!( + item, + application.tag.clone() + ))); + } + } + + if let Ok(paths) = ichwh::which_all(&item).await { + for path in paths { + output.push(ReturnSuccess::value(entry_path!( + item, + path.into(), + application.tag.clone() + ))); + } + } if all { - Ok(stream.to_output_stream()) + Ok(futures::stream::iter(output.into_iter()).to_output_stream()) } else { - Ok(stream.take(1).to_output_stream()) + Ok(futures::stream::iter(output.into_iter().take(1)).to_output_stream()) } }