Another batch of removing async_stream (#1972)

This commit is contained in:
Jonathan Turner 2020-06-12 21:03:39 -07:00 committed by GitHub
parent d82ce26b44
commit c959dc1ee3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 476 additions and 445 deletions

View File

@ -36,62 +36,66 @@ impl WholeStreamCommand for FromODS {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
from_ods(args, registry) from_ods(args, registry).await
} }
} }
fn from_ods(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn from_ods(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone(); let tag = args.call_info.name_tag.clone();
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let (
let (FromODSArgs { headerless: _headerless }, mut input) = args.process(&registry).await?; FromODSArgs {
let bytes = input.collect_binary(tag.clone()).await?; headerless: _headerless,
let mut buf: Cursor<Vec<u8>> = Cursor::new(bytes.item); },
let mut ods = Ods::<_>::new(buf).map_err(|_| ShellError::labeled_error( input,
"Could not load ods file", ) = args.process(&registry).await?;
"could not load ods file", let bytes = input.collect_binary(tag.clone()).await?;
&tag))?; let buf: Cursor<Vec<u8>> = Cursor::new(bytes.item);
let mut ods = Ods::<_>::new(buf).map_err(|_| {
ShellError::labeled_error("Could not load ods file", "could not load ods file", &tag)
})?;
let mut dict = TaggedDictBuilder::new(&tag); let mut dict = TaggedDictBuilder::new(&tag);
let sheet_names = ods.sheet_names().to_owned(); let sheet_names = ods.sheet_names().to_owned();
for sheet_name in &sheet_names { for sheet_name in &sheet_names {
let mut sheet_output = TaggedListBuilder::new(&tag); let mut sheet_output = TaggedListBuilder::new(&tag);
if let Some(Ok(current_sheet)) = ods.worksheet_range(sheet_name) { if let Some(Ok(current_sheet)) = ods.worksheet_range(sheet_name) {
for row in current_sheet.rows() { for row in current_sheet.rows() {
let mut row_output = TaggedDictBuilder::new(&tag); let mut row_output = TaggedDictBuilder::new(&tag);
for (i, cell) in row.iter().enumerate() { for (i, cell) in row.iter().enumerate() {
let value = match cell { let value = match cell {
DataType::Empty => UntaggedValue::nothing(), DataType::Empty => UntaggedValue::nothing(),
DataType::String(s) => UntaggedValue::string(s), DataType::String(s) => UntaggedValue::string(s),
DataType::Float(f) => UntaggedValue::decimal(*f), DataType::Float(f) => UntaggedValue::decimal(*f),
DataType::Int(i) => UntaggedValue::int(*i), DataType::Int(i) => UntaggedValue::int(*i),
DataType::Bool(b) => UntaggedValue::boolean(*b), DataType::Bool(b) => UntaggedValue::boolean(*b),
_ => UntaggedValue::nothing(), _ => UntaggedValue::nothing(),
}; };
row_output.insert_untagged(&format!("Column{}", i), value); row_output.insert_untagged(&format!("Column{}", i), value);
}
sheet_output.push_untagged(row_output.into_untagged_value());
} }
dict.insert_untagged(sheet_name, sheet_output.into_untagged_value()); sheet_output.push_untagged(row_output.into_untagged_value());
} else {
yield Err(ShellError::labeled_error(
"Could not load sheet",
"could not load sheet",
&tag));
} }
dict.insert_untagged(sheet_name, sheet_output.into_untagged_value());
} else {
return Err(ShellError::labeled_error(
"Could not load sheet",
"could not load sheet",
&tag,
));
} }
}
yield ReturnSuccess::value(dict.into_value()); Ok(OutputStream::one(ReturnSuccess::value(dict.into_value())))
};
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -51,7 +51,7 @@ impl WholeStreamCommand for FromSSV {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
from_ssv(args, registry) from_ssv(args, registry).await
} }
} }
@ -251,37 +251,53 @@ fn from_ssv_string_to_value(
Some(UntaggedValue::Table(rows).into_value(&tag)) Some(UntaggedValue::Table(rows).into_value(&tag))
} }
fn from_ssv(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn from_ssv(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let name = args.call_info.name_tag.clone(); let name = args.call_info.name_tag.clone();
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let (
let (FromSSVArgs { headerless, aligned_columns, minimum_spaces }, mut input) = args.process(&registry).await?; FromSSVArgs {
let concat_string = input.collect_string(name.clone()).await?; headerless,
let split_at = match minimum_spaces { aligned_columns,
Some(number) => number.item, minimum_spaces,
None => DEFAULT_MINIMUM_SPACES },
}; input,
) = args.process(&registry).await?;
let concat_string = input.collect_string(name.clone()).await?;
let split_at = match minimum_spaces {
Some(number) => number.item,
None => DEFAULT_MINIMUM_SPACES,
};
match from_ssv_string_to_value(&concat_string.item, headerless, aligned_columns, split_at, name.clone()) { Ok(
match from_ssv_string_to_value(
&concat_string.item,
headerless,
aligned_columns,
split_at,
name.clone(),
) {
Some(x) => match x { Some(x) => match x {
Value { value: UntaggedValue::Table(list), ..} => { Value {
for l in list { yield ReturnSuccess::value(l) } value: UntaggedValue::Table(list),
} ..
x => yield ReturnSuccess::value(x) } => futures::stream::iter(list.into_iter().map(ReturnSuccess::value))
.to_output_stream(),
x => OutputStream::one(ReturnSuccess::value(x)),
}, },
None => { None => {
yield Err(ShellError::labeled_error_with_secondary( return Err(ShellError::labeled_error_with_secondary(
"Could not parse as SSV", "Could not parse as SSV",
"input cannot be parsed ssv", "input cannot be parsed ssv",
&name, &name,
"value originates from here", "value originates from here",
&concat_string.tag, &concat_string.tag,
)) ));
}, }
} },
}; )
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -24,7 +24,7 @@ impl WholeStreamCommand for FromTOML {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
from_toml(args, registry) from_toml(args, registry).await
} }
} }
@ -64,28 +64,28 @@ pub fn from_toml_string_to_value(s: String, tag: impl Into<Tag>) -> Result<Value
Ok(convert_toml_value_to_nu_value(&v, tag)) Ok(convert_toml_value_to_nu_value(&v, tag))
} }
pub fn from_toml( pub async fn from_toml(
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let args = args.evaluate_once(&registry).await?;
let args = args.evaluate_once(&registry).await?; let tag = args.name_tag();
let tag = args.name_tag(); let input = args.input;
let input = args.input;
let concat_string = input.collect_string(tag.clone()).await?; let concat_string = input.collect_string(tag.clone()).await?;
Ok(
match from_toml_string_to_value(concat_string.item, tag.clone()) { match from_toml_string_to_value(concat_string.item, tag.clone()) {
Ok(x) => match x { Ok(x) => match x {
Value { value: UntaggedValue::Table(list), .. } => { Value {
for l in list { value: UntaggedValue::Table(list),
yield ReturnSuccess::value(l); ..
} } => futures::stream::iter(list.into_iter().map(ReturnSuccess::value))
} .to_output_stream(),
x => yield ReturnSuccess::value(x), x => OutputStream::one(ReturnSuccess::value(x)),
}, },
Err(_) => { Err(_) => {
yield Err(ShellError::labeled_error_with_secondary( return Err(ShellError::labeled_error_with_secondary(
"Could not parse as TOML", "Could not parse as TOML",
"input cannot be parsed as TOML", "input cannot be parsed as TOML",
&tag, &tag,
@ -93,10 +93,8 @@ pub fn from_toml(
concat_string.tag, concat_string.tag,
)) ))
} }
} },
}; )
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -24,7 +24,7 @@ impl WholeStreamCommand for FromXML {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
from_xml(args, registry) from_xml(args, registry).await
} }
} }
@ -99,37 +99,38 @@ pub fn from_xml_string_to_value(s: String, tag: impl Into<Tag>) -> Result<Value,
Ok(from_document_to_value(&parsed, tag)) Ok(from_document_to_value(&parsed, tag))
} }
fn from_xml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn from_xml(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let args = args.evaluate_once(&registry).await?;
let args = args.evaluate_once(&registry).await?; let tag = args.name_tag();
let tag = args.name_tag(); let input = args.input;
let input = args.input;
let concat_string = input.collect_string(tag.clone()).await?; let concat_string = input.collect_string(tag.clone()).await?;
Ok(
match from_xml_string_to_value(concat_string.item, tag.clone()) { match from_xml_string_to_value(concat_string.item, tag.clone()) {
Ok(x) => match x { Ok(x) => match x {
Value { value: UntaggedValue::Table(list), .. } => { Value {
for l in list { value: UntaggedValue::Table(list),
yield ReturnSuccess::value(l); ..
} } => futures::stream::iter(list.into_iter().map(ReturnSuccess::value))
} .to_output_stream(),
x => yield ReturnSuccess::value(x), x => OutputStream::one(ReturnSuccess::value(x)),
}, },
Err(_) => { Err(_) => {
yield Err(ShellError::labeled_error_with_secondary( return Err(ShellError::labeled_error_with_secondary(
"Could not parse as XML", "Could not parse as XML",
"input cannot be parsed as XML", "input cannot be parsed as XML",
&tag, &tag,
"value originates from here", "value originates from here",
&concat_string.tag, &concat_string.tag,
)) ))
} , }
} },
}; )
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -28,7 +28,7 @@ impl WholeStreamCommand for Headers {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
headers(args, registry) headers(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -40,51 +40,65 @@ impl WholeStreamCommand for Headers {
} }
} }
pub fn headers(args: CommandArgs, _registry: &CommandRegistry) -> Result<OutputStream, ShellError> { pub async fn headers(
let stream = async_stream! { args: CommandArgs,
let mut input = args.input; _registry: &CommandRegistry,
let rows: Vec<Value> = input.collect().await; ) -> Result<OutputStream, ShellError> {
let input = args.input;
let rows: Vec<Value> = input.collect().await;
if rows.len() < 1 { if rows.is_empty() {
yield Err(ShellError::untagged_runtime_error("Couldn't find headers, was the input a properly formatted, non-empty table?")); return Err(ShellError::untagged_runtime_error(
} "Couldn't find headers, was the input a properly formatted, non-empty table?",
));
}
//the headers are the first row in the table //the headers are the first row in the table
let headers: Vec<String> = match &rows[0].value { let headers: Vec<String> = match &rows[0].value {
UntaggedValue::Row(d) => { UntaggedValue::Row(d) => {
Ok(d.entries.iter().map(|(k, v)| { Ok(d.entries
.iter()
.map(|(k, v)| {
match v.as_string() { match v.as_string() {
Ok(s) => s, Ok(s) => s,
Err(_) => { //If a cell that should contain a header name is empty, we name the column Column[index] Err(_) => {
//If a cell that should contain a header name is empty, we name the column Column[index]
match d.entries.get_full(k) { match d.entries.get_full(k) {
Some((index, _, _)) => format!("Column{}", index), Some((index, _, _)) => format!("Column{}", index),
None => "unknownColumn".to_string() None => "unknownColumn".to_string(),
} }
} }
} }
}).collect()) })
} .collect())
_ => Err(ShellError::unexpected_eof("Could not get headers, is the table empty?", rows[0].tag.span)) }
}?; _ => Err(ShellError::unexpected_eof(
"Could not get headers, is the table empty?",
rows[0].tag.span,
)),
}?;
//Each row is a dictionary with the headers as keys Ok(
for r in rows.iter().skip(1) { futures::stream::iter(rows.into_iter().skip(1).map(move |r| {
//Each row is a dictionary with the headers as keys
match &r.value { match &r.value {
UntaggedValue::Row(d) => { UntaggedValue::Row(d) => {
let mut i = 0;
let mut entries = IndexMap::new(); let mut entries = IndexMap::new();
for (_, v) in d.entries.iter() { for (i, (_, v)) in d.entries.iter().enumerate() {
entries.insert(headers[i].clone(), v.clone()); entries.insert(headers[i].clone(), v.clone());
i += 1;
} }
yield Ok(ReturnSuccess::Value(UntaggedValue::Row(Dictionary{entries}).into_value(r.tag.clone()))) Ok(ReturnSuccess::Value(
UntaggedValue::Row(Dictionary { entries }).into_value(r.tag.clone()),
))
} }
_ => yield Err(ShellError::unexpected_eof("Couldn't iterate through rows, was the input a properly formatted table?", r.tag.span)) _ => Err(ShellError::unexpected_eof(
"Couldn't iterate through rows, was the input a properly formatted table?",
r.tag.span,
)),
} }
} }))
}; .to_output_stream(),
)
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -42,38 +42,32 @@ impl WholeStreamCommand for Insert {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
insert(args, registry) insert(args, registry).await
} }
} }
fn insert(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn insert(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let (InsertArgs { column, value }, input) = args.process(&registry).await?;
let (InsertArgs { column, value }, mut input) = args.process(&registry).await?;
while let Some(row) = input.next().await {
match row {
Value {
value: UntaggedValue::Row(_),
..
} => match row.insert_data_at_column_path(&column, value.clone()) {
Ok(v) => yield Ok(ReturnSuccess::Value(v)),
Err(err) => yield Err(err),
},
Value { tag, ..} => { Ok(input
yield Err(ShellError::labeled_error( .map(move |row| match row {
"Unrecognized type in stream", Value {
"original value", value: UntaggedValue::Row(_),
tag, ..
)); } => match row.insert_data_at_column_path(&column, value.clone()) {
} Ok(v) => Ok(ReturnSuccess::Value(v)),
Err(err) => Err(err),
},
} Value { tag, .. } => Err(ShellError::labeled_error(
}; "Unrecognized type in stream",
"original value",
}; tag,
Ok(stream.to_output_stream()) )),
})
.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand;
use crate::context::CommandRegistry; use crate::context::CommandRegistry;
use crate::prelude::*; use crate::prelude::*;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue}; use nu_protocol::{Signature, SyntaxShape, UntaggedValue};
use nu_source::Tagged; use nu_source::Tagged;
pub struct Keep; pub struct Keep;
@ -35,7 +35,7 @@ impl WholeStreamCommand for Keep {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
keep(args, registry) keep(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -59,27 +59,16 @@ impl WholeStreamCommand for Keep {
} }
} }
fn keep(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn keep(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let (KeepArgs { rows }, input) = args.process(&registry).await?;
let (KeepArgs { rows }, mut input) = args.process(&registry).await?; let rows_desired = if let Some(quantity) = rows {
let mut rows_desired = if let Some(quantity) = rows { *quantity
*quantity } else {
} else { 1
1
};
while let Some(input) = input.next().await {
if rows_desired > 0 {
yield ReturnSuccess::value(input);
rows_desired -= 1;
} else {
break;
}
}
}; };
Ok(stream.to_output_stream()) Ok(input.take(rows_desired).to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand;
use crate::context::CommandRegistry; use crate::context::CommandRegistry;
use crate::prelude::*; use crate::prelude::*;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value}; use nu_protocol::{Signature, SyntaxShape, UntaggedValue, Value};
#[derive(Deserialize)] #[derive(Deserialize)]
struct PrependArgs { struct PrependArgs {
@ -34,7 +34,7 @@ impl WholeStreamCommand for Prepend {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
prepend(args, registry) prepend(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -51,19 +51,17 @@ impl WholeStreamCommand for Prepend {
} }
} }
fn prepend(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn prepend(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let (PrependArgs { row }, input) = args.process(&registry).await?;
let (PrependArgs { row }, mut input) = args.process(&registry).await?;
yield ReturnSuccess::value(row); let bos = futures::stream::iter(vec![row]);
while let Some(item) = input.next().await {
yield ReturnSuccess::value(item);
}
};
Ok(stream.to_output_stream()) Ok(bos.chain(input).to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -36,28 +36,25 @@ impl WholeStreamCommand for Range {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
range(args, registry) range(args, registry).await
} }
} }
fn range(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn range(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let (RangeArgs { area }, input) = args.process(&registry).await?;
let (RangeArgs { area }, mut input) = args.process(&registry).await?; let range = area.item;
let range = area.item; let (from, _) = range.from;
let (from, _) = range.from; let (to, _) = range.to;
let (to, _) = range.to;
let from = *from as usize; let from = *from as usize;
let to = *to as usize; let to = *to as usize;
let mut inp = input.skip(from).take(to - from + 1); Ok(input
while let Some(item) = inp.next().await { .skip(from)
yield ReturnSuccess::value(item); .take(to - from + 1)
} .map(ReturnSuccess::value)
}; .to_output_stream())
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -25,7 +25,7 @@ impl WholeStreamCommand for Reverse {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
reverse(args, registry) reverse(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -43,19 +43,16 @@ impl WholeStreamCommand for Reverse {
} }
} }
fn reverse(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn reverse(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let args = args.evaluate_once(&registry).await?;
let args = args.evaluate_once(&registry).await?; let (input, _args) = args.parts();
let (input, _args) = args.parts();
let input = input.collect::<Vec<_>>().await; let input = input.collect::<Vec<_>>().await;
for output in input.into_iter().rev() { Ok(futures::stream::iter(input.into_iter().rev().map(ReturnSuccess::value)).to_output_stream())
yield ReturnSuccess::value(output);
}
};
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -44,7 +44,7 @@ impl WholeStreamCommand for SubCommand {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
operate(args, registry) operate(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -59,52 +59,56 @@ impl WholeStreamCommand for SubCommand {
#[derive(Clone)] #[derive(Clone)]
struct FindReplace(String, String); struct FindReplace(String, String);
fn operate(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn operate(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let (
let (Arguments { find, replace, rest }, mut input) = args.process(&registry).await?; Arguments {
let options = FindReplace(find.item, replace.item); find,
replace,
rest,
},
input,
) = args.process(&registry).await?;
let options = FindReplace(find.item, replace.item);
let column_paths: Vec<_> = rest.iter().map(|x| x.clone()).collect(); let column_paths: Vec<_> = rest;
while let Some(v) = input.next().await { Ok(input
.map(move |v| {
if column_paths.is_empty() { if column_paths.is_empty() {
match action(&v, &options, v.tag()) { match action(&v, &options, v.tag()) {
Ok(out) => yield ReturnSuccess::value(out), Ok(out) => ReturnSuccess::value(out),
Err(err) => { Err(err) => Err(err),
yield Err(err);
return;
}
} }
} else { } else {
let mut ret = v;
let mut ret = v.clone();
for path in &column_paths { for path in &column_paths {
let options = options.clone(); let options = options.clone();
let swapping = ret.swap_data_by_column_path(path, Box::new(move |old| { let swapping = ret.swap_data_by_column_path(
action(old, &options, old.tag()) path,
})); Box::new(move |old| action(old, &options, old.tag())),
);
match swapping { match swapping {
Ok(new_value) => { Ok(new_value) => {
ret = new_value; ret = new_value;
} }
Err(err) => { Err(err) => {
yield Err(err); return Err(err);
return;
} }
} }
} }
yield ReturnSuccess::value(ret); ReturnSuccess::value(ret)
} }
} })
}; .to_output_stream())
Ok(stream.to_output_stream())
} }
fn action(input: &Value, options: &FindReplace, tag: impl Into<Tag>) -> Result<Value, ShellError> { fn action(input: &Value, options: &FindReplace, tag: impl Into<Tag>) -> Result<Value, ShellError> {

View File

@ -37,7 +37,7 @@ impl WholeStreamCommand for SubCommand {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
operate(args, registry) operate(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -49,47 +49,46 @@ impl WholeStreamCommand for SubCommand {
} }
} }
fn operate(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn operate(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let (Arguments { rest }, input) = args.process(&registry).await?;
let (Arguments { rest }, mut input) = args.process(&registry).await?;
let column_paths: Vec<_> = rest.iter().map(|x| x.clone()).collect(); let column_paths: Vec<_> = rest;
while let Some(v) = input.next().await { Ok(input
.map(move |v| {
if column_paths.is_empty() { if column_paths.is_empty() {
match action(&v, v.tag()) { match action(&v, v.tag()) {
Ok(out) => yield ReturnSuccess::value(out), Ok(out) => ReturnSuccess::value(out),
Err(err) => { Err(err) => Err(err),
yield Err(err);
return;
}
} }
} else { } else {
let mut ret = v;
let mut ret = v.clone();
for path in &column_paths { for path in &column_paths {
let swapping = ret.swap_data_by_column_path(path, Box::new(move |old| action(old, old.tag()))); let swapping = ret.swap_data_by_column_path(
path,
Box::new(move |old| action(old, old.tag())),
);
match swapping { match swapping {
Ok(new_value) => { Ok(new_value) => {
ret = new_value; ret = new_value;
} }
Err(err) => { Err(err) => {
yield Err(err); return Err(err);
return;
} }
} }
} }
yield ReturnSuccess::value(ret); ReturnSuccess::value(ret)
} }
} })
}; .to_output_stream())
Ok(stream.to_output_stream())
} }
fn action(input: &Value, tag: impl Into<Tag>) -> Result<Value, ShellError> { fn action(input: &Value, tag: impl Into<Tag>) -> Result<Value, ShellError> {

View File

@ -24,67 +24,58 @@ impl WholeStreamCommand for ToURL {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
to_url(args, registry) to_url(args, registry).await
} }
} }
fn to_url(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn to_url(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let args = args.evaluate_once(&registry).await?;
let args = args.evaluate_once(&registry).await?; let tag = args.name_tag();
let tag = args.name_tag(); let input = args.input;
let input = args.input;
let input: Vec<Value> = input.collect().await; Ok(input
.map(move |value| match value {
for value in input { Value {
match value { value: UntaggedValue::Row(row),
Value { value: UntaggedValue::Row(row), .. } => { ..
let mut row_vec = vec![]; } => {
for (k,v) in row.entries { let mut row_vec = vec![];
match v.as_string() { for (k, v) in row.entries {
Ok(s) => { match v.as_string() {
row_vec.push((k.clone(), s.to_string()));
}
_ => {
yield Err(ShellError::labeled_error_with_secondary(
"Expected table with string values",
"requires table with strings",
&tag,
"value originates from here",
v.tag,
))
}
}
}
match serde_urlencoded::to_string(row_vec) {
Ok(s) => { Ok(s) => {
yield ReturnSuccess::value(UntaggedValue::string(s).into_value(&tag)); row_vec.push((k.clone(), s.to_string()));
} }
_ => { _ => {
yield Err(ShellError::labeled_error( return Err(ShellError::labeled_error_with_secondary(
"Failed to convert to url-encoded", "Expected table with string values",
"cannot url-encode", "requires table with strings",
&tag, &tag,
)) "value originates from here",
v.tag,
));
} }
} }
} }
Value { tag: value_tag, .. } => {
yield Err(ShellError::labeled_error_with_secondary( match serde_urlencoded::to_string(row_vec) {
"Expected a table from pipeline", Ok(s) => ReturnSuccess::value(UntaggedValue::string(s).into_value(&tag)),
"requires table input", _ => Err(ShellError::labeled_error(
"Failed to convert to url-encoded",
"cannot url-encode",
&tag, &tag,
"value originates from here", )),
value_tag.span,
))
} }
} }
} Value { tag: value_tag, .. } => Err(ShellError::labeled_error_with_secondary(
}; "Expected a table from pipeline",
"requires table input",
Ok(stream.to_output_stream()) &tag,
"value originates from here",
value_tag.span,
)),
})
.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -24,7 +24,7 @@ impl WholeStreamCommand for ToYAML {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
to_yaml(args, registry) to_yaml(args, registry).await
} }
} }
@ -125,51 +125,55 @@ pub fn value_to_yaml_value(v: &Value) -> Result<serde_yaml::Value, ShellError> {
}) })
} }
fn to_yaml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn to_yaml(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let args = args.evaluate_once(&registry).await?;
let args = args.evaluate_once(&registry).await?; let name_tag = args.name_tag();
let name_tag = args.name_tag(); let name_span = name_tag.span;
let name_span = name_tag.span;
let input: Vec<Value> = args.input.collect().await; let input: Vec<Value> = args.input.collect().await;
let to_process_input = if input.len() > 1 { let to_process_input = match input.len() {
x if x > 1 => {
let tag = input[0].tag.clone(); let tag = input[0].tag.clone();
vec![Value { value: UntaggedValue::Table(input), tag } ] vec![Value {
} else if input.len() == 1 { value: UntaggedValue::Table(input),
input tag,
} else { }]
vec![] }
}; 1 => input,
_ => vec![],
};
for value in to_process_input { Ok(
futures::stream::iter(to_process_input.into_iter().map(move |value| {
let value_span = value.tag.span; let value_span = value.tag.span;
match value_to_yaml_value(&value) { match value_to_yaml_value(&value) {
Ok(yaml_value) => { Ok(yaml_value) => match serde_yaml::to_string(&yaml_value) {
match serde_yaml::to_string(&yaml_value) { Ok(x) => ReturnSuccess::value(
Ok(x) => yield ReturnSuccess::value( UntaggedValue::Primitive(Primitive::String(x)).into_value(&name_tag),
UntaggedValue::Primitive(Primitive::String(x)).into_value(&name_tag), ),
), _ => Err(ShellError::labeled_error_with_secondary(
_ => yield Err(ShellError::labeled_error_with_secondary( "Expected a table with YAML-compatible structure from pipeline",
"Expected a table with YAML-compatible structure from pipeline", "requires YAML-compatible input",
"requires YAML-compatible input", name_span,
name_span, "originates from here".to_string(),
"originates from here".to_string(), value_span,
value_span, )),
)), },
} _ => Err(ShellError::labeled_error(
}
_ => yield Err(ShellError::labeled_error(
"Expected a table with YAML-compatible structure from pipeline", "Expected a table with YAML-compatible structure from pipeline",
"requires YAML-compatible input", "requires YAML-compatible input",
&name_tag)) &name_tag,
)),
} }
} }))
}; .to_output_stream(),
)
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -3,7 +3,7 @@ use crate::commands::WholeStreamCommand;
use crate::context::CommandRegistry; use crate::context::CommandRegistry;
use crate::prelude::*; use crate::prelude::*;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ColumnPath, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value}; use nu_protocol::{ColumnPath, ReturnSuccess, Scope, Signature, SyntaxShape, UntaggedValue, Value};
use nu_value_ext::ValueExt; use nu_value_ext::ValueExt;
use futures::stream::once; use futures::stream::once;
@ -44,105 +44,124 @@ impl WholeStreamCommand for Update {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
update(args, registry) update(args, registry).await
} }
} }
fn update(raw_args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn process_row(
let registry = registry.clone(); scope: Arc<Scope>,
let scope = raw_args.call_info.scope.clone(); mut context: Arc<Context>,
input: Value,
mut replacement: Arc<Value>,
field: Arc<ColumnPath>,
) -> Result<OutputStream, ShellError> {
let replacement = Arc::make_mut(&mut replacement);
let stream = async_stream! { Ok(match replacement {
let mut context = Context::from_raw(&raw_args, &registry); Value {
let (UpdateArgs { field, replacement }, mut input) = raw_args.process(&registry).await?; value: UntaggedValue::Block(block),
while let Some(input) = input.next().await { ..
let replacement = replacement.clone(); } => {
match replacement { let for_block = input.clone();
Value { let input_stream = once(async { Ok(for_block) }).to_input_stream();
value: UntaggedValue::Block(block),
tag,
} => {
let for_block = input.clone();
let input_stream = once(async { Ok(for_block) }).to_input_stream();
let result = run_block( let result = run_block(
&block, &block,
&mut context, Arc::make_mut(&mut context),
input_stream, input_stream,
&input, &input,
&scope.vars, &scope.vars,
&scope.env &scope.env,
).await; )
.await;
match result { match result {
Ok(mut stream) => { Ok(mut stream) => {
let errors = context.get_errors(); let errors = context.get_errors();
if let Some(error) = errors.first() { if let Some(error) = errors.first() {
yield Err(error.clone()); return Err(error.clone());
}
match input {
obj @ Value {
value: UntaggedValue::Row(_),
..
} => {
if let Some(result) = stream.next().await {
match obj.replace_data_at_column_path(&field, result.clone()) {
Some(v) => yield Ok(ReturnSuccess::Value(v)),
None => {
yield Err(ShellError::labeled_error(
"update could not find place to insert column",
"column name",
obj.tag,
))
}
}
}
}
Value { tag, ..} => {
yield Err(ShellError::labeled_error(
"Unrecognized type in stream",
"original value",
tag,
))
}
}
}
Err(e) => {
yield Err(e);
}
} }
}
_ => {
match input { match input {
obj @ Value { obj
@
Value {
value: UntaggedValue::Row(_), value: UntaggedValue::Row(_),
.. ..
} => match obj.replace_data_at_column_path(&field, replacement.clone()) { } => {
Some(v) => yield Ok(ReturnSuccess::Value(v)), if let Some(result) = stream.next().await {
None => { match obj.replace_data_at_column_path(&field, result) {
yield Err(ShellError::labeled_error( Some(v) => OutputStream::one(ReturnSuccess::value(v)),
"update could not find place to insert column", None => OutputStream::one(Err(ShellError::labeled_error(
"column name", "update could not find place to insert column",
obj.tag, "column name",
)) obj.tag,
))),
}
} else {
OutputStream::empty()
} }
},
Value { tag, ..} => {
yield Err(ShellError::labeled_error(
"Unrecognized type in stream",
"original value",
tag,
))
} }
_ => {} Value { tag, .. } => OutputStream::one(Err(ShellError::labeled_error(
"Unrecognized type in stream",
"original value",
tag,
))),
} }
} }
Err(e) => OutputStream::one(Err(e)),
} }
} }
}; _ => match input {
obj
@
Value {
value: UntaggedValue::Row(_),
..
} => match obj.replace_data_at_column_path(&field, replacement.clone()) {
Some(v) => OutputStream::one(ReturnSuccess::value(v)),
None => OutputStream::one(Err(ShellError::labeled_error(
"update could not find place to insert column",
"column name",
obj.tag,
))),
},
Value { tag, .. } => OutputStream::one(Err(ShellError::labeled_error(
"Unrecognized type in stream",
"original value",
tag,
))),
},
})
}
Ok(stream.to_output_stream()) async fn update(
raw_args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let scope = Arc::new(raw_args.call_info.scope.clone());
let context = Arc::new(Context::from_raw(&raw_args, &registry));
let (UpdateArgs { field, replacement }, input) = raw_args.process(&registry).await?;
let replacement = Arc::new(replacement);
let field = Arc::new(field);
Ok(input
.then(move |input| {
let replacement = replacement.clone();
let scope = scope.clone();
let context = context.clone();
let field = field.clone();
async {
match process_row(scope, context, input, replacement, field).await {
Ok(s) => s,
Err(e) => OutputStream::one(Err(e)),
}
}
})
.flatten()
.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -28,7 +28,7 @@ impl WholeStreamCommand for Which {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
which(args, registry) which(args, registry).await
} }
} }
@ -77,36 +77,42 @@ struct WhichArgs {
all: bool, all: bool,
} }
fn which(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn which(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let mut all = true;
let stream = async_stream! {
let (WhichArgs { application, all: all_items }, _) = args.process(&registry).await?;
all = all_items;
let external = application.starts_with('^');
let item = if external {
application.item[1..].to_string()
} else {
application.item.clone()
};
if !external {
let builtin = registry.has(&item);
if builtin {
yield ReturnSuccess::value(entry_builtin!(item, application.tag.clone()));
}
}
if let Ok(paths) = ichwh::which_all(&item).await { let mut output = vec![];
for path in paths {
yield ReturnSuccess::value(entry_path!(item, path.into(), application.tag.clone())); let (WhichArgs { application, all }, _) = args.process(&registry).await?;
} let external = application.starts_with('^');
} let item = if external {
application.item[1..].to_string()
} else {
application.item.clone()
}; };
if !external {
let builtin = registry.has(&item);
if builtin {
output.push(ReturnSuccess::value(entry_builtin!(
item,
application.tag.clone()
)));
}
}
if let Ok(paths) = ichwh::which_all(&item).await {
for path in paths {
output.push(ReturnSuccess::value(entry_path!(
item,
path.into(),
application.tag.clone()
)));
}
}
if all { if all {
Ok(stream.to_output_stream()) Ok(futures::stream::iter(output.into_iter()).to_output_stream())
} else { } else {
Ok(stream.take(1).to_output_stream()) Ok(futures::stream::iter(output.into_iter().take(1)).to_output_stream())
} }
} }