Batch of moving commands off async_stream (#1917)

This commit is contained in:
Jonathan Turner 2020-05-30 16:34:39 +12:00 committed by GitHub
parent b84ff99e7f
commit 092ee127ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 101 additions and 110 deletions

View File

@ -29,26 +29,27 @@ impl WholeStreamCommand for Debug {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
debug_value(args, registry)
debug_value(args, registry).await
}
}
fn debug_value(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn debug_value(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (DebugArgs { raw }, mut input) = args.process(&registry).await?;
while let Some(v) = input.next().await {
let (DebugArgs { raw }, input) = args.process(&registry).await?;
Ok(input
.map(move |v| {
if raw {
yield ReturnSuccess::value(
ReturnSuccess::value(
UntaggedValue::string(format!("{:#?}", v)).into_untagged_value(),
);
)
} else {
yield ReturnSuccess::debug_value(v);
ReturnSuccess::debug_value(v)
}
}
};
Ok(stream.to_output_stream())
})
.to_output_stream())
}
#[cfg(test)]

View File

@ -39,7 +39,7 @@ impl WholeStreamCommand for Default {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
default(args, registry)
default(args, registry).await
}
fn examples(&self) -> Vec<Example> {
@ -51,11 +51,15 @@ impl WholeStreamCommand for Default {
}
}
fn default(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn default(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (DefaultArgs { column, value }, mut input) = args.process(&registry).await?;
while let Some(item) = input.next().await {
let (DefaultArgs { column, value }, input) = args.process(&registry).await?;
Ok(input
.map(move |item| {
let should_add = match item {
Value {
value: UntaggedValue::Row(ref r),
@ -66,17 +70,14 @@ fn default(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream
if should_add {
match item.insert_data_at_path(&column.item, value.clone()) {
Some(new_value) => yield ReturnSuccess::value(new_value),
None => yield ReturnSuccess::value(item),
Some(new_value) => ReturnSuccess::value(new_value),
None => ReturnSuccess::value(item),
}
} else {
yield ReturnSuccess::value(item);
ReturnSuccess::value(item)
}
}
};
Ok(stream.to_output_stream())
})
.to_output_stream())
}
#[cfg(test)]

View File

@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand;
use crate::context::CommandRegistry;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value};
use nu_protocol::{Signature, SyntaxShape, UntaggedValue};
use nu_source::Tagged;
pub struct Drop;
@ -35,7 +35,20 @@ impl WholeStreamCommand for Drop {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
drop(args, registry)
let (DropArgs { rows }, input) = args.process(&registry).await?;
let mut v: Vec<_> = input.into_vec().await;
let rows_to_drop = if let Some(quantity) = rows {
*quantity as usize
} else {
1
};
for _ in 0..rows_to_drop {
v.pop();
}
Ok(futures::stream::iter(v).to_output_stream())
}
fn examples(&self) -> Vec<Example> {
@ -57,29 +70,6 @@ impl WholeStreamCommand for Drop {
}
}
fn drop(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (DropArgs { rows }, mut input) = args.process(&registry).await?;
let v: Vec<_> = input.into_vec().await;
let rows_to_drop = if let Some(quantity) = rows {
*quantity as usize
} else {
1
};
if rows_to_drop < v.len() {
let k = v.len() - rows_to_drop;
for x in v[0..k].iter() {
let y: Value = x.clone();
yield ReturnSuccess::value(y)
}
}
};
Ok(stream.to_output_stream())
}
#[cfg(test)]
mod tests {
use super::Drop;

View File

@ -78,7 +78,7 @@ impl WholeStreamCommand for Du {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
du(args, registry)
du(args, registry).await
}
fn examples(&self) -> Vec<Example> {
@ -90,14 +90,13 @@ impl WholeStreamCommand for Du {
}
}
fn du(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn du(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let tag = args.call_info.name_tag.clone();
let ctrl_c = args.ctrl_c.clone();
let ctrl_c_copy = ctrl_c.clone();
let stream = async_stream! {
let (args, mut input): (DuArgs, _) = args.process(&registry).await?;
let (args, _): (DuArgs, _) = args.process(&registry).await?;
let exclude = args.exclude.map_or(Ok(None), move |x| {
Pattern::new(&x.item)
.map(Option::Some)
@ -139,27 +138,27 @@ fn du(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, She
all,
};
let mut inp = futures::stream::iter(paths).interruptible(ctrl_c.clone());
let inp = futures::stream::iter(paths);
while let Some(path) = inp.next().await {
match path {
Ok(inp
.flat_map(move |path| match path {
Ok(p) => {
let mut output = vec![];
if p.is_dir() {
yield Ok(ReturnSuccess::Value(
output.push(Ok(ReturnSuccess::Value(
DirInfo::new(p, &params, max_depth, ctrl_c.clone()).into(),
))
)));
} else {
for v in FileInfo::new(p, deref, tag.clone()).into_iter() {
yield Ok(ReturnSuccess::Value(v.into()));
output.push(Ok(ReturnSuccess::Value(v.into())));
}
}
futures::stream::iter(output)
}
Err(e) => yield Err(e),
}
}
};
Ok(stream.interruptible(ctrl_c_copy).to_output_stream())
Err(e) => futures::stream::iter(vec![Err(e)]),
})
.interruptible(ctrl_c_copy)
.to_output_stream())
}
pub struct DirBuilder {