Another batch of converting commands away from async_stream (#1974)

* Another batch of removing async_stream

* merge master
This commit is contained in:
Jonathan Turner 2020-06-13 01:43:21 -07:00 committed by GitHub
parent e24e0242d1
commit fe6d96e996
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 495 additions and 492 deletions

View File

@ -33,7 +33,7 @@ impl WholeStreamCommand for FromJSON {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
from_json(args, registry) from_json(args, registry).await
} }
} }
@ -71,65 +71,73 @@ pub fn from_json_string_to_value(s: String, tag: impl Into<Tag>) -> serde_hjson:
Ok(convert_json_value_to_nu_value(&v, tag)) Ok(convert_json_value_to_nu_value(&v, tag))
} }
fn from_json(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn from_json(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let name_tag = args.call_info.name_tag.clone(); let name_tag = args.call_info.name_tag.clone();
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let (FromJSONArgs { objects }, input) = args.process(&registry).await?;
let (FromJSONArgs { objects }, mut input) = args.process(&registry).await?;
let concat_string = input.collect_string(name_tag.clone()).await?; let concat_string = input.collect_string(name_tag.clone()).await?;
let string_clone: Vec<_> = concat_string.item.lines().map(|x| x.to_string()).collect();
if objects { if objects {
for json_str in concat_string.item.lines() { Ok(
futures::stream::iter(string_clone.into_iter().filter_map(move |json_str| {
if json_str.is_empty() { if json_str.is_empty() {
continue; return None;
} }
match from_json_string_to_value(json_str.to_string(), &name_tag) { match from_json_string_to_value(json_str, &name_tag) {
Ok(x) => Ok(x) => Some(ReturnSuccess::value(x)),
yield ReturnSuccess::value(x),
Err(e) => { Err(e) => {
let mut message = "Could not parse as JSON (".to_string(); let mut message = "Could not parse as JSON (".to_string();
message.push_str(&e.to_string()); message.push_str(&e.to_string());
message.push_str(")"); message.push_str(")");
yield Err(ShellError::labeled_error_with_secondary( Some(Err(ShellError::labeled_error_with_secondary(
message, message,
"input cannot be parsed as JSON", "input cannot be parsed as JSON",
&name_tag, name_tag.clone(),
"value originates from here", "value originates from here",
concat_string.tag.clone())) concat_string.tag.clone(),
} )))
} }
} }
}))
.to_output_stream(),
)
} else { } else {
match from_json_string_to_value(concat_string.item, name_tag.clone()) { match from_json_string_to_value(concat_string.item, name_tag.clone()) {
Ok(x) => Ok(x) => match x {
match x { Value {
Value { value: UntaggedValue::Table(list), .. } => { value: UntaggedValue::Table(list),
for l in list { ..
yield ReturnSuccess::value(l); } => Ok(
} futures::stream::iter(list.into_iter().map(ReturnSuccess::value))
} .to_output_stream(),
x => yield ReturnSuccess::value(x), ),
} x => Ok(OutputStream::one(ReturnSuccess::value(x))),
},
Err(e) => { Err(e) => {
let mut message = "Could not parse as JSON (".to_string(); let mut message = "Could not parse as JSON (".to_string();
message.push_str(&e.to_string()); message.push_str(&e.to_string());
message.push_str(")"); message.push_str(")");
yield Err(ShellError::labeled_error_with_secondary( Ok(OutputStream::one(Err(
ShellError::labeled_error_with_secondary(
message, message,
"input cannot be parsed as JSON", "input cannot be parsed as JSON",
name_tag, name_tag,
"value originates from here", "value originates from here",
concat_string.tag)) concat_string.tag,
),
)))
} }
} }
} }
};
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -45,7 +45,7 @@ impl WholeStreamCommand for Histogram {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
histogram(args, registry) histogram(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -70,21 +70,20 @@ impl WholeStreamCommand for Histogram {
} }
} }
pub fn histogram( pub async fn histogram(
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let name = args.call_info.name_tag.clone(); let name = args.call_info.name_tag.clone();
let stream = async_stream! { let (HistogramArgs { column_name, rest }, input) = args.process(&registry).await?;
let (HistogramArgs { column_name, rest}, mut input) = args.process(&registry).await?;
let values: Vec<Value> = input.collect().await; let values: Vec<Value> = input.collect().await;
let Tagged { item: group_by, .. } = column_name.clone(); let Tagged { item: group_by, .. } = column_name.clone();
let groups = group(&column_name, values, &name)?; let groups = group(&column_name, values, &name)?;
let group_labels = columns_sorted(Some(group_by.clone()), &groups, &name); let group_labels = columns_sorted(Some(group_by.clone()), &groups, &name);
let sorted = t_sort(Some(group_by.clone()), None, &groups, &name)?; let sorted = t_sort(Some(group_by), None, &groups, &name)?;
let evaled = evaluate(&sorted, None, &name)?; let evaled = evaluate(&sorted, None, &name)?;
let reduced = reduce(&evaled, None, &name)?; let reduced = reduce(&evaled, None, &name)?;
let maxima = map_max(&reduced, None, &name)?; let maxima = map_max(&reduced, None, &name)?;
@ -95,7 +94,6 @@ pub fn histogram(
value: UntaggedValue::Table(datasets), value: UntaggedValue::Table(datasets),
.. ..
} => { } => {
let mut idx = 0; let mut idx = 0;
let column_names_supplied: Vec<_> = rest.iter().map(|f| f.item.clone()).collect(); let column_names_supplied: Vec<_> = rest.iter().map(|f| f.item.clone()).collect();
@ -109,7 +107,11 @@ pub fn histogram(
let column = (*column_name).clone(); let column = (*column_name).clone();
let count_column_name = "count".to_string(); let count_column_name = "count".to_string();
let count_shell_error = ShellError::labeled_error("Unable to load group count", "unabled to load group count", &name); let count_shell_error = ShellError::labeled_error(
"Unable to load group count",
"unabled to load group count",
&name,
);
let mut count_values: Vec<u64> = Vec::new(); let mut count_values: Vec<u64> = Vec::new();
for table_entry in reduced.table_entries() { for table_entry in reduced.table_entries() {
@ -122,43 +124,82 @@ pub fn histogram(
if let Ok(count) = i.value.clone().into_value(&name).as_u64() { if let Ok(count) = i.value.clone().into_value(&name).as_u64() {
count_values.push(count); count_values.push(count);
} else { } else {
yield Err(count_shell_error); return Err(count_shell_error);
return;
} }
} }
} }
_ => { _ => {
yield Err(count_shell_error); return Err(count_shell_error);
return;
} }
} }
} }
if let Value { value: UntaggedValue::Table(start), .. } = datasets.get(0).ok_or_else(|| ShellError::labeled_error("Unable to load dataset", "unabled to load dataset", &name))? { if let Value {
for percentage in start.iter() { value: UntaggedValue::Table(start),
..
} = datasets.get(0).ok_or_else(|| {
ShellError::labeled_error(
"Unable to load dataset",
"unabled to load dataset",
&name,
)
})? {
let start = start.clone();
Ok(
futures::stream::iter(start.into_iter().map(move |percentage| {
let mut fact = TaggedDictBuilder::new(&name); let mut fact = TaggedDictBuilder::new(&name);
let value: Tagged<String> = group_labels.get(idx).ok_or_else(|| ShellError::labeled_error("Unable to load group labels", "unabled to load group labels", &name))?.clone(); let value: Tagged<String> = group_labels
fact.insert_value(&column, UntaggedValue::string(value.item).into_value(value.tag)); .get(idx)
.ok_or_else(|| {
ShellError::labeled_error(
"Unable to load group labels",
"unabled to load group labels",
&name,
)
})?
.clone();
fact.insert_value(
&column,
UntaggedValue::string(value.item).into_value(value.tag),
);
fact.insert_untagged(&count_column_name, UntaggedValue::int(count_values[idx])); fact.insert_untagged(
&count_column_name,
UntaggedValue::int(count_values[idx]),
);
if let Value { value: UntaggedValue::Primitive(Primitive::Int(ref num)), ref tag } = percentage.clone() { if let Value {
let string = std::iter::repeat("*").take(num.to_i32().ok_or_else(|| ShellError::labeled_error("Expected a number", "expected a number", tag))? as usize).collect::<String>(); value: UntaggedValue::Primitive(Primitive::Int(ref num)),
fact.insert_untagged(&frequency_column_name, UntaggedValue::string(string)); ref tag,
} = percentage
{
let string = std::iter::repeat("*")
.take(num.to_i32().ok_or_else(|| {
ShellError::labeled_error(
"Expected a number",
"expected a number",
tag,
)
})? as usize)
.collect::<String>();
fact.insert_untagged(
&frequency_column_name,
UntaggedValue::string(string),
);
} }
idx += 1; idx += 1;
yield ReturnSuccess::value(fact.into_value()); ReturnSuccess::value(fact.into_value())
}))
.to_output_stream(),
)
} else {
Ok(OutputStream::empty())
} }
} }
_ => Ok(OutputStream::empty()),
} }
_ => {}
}
};
Ok(stream.to_output_stream())
} }
fn percentages(values: &Value, max: Value, tag: impl Into<Tag>) -> Result<Value, ShellError> { fn percentages(values: &Value, max: Value, tag: impl Into<Tag>) -> Result<Value, ShellError> {

View File

@ -33,21 +33,25 @@ impl WholeStreamCommand for History {
fn history(args: CommandArgs, _registry: &CommandRegistry) -> Result<OutputStream, ShellError> { fn history(args: CommandArgs, _registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag; let tag = args.call_info.name_tag;
let stream = async_stream! {
let history_path = HistoryFile::path(); let history_path = HistoryFile::path();
let file = File::open(history_path); let file = File::open(history_path);
if let Ok(file) = file { if let Ok(file) = file {
let reader = BufReader::new(file); let reader = BufReader::new(file);
for line in reader.lines() { let output = reader.lines().filter_map(move |line| match line {
if let Ok(line) = line { Ok(line) => Some(ReturnSuccess::value(
yield ReturnSuccess::value(UntaggedValue::string(line).into_value(tag.clone())); UntaggedValue::string(line).into_value(tag.clone()),
} )),
} Err(_) => None,
});
Ok(futures::stream::iter(output).to_output_stream())
} else { } else {
yield Err(ShellError::labeled_error("Could not open history", "history file could not be opened", tag.clone())); Err(ShellError::labeled_error(
"Could not open history",
"history file could not be opened",
tag,
))
} }
};
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand;
use crate::context::CommandRegistry; use crate::context::CommandRegistry;
use crate::prelude::*; use crate::prelude::*;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue}; use nu_protocol::{Signature, SyntaxShape};
use nu_source::Tagged; use nu_source::Tagged;
use std::process::{Command, Stdio}; use std::process::{Command, Stdio};
@ -43,7 +43,7 @@ impl WholeStreamCommand for Kill {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
kill(args, registry) kill(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -62,16 +62,18 @@ impl WholeStreamCommand for Kill {
} }
} }
fn kill(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn kill(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let (
let (KillArgs { KillArgs {
pid, pid,
rest, rest,
force, force,
quiet, quiet,
}, mut input) = args.process(&registry).await?; },
..,
) = args.process(&registry).await?;
let mut cmd = if cfg!(windows) { let mut cmd = if cfg!(windows) {
let mut cmd = Command::new("taskkill"); let mut cmd = Command::new("taskkill");
@ -113,12 +115,7 @@ fn kill(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, S
cmd.status().expect("failed to execute shell command"); cmd.status().expect("failed to execute shell command");
if false { Ok(OutputStream::empty())
yield ReturnSuccess::value(UntaggedValue::nothing().into_value(Tag::unknown()));
}
};
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -37,7 +37,7 @@ impl WholeStreamCommand for Merge {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
merge(args, registry) merge(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -49,57 +49,61 @@ impl WholeStreamCommand for Merge {
} }
} }
fn merge(raw_args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn merge(
raw_args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let scope = raw_args.call_info.scope.clone(); let scope = raw_args.call_info.scope.clone();
let stream = async_stream! {
let mut context = Context::from_raw(&raw_args, &registry); let mut context = Context::from_raw(&raw_args, &registry);
let name_tag = raw_args.call_info.name_tag.clone(); let name_tag = raw_args.call_info.name_tag.clone();
let (merge_args, mut input): (MergeArgs, _) = raw_args.process(&registry).await?; let (merge_args, input): (MergeArgs, _) = raw_args.process(&registry).await?;
let block = merge_args.block; let block = merge_args.block;
let table: Option<Vec<Value>> = match run_block(&block, let table: Option<Vec<Value>> = match run_block(
&block,
&mut context, &mut context,
InputStream::empty(), InputStream::empty(),
&scope.it, &scope.it,
&scope.vars, &scope.vars,
&scope.env).await { &scope.env,
)
.await
{
Ok(mut stream) => Some(stream.drain_vec().await), Ok(mut stream) => Some(stream.drain_vec().await),
Err(err) => { Err(err) => {
yield Err(err); return Err(err);
return;
} }
}; };
let table = table.unwrap_or_else(|| {
let table = table.unwrap_or_else(|| vec![Value { vec![Value {
value: UntaggedValue::row(IndexMap::default()), value: UntaggedValue::row(IndexMap::default()),
tag: name_tag, tag: name_tag,
}]); }]
});
let mut idx = 0; Ok(input
.enumerate()
while let Some(value) = input.next().await { .map(move |(idx, value)| {
let other = table.get(idx); let other = table.get(idx);
match other { match other {
Some(replacement) => { Some(replacement) => match merge_values(&value.value, &replacement.value) {
match merge_values(&value.value, &replacement.value) { Ok(merged_value) => ReturnSuccess::value(merged_value.into_value(&value.tag)),
Ok(merged_value) => yield ReturnSuccess::value(merged_value.into_value(&value.tag)), Err(_) => {
Err(err) => {
let message = format!("The row at {:?} types mismatch", idx); let message = format!("The row at {:?} types mismatch", idx);
yield Err(ShellError::labeled_error("Could not merge", &message, &value.tag)); Err(ShellError::labeled_error(
"Could not merge",
&message,
&value.tag,
))
} }
},
None => ReturnSuccess::value(value),
} }
} })
None => yield ReturnSuccess::value(value), .to_output_stream())
}
idx += 1;
}
};
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -38,7 +38,7 @@ impl WholeStreamCommand for Nth {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
nth(args, registry) nth(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -57,30 +57,36 @@ impl WholeStreamCommand for Nth {
} }
} }
fn nth(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn nth(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let (
let (NthArgs { row_number, rest: and_rows}, input) = args.process(&registry).await?; NthArgs {
row_number,
rest: and_rows,
},
input,
) = args.process(&registry).await?;
let mut inp = input.enumerate(); let row_numbers = vec![vec![row_number], and_rows]
while let Some((idx, item)) = inp.next().await {
let row_number = vec![row_number.clone()];
let row_numbers = vec![&row_number, &and_rows]
.into_iter() .into_iter()
.flatten() .flatten()
.collect::<Vec<&Tagged<u64>>>(); .collect::<Vec<Tagged<u64>>>();
Ok(input
.enumerate()
.filter_map(move |(idx, item)| {
futures::future::ready(
if row_numbers if row_numbers
.iter() .iter()
.any(|requested| requested.item == idx as u64) .any(|requested| requested.item == idx as u64)
{ {
yield ReturnSuccess::value(item); Some(ReturnSuccess::value(item))
} } else {
} None
}; },
)
Ok(stream.to_output_stream()) })
.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -51,24 +51,29 @@ impl WholeStreamCommand for Pivot {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
pivot(args, registry) pivot(args, registry).await
} }
} }
pub fn pivot(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { pub async fn pivot(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let name = args.call_info.name_tag.clone(); let name = args.call_info.name_tag.clone();
let stream = async_stream! { let (args, input): (PivotArgs, _) = args.process(&registry).await?;
let (args, mut input): (PivotArgs, _) = args.process(&registry).await?;
let input = input.into_vec().await; let input = input.into_vec().await;
let descs = merge_descriptors(&input); let descs = merge_descriptors(&input);
let mut headers: Vec<String> = vec![]; let mut headers: Vec<String> = vec![];
if args.rest.len() > 0 && args.header_row { if !args.rest.is_empty() && args.header_row {
yield Err(ShellError::labeled_error("Can not provide header names and use header row", "using header row", name)); return Err(ShellError::labeled_error(
return; "Can not provide header names and use header row",
"using header row",
name,
));
} }
if args.header_row { if args.header_row {
@ -79,18 +84,27 @@ pub fn pivot(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStre
if let Ok(s) = x.as_string() { if let Ok(s) = x.as_string() {
headers.push(s.to_string()); headers.push(s.to_string());
} else { } else {
yield Err(ShellError::labeled_error("Header row needs string headers", "used non-string headers", name)); return Err(ShellError::labeled_error(
return; "Header row needs string headers",
"used non-string headers",
name,
));
} }
} }
_ => { _ => {
yield Err(ShellError::labeled_error("Header row is incomplete and can't be used", "using incomplete header row", name)); return Err(ShellError::labeled_error(
return; "Header row is incomplete and can't be used",
"using incomplete header row",
name,
));
} }
} }
} else { } else {
yield Err(ShellError::labeled_error("Header row is incomplete and can't be used", "using incomplete header row", name)); return Err(ShellError::labeled_error(
return; "Header row is incomplete and can't be used",
"using incomplete header row",
name,
));
} }
} }
} else { } else {
@ -104,17 +118,20 @@ pub fn pivot(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStre
} }
let descs: Vec<_> = if args.header_row { let descs: Vec<_> = if args.header_row {
descs.iter().skip(1).collect() descs.into_iter().skip(1).collect()
} else { } else {
descs.iter().collect() descs
}; };
for desc in descs { Ok(futures::stream::iter(descs.into_iter().map(move |desc| {
let mut column_num: usize = 0; let mut column_num: usize = 0;
let mut dict = TaggedDictBuilder::new(&name); let mut dict = TaggedDictBuilder::new(&name);
if !args.ignore_titles && !args.header_row { if !args.ignore_titles && !args.header_row {
dict.insert_untagged(headers[column_num].clone(), UntaggedValue::string(desc.clone())); dict.insert_untagged(
headers[column_num].clone(),
UntaggedValue::string(desc.clone()),
);
column_num += 1 column_num += 1
} }
@ -130,13 +147,9 @@ pub fn pivot(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStre
column_num += 1; column_num += 1;
} }
yield ReturnSuccess::value(dict.into_value()); ReturnSuccess::value(dict.into_value())
} }))
.to_output_stream())
};
Ok(OutputStream::new(stream))
} }
#[cfg(test)] #[cfg(test)]

View File

@ -47,7 +47,6 @@ async fn reject(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputS
let registry = registry.clone(); let registry = registry.clone();
let name = args.call_info.name_tag.clone(); let name = args.call_info.name_tag.clone();
let (RejectArgs { rest: fields }, input) = args.process(&registry).await?; let (RejectArgs { rest: fields }, input) = args.process(&registry).await?;
if fields.is_empty() { if fields.is_empty() {
return Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Reject requires fields", "Reject requires fields",

View File

@ -38,7 +38,7 @@ impl WholeStreamCommand for Rename {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
rename(args, registry) rename(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -57,17 +57,20 @@ impl WholeStreamCommand for Rename {
} }
} }
pub fn rename(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { pub async fn rename(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let name = args.call_info.name_tag.clone(); let name = args.call_info.name_tag.clone();
let stream = async_stream! { let (Arguments { column_name, rest }, input) = args.process(&registry).await?;
let (Arguments { column_name, rest }, mut input) = args.process(&registry).await?;
let mut new_column_names = vec![vec![column_name]]; let mut new_column_names = vec![vec![column_name]];
new_column_names.push(rest); new_column_names.push(rest);
let new_column_names = new_column_names.into_iter().flatten().collect::<Vec<_>>(); let new_column_names = new_column_names.into_iter().flatten().collect::<Vec<_>>();
while let Some(item) = input.next().await { Ok(input
.map(move |item| {
if let Value { if let Value {
value: UntaggedValue::Row(row), value: UntaggedValue::Row(row),
tag, tag,
@ -87,21 +90,19 @@ pub fn rename(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStr
let out = UntaggedValue::Row(renamed_row.into()).into_value(tag); let out = UntaggedValue::Row(renamed_row.into()).into_value(tag);
yield ReturnSuccess::value(out); ReturnSuccess::value(out)
} else { } else {
yield ReturnSuccess::value( ReturnSuccess::value(
UntaggedValue::Error(ShellError::labeled_error( UntaggedValue::Error(ShellError::labeled_error(
"no column names available", "no column names available",
"can't rename", "can't rename",
&name, &name,
)) ))
.into_untagged_value(), .into_untagged_value(),
); )
} }
} })
}; .to_output_stream())
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -4,7 +4,7 @@ use crate::prelude::*;
use derive_new::new; use derive_new::new;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{hir::Block, ReturnSuccess, Signature, SyntaxShape}; use nu_protocol::{hir::Block, Signature, SyntaxShape};
#[derive(new, Clone)] #[derive(new, Clone)]
pub struct AliasCommand { pub struct AliasCommand {
@ -38,7 +38,6 @@ impl WholeStreamCommand for AliasCommand {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone();
let call_info = args.call_info.clone(); let call_info = args.call_info.clone();
let registry = registry.clone(); let registry = registry.clone();
let block = self.block.clone(); let block = self.block.clone();
@ -46,61 +45,26 @@ impl WholeStreamCommand for AliasCommand {
let mut context = Context::from_args(&args, &registry); let mut context = Context::from_args(&args, &registry);
let input = args.input; let input = args.input;
let stream = async_stream! {
let mut scope = call_info.scope.clone(); let mut scope = call_info.scope.clone();
let evaluated = call_info.evaluate(&registry).await?; let evaluated = call_info.evaluate(&registry).await?;
if let Some(positional) = &evaluated.args.positional { if let Some(positional) = &evaluated.args.positional {
for (pos, arg) in positional.iter().enumerate() { for (pos, arg) in positional.iter().enumerate() {
scope.vars.insert(alias_command.args[pos].to_string(), arg.clone()); scope
.vars
.insert(alias_command.args[pos].to_string(), arg.clone());
} }
} }
let result = run_block( // FIXME: we need to patch up the spans to point at the top-level error
Ok(run_block(
&block, &block,
&mut context, &mut context,
input, input,
&scope.it, &scope.it,
&scope.vars, &scope.vars,
&scope.env, &scope.env,
).await; )
.await?
match result { .to_output_stream())
Ok(stream) if stream.is_empty() => {
yield Err(ShellError::labeled_error(
"Expected a block",
"alias needs a block",
tag,
));
}
Ok(mut stream) => {
// We collect first to ensure errors are put into the context
while let Some(result) = stream.next().await {
yield Ok(ReturnSuccess::Value(result));
}
let errors = context.get_errors();
if let Some(x) = errors.first() {
yield Err(ShellError::labeled_error_with_secondary(
"Alias failed to run",
"alias failed to run",
tag.clone(),
x.to_string(),
tag
));
}
}
Err(e) => {
yield Err(ShellError::labeled_error_with_secondary(
"Alias failed to run",
"alias failed to run",
tag.clone(),
e.to_string(),
tag
));
}
}
};
Ok(stream.to_output_stream())
} }
} }

View File

@ -9,7 +9,7 @@ use std::path::PathBuf;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::hir::{Expression, ExternalArgs, ExternalCommand, Literal, SpannedExpression}; use nu_protocol::hir::{Expression, ExternalArgs, ExternalCommand, Literal, SpannedExpression};
use nu_protocol::{ReturnSuccess, Signature, SyntaxShape}; use nu_protocol::{Signature, SyntaxShape};
use nu_source::Tagged; use nu_source::Tagged;
#[derive(Deserialize)] #[derive(Deserialize)]
@ -99,7 +99,6 @@ impl WholeStreamCommand for RunExternalCommand {
let is_interactive = self.interactive; let is_interactive = self.interactive;
let stream = async_stream! {
let command = ExternalCommand { let command = ExternalCommand {
name, name,
name_tag: args.call_info.name_tag.clone(), name_tag: args.call_info.name_tag.clone(),
@ -117,51 +116,32 @@ impl WholeStreamCommand for RunExternalCommand {
path: Some(Tagged { path: Some(Tagged {
item: PathBuf::from(path), item: PathBuf::from(path),
tag: args.call_info.name_tag.clone(), tag: args.call_info.name_tag.clone(),
}) }),
}; };
let result = external_context.shell_manager.cd(cd_args, args.call_info.name_tag.clone()); let result = external_context
.shell_manager
.cd(cd_args, args.call_info.name_tag.clone());
match result { match result {
Ok(mut stream) => { Ok(stream) => return Ok(stream.to_output_stream()),
while let Some(value) = stream.next().await {
yield value;
}
},
Err(e) => { Err(e) => {
yield Err(e); return Err(e);
}, }
_ => {}
} }
return;
} }
} }
let scope = args.call_info.scope.clone(); let scope = args.call_info.scope.clone();
let is_last = args.call_info.args.is_last; let is_last = args.call_info.args.is_last;
let input = args.input; let input = args.input;
let result = external::run_external_command( let result =
command, external::run_external_command(command, &mut external_context, input, &scope, is_last)
&mut external_context, .await;
input,
&scope,
is_last,
).await;
match result { match result {
Ok(mut stream) => { Ok(stream) => Ok(stream.to_output_stream()),
while let Some(value) = stream.next().await { Err(e) => Err(e),
yield Ok(ReturnSuccess::Value(value));
} }
},
Err(e) => {
yield Err(e);
},
_ => {}
}
};
Ok(stream.to_output_stream())
} }
} }

View File

@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand;
use crate::context::CommandRegistry; use crate::context::CommandRegistry;
use crate::prelude::*; use crate::prelude::*;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ReturnSuccess, ReturnValue, Value}; use nu_protocol::{ReturnSuccess, Value};
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use rand::thread_rng; use rand::thread_rng;
@ -24,28 +24,20 @@ impl WholeStreamCommand for Shuffle {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
shuffle(args, registry) shuffle(args, registry).await
} }
} }
fn shuffle(args: CommandArgs, _registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn shuffle(
let stream = async_stream! { args: CommandArgs,
let mut input = args.input; _registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let input = args.input;
let mut values: Vec<Value> = input.collect().await; let mut values: Vec<Value> = input.collect().await;
let out = {
values.shuffle(&mut thread_rng()); values.shuffle(&mut thread_rng());
values.clone()
};
for val in out.into_iter() { Ok(futures::stream::iter(values.into_iter().map(ReturnSuccess::value)).to_output_stream())
yield ReturnSuccess::value(val);
}
};
let stream: BoxStream<'static, ReturnValue> = stream.boxed();
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand;
use crate::prelude::*; use crate::prelude::*;
use crate::utils::data_processing::{reducer_for, Reduce}; use crate::utils::data_processing::{reducer_for, Reduce};
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{Dictionary, ReturnSuccess, ReturnValue, Signature, UntaggedValue, Value}; use nu_protocol::{Dictionary, ReturnSuccess, Signature, UntaggedValue, Value};
use num_traits::identities::Zero; use num_traits::identities::Zero;
use indexmap::map::IndexMap; use indexmap::map::IndexMap;
@ -38,6 +38,7 @@ impl WholeStreamCommand for Sum {
name: args.call_info.name_tag, name: args.call_info.name_tag,
raw_input: args.raw_input, raw_input: args.raw_input,
}) })
.await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -56,27 +57,25 @@ impl WholeStreamCommand for Sum {
} }
} }
fn sum(RunnableContext { mut input, .. }: RunnableContext) -> Result<OutputStream, ShellError> { async fn sum(
let stream = async_stream! { RunnableContext { mut input, .. }: RunnableContext,
let mut values: Vec<Value> = input.drain_vec().await; ) -> Result<OutputStream, ShellError> {
let values: Vec<Value> = input.drain_vec().await;
let action = reducer_for(Reduce::Sum); let action = reducer_for(Reduce::Sum);
if values.iter().all(|v| if let UntaggedValue::Primitive(_) = v.value {true} else {false}) { if values.iter().all(|v| v.is_primitive()) {
let total = action(Value::zero(), values)?; let total = action(Value::zero(), values)?;
yield ReturnSuccess::value(total) Ok(OutputStream::one(ReturnSuccess::value(total)))
} else { } else {
let mut column_values = IndexMap::new(); let mut column_values = IndexMap::new();
for value in values { for value in values {
match value.value { if let UntaggedValue::Row(row_dict) = value.value {
UntaggedValue::Row(row_dict) => {
for (key, value) in row_dict.entries.iter() { for (key, value) in row_dict.entries.iter() {
column_values column_values
.entry(key.clone()) .entry(key.clone())
.and_modify(|v: &mut Vec<Value>| v.push(value.clone())) .and_modify(|v: &mut Vec<Value>| v.push(value.clone()))
.or_insert(vec![value.clone()]); .or_insert(vec![value.clone()]);
} }
},
table => {},
}; };
} }
@ -86,18 +85,17 @@ fn sum(RunnableContext { mut input, .. }: RunnableContext) -> Result<OutputStrea
match sum { match sum {
Ok(value) => { Ok(value) => {
column_totals.insert(col_name, value); column_totals.insert(col_name, value);
}, }
Err(err) => yield Err(err), Err(err) => return Err(err),
}; };
} }
yield ReturnSuccess::value( Ok(OutputStream::one(ReturnSuccess::value(
UntaggedValue::Row(Dictionary {entries: column_totals}).into_untagged_value()) UntaggedValue::Row(Dictionary {
entries: column_totals,
})
.into_untagged_value(),
)))
} }
};
let stream: BoxStream<'static, ReturnValue> = stream.boxed();
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -26,14 +26,10 @@ impl WholeStreamCommand for To {
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { Ok(OutputStream::one(ReturnSuccess::value(
yield Ok(ReturnSuccess::Value(
UntaggedValue::string(crate::commands::help::get_help(&To, &registry)) UntaggedValue::string(crate::commands::help::get_help(&To, &registry))
.into_value(Tag::unknown()), .into_value(Tag::unknown()),
)); )))
};
Ok(stream.to_output_stream())
} }
} }