Evaluation of command arguments (#1801)

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* Finish adding the baseline refactors for argument invocation

* Finish cleanup and add test

* Add missing plugin references
This commit is contained in:
Jonathan Turner
2020-05-15 20:18:24 -07:00
committed by GitHub
parent 822440d5ff
commit 076fde16dd
139 changed files with 2496 additions and 2188 deletions

553
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -30,6 +30,7 @@ nu_plugin_binaryview = { version = "0.14.1", path = "./crates/nu_plugin_binaryvi
nu_plugin_fetch = { version = "0.14.1", path = "./crates/nu_plugin_fetch", optional=true }
nu_plugin_inc = { version = "0.14.1", path = "./crates/nu_plugin_inc", optional=true }
nu_plugin_match = { version = "0.14.1", path = "./crates/nu_plugin_match", optional=true }
nu_plugin_parse = { version = "0.14.1", path = "./crates/nu_plugin_parse", optional=true }
nu_plugin_post = { version = "0.14.1", path = "./crates/nu_plugin_post", optional=true }
nu_plugin_ps = { version = "0.14.1", path = "./crates/nu_plugin_ps", optional=true }
nu_plugin_start = { version = "0.1.0", path = "./crates/nu_plugin_start", optional=true }
@ -63,7 +64,7 @@ nu-build = { version = "0.14.1", path = "./crates/nu-build" }
test-bins = []
default = ["sys", "ps", "textview", "inc", "str"]
stable = ["default", "starship-prompt", "binaryview", "match", "tree", "average", "post", "fetch", "clipboard-cli", "trash-support", "start"]
stable = ["default", "starship-prompt", "binaryview", "match", "tree", "average", "parse", "post", "fetch", "clipboard-cli", "trash-support", "start"]
# Default
textview = ["crossterm", "syntect", "url", "nu_plugin_textview"]
@ -77,6 +78,7 @@ average = ["nu_plugin_average"]
binaryview = ["nu_plugin_binaryview"]
fetch = ["nu_plugin_fetch"]
match = ["nu_plugin_match"]
parse = ["nu_plugin_parse"]
post = ["nu_plugin_post"]
trace = ["nu-parser/trace"]
tree = ["nu_plugin_tree"]
@ -160,6 +162,11 @@ name = "nu_plugin_stable_match"
path = "src/plugins/nu_plugin_stable_match.rs"
required-features = ["match"]
[[bin]]
name = "nu_plugin_stable_parse"
path = "src/plugins/nu_plugin_stable_parse.rs"
required-features = ["parse"]
[[bin]]
name = "nu_plugin_stable_post"
path = "src/plugins/nu_plugin_stable_post.rs"

View File

@ -20,6 +20,8 @@ nu-test-support = { version = "0.14.1", path = "../nu-test-support" }
ansi_term = "0.12.1"
app_dirs = "1.2.1"
async-recursion = "0.3.1"
directories = "2.0.2"
async-stream = "0.2"
base64 = "0.12.0"

View File

@ -284,7 +284,6 @@ pub fn create_default_context(
whole_stream_command(Lines),
whole_stream_command(Trim),
whole_stream_command(Echo),
whole_stream_command(Parse),
// Column manipulation
whole_stream_command(Reject),
whole_stream_command(Select),

View File

@ -71,7 +71,6 @@ pub(crate) mod mv;
pub(crate) mod next;
pub(crate) mod nth;
pub(crate) mod open;
pub(crate) mod parse;
pub(crate) mod pivot;
pub(crate) mod plugin;
pub(crate) mod prepend;
@ -199,7 +198,6 @@ pub(crate) use mv::Move;
pub(crate) use next::Next;
pub(crate) use nth::Nth;
pub(crate) use open::Open;
pub(crate) use parse::Parse;
pub(crate) use pivot::Pivot;
pub(crate) use prepend::Prepend;
pub(crate) use prev::Previous;

View File

@ -39,7 +39,8 @@ impl WholeStreamCommand for Alias {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, alias)?.run()
//args.process(registry, alias)?.run()
alias(args, registry)
}
fn examples(&self) -> &[Example] {
@ -56,15 +57,10 @@ impl WholeStreamCommand for Alias {
}
}
pub fn alias(
AliasArgs {
name,
args: list,
block,
}: AliasArgs,
_: RunnableContext,
) -> Result<OutputStream, ShellError> {
pub fn alias(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (AliasArgs { name, args: list, block }, _) = args.process(&registry).await?;
let mut args: Vec<String> = vec![];
for item in list.iter() {
if let Ok(string) = item.as_string() {

View File

@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand;
use crate::context::CommandRegistry;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{Signature, SyntaxShape, Value};
use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, Value};
#[derive(Deserialize)]
struct AppendArgs {
@ -33,7 +33,7 @@ impl WholeStreamCommand for Append {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, append)?.run()
append(args, registry)
}
fn examples(&self) -> &[Example] {
@ -44,13 +44,17 @@ impl WholeStreamCommand for Append {
}
}
fn append(
AppendArgs { row }: AppendArgs,
RunnableContext { input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let mut after: VecDeque<Value> = VecDeque::new();
after.push_back(row);
let after = futures::stream::iter(after);
fn append(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
Ok(OutputStream::from_input(input.chain(after)))
let stream = async_stream! {
let (AppendArgs { row }, mut input) = args.process(&registry).await?;
while let Some(item) = input.next().await {
yield ReturnSuccess::value(item);
}
yield ReturnSuccess::value(row);
};
Ok(stream.to_output_stream())
}

View File

@ -5,7 +5,7 @@ use nu_protocol::Dictionary;
use crate::commands::{command::EvaluatedWholeStreamCommandArgs, WholeStreamCommand};
use indexmap::IndexMap;
use nu_protocol::{Signature, SyntaxShape, UntaggedValue, Value};
use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value};
pub struct Cal;
@ -59,7 +59,9 @@ impl WholeStreamCommand for Cal {
}
pub fn cal(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let mut calendar_vec_deque = VecDeque::new();
let tag = args.call_info.name_tag.clone();
@ -104,7 +106,12 @@ pub fn cal(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream
);
}
Ok(futures::stream::iter(calendar_vec_deque).to_output_stream())
for item in calendar_vec_deque {
yield ReturnSuccess::value(item);
}
};
Ok(stream.to_output_stream())
}
fn get_current_date() -> (i32, u32, u32) {

View File

@ -5,9 +5,6 @@ use nu_protocol::{Primitive, ReturnSuccess, UntaggedValue, Value};
pub struct Calc;
#[derive(Deserialize)]
pub struct CalcArgs {}
impl WholeStreamCommand for Calc {
fn name(&self) -> &str {
"calc"
@ -22,7 +19,7 @@ impl WholeStreamCommand for Calc {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, calc)?.run()
calc(args, registry)
}
fn examples(&self) -> &[Example] {
@ -33,30 +30,31 @@ impl WholeStreamCommand for Calc {
}
}
pub fn calc(
_: CalcArgs,
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
Ok(input
.map(move |input| {
pub fn calc(args: CommandArgs, _registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let stream = async_stream! {
let mut input = args.input;
let name = args.call_info.name_tag.clone();
while let Some(input) = input.next().await {
if let Ok(string) = input.as_string() {
match parse(&string, &input.tag) {
Ok(value) => ReturnSuccess::value(value),
Err(err) => Err(ShellError::labeled_error(
Ok(value) => yield ReturnSuccess::value(value),
Err(err) => yield Err(ShellError::labeled_error(
"Calculation error",
err,
&input.tag.span,
)),
}
} else {
Err(ShellError::labeled_error(
yield Err(ShellError::labeled_error(
"Expected a string from pipeline",
"requires string input",
name.clone(),
))
}
})
.to_output_stream())
}
};
Ok(stream.to_output_stream())
}
pub fn parse(math_expression: &str, tag: impl Into<Tag>) -> Result<Value, String> {

View File

@ -36,7 +36,7 @@ impl WholeStreamCommand for Cd {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, cd)?.run()
cd(args, registry)
}
fn examples(&self) -> &[Example] {
@ -61,6 +61,18 @@ impl WholeStreamCommand for Cd {
}
}
fn cd(args: CdArgs, context: RunnableContext) -> Result<OutputStream, ShellError> {
context.shell_manager.cd(args, &context)
fn cd(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let name = args.call_info.name_tag.clone();
let shell_manager = args.shell_manager.clone();
let (args, _): (CdArgs, _) = args.process(&registry).await?;
let mut result = shell_manager.cd(args, name)?;
while let Some(item) = result.next().await {
yield item;
}
};
Ok(stream.to_output_stream())
}

View File

@ -78,7 +78,7 @@ async fn run_pipeline(
}
(Some(ClassifiedCommand::Expr(expr)), _) => {
run_expression_block(*expr, ctx, input, scope)?
run_expression_block(*expr, ctx, input, scope).await?
}
(Some(ClassifiedCommand::Error(err)), _) => return Err(err.into()),
(_, Some(ClassifiedCommand::Error(err))) => return Err(err.clone().into()),

View File

@ -8,7 +8,7 @@ use nu_errors::ShellError;
use nu_protocol::hir::SpannedExpression;
use nu_protocol::Scope;
pub(crate) fn run_expression_block(
pub(crate) async fn run_expression_block(
expr: SpannedExpression,
context: &mut Context,
_input: InputStream,
@ -21,7 +21,7 @@ pub(crate) fn run_expression_block(
let scope = scope.clone();
let registry = context.registry().clone();
let output = evaluate_baseline_expr(&expr, &registry, &scope)?;
let output = evaluate_baseline_expr(&expr, &registry, &scope).await?;
Ok(once(async { Ok(output) }).to_input_stream())
}

View File

@ -99,10 +99,10 @@ pub(crate) async fn run_external_command(
));
}
run_with_stdin(command, context, input, scope, is_last)
run_with_stdin(command, context, input, scope, is_last).await
}
fn run_with_stdin(
async fn run_with_stdin(
command: ExternalCommand,
context: &mut Context,
input: InputStream,
@ -115,7 +115,7 @@ fn run_with_stdin(
let mut command_args = vec![];
for arg in command.args.iter() {
let value = evaluate_baseline_expr(arg, &context.registry, scope)?;
let value = evaluate_baseline_expr(arg, &context.registry, scope).await?;
// Skip any arguments that don't really exist, treating them as optional
// FIXME: we may want to preserve the gap in the future, though it's hard to say
// what value we would put in its place.

View File

@ -11,9 +11,6 @@ pub mod clipboard {
pub struct Clip;
#[derive(Deserialize)]
pub struct ClipArgs {}
impl WholeStreamCommand for Clip {
fn name(&self) -> &str {
"clip"
@ -32,7 +29,7 @@ pub mod clipboard {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, clip)?.run()
clip(args, registry)
}
fn examples(&self) -> &[Example] {
@ -44,10 +41,12 @@ pub mod clipboard {
}
pub fn clip(
ClipArgs {}: ClipArgs,
RunnableContext { input, name, .. }: RunnableContext,
args: CommandArgs,
_registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let stream = async_stream! {
let mut input = args.input;
let name = args.call_info.name_tag.clone();
let values: Vec<Value> = input.collect().await;
let mut clip_stream = inner_clip(values, name).await;

View File

@ -7,7 +7,7 @@ use derive_new::new;
use getset::Getters;
use nu_errors::ShellError;
use nu_protocol::hir;
use nu_protocol::{CallInfo, EvaluatedArgs, ReturnValue, Scope, Signature, Value};
use nu_protocol::{CallInfo, EvaluatedArgs, ReturnSuccess, Scope, Signature, UntaggedValue, Value};
use serde::{Deserialize, Serialize};
use std::ops::Deref;
use std::sync::atomic::AtomicBool;
@ -20,8 +20,8 @@ pub struct UnevaluatedCallInfo {
}
impl UnevaluatedCallInfo {
pub fn evaluate(self, registry: &CommandRegistry) -> Result<CallInfo, ShellError> {
let args = evaluate_args(&self.args, registry, &self.scope)?;
pub async fn evaluate(self, registry: &CommandRegistry) -> Result<CallInfo, ShellError> {
let args = evaluate_args(&self.args, registry, &self.scope).await?;
Ok(CallInfo {
args,
@ -29,14 +29,14 @@ impl UnevaluatedCallInfo {
})
}
pub fn evaluate_with_new_it(
pub async fn evaluate_with_new_it(
self,
registry: &CommandRegistry,
it: &Value,
) -> Result<CallInfo, ShellError> {
let mut scope = self.scope.clone();
scope = scope.set_it(it.clone());
let args = evaluate_args(&self.args, registry, &scope)?;
let args = evaluate_args(&self.args, registry, &scope).await?;
Ok(CallInfo {
args,
@ -87,7 +87,7 @@ impl std::fmt::Debug for CommandArgs {
}
impl CommandArgs {
pub fn evaluate_once(
pub async fn evaluate_once(
self,
registry: &CommandRegistry,
) -> Result<EvaluatedWholeStreamCommandArgs, ShellError> {
@ -95,7 +95,7 @@ impl CommandArgs {
let ctrl_c = self.ctrl_c.clone();
let shell_manager = self.shell_manager.clone();
let input = self.input;
let call_info = self.call_info.evaluate(registry)?;
let call_info = self.call_info.evaluate(registry).await?;
Ok(EvaluatedWholeStreamCommandArgs::new(
host,
@ -106,7 +106,7 @@ impl CommandArgs {
))
}
pub fn evaluate_once_with_scope(
pub async fn evaluate_once_with_scope(
self,
registry: &CommandRegistry,
scope: &Scope,
@ -120,7 +120,7 @@ impl CommandArgs {
args: self.call_info.args,
scope: scope.clone(),
};
let call_info = call_info.evaluate(registry)?;
let call_info = call_info.evaluate(registry).await?;
Ok(EvaluatedWholeStreamCommandArgs::new(
host,
@ -131,69 +131,16 @@ impl CommandArgs {
))
}
pub fn process<'de, T: Deserialize<'de>, O: ToOutputStream>(
pub async fn process<'de, T: Deserialize<'de>>(
self,
registry: &CommandRegistry,
callback: fn(T, RunnableContext) -> Result<O, ShellError>,
) -> Result<RunnableArgs<T, O>, ShellError> {
let shell_manager = self.shell_manager.clone();
let host = self.host.clone();
let ctrl_c = self.ctrl_c.clone();
let args = self.evaluate_once(registry)?;
let call_info = args.call_info.clone();
let (input, args) = args.split();
let name_tag = args.call_info.name_tag;
let mut deserializer = ConfigDeserializer::from_call_info(call_info);
Ok(RunnableArgs {
args: T::deserialize(&mut deserializer)?,
context: RunnableContext {
input,
registry: registry.clone(),
shell_manager,
name: name_tag,
host,
ctrl_c,
},
callback,
})
}
pub fn process_raw<'de, T: Deserialize<'de>>(
self,
registry: &CommandRegistry,
callback: fn(T, RunnableContext, RawCommandArgs) -> Result<OutputStream, ShellError>,
) -> Result<RunnableRawArgs<T>, ShellError> {
let raw_args = RawCommandArgs {
host: self.host.clone(),
ctrl_c: self.ctrl_c.clone(),
shell_manager: self.shell_manager.clone(),
call_info: self.call_info.clone(),
};
let shell_manager = self.shell_manager.clone();
let host = self.host.clone();
let ctrl_c = self.ctrl_c.clone();
let args = self.evaluate_once(registry)?;
) -> Result<(T, InputStream), ShellError> {
let args = self.evaluate_once(registry).await?;
let call_info = args.call_info.clone();
let (input, args) = args.split();
let name_tag = args.call_info.name_tag;
let mut deserializer = ConfigDeserializer::from_call_info(call_info);
Ok(RunnableRawArgs {
args: T::deserialize(&mut deserializer)?,
context: RunnableContext {
input,
registry: registry.clone(),
shell_manager,
name: name_tag,
host,
ctrl_c,
},
raw_args,
callback,
})
Ok((T::deserialize(&mut deserializer)?, args.input))
}
}
@ -212,34 +159,6 @@ impl RunnableContext {
}
}
pub struct RunnableArgs<T, O: ToOutputStream> {
args: T,
context: RunnableContext,
callback: fn(T, RunnableContext) -> Result<O, ShellError>,
}
impl<T, O: ToOutputStream> RunnableArgs<T, O> {
pub fn run(self) -> Result<OutputStream, ShellError> {
(self.callback)(self.args, self.context).map(|v| v.to_output_stream())
}
}
pub struct RunnableRawArgs<T> {
args: T,
raw_args: RawCommandArgs,
context: RunnableContext,
callback: fn(T, RunnableContext, RawCommandArgs) -> Result<OutputStream, ShellError>,
}
impl<T> RunnableRawArgs<T> {
pub fn run(self) -> OutputStream {
match (self.callback)(self.args, self.context, self.raw_args) {
Ok(stream) => stream,
Err(err) => OutputStream::one(Err(err)),
}
}
}
pub struct EvaluatedWholeStreamCommandArgs {
pub args: EvaluatedCommandArgs,
pub input: InputStream,
@ -416,7 +335,12 @@ impl Command {
pub fn run(&self, args: CommandArgs, registry: &CommandRegistry) -> OutputStream {
if args.call_info.switch_present("help") {
get_help(&*self.0, registry).into()
let cl = self.0.clone();
let registry = registry.clone();
let stream = async_stream! {
yield Ok(ReturnSuccess::Value(UntaggedValue::string(get_help(&*cl, &registry)).into_value(Tag::unknown())));
};
stream.to_output_stream()
} else {
match self.0.run(args, registry) {
Ok(stream) => stream,
@ -458,17 +382,18 @@ impl WholeStreamCommand for FnFilterCommand {
ctrl_c,
shell_manager,
call_info,
input,
mut input,
} = args;
let host: Arc<parking_lot::Mutex<dyn Host>> = host.clone();
let registry: CommandRegistry = registry.clone();
let func = self.func;
let result = input.map(move |it| {
let stream = async_stream! {
while let Some(it) = input.next().await {
let registry = registry.clone();
let call_info = match call_info.clone().evaluate_with_new_it(&registry, &it) {
Err(err) => return OutputStream::from(vec![Err(err)]).values,
let call_info = match call_info.clone().evaluate_with_new_it(&registry, &it).await {
Err(err) => { yield Err(err); return; },
Ok(args) => args,
};
@ -480,15 +405,17 @@ impl WholeStreamCommand for FnFilterCommand {
);
match func(args) {
Err(err) => OutputStream::from(vec![Err(err)]).values,
Ok(stream) => stream.values,
Err(err) => yield Err(err),
Ok(mut stream) => {
while let Some(value) = stream.values.next().await {
yield value;
}
});
}
}
}
};
let result = result.flatten();
let result: BoxStream<ReturnValue> = result.boxed();
Ok(result.into())
Ok(stream.to_output_stream())
}
}

View File

@ -3,7 +3,7 @@ use crate::context::CommandRegistry;
use crate::prelude::*;
use futures::stream::StreamExt;
use nu_errors::ShellError;
use nu_protocol::{Signature, SyntaxShape, UntaggedValue, Value};
use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value};
use nu_source::Tagged;
pub struct Compact;
@ -31,7 +31,7 @@ impl WholeStreamCommand for Compact {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, compact)?.run()
compact(args, registry)
}
fn examples(&self) -> &[Example] {
@ -42,27 +42,29 @@ impl WholeStreamCommand for Compact {
}
}
pub fn compact(
CompactArgs { rest: columns }: CompactArgs,
RunnableContext { input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let objects = input.filter(move |item| {
let keep = if columns.is_empty() {
item.is_some()
pub fn compact(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (CompactArgs { rest: columns }, mut input) = args.process(&registry).await?;
while let Some(item) = input.next().await {
if columns.is_empty() {
if !item.is_empty() {
yield ReturnSuccess::value(item);
}
} else {
match item {
Value {
value: UntaggedValue::Row(ref r),
..
} => columns
} => if columns
.iter()
.all(|field| r.get_data(field).borrow().is_some()),
_ => false,
.all(|field| r.get_data(field).borrow().is_some()) {
yield ReturnSuccess::value(item);
}
_ => {},
}
};
futures::future::ready(keep)
});
Ok(objects.from_input_stream())
}
};
Ok(stream.to_output_stream())
}

View File

@ -70,7 +70,7 @@ impl WholeStreamCommand for Config {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, config)?.run()
config(args, registry)
}
fn examples(&self) -> &[Example] {
@ -107,8 +107,13 @@ impl WholeStreamCommand for Config {
}
}
pub fn config(
ConfigArgs {
pub fn config(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let name_span = args.call_info.name_tag.clone();
let name = args.call_info.name_tag.clone();
let registry = registry.clone();
let stream = async_stream! {
let (ConfigArgs {
load,
set,
set_into,
@ -116,12 +121,8 @@ pub fn config(
clear,
remove,
path,
}: ConfigArgs,
RunnableContext { name, input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let name_span = name.clone();
}, mut input) = args.process(&registry).await?;
let stream = async_stream! {
let configuration = if let Some(supplied) = load {
Some(supplied.item().clone())
} else {

View File

@ -7,9 +7,6 @@ use nu_protocol::{ReturnSuccess, Signature, UntaggedValue, Value};
pub struct Count;
#[derive(Deserialize)]
pub struct CountArgs {}
impl WholeStreamCommand for Count {
fn name(&self) -> &str {
"count"
@ -28,7 +25,7 @@ impl WholeStreamCommand for Count {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, count)?.run()
count(args, registry)
}
fn examples(&self) -> &[Example] {
@ -39,12 +36,10 @@ impl WholeStreamCommand for Count {
}
}
pub fn count(
CountArgs {}: CountArgs,
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
pub fn count(args: CommandArgs, _registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let stream = async_stream! {
let rows: Vec<Value> = input.collect().await;
let name = args.call_info.name_tag.clone();
let rows: Vec<Value> = args.input.collect().await;
yield ReturnSuccess::value(UntaggedValue::int(rows.len()).into_value(name))
};

View File

@ -40,7 +40,7 @@ impl WholeStreamCommand for Cpy {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, cp)?.run()
cp(args, registry)
}
fn examples(&self) -> &[Example] {
@ -57,7 +57,18 @@ impl WholeStreamCommand for Cpy {
}
}
pub fn cp(args: CopyArgs, context: RunnableContext) -> Result<OutputStream, ShellError> {
let shell_manager = context.shell_manager.clone();
shell_manager.cp(args, &context)
pub fn cp(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let shell_manager = args.shell_manager.clone();
let name = args.call_info.name_tag.clone();
let (args, _) = args.process(&registry).await?;
let mut result = shell_manager.cp(args, name)?;
while let Some(item) = result.next().await {
yield item;
}
};
Ok(stream.to_output_stream())
}

View File

@ -7,7 +7,7 @@ use crate::commands::WholeStreamCommand;
use chrono::{Datelike, TimeZone, Timelike};
use core::fmt::Display;
use indexmap::IndexMap;
use nu_protocol::{Signature, UntaggedValue};
use nu_protocol::{ReturnSuccess, Signature, UntaggedValue};
pub struct Date;
@ -89,9 +89,10 @@ where
}
pub fn date(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let mut date_out = VecDeque::new();
let tag = args.call_info.name_tag.clone();
let value = if args.has("utc") {
@ -102,7 +103,8 @@ pub fn date(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStrea
date_to_value(local, tag)
};
date_out.push_back(value);
yield ReturnSuccess::value(value);
};
Ok(futures::stream::iter(date_out).to_output_stream())
Ok(stream.to_output_stream())
}

View File

@ -28,23 +28,24 @@ impl WholeStreamCommand for Debug {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, debug_value)?.run()
debug_value(args, registry)
}
}
fn debug_value(
DebugArgs { raw }: DebugArgs,
RunnableContext { input, .. }: RunnableContext,
) -> Result<impl ToOutputStream, ShellError> {
Ok(input
.map(move |v| {
fn debug_value(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (DebugArgs { raw }, mut input) = args.process(&registry).await?;
while let Some(v) = input.next().await {
if raw {
ReturnSuccess::value(
yield ReturnSuccess::value(
UntaggedValue::string(format!("{:#?}", v)).into_untagged_value(),
)
);
} else {
ReturnSuccess::debug_value(v)
yield ReturnSuccess::debug_value(v);
}
})
.to_output_stream())
}
};
Ok(stream.to_output_stream())
}

View File

@ -38,7 +38,7 @@ impl WholeStreamCommand for Default {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, default)?.run()
default(args, registry)
}
fn examples(&self) -> &[Example] {
@ -49,14 +49,11 @@ impl WholeStreamCommand for Default {
}
}
fn default(
DefaultArgs { column, value }: DefaultArgs,
RunnableContext { input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let stream = input
.map(move |item| {
let mut result = VecDeque::new();
fn default(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (DefaultArgs { column, value }, mut input) = args.process(&registry).await?;
while let Some(item) = input.next().await {
let should_add = match item {
Value {
value: UntaggedValue::Row(ref r),
@ -67,16 +64,15 @@ fn default(
if should_add {
match item.insert_data_at_path(&column.item, value.clone()) {
Some(new_value) => result.push_back(ReturnSuccess::value(new_value)),
None => result.push_back(ReturnSuccess::value(item)),
Some(new_value) => yield ReturnSuccess::value(new_value),
None => yield ReturnSuccess::value(item),
}
} else {
result.push_back(ReturnSuccess::value(item));
yield ReturnSuccess::value(item);
}
futures::stream::iter(result)
})
.flatten();
}
};
Ok(stream.to_output_stream())
}

View File

@ -34,7 +34,7 @@ impl WholeStreamCommand for Drop {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, drop)?.run()
drop(args, registry)
}
fn examples(&self) -> &[Example] {
@ -51,9 +51,11 @@ impl WholeStreamCommand for Drop {
}
}
fn drop(DropArgs { rows }: DropArgs, context: RunnableContext) -> Result<OutputStream, ShellError> {
fn drop(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let v: Vec<_> = context.input.into_vec().await;
let (DropArgs { rows }, mut input) = args.process(&registry).await?;
let v: Vec<_> = input.into_vec().await;
let rows_to_drop = if let Some(quantity) = rows {
*quantity as usize

View File

@ -76,7 +76,7 @@ impl WholeStreamCommand for Du {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, du)?.run()
du(args, registry)
}
fn examples(&self) -> &[Example] {
@ -87,9 +87,13 @@ impl WholeStreamCommand for Du {
}
}
fn du(args: DuArgs, ctx: RunnableContext) -> Result<OutputStream, ShellError> {
let tag = ctx.name.clone();
fn du(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let tag = args.call_info.name_tag.clone();
let ctrl_c = args.ctrl_c.clone();
let stream = async_stream! {
let (args, mut input): (DuArgs, _) = args.process(&registry).await?;
let exclude = args.exclude.map_or(Ok(None), move |x| {
Pattern::new(&x.item)
.map(Option::Some)
@ -118,7 +122,6 @@ fn du(args: DuArgs, ctx: RunnableContext) -> Result<OutputStream, ShellError> {
})
.map(|v| v.map_err(glob_err_into));
let ctrl_c = ctx.ctrl_c;
let all = args.all;
let deref = args.deref;
let max_depth = args.max_depth.map(|f| f.item);
@ -132,20 +135,24 @@ fn du(args: DuArgs, ctx: RunnableContext) -> Result<OutputStream, ShellError> {
all,
};
let stream = futures::stream::iter(paths)
.interruptible(ctrl_c)
.map(move |path| match path {
let mut inp = futures::stream::iter(paths).interruptible(ctrl_c);
while let Some(path) = inp.next().await {
match path {
Ok(p) => {
if p.is_dir() {
Ok(ReturnSuccess::Value(
yield Ok(ReturnSuccess::Value(
DirInfo::new(p, &params, max_depth).into(),
))
} else {
FileInfo::new(p, deref, tag.clone()).map(|v| ReturnSuccess::Value(v.into()))
for v in FileInfo::new(p, deref, tag.clone()).into_iter() {
yield Ok(ReturnSuccess::Value(v.into()));
}
}
Err(e) => Err(e),
});
}
Err(e) => yield Err(e),
}
}
};
Ok(stream.to_output_stream())
}

View File

@ -39,7 +39,7 @@ impl WholeStreamCommand for Each {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
Ok(args.process_raw(registry, each)?.run())
each(args, registry)
}
fn examples(&self) -> &[Example] {
@ -60,21 +60,18 @@ fn is_expanded_it_usage(head: &SpannedExpression) -> bool {
}
}
fn each(
each_args: EachArgs,
context: RunnableContext,
raw_args: RawCommandArgs,
) -> Result<OutputStream, ShellError> {
let block = each_args.block;
let scope = raw_args.call_info.scope.clone();
let registry = context.registry.clone();
let mut input_stream = context.input;
fn each(raw_args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
while let Some(input) = input_stream.next().await {
let head = raw_args.call_info.args.head.clone();
let scope = raw_args.call_info.scope.clone();
let mut context = Context::from_raw(&raw_args, &registry);
let (each_args, mut input): (EachArgs, _) = raw_args.process(&registry).await?;
let block = each_args.block;
while let Some(input) = input.next().await {
let input_clone = input.clone();
let input_stream = if is_expanded_it_usage(&raw_args.call_info.args.head) {
let input_stream = if is_expanded_it_usage(&head) {
InputStream::empty()
} else {
once(async { Ok(input) }).to_input_stream()

View File

@ -28,7 +28,7 @@ impl WholeStreamCommand for Echo {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, echo)?.run()
echo(args, registry)
}
fn examples(&self) -> &[Example] {
@ -45,15 +45,17 @@ impl WholeStreamCommand for Echo {
}
}
fn echo(args: EchoArgs, _: RunnableContext) -> Result<OutputStream, ShellError> {
let mut output = vec![];
fn echo(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (args, _): (EchoArgs, _) = args.process(&registry).await?;
for i in args.rest {
match i.as_string() {
Ok(s) => {
output.push(Ok(ReturnSuccess::Value(
yield Ok(ReturnSuccess::Value(
UntaggedValue::string(s).into_value(i.tag.clone()),
)));
));
}
_ => match i {
Value {
@ -61,18 +63,16 @@ fn echo(args: EchoArgs, _: RunnableContext) -> Result<OutputStream, ShellError>
..
} => {
for value in table {
output.push(Ok(ReturnSuccess::Value(value.clone())));
yield Ok(ReturnSuccess::Value(value.clone()));
}
}
_ => {
output.push(Ok(ReturnSuccess::Value(i.clone())));
yield Ok(ReturnSuccess::Value(i.clone()));
}
},
}
}
// TODO: This whole block can probably be replaced with `.map()`
let stream = futures::stream::iter(output);
};
Ok(stream.to_output_stream())
}

View File

@ -38,7 +38,7 @@ impl WholeStreamCommand for Enter {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
Ok(args.process_raw(registry, enter)?.run())
enter(args, registry)
}
fn examples(&self) -> &[Example] {
@ -55,15 +55,16 @@ impl WholeStreamCommand for Enter {
}
}
fn enter(
EnterArgs { location }: EnterArgs,
RunnableContext {
registry,
name: tag,
..
}: RunnableContext,
raw_args: RawCommandArgs,
) -> Result<OutputStream, ShellError> {
fn enter(raw_args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let scope = raw_args.call_info.scope.clone();
let shell_manager = raw_args.shell_manager.clone();
let head = raw_args.call_info.args.head.clone();
let ctrl_c = raw_args.ctrl_c.clone();
let host = raw_args.host.clone();
let tag = raw_args.call_info.name_tag.clone();
let (EnterArgs { location }, _) = raw_args.process(&registry).await?;
let location_string = location.display().to_string();
let location_clone = location_string.clone();
@ -74,25 +75,22 @@ fn enter(
let (_, command) = (spec[0], spec[1]);
if registry.has(command) {
return Ok(vec![Ok(ReturnSuccess::Action(CommandAction::EnterHelpShell(
yield Ok(ReturnSuccess::Action(CommandAction::EnterHelpShell(
UntaggedValue::string(command).into_value(Tag::unknown()),
)))]
.into());
)));
return;
}
}
Ok(vec![Ok(ReturnSuccess::Action(CommandAction::EnterHelpShell(
yield Ok(ReturnSuccess::Action(CommandAction::EnterHelpShell(
UntaggedValue::nothing().into_value(Tag::unknown()),
)))]
.into())
)));
} else if location.is_dir() {
Ok(vec![Ok(ReturnSuccess::Action(CommandAction::EnterShell(
yield Ok(ReturnSuccess::Action(CommandAction::EnterShell(
location_clone,
)))]
.into())
)));
} else {
let stream = async_stream! {
// If it's a file, attempt to open the file as a value and enter it
let cwd = raw_args.shell_manager.path();
let cwd = shell_manager.path();
let full_path = std::path::PathBuf::from(cwd);
@ -113,19 +111,19 @@ fn enter(
registry.get_command(&command_name)
{
let new_args = RawCommandArgs {
host: raw_args.host,
ctrl_c: raw_args.ctrl_c,
shell_manager: raw_args.shell_manager,
host,
ctrl_c,
shell_manager,
call_info: UnevaluatedCallInfo {
args: nu_protocol::hir::Call {
head: raw_args.call_info.args.head,
head,
positional: None,
named: None,
span: Span::unknown(),
is_last: false,
},
name_tag: raw_args.call_info.name_tag,
scope: raw_args.call_info.scope.clone()
name_tag: tag.clone(),
scope: scope.clone()
},
};
let mut result = converter.run(
@ -162,7 +160,8 @@ fn enter(
yield Ok(ReturnSuccess::Action(CommandAction::EnterValueShell(tagged_contents)));
}
}
}
};
Ok(stream.to_output_stream())
}
}

View File

@ -36,15 +36,18 @@ impl WholeStreamCommand for EvaluateBy {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, evaluate_by)?.run()
evaluate_by(args, registry)
}
}
pub fn evaluate_by(
EvaluateByArgs { evaluate_with }: EvaluateByArgs,
RunnableContext { input, name, .. }: RunnableContext,
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let name = args.call_info.name_tag.clone();
let (EvaluateByArgs { evaluate_with }, mut input) = args.process(&registry).await?;
let values: Vec<Value> = input.collect().await;
if values.is_empty() {

View File

@ -42,11 +42,16 @@ impl WholeStreamCommand for Exit {
}
pub fn exit(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
if args.call_info.args.has("now") {
Ok(vec![Ok(ReturnSuccess::Action(CommandAction::Exit))].into())
yield Ok(ReturnSuccess::Action(CommandAction::Exit));
} else {
Ok(vec![Ok(ReturnSuccess::Action(CommandAction::LeaveShell))].into())
yield Ok(ReturnSuccess::Action(CommandAction::LeaveShell));
}
};
Ok(stream.to_output_stream())
}

View File

@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand;
use crate::context::CommandRegistry;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{Signature, SyntaxShape};
use nu_protocol::{ReturnSuccess, Signature, SyntaxShape};
use nu_source::Tagged;
pub struct First;
@ -34,7 +34,7 @@ impl WholeStreamCommand for First {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, first)?.run()
first(args, registry)
}
fn examples(&self) -> &[Example] {
@ -51,15 +51,25 @@ impl WholeStreamCommand for First {
}
}
fn first(
FirstArgs { rows }: FirstArgs,
context: RunnableContext,
) -> Result<OutputStream, ShellError> {
let rows_desired = if let Some(quantity) = rows {
fn first(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (FirstArgs { rows }, mut input) = args.process(&registry).await?;
let mut rows_desired = if let Some(quantity) = rows {
*quantity
} else {
1
};
Ok(OutputStream::from_input(context.input.take(rows_desired)))
while let Some(input) = input.next().await {
if rows_desired > 0 {
yield ReturnSuccess::value(input);
rows_desired -= 1;
} else {
break;
}
}
};
Ok(stream.to_output_stream())
}

View File

@ -36,7 +36,7 @@ impl WholeStreamCommand for Format {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, format_command)?.run()
format_command(args, registry)
}
fn examples(&self) -> &[Example] {
@ -48,16 +48,17 @@ impl WholeStreamCommand for Format {
}
fn format_command(
FormatArgs { pattern }: FormatArgs,
RunnableContext { input, .. }: RunnableContext,
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (FormatArgs { pattern }, mut input) = args.process(&registry).await?;
let pattern_tag = pattern.tag.clone();
let format_pattern = format(&pattern);
let commands = format_pattern;
let mut input = input;
let stream = async_stream! {
while let Some(value) = input.next().await {
match value {
value

View File

@ -1,7 +1,7 @@
use crate::commands::WholeStreamCommand;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::Signature;
use nu_protocol::{ReturnSuccess, Signature, UntaggedValue};
pub struct From;
@ -23,6 +23,14 @@ impl WholeStreamCommand for From {
_args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
Ok(crate::commands::help::get_help(&*self, registry).into())
let registry = registry.clone();
let stream = async_stream! {
yield Ok(ReturnSuccess::Value(
UntaggedValue::string(crate::commands::help::get_help(&From, &registry))
.into_value(Tag::unknown()),
));
};
Ok(stream.to_output_stream())
}
}

View File

@ -207,11 +207,12 @@ pub fn from_bson_bytes_to_value(bytes: Vec<u8>, tag: impl Into<Tag>) -> Result<V
}
fn from_bson(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let tag = args.name_tag();
let input = args.input;
let stream = async_stream! {
let bytes = input.collect_binary(tag.clone()).await?;
match from_bson_bytes_to_value(bytes.item, tag.clone()) {

View File

@ -41,7 +41,7 @@ impl WholeStreamCommand for FromCSV {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, from_csv)?.run()
from_csv(args, registry)
}
fn examples(&self) -> &[Example] {
@ -62,13 +62,11 @@ impl WholeStreamCommand for FromCSV {
}
}
fn from_csv(
FromCSVArgs {
headerless,
separator,
}: FromCSVArgs,
runnable_context: RunnableContext,
) -> Result<OutputStream, ShellError> {
fn from_csv(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let name = args.call_info.name_tag.clone();
let stream = async_stream! {
let (FromCSVArgs { headerless, separator }, mut input) = args.process(&registry).await?;
let sep = match separator {
Some(Value {
value: UntaggedValue::Primitive(Primitive::String(s)),
@ -80,11 +78,12 @@ fn from_csv(
} else {
let vec_s: Vec<char> = s.chars().collect();
if vec_s.len() != 1 {
return Err(ShellError::labeled_error(
yield Err(ShellError::labeled_error(
"Expected a single separator char from --separator",
"requires a single character string input",
tag,
));
return;
};
vec_s[0]
}
@ -92,5 +91,12 @@ fn from_csv(
_ => ',',
};
from_delimited_data(headerless, sep, "CSV", runnable_context)
let mut result = from_delimited_data(headerless, sep, "CSV", input, name)?;
while let Some(item) = result.next().await {
yield item;
}
};
Ok(stream.to_output_stream())
}

View File

@ -45,7 +45,8 @@ pub fn from_delimited_data(
headerless: bool,
sep: char,
format_name: &'static str,
RunnableContext { input, name, .. }: RunnableContext,
input: InputStream,
name: Tag,
) -> Result<OutputStream, ShellError> {
let name_tag = name;

View File

@ -39,7 +39,7 @@ impl WholeStreamCommand for FromEML {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, from_eml)?.run()
from_eml(args, registry)
}
}
@ -76,14 +76,11 @@ fn headerfieldvalue_to_value(tag: &Tag, value: &HeaderFieldValue) -> UntaggedVal
}
}
fn from_eml(
eml_args: FromEMLArgs,
runnable_context: RunnableContext,
) -> Result<OutputStream, ShellError> {
let input = runnable_context.input;
let tag = runnable_context.name;
fn from_eml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone();
let registry = registry.clone();
let stream = async_stream! {
let (eml_args, mut input): (FromEMLArgs, _) = args.process(&registry).await?;
let value = input.collect_string(tag.clone()).await?;
let body_preview = eml_args.preview_body.map(|b| b.item).unwrap_or(DEFAULT_BODY_PREVIEW);

View File

@ -32,11 +32,12 @@ impl WholeStreamCommand for FromIcs {
}
fn from_ics(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let tag = args.name_tag();
let input = args.input;
let stream = async_stream! {
let input_string = input.collect_string(tag.clone()).await?.item;
let input_bytes = input_string.as_bytes();
let buf_reader = BufReader::new(input_bytes);

View File

@ -64,11 +64,11 @@ pub fn from_ini_string_to_value(
}
fn from_ini(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let tag = args.name_tag();
let input = args.input;
let stream = async_stream! {
let concat_string = input.collect_string(tag.clone()).await?;
match from_ini_string_to_value(concat_string.item, tag.clone()) {

View File

@ -32,7 +32,7 @@ impl WholeStreamCommand for FromJSON {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, from_json)?.run()
from_json(args, registry)
}
}
@ -70,13 +70,12 @@ pub fn from_json_string_to_value(s: String, tag: impl Into<Tag>) -> serde_hjson:
Ok(convert_json_value_to_nu_value(&v, tag))
}
fn from_json(
FromJSONArgs { objects }: FromJSONArgs,
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let name_tag = name;
fn from_json(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let name_tag = args.call_info.name_tag.clone();
let registry = registry.clone();
let stream = async_stream! {
let (FromJSONArgs { objects }, mut input) = args.process(&registry).await?;
let concat_string = input.collect_string(name_tag.clone()).await?;
if objects {

View File

@ -35,20 +35,16 @@ impl WholeStreamCommand for FromODS {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, from_ods)?.run()
from_ods(args, registry)
}
}
fn from_ods(
FromODSArgs {
headerless: _headerless,
}: FromODSArgs,
runnable_context: RunnableContext,
) -> Result<OutputStream, ShellError> {
let input = runnable_context.input;
let tag = runnable_context.name;
fn from_ods(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone();
let registry = registry.clone();
let stream = async_stream! {
let (FromODSArgs { headerless: _headerless }, mut input) = args.process(&registry).await?;
let bytes = input.collect_binary(tag.clone()).await?;
let mut buf: Cursor<Vec<u8>> = Cursor::new(bytes.item);
let mut ods = Ods::<_>::new(buf).map_err(|_| ShellError::labeled_error(

View File

@ -133,11 +133,12 @@ pub fn from_sqlite_bytes_to_value(
}
fn from_sqlite(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let tag = args.name_tag();
let input = args.input;
let stream = async_stream! {
let bytes = input.collect_binary(tag.clone()).await?;
match from_sqlite_bytes_to_value(bytes.item, tag.clone()) {
Ok(x) => match x {

View File

@ -50,7 +50,7 @@ impl WholeStreamCommand for FromSSV {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, from_ssv)?.run()
from_ssv(args, registry)
}
}
@ -250,15 +250,11 @@ fn from_ssv_string_to_value(
Some(UntaggedValue::Table(rows).into_value(&tag))
}
fn from_ssv(
FromSSVArgs {
headerless,
aligned_columns,
minimum_spaces,
}: FromSSVArgs,
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
fn from_ssv(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let name = args.call_info.name_tag.clone();
let registry = registry.clone();
let stream = async_stream! {
let (FromSSVArgs { headerless, aligned_columns, minimum_spaces }, mut input) = args.process(&registry).await?;
let concat_string = input.collect_string(name.clone()).await?;
let split_at = match minimum_spaces {
Some(number) => number.item,

View File

@ -67,11 +67,12 @@ pub fn from_toml(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let tag = args.name_tag();
let input = args.input;
let stream = async_stream! {
let concat_string = input.collect_string(tag.clone()).await?;
match from_toml_string_to_value(concat_string.item, tag.clone()) {
Ok(x) => match x {

View File

@ -33,13 +33,21 @@ impl WholeStreamCommand for FromTSV {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, from_tsv)?.run()
from_tsv(args, registry)
}
}
fn from_tsv(
FromTSVArgs { headerless }: FromTSVArgs,
runnable_context: RunnableContext,
) -> Result<OutputStream, ShellError> {
from_delimited_data(headerless, '\t', "TSV", runnable_context)
fn from_tsv(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let name = args.call_info.name_tag.clone();
let stream = async_stream! {
let (FromTSVArgs { headerless }, mut input) = args.process(&registry).await?;
let mut result = from_delimited_data(headerless, '\t', "TSV", input, name)?;
while let Some(output) = result.next().await {
yield output;
}
};
Ok(stream.to_output_stream())
}

View File

@ -28,11 +28,12 @@ impl WholeStreamCommand for FromURL {
}
fn from_url(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let tag = args.name_tag();
let input = args.input;
let stream = async_stream! {
let concat_string = input.collect_string(tag.clone()).await?;
let result = serde_urlencoded::from_str::<Vec<(String, String)>>(&concat_string.item);

View File

@ -32,11 +32,12 @@ impl WholeStreamCommand for FromVcf {
}
fn from_vcf(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let tag = args.name_tag();
let input = args.input;
let stream = async_stream! {
let input_string = input.collect_string(tag.clone()).await?.item;
let input_bytes = input_string.as_bytes();
let buf_reader = BufReader::new(input_bytes);

View File

@ -35,20 +35,15 @@ impl WholeStreamCommand for FromXLSX {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, from_xlsx)?.run()
from_xlsx(args, registry)
}
}
fn from_xlsx(
FromXLSXArgs {
headerless: _headerless,
}: FromXLSXArgs,
runnable_context: RunnableContext,
) -> Result<OutputStream, ShellError> {
let input = runnable_context.input;
let tag = runnable_context.name;
fn from_xlsx(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone();
let registry = registry.clone();
let stream = async_stream! {
let (FromXLSXArgs { headerless: _headerless }, mut input) = args.process(&registry).await?;
let value = input.collect_binary(tag.clone()).await?;
let mut buf: Cursor<Vec<u8>> = Cursor::new(value.item);

View File

@ -99,11 +99,12 @@ pub fn from_xml_string_to_value(s: String, tag: impl Into<Tag>) -> Result<Value,
}
fn from_xml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let tag = args.name_tag();
let input = args.input;
let stream = async_stream! {
let concat_string = input.collect_string(tag.clone()).await?;
match from_xml_string_to_value(concat_string.item, tag.clone()) {

View File

@ -119,11 +119,12 @@ pub fn from_yaml_string_to_value(s: String, tag: impl Into<Tag>) -> Result<Value
}
fn from_yaml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let tag = args.name_tag();
let input = args.input;
let stream = async_stream! {
let concat_string = input.collect_string(tag.clone()).await?;
match from_yaml_string_to_value(concat_string.item, tag.clone()) {

View File

@ -4,8 +4,8 @@ use indexmap::set::IndexSet;
use log::trace;
use nu_errors::ShellError;
use nu_protocol::{
did_you_mean, ColumnPath, PathMember, Primitive, ReturnSuccess, ReturnValue, Signature,
SyntaxShape, UnspannedPathMember, UntaggedValue, Value,
did_you_mean, ColumnPath, PathMember, Primitive, ReturnSuccess, Signature, SyntaxShape,
UnspannedPathMember, UntaggedValue, Value,
};
use nu_source::span_for_spanned_list;
use nu_value_ext::get_data_by_column_path;
@ -38,7 +38,7 @@ impl WholeStreamCommand for Get {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, get)?.run()
get(args, registry)
}
fn examples(&self) -> &[Example] {
@ -189,30 +189,21 @@ pub fn get_column_path(path: &ColumnPath, obj: &Value) -> Result<Value, ShellErr
)
}
pub fn get(
GetArgs { rest: mut fields }: GetArgs,
RunnableContext { mut input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
if fields.is_empty() {
pub fn get(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (GetArgs { rest: mut fields }, mut input) = args.process(&registry).await?;
if fields.is_empty() {
let mut vec = input.drain_vec().await;
let descs = nu_protocol::merge_descriptors(&vec);
for desc in descs {
yield ReturnSuccess::value(desc);
}
};
let stream: BoxStream<'static, ReturnValue> = stream.boxed();
Ok(stream.to_output_stream())
} else {
let member = fields.remove(0);
trace!("get {:?} {:?}", member, fields);
let stream = input
.map(move |item| {
let mut result = VecDeque::new();
while let Some(item) = input.next().await {
let member = vec![member.clone()];
let column_paths = vec![&member, &fields]
@ -230,25 +221,22 @@ pub fn get(
..
} => {
for item in rows {
result.push_back(ReturnSuccess::value(item.clone()));
yield ReturnSuccess::value(item.clone());
}
}
Value {
value: UntaggedValue::Primitive(Primitive::Nothing),
..
} => {}
other => result.push_back(ReturnSuccess::value(other.clone())),
other => yield ReturnSuccess::value(other.clone()),
},
Err(reason) => result.push_back(ReturnSuccess::value(
Err(reason) => yield ReturnSuccess::value(
UntaggedValue::Error(reason).into_untagged_value(),
)),
),
}
}
futures::stream::iter(result)
})
.flatten();
}
}
};
Ok(stream.to_output_stream())
}
}

View File

@ -33,7 +33,7 @@ impl WholeStreamCommand for GroupBy {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, group_by)?.run()
group_by(args, registry)
}
fn examples(&self) -> &[Example] {
@ -44,11 +44,11 @@ impl WholeStreamCommand for GroupBy {
}
}
pub fn group_by(
GroupByArgs { column_name }: GroupByArgs,
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
pub fn group_by(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let name = args.call_info.name_tag.clone();
let stream = async_stream! {
let (GroupByArgs { column_name }, mut input) = args.process(&registry).await?;
let values: Vec<Value> = input.collect().await;
if values.is_empty() {

View File

@ -41,7 +41,7 @@ impl WholeStreamCommand for GroupByDate {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, group_by_date)?.run()
group_by_date(args, registry)
}
fn examples(&self) -> &[Example] {
@ -57,13 +57,13 @@ enum Grouper {
}
pub fn group_by_date(
GroupByDateArgs {
column_name,
format,
}: GroupByDateArgs,
RunnableContext { input, name, .. }: RunnableContext,
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let name = args.call_info.name_tag.clone();
let stream = async_stream! {
let (GroupByDateArgs { column_name, format }, mut input) = args.process(&registry).await?;
let values: Vec<Value> = input.collect().await;
if values.is_empty() {

View File

@ -8,8 +8,6 @@ use nu_protocol::Dictionary;
use nu_protocol::{ReturnSuccess, Signature, UntaggedValue, Value};
pub struct Headers;
#[derive(Deserialize)]
pub struct HeadersArgs {}
impl WholeStreamCommand for Headers {
fn name(&self) -> &str {
@ -29,7 +27,7 @@ impl WholeStreamCommand for Headers {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, headers)?.run()
headers(args, registry)
}
fn examples(&self) -> &[Example] {
@ -40,11 +38,9 @@ impl WholeStreamCommand for Headers {
}
}
pub fn headers(
HeadersArgs {}: HeadersArgs,
RunnableContext { input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
pub fn headers(args: CommandArgs, _registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let stream = async_stream! {
let mut input = args.input;
let rows: Vec<Value> = input.collect().await;
if rows.len() < 1 {

View File

@ -35,16 +35,16 @@ impl WholeStreamCommand for Help {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, help)?.run()
help(args, registry)
}
}
fn help(
HelpArgs { rest }: HelpArgs,
RunnableContext { registry, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
fn help(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let name = args.call_info.name_tag.clone();
let stream = async_stream! {
let (HelpArgs { rest }, mut input) = args.process(&registry).await?;
if let Some(document) = rest.get(0) {
let mut help = VecDeque::new();
if document.item == "commands" {
let mut sorted_names = registry.names();
sorted_names.sort();
@ -80,25 +80,23 @@ fn help(
.as_string()?,
);
help.push_back(ReturnSuccess::value(short_desc.into_value()));
yield ReturnSuccess::value(short_desc.into_value());
}
} else if rest.len() == 2 {
// Check for a subcommand
let command_name = format!("{} {}", rest[0].item, rest[1].item);
if let Some(command) = registry.get_command(&command_name) {
return Ok(get_help(command.stream_command(), &registry).into());
yield Ok(ReturnSuccess::Value(UntaggedValue::string(get_help(command.stream_command(), &registry)).into_value(Tag::unknown())));
}
} else if let Some(command) = registry.get_command(&document.item) {
return Ok(get_help(command.stream_command(), &registry).into());
yield Ok(ReturnSuccess::Value(UntaggedValue::string(get_help(command.stream_command(), &registry)).into_value(Tag::unknown())));
} else {
return Err(ShellError::labeled_error(
yield Err(ShellError::labeled_error(
"Can't find command (use 'help commands' for full list)",
"can't find command",
document.tag.span,
));
}
let help = futures::stream::iter(help);
Ok(help.to_output_stream())
} else {
let msg = r#"Welcome to Nushell.
@ -122,22 +120,17 @@ Get the processes on your system actively using CPU:
You can also learn more at https://www.nushell.sh/book/"#;
let output_stream = futures::stream::iter(vec![ReturnSuccess::value(
UntaggedValue::string(msg).into_value(name),
)]);
Ok(output_stream.to_output_stream())
yield Ok(ReturnSuccess::Value(UntaggedValue::string(msg).into_value(Tag::unknown())));
}
};
Ok(stream.to_output_stream())
}
#[allow(clippy::cognitive_complexity)]
pub fn get_help(
cmd: &dyn WholeStreamCommand,
registry: &CommandRegistry,
) -> impl Into<OutputStream> {
pub fn get_help(cmd: &dyn WholeStreamCommand, registry: &CommandRegistry) -> String {
let cmd_name = cmd.name();
let signature = cmd.signature();
let mut help = VecDeque::new();
let mut long_desc = String::new();
long_desc.push_str(&cmd.usage());
@ -285,8 +278,5 @@ pub fn get_help(
long_desc.push_str("\n");
help.push_back(ReturnSuccess::value(
UntaggedValue::string(long_desc).into_value(Tag::from((0, cmd_name.len(), None))),
));
help
long_desc
}

View File

@ -44,7 +44,7 @@ impl WholeStreamCommand for Histogram {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, histogram)?.run()
histogram(args, registry)
}
fn examples(&self) -> &[Example] {
@ -67,10 +67,13 @@ impl WholeStreamCommand for Histogram {
}
pub fn histogram(
HistogramArgs { column_name, rest }: HistogramArgs,
RunnableContext { input, name, .. }: RunnableContext,
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let name = args.call_info.name_tag.clone();
let stream = async_stream! {
let (HistogramArgs { column_name, rest}, mut input) = args.process(&registry).await?;
let values: Vec<Value> = input.collect().await;
let Tagged { item: group_by, .. } = column_name.clone();

View File

@ -8,9 +8,6 @@ use std::io::{BufRead, BufReader};
pub struct History;
#[derive(Deserialize)]
pub struct HistoryArgs {}
impl WholeStreamCommand for History {
fn name(&self) -> &str {
"history"
@ -29,14 +26,12 @@ impl WholeStreamCommand for History {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, history)?.run()
history(args, registry)
}
}
fn history(
_: HistoryArgs,
RunnableContext { name: tag, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
fn history(args: CommandArgs, _registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag;
let stream = async_stream! {
let history_path = HistoryFile::path();
let file = File::open(history_path);

View File

@ -41,17 +41,15 @@ impl WholeStreamCommand for Insert {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, insert)?.run()
insert(args, registry)
}
}
fn insert(
InsertArgs { column, value }: InsertArgs,
RunnableContext { input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let mut input = input;
fn insert(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (InsertArgs { column, value }, mut input) = args.process(&registry).await?;
match input.next().await {
Some(obj @ Value {
value: UntaggedValue::Row(_),

View File

@ -41,16 +41,15 @@ impl WholeStreamCommand for IsEmpty {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, is_empty)?.run()
is_empty(args, registry)
}
}
fn is_empty(
IsEmptyArgs { rest }: IsEmptyArgs,
RunnableContext { input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
Ok(input
.map(move |value| {
fn is_empty(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (IsEmptyArgs { rest }, mut input) = args.process(&registry).await?;
while let Some(value) = input.next().await {
let value_tag = value.tag();
let action = if rest.len() <= 2 {
@ -85,7 +84,7 @@ fn is_empty(
};
match action {
IsEmptyFor::Value => Ok(ReturnSuccess::Value(
IsEmptyFor::Value => yield Ok(ReturnSuccess::Value(
UntaggedValue::boolean(value.is_empty()).into_value(value_tag),
)),
IsEmptyFor::RowWithFieldsAndFallback(fields, default) => {
@ -93,7 +92,7 @@ fn is_empty(
for field in fields.iter() {
let val =
out.get_data_by_column_path(&field, Box::new(move |(_, _, err)| err))?;
crate::commands::get::get_column_path(&field, &out)?;
let emptiness_value = match out {
obj
@ -125,11 +124,11 @@ fn is_empty(
out = emptiness_value?;
}
Ok(ReturnSuccess::Value(out))
yield Ok(ReturnSuccess::Value(out))
}
IsEmptyFor::RowWithField(field) => {
let val =
value.get_data_by_column_path(&field, Box::new(move |(_, _, err)| err))?;
crate::commands::get::get_column_path(&field, &value)?;
match &value {
obj
@ -143,18 +142,18 @@ fn is_empty(
&field,
UntaggedValue::boolean(true).into_value(&value_tag),
) {
Some(v) => Ok(ReturnSuccess::Value(v)),
None => Err(ShellError::labeled_error(
Some(v) => yield Ok(ReturnSuccess::Value(v)),
None => yield Err(ShellError::labeled_error(
"empty? could not find place to check emptiness",
"column name",
&field.tag,
)),
}
} else {
Ok(ReturnSuccess::Value(value))
yield Ok(ReturnSuccess::Value(value))
}
}
_ => Err(ShellError::labeled_error(
_ => yield Err(ShellError::labeled_error(
"Unrecognized type in stream",
"original value",
&value_tag,
@ -163,7 +162,7 @@ fn is_empty(
}
IsEmptyFor::RowWithFieldAndFallback(field, default) => {
let val =
value.get_data_by_column_path(&field, Box::new(move |(_, _, err)| err))?;
crate::commands::get::get_column_path(&field, &value)?;
match &value {
obj
@ -174,18 +173,18 @@ fn is_empty(
} => {
if val.is_empty() {
match obj.replace_data_at_column_path(&field, default) {
Some(v) => Ok(ReturnSuccess::Value(v)),
None => Err(ShellError::labeled_error(
Some(v) => yield Ok(ReturnSuccess::Value(v)),
None => yield Err(ShellError::labeled_error(
"empty? could not find place to check emptiness",
"column name",
&field.tag,
)),
}
} else {
Ok(ReturnSuccess::Value(value))
yield Ok(ReturnSuccess::Value(value))
}
}
_ => Err(ShellError::labeled_error(
_ => yield Err(ShellError::labeled_error(
"Unrecognized type in stream",
"original value",
&value_tag,
@ -193,6 +192,7 @@ fn is_empty(
}
}
}
})
.to_output_stream())
}
};
Ok(stream.to_output_stream())
}

View File

@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand;
use crate::context::CommandRegistry;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{Signature, SyntaxShape};
use nu_protocol::{ReturnSuccess, Signature, SyntaxShape};
use nu_source::Tagged;
pub struct Keep;
@ -34,7 +34,7 @@ impl WholeStreamCommand for Keep {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, keep)?.run()
keep(args, registry)
}
fn examples(&self) -> &[Example] {
@ -51,12 +51,25 @@ impl WholeStreamCommand for Keep {
}
}
fn keep(KeepArgs { rows }: KeepArgs, context: RunnableContext) -> Result<OutputStream, ShellError> {
let rows_desired = if let Some(quantity) = rows {
fn keep(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (KeepArgs { rows }, mut input) = args.process(&registry).await?;
let mut rows_desired = if let Some(quantity) = rows {
*quantity
} else {
1
};
Ok(OutputStream::from_input(context.input.take(rows_desired)))
while let Some(input) = input.next().await {
if rows_desired > 0 {
yield ReturnSuccess::value(input);
rows_desired -= 1;
} else {
break;
}
}
};
Ok(stream.to_output_stream())
}

View File

@ -3,7 +3,9 @@ use crate::evaluate::evaluate_baseline_expr;
use crate::prelude::*;
use log::trace;
use nu_errors::ShellError;
use nu_protocol::{hir::ClassifiedCommand, Signature, SyntaxShape, UntaggedValue, Value};
use nu_protocol::{
hir::ClassifiedCommand, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value,
};
pub struct KeepUntil;
@ -33,7 +35,8 @@ impl WholeStreamCommand for KeepUntil {
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let scope = args.call_info.scope.clone();
let call_info = args.evaluate_once(&registry)?;
let stream = async_stream! {
let mut call_info = args.evaluate_once(&registry).await?;
let block = call_info.args.expect_nth(0)?.clone();
@ -43,46 +46,51 @@ impl WholeStreamCommand for KeepUntil {
tag,
} => {
if block.block.len() != 1 {
return Err(ShellError::labeled_error(
yield Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
return;
}
match block.block[0].list.get(0) {
Some(item) => match item {
ClassifiedCommand::Expr(expr) => expr.clone(),
_ => {
return Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
))
}
},
None => {
return Err(ShellError::labeled_error(
yield Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
return;
}
},
None => {
yield Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
return;
}
}
}
Value { tag, .. } => {
return Err(ShellError::labeled_error(
yield Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
return;
}
};
let objects = call_info.input.take_while(move |item| {
while let Some(item) = call_info.input.next().await {
let condition = condition.clone();
trace!("ITEM = {:?}", item);
let result =
evaluate_baseline_expr(&*condition, &registry, &scope.clone().set_it(item.clone()));
evaluate_baseline_expr(&*condition, &registry, &scope.clone().set_it(item.clone()))
.await;
trace!("RESULT = {:?}", result);
let return_value = match result {
@ -90,9 +98,14 @@ impl WholeStreamCommand for KeepUntil {
_ => true,
};
futures::future::ready(return_value)
});
if return_value {
yield ReturnSuccess::value(item);
} else {
break;
}
}
};
Ok(objects.from_input_stream())
Ok(stream.to_output_stream())
}
}

View File

@ -3,7 +3,9 @@ use crate::evaluate::evaluate_baseline_expr;
use crate::prelude::*;
use log::trace;
use nu_errors::ShellError;
use nu_protocol::{hir::ClassifiedCommand, Signature, SyntaxShape, UntaggedValue, Value};
use nu_protocol::{
hir::ClassifiedCommand, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value,
};
pub struct KeepWhile;
@ -33,7 +35,8 @@ impl WholeStreamCommand for KeepWhile {
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let scope = args.call_info.scope.clone();
let call_info = args.evaluate_once(&registry)?;
let stream = async_stream! {
let mut call_info = args.evaluate_once(&registry).await?;
let block = call_info.args.expect_nth(0)?.clone();
@ -43,46 +46,51 @@ impl WholeStreamCommand for KeepWhile {
tag,
} => {
if block.block.len() != 1 {
return Err(ShellError::labeled_error(
yield Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
return;
}
match block.block[0].list.get(0) {
Some(item) => match item {
ClassifiedCommand::Expr(expr) => expr.clone(),
_ => {
return Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
))
}
},
None => {
return Err(ShellError::labeled_error(
yield Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
return;
}
},
None => {
yield Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
return;
}
}
}
Value { tag, .. } => {
return Err(ShellError::labeled_error(
yield Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
return;
}
};
let objects = call_info.input.take_while(move |item| {
while let Some(item) = call_info.input.next().await {
let condition = condition.clone();
trace!("ITEM = {:?}", item);
let result =
evaluate_baseline_expr(&*condition, &registry, &scope.clone().set_it(item.clone()));
evaluate_baseline_expr(&*condition, &registry, &scope.clone().set_it(item.clone()))
.await;
trace!("RESULT = {:?}", result);
let return_value = match result {
@ -90,9 +98,14 @@ impl WholeStreamCommand for KeepWhile {
_ => false,
};
futures::future::ready(return_value)
});
if return_value {
yield ReturnSuccess::value(item);
} else {
break;
}
}
};
Ok(objects.from_input_stream())
Ok(stream.to_output_stream())
}
}

View File

@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand;
use crate::context::CommandRegistry;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{Signature, SyntaxShape};
use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue};
use nu_source::Tagged;
use std::process::{Command, Stdio};
@ -42,7 +42,7 @@ impl WholeStreamCommand for Kill {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, kill)?.run()
kill(args, registry)
}
fn examples(&self) -> &[Example] {
@ -59,15 +59,16 @@ impl WholeStreamCommand for Kill {
}
}
fn kill(
KillArgs {
fn kill(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (KillArgs {
pid,
rest,
force,
quiet,
}: KillArgs,
_context: RunnableContext,
) -> Result<OutputStream, ShellError> {
}, mut input) = args.process(&registry).await?;
let mut cmd = if cfg!(windows) {
let mut cmd = Command::new("taskkill");
@ -109,5 +110,10 @@ fn kill(
cmd.status().expect("failed to execute shell command");
Ok(OutputStream::empty())
if false {
yield ReturnSuccess::value(UntaggedValue::nothing().into_value(Tag::unknown()));
}
};
Ok(stream.to_output_stream())
}

View File

@ -34,7 +34,7 @@ impl WholeStreamCommand for Last {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, last)?.run()
last(args, registry)
}
fn examples(&self) -> &[Example] {
@ -51,9 +51,11 @@ impl WholeStreamCommand for Last {
}
}
fn last(LastArgs { rows }: LastArgs, context: RunnableContext) -> Result<OutputStream, ShellError> {
fn last(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let v: Vec<_> = context.input.into_vec().await;
let (LastArgs { rows }, mut input) = args.process(&registry).await?;
let v: Vec<_> = input.into_vec().await;
let rows_desired = if let Some(quantity) = rows {
*quantity

View File

@ -45,14 +45,14 @@ fn ends_with_line_ending(st: &str) -> bool {
}
fn lines(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let mut leftover = vec![];
let mut leftover_string = String::new();
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await.unwrap();
let tag = args.name_tag();
let name_span = tag.span;
let mut input = args.input;
let mut leftover = vec![];
let mut leftover_string = String::new();
let stream = async_stream! {
loop {
match input.next().await {
Some(Value { value: UntaggedValue::Primitive(Primitive::String(st)), ..}) => {

View File

@ -64,7 +64,7 @@ impl WholeStreamCommand for Ls {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, ls)?.run()
ls(args, registry)
}
fn examples(&self) -> &[Example] {
@ -85,6 +85,19 @@ impl WholeStreamCommand for Ls {
}
}
fn ls(args: LsArgs, context: RunnableContext) -> Result<OutputStream, ShellError> {
context.shell_manager.ls(args, &context)
fn ls(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let name = args.call_info.name_tag.clone();
let ctrl_c = args.ctrl_c.clone();
let shell_manager = args.shell_manager.clone();
let (args, _) = args.process(&registry).await?;
let mut result = shell_manager.ls(args, name, ctrl_c)?;
while let Some(item) = result.next().await {
yield item;
}
};
Ok(stream.to_output_stream())
}

View File

@ -37,18 +37,20 @@ impl WholeStreamCommand for MapMaxBy {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, map_max_by)?.run()
map_max_by(args, registry)
}
}
pub fn map_max_by(
MapMaxByArgs { column_name }: MapMaxByArgs,
RunnableContext { input, name, .. }: RunnableContext,
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let name = args.call_info.name_tag.clone();
let stream = async_stream! {
let (MapMaxByArgs { column_name }, mut input) = args.process(&registry).await?;
let values: Vec<Value> = input.collect().await;
if values.is_empty() {
yield Err(ShellError::labeled_error(
"Expected table from pipeline",

View File

@ -36,7 +36,7 @@ impl WholeStreamCommand for Merge {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
Ok(args.process_raw(registry, merge)?.run())
merge(args, registry)
}
fn examples(&self) -> &[Example] {
@ -47,19 +47,15 @@ impl WholeStreamCommand for Merge {
}
}
fn merge(
merge_args: MergeArgs,
context: RunnableContext,
raw_args: RawCommandArgs,
) -> Result<OutputStream, ShellError> {
let block = merge_args.block;
let registry = context.registry.clone();
let mut input = context.input;
let scope = raw_args.call_info.scope.clone();
let mut context = Context::from_raw(&raw_args, &registry);
fn merge(raw_args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let mut context = Context::from_raw(&raw_args, &registry);
let name_tag = raw_args.call_info.name_tag.clone();
let scope = raw_args.call_info.scope.clone();
let (merge_args, mut input): (MergeArgs, _) = raw_args.process(&registry).await?;
let block = merge_args.block;
let table: Option<Vec<Value>> = match run_block(&block,
&mut context,
InputStream::empty(),
@ -74,7 +70,7 @@ fn merge(
let table = table.unwrap_or_else(|| vec![Value {
value: UntaggedValue::row(IndexMap::default()),
tag: raw_args.call_info.name_tag,
tag: name_tag,
}]);
let mut idx = 0;

View File

@ -31,7 +31,7 @@ impl WholeStreamCommand for Mkdir {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, mkdir)?.run()
mkdir(args, registry)
}
fn examples(&self) -> &[Example] {
@ -42,7 +42,18 @@ impl WholeStreamCommand for Mkdir {
}
}
fn mkdir(args: MkdirArgs, context: RunnableContext) -> Result<OutputStream, ShellError> {
let shell_manager = context.shell_manager.clone();
shell_manager.mkdir(args, &context)
fn mkdir(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let name = args.call_info.name_tag.clone();
let shell_manager = args.shell_manager.clone();
let (args, _) = args.process(&registry).await?;
let mut result = shell_manager.mkdir(args, name)?;
while let Some(item) = result.next().await {
yield item;
}
};
Ok(stream.to_output_stream())
}

View File

@ -42,7 +42,7 @@ impl WholeStreamCommand for Move {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, mv)?.run()
mv(args, registry)
}
fn examples(&self) -> &[Example] {
@ -63,7 +63,18 @@ impl WholeStreamCommand for Move {
}
}
fn mv(args: MoveArgs, context: RunnableContext) -> Result<OutputStream, ShellError> {
let shell_manager = context.shell_manager.clone();
shell_manager.mv(args, &context)
fn mv(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let name = args.call_info.name_tag.clone();
let shell_manager = args.shell_manager.clone();
let (args, _) = args.process(&registry).await?;
let mut result = shell_manager.mv(args, name)?;
while let Some(item) = result.next().await {
yield item;
}
};
Ok(stream.to_output_stream())
}

View File

@ -37,7 +37,7 @@ impl WholeStreamCommand for Nth {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, nth)?.run()
nth(args, registry)
}
fn examples(&self) -> &[Example] {
@ -54,16 +54,13 @@ impl WholeStreamCommand for Nth {
}
}
fn nth(
NthArgs {
row_number,
rest: and_rows,
}: NthArgs,
RunnableContext { input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let stream = input
.enumerate()
.map(move |(idx, item)| {
fn nth(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (NthArgs { row_number, rest: and_rows}, input) = args.process(&registry).await?;
let mut inp = input.enumerate();
while let Some((idx, item)) = inp.next().await {
let row_number = vec![row_number.clone()];
let row_numbers = vec![&row_number, &and_rows]
@ -71,18 +68,14 @@ fn nth(
.flatten()
.collect::<Vec<&Tagged<u64>>>();
let mut result = VecDeque::new();
if row_numbers
.iter()
.any(|requested| requested.item == idx as u64)
{
result.push_back(ReturnSuccess::value(item));
yield ReturnSuccess::value(item);
}
futures::stream::iter(result)
})
.flatten();
}
};
Ok(stream.to_output_stream())
}

View File

@ -41,19 +41,17 @@ impl WholeStreamCommand for Open {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, open)?.run()
open(args, registry)
}
}
fn open(
OpenArgs { path, raw }: OpenArgs,
RunnableContext { shell_manager, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let cwd = PathBuf::from(shell_manager.path());
fn open(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let cwd = PathBuf::from(args.shell_manager.path());
let full_path = cwd;
let registry = registry.clone();
let stream = async_stream! {
let (OpenArgs { path, raw }, _) = args.process(&registry).await?;
let result = fetch(&full_path, &path.item, path.tag.span).await;
if let Err(e) = result {

View File

@ -1,167 +0,0 @@
use crate::commands::WholeStreamCommand;
use crate::context::CommandRegistry;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, TaggedDictBuilder, UntaggedValue};
use nu_source::Tagged;
use regex::Regex;
#[derive(Debug)]
enum ParseCommand {
Text(String),
Column(String),
}
fn parse(input: &str) -> Vec<ParseCommand> {
let mut output = vec![];
//let mut loop_input = input;
let mut loop_input = input.chars();
loop {
let mut before = String::new();
while let Some(c) = loop_input.next() {
if c == '{' {
break;
}
before.push(c);
}
if !before.is_empty() {
output.push(ParseCommand::Text(before.to_string()));
}
// Look for column as we're now at one
let mut column = String::new();
while let Some(c) = loop_input.next() {
if c == '}' {
break;
}
column.push(c);
}
if !column.is_empty() {
output.push(ParseCommand::Column(column.to_string()));
}
if before.is_empty() && column.is_empty() {
break;
}
}
output
}
fn column_names(commands: &[ParseCommand]) -> Vec<String> {
let mut output = vec![];
for command in commands {
if let ParseCommand::Column(c) = command {
output.push(c.clone());
}
}
output
}
fn build_regex(commands: &[ParseCommand]) -> String {
let mut output = String::new();
for command in commands {
match command {
ParseCommand::Text(s) => {
output.push_str(&s.replace("(", "\\("));
}
ParseCommand::Column(_) => {
output.push_str("(.*)");
}
}
}
output
}
pub struct Parse;
#[derive(Deserialize)]
pub struct ParseArgs {
pattern: Tagged<String>,
}
impl WholeStreamCommand for Parse {
fn name(&self) -> &str {
"parse"
}
fn signature(&self) -> Signature {
Signature::build("parse").required(
"pattern",
SyntaxShape::String,
"the pattern to match. Eg) \"{foo}: {bar}\"",
)
}
fn usage(&self) -> &str {
"Parse columns from string data using a simple pattern."
}
fn run(
&self,
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, parse_command)?.run()
}
fn examples(&self) -> &[Example] {
&[Example {
description: "Parse values from a string into a table",
example: r#"echo "data: 123" | parse "{key}: {value}""#,
}]
}
}
fn parse_command(
ParseArgs { pattern }: ParseArgs,
RunnableContext { name, input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let parse_pattern = parse(&pattern.item);
let parse_regex = build_regex(&parse_pattern);
let column_names = column_names(&parse_pattern);
let name = name.span;
let regex = Regex::new(&parse_regex).map_err(|_| {
ShellError::labeled_error(
"Could not parse regex",
"could not parse regex",
&pattern.tag,
)
})?;
Ok(input
.map(move |value| {
if let Ok(s) = value.as_string() {
let mut output = vec![];
for cap in regex.captures_iter(&s) {
let mut dict = TaggedDictBuilder::new(value.tag());
for (idx, column_name) in column_names.iter().enumerate() {
dict.insert_untagged(
column_name,
UntaggedValue::string(cap[idx + 1].to_string()),
);
}
output.push(Ok(ReturnSuccess::Value(dict.into_value())));
}
output
} else {
vec![Err(ShellError::labeled_error_with_secondary(
"Expected string input",
"expected string input",
name,
"value originated here",
value.tag,
))]
}
})
.map(futures::stream::iter)
.flatten()
.to_output_stream())
}

View File

@ -50,20 +50,23 @@ impl WholeStreamCommand for Pivot {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, pivot)?.run()
pivot(args, registry)
}
}
pub fn pivot(args: PivotArgs, context: RunnableContext) -> Result<OutputStream, ShellError> {
pub fn pivot(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let name = args.call_info.name_tag.clone();
let stream = async_stream! {
let input = context.input.into_vec().await;
let (args, mut input): (PivotArgs, _) = args.process(&registry).await?;
let input = input.into_vec().await;
let descs = merge_descriptors(&input);
let mut headers: Vec<String> = vec![];
if args.rest.len() > 0 && args.header_row {
yield Err(ShellError::labeled_error("Can not provide header names and use header row", "using header row", context.name));
yield Err(ShellError::labeled_error("Can not provide header names and use header row", "using header row", name));
return;
}
@ -75,17 +78,17 @@ pub fn pivot(args: PivotArgs, context: RunnableContext) -> Result<OutputStream,
if let Ok(s) = x.as_string() {
headers.push(s.to_string());
} else {
yield Err(ShellError::labeled_error("Header row needs string headers", "used non-string headers", context.name));
yield Err(ShellError::labeled_error("Header row needs string headers", "used non-string headers", name));
return;
}
}
_ => {
yield Err(ShellError::labeled_error("Header row is incomplete and can't be used", "using incomplete header row", context.name));
yield Err(ShellError::labeled_error("Header row is incomplete and can't be used", "using incomplete header row", name));
return;
}
}
} else {
yield Err(ShellError::labeled_error("Header row is incomplete and can't be used", "using incomplete header row", context.name));
yield Err(ShellError::labeled_error("Header row is incomplete and can't be used", "using incomplete header row", name));
return;
}
}
@ -107,7 +110,7 @@ pub fn pivot(args: PivotArgs, context: RunnableContext) -> Result<OutputStream,
for desc in descs {
let mut column_num: usize = 0;
let mut dict = TaggedDictBuilder::new(&context.name);
let mut dict = TaggedDictBuilder::new(&name);
if !args.ignore_titles && !args.header_row {
dict.insert_untagged(headers[column_num].clone(), UntaggedValue::string(desc.clone()));

View File

@ -3,7 +3,7 @@ use crate::prelude::*;
use derive_new::new;
use log::trace;
use nu_errors::ShellError;
use nu_protocol::{Primitive, ReturnSuccess, ReturnValue, Signature, UntaggedValue, Value};
use nu_protocol::{ReturnSuccess, ReturnValue, Signature, UntaggedValue, Value};
use serde::{self, Deserialize, Serialize};
use std::io::prelude::*;
use std::io::BufReader;
@ -70,14 +70,16 @@ pub fn filter_plugin(
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
trace!("filter_plugin :: {}", path);
let registry = registry.clone();
let scope = &args
let scope = args
.call_info
.scope
.clone()
.set_it(UntaggedValue::string("$it").into_untagged_value());
let args = args.evaluate_once_with_scope(registry, &scope)?;
let stream = async_stream! {
let mut args = args.evaluate_once_with_scope(&registry, &scope).await?;
let mut child = std::process::Command::new(path)
.stdin(std::process::Stdio::piped())
@ -85,26 +87,12 @@ pub fn filter_plugin(
.spawn()
.expect("Failed to spawn child process");
let mut bos: VecDeque<Value> = VecDeque::new();
bos.push_back(UntaggedValue::Primitive(Primitive::BeginningOfStream).into_untagged_value());
let bos = futures::stream::iter(bos);
let mut eos: VecDeque<Value> = VecDeque::new();
eos.push_back(UntaggedValue::Primitive(Primitive::EndOfStream).into_untagged_value());
let eos = futures::stream::iter(eos);
let call_info = args.call_info.clone();
trace!("filtering :: {:?}", call_info);
let stream = bos
.chain(args.input)
.chain(eos)
.map(move |v| match v {
Value {
value: UntaggedValue::Primitive(Primitive::BeginningOfStream),
..
} => {
// Beginning of the stream
{
let stdin = child.stdin.as_mut().expect("Failed to open stdin");
let stdout = child.stdout.as_mut().expect("Failed to open stdout");
@ -115,20 +103,16 @@ pub fn filter_plugin(
match request_raw {
Err(_) => {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::labeled_error(
yield Err(ShellError::labeled_error(
"Could not load json from plugin",
"could not load json from plugin",
&call_info.name_tag,
)));
return result;
));
}
Ok(request_raw) => match stdin.write(format!("{}\n", request_raw).as_bytes()) {
Ok(_) => {}
Err(err) => {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::unexpected(format!("{}", err))));
return result;
yield Err(ShellError::unexpected(format!("{}", err)));
}
},
}
@ -139,37 +123,82 @@ pub fn filter_plugin(
let response = serde_json::from_str::<NuResult>(&input);
match response {
Ok(NuResult::response { params }) => match params {
Ok(params) => params,
Ok(params) => for param in params { yield param },
Err(e) => {
let mut result = VecDeque::new();
result.push_back(ReturnValue::Err(e));
result
yield ReturnValue::Err(e);
}
},
Err(e) => {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::untagged_runtime_error(format!(
yield Err(ShellError::untagged_runtime_error(format!(
"Error while processing begin_filter response: {:?} {}",
e, input
))));
result
)));
}
}
}
Err(e) => {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::untagged_runtime_error(format!(
yield Err(ShellError::untagged_runtime_error(format!(
"Error while reading begin_filter response: {:?}",
e
))));
result
)));
}
}
}
Value {
value: UntaggedValue::Primitive(Primitive::EndOfStream),
..
} => {
// Stream contents
{
while let Some(v) = args.input.next().await {
let stdin = child.stdin.as_mut().expect("Failed to open stdin");
let stdout = child.stdout.as_mut().expect("Failed to open stdout");
let mut reader = BufReader::new(stdout);
let request = JsonRpc::new("filter", v);
let request_raw = serde_json::to_string(&request);
match request_raw {
Ok(request_raw) => {
let _ = stdin.write(format!("{}\n", request_raw).as_bytes()); // TODO: Handle error
}
Err(e) => {
yield Err(ShellError::untagged_runtime_error(format!(
"Error while processing filter response: {:?}",
e
)));
}
}
let mut input = String::new();
match reader.read_line(&mut input) {
Ok(_) => {
let response = serde_json::from_str::<NuResult>(&input);
match response {
Ok(NuResult::response { params }) => match params {
Ok(params) => for param in params { yield param },
Err(e) => {
yield ReturnValue::Err(e);
}
},
Err(e) => {
yield Err(ShellError::untagged_runtime_error(format!(
"Error while processing filter response: {:?}\n== input ==\n{}",
e, input
)));
}
}
}
Err(e) => {
yield Err(ShellError::untagged_runtime_error(format!(
"Error while reading filter response: {:?}",
e
)));
}
}
}
}
// End of the stream
{
let stdin = child.stdin.as_mut().expect("Failed to open stdin");
let stdout = child.stdout.as_mut().expect("Failed to open stdout");
@ -179,16 +208,15 @@ pub fn filter_plugin(
let request_raw = match serde_json::to_string(&request) {
Ok(req) => req,
Err(err) => {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::unexpected(format!("{}", err))));
return result;
yield Err(ShellError::unexpected(format!("{}", err)));
return;
}
};
let _ = stdin.write(format!("{}\n", request_raw).as_bytes()); // TODO: Handle error
let mut input = String::new();
let result = match reader.read_line(&mut input) {
match reader.read_line(&mut input) {
Ok(_) => {
let response = serde_json::from_str::<NuResult>(&input);
match response {
@ -202,105 +230,40 @@ pub fn filter_plugin(
let _ = stdin.write(format!("{}\n", request_raw).as_bytes()); // TODO: Handle error
}
Err(e) => {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::untagged_runtime_error(format!(
yield Err(ShellError::untagged_runtime_error(format!(
"Error while processing begin_filter response: {:?} {}",
e, input
))));
return result;
)));
return;
}
}
params
//yield ReturnValue::Ok(params)
//yield ReturnSuccess::value(Value)
}
Err(e) => {
let mut result = VecDeque::new();
result.push_back(ReturnValue::Err(e));
result
yield ReturnValue::Err(e);
}
},
Err(e) => {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::untagged_runtime_error(format!(
yield Err(ShellError::untagged_runtime_error(format!(
"Error while processing end_filter response: {:?} {}",
e, input
))));
result
)));
}
}
}
Err(e) => {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::untagged_runtime_error(format!(
yield Err(ShellError::untagged_runtime_error(format!(
"Error while reading end_filter: {:?}",
e
))));
result
)));
}
};
let _ = child.wait();
result
}
_ => {
let stdin = child.stdin.as_mut().expect("Failed to open stdin");
let stdout = child.stdout.as_mut().expect("Failed to open stdout");
let mut reader = BufReader::new(stdout);
let request = JsonRpc::new("filter", v);
let request_raw = serde_json::to_string(&request);
match request_raw {
Ok(request_raw) => {
let _ = stdin.write(format!("{}\n", request_raw).as_bytes()); // TODO: Handle error
}
Err(e) => {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::untagged_runtime_error(format!(
"Error while processing filter response: {:?}",
e
))));
return result;
}
}
let mut input = String::new();
match reader.read_line(&mut input) {
Ok(_) => {
let response = serde_json::from_str::<NuResult>(&input);
match response {
Ok(NuResult::response { params }) => match params {
Ok(params) => params,
Err(e) => {
let mut result = VecDeque::new();
result.push_back(ReturnValue::Err(e));
result
}
},
Err(e) => {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::untagged_runtime_error(format!(
"Error while processing filter response: {:?}\n== input ==\n{}",
e, input
))));
result
}
}
}
Err(e) => {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::untagged_runtime_error(format!(
"Error while reading filter response: {:?}",
e
))));
result
}
}
}
})
.map(futures::stream::iter) // convert to a stream
.flatten();
};
Ok(stream.to_output_stream())
}
@ -339,10 +302,11 @@ pub fn sink_plugin(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let call_info = args.call_info.clone();
let stream = async_stream! {
let input: Vec<Value> = args.input.collect().await;
let request = JsonRpc::new("sink", (call_info.clone(), input));

View File

@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand;
use crate::context::CommandRegistry;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{Signature, SyntaxShape, Value};
use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, Value};
#[derive(Deserialize)]
struct PrependArgs {
@ -33,7 +33,7 @@ impl WholeStreamCommand for Prepend {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, prepend)?.run()
prepend(args, registry)
}
fn examples(&self) -> &[Example] {
@ -44,11 +44,17 @@ impl WholeStreamCommand for Prepend {
}
}
fn prepend(
PrependArgs { row }: PrependArgs,
RunnableContext { input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let prepend = futures::stream::iter(vec![row]);
fn prepend(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
Ok(prepend.chain(input).to_output_stream())
let stream = async_stream! {
let (PrependArgs { row }, mut input) = args.process(&registry).await?;
yield ReturnSuccess::value(row);
while let Some(item) = input.next().await {
yield ReturnSuccess::value(item);
}
};
Ok(stream.to_output_stream())
}

View File

@ -35,7 +35,17 @@ impl WholeStreamCommand for Pwd {
}
pub fn pwd(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let shell_manager = args.shell_manager.clone();
let args = args.evaluate_once(registry)?;
shell_manager.pwd(args)
let args = args.evaluate_once(&registry).await?;
let mut out = shell_manager.pwd(args)?;
while let Some(l) = out.next().await {
yield l;
}
};
Ok(stream.to_output_stream())
}

View File

@ -3,7 +3,7 @@ use crate::context::CommandRegistry;
use crate::deserializer::NumericRange;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{Signature, SyntaxShape};
use nu_protocol::{ReturnSuccess, Signature, SyntaxShape};
use nu_source::Tagged;
#[derive(Deserialize)]
@ -35,14 +35,14 @@ impl WholeStreamCommand for Range {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, range)?.run()
range(args, registry)
}
}
fn range(
RangeArgs { area }: RangeArgs,
RunnableContext { input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
fn range(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (RangeArgs { area }, mut input) = args.process(&registry).await?;
let range = area.item;
let (from, _) = range.from;
let (to, _) = range.to;
@ -50,5 +50,11 @@ fn range(
let from = *from as usize;
let to = *to as usize;
Ok(input.skip(from).take(to - from + 1).to_output_stream())
let mut inp = input.skip(from).take(to - from + 1);
while let Some(item) = inp.next().await {
yield ReturnSuccess::value(item);
}
};
Ok(stream.to_output_stream())
}

View File

@ -36,15 +36,18 @@ impl WholeStreamCommand for ReduceBy {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, reduce_by)?.run()
reduce_by(args, registry)
}
}
pub fn reduce_by(
ReduceByArgs { reduce_with }: ReduceByArgs,
RunnableContext { input, name, .. }: RunnableContext,
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let name = args.call_info.name_tag.clone();
let stream = async_stream! {
let (ReduceByArgs { reduce_with }, mut input) = args.process(&registry).await?;
let values: Vec<Value> = input.collect().await;
if values.is_empty() {

View File

@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand;
use crate::data::base::reject_fields;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{Signature, SyntaxShape};
use nu_protocol::{ReturnSuccess, Signature, SyntaxShape};
use nu_source::Tagged;
#[derive(Deserialize)]
@ -30,25 +30,30 @@ impl WholeStreamCommand for Reject {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, reject)?.run()
reject(args, registry)
}
}
fn reject(
RejectArgs { rest: fields }: RejectArgs,
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
fn reject(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let name = args.call_info.name_tag.clone();
let (RejectArgs { rest: fields }, mut input) = args.process(&registry).await?;
if fields.is_empty() {
return Err(ShellError::labeled_error(
yield Err(ShellError::labeled_error(
"Reject requires fields",
"needs parameter",
name,
));
return;
}
let fields: Vec<_> = fields.iter().map(|f| f.item.clone()).collect();
let stream = input.map(move |item| reject_fields(&item, &fields, &item.tag));
Ok(stream.from_input_stream())
while let Some(item) = input.next().await {
yield ReturnSuccess::value(reject_fields(&item, &fields, &item.tag));
}
};
Ok(stream.to_output_stream())
}

View File

@ -37,7 +37,7 @@ impl WholeStreamCommand for Rename {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, rename)?.run()
rename(args, registry)
}
fn examples(&self) -> &[Example] {
@ -54,19 +54,17 @@ impl WholeStreamCommand for Rename {
}
}
pub fn rename(
Arguments { column_name, rest }: Arguments,
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
pub fn rename(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let name = args.call_info.name_tag.clone();
let stream = async_stream! {
let (Arguments { column_name, rest }, mut input) = args.process(&registry).await?;
let mut new_column_names = vec![vec![column_name]];
new_column_names.push(rest);
let new_column_names = new_column_names.into_iter().flatten().collect::<Vec<_>>();
let stream = input
.map(move |item| {
let mut result = VecDeque::new();
while let Some(item) = input.next().await {
if let Value {
value: UntaggedValue::Row(row),
tag,
@ -86,21 +84,19 @@ pub fn rename(
let out = UntaggedValue::Row(renamed_row.into()).into_value(tag);
result.push_back(ReturnSuccess::value(out));
yield ReturnSuccess::value(out);
} else {
result.push_back(ReturnSuccess::value(
yield ReturnSuccess::value(
UntaggedValue::Error(ShellError::labeled_error(
"no column names available",
"can't rename",
&name,
))
.into_untagged_value(),
));
);
}
futures::stream::iter(result)
})
.flatten();
}
};
Ok(stream.to_output_stream())
}

View File

@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand;
use crate::context::CommandRegistry;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::Signature;
use nu_protocol::{ReturnSuccess, Signature};
pub struct Reverse;
@ -36,15 +36,16 @@ impl WholeStreamCommand for Reverse {
}
fn reverse(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let (input, _args) = args.parts();
let input = input.collect::<Vec<_>>();
let output = input.map(move |mut vec| {
vec.reverse();
futures::stream::iter(vec)
});
Ok(output.flatten_stream().from_input_stream())
let input = input.collect::<Vec<_>>().await;
for output in input.into_iter().rev() {
yield ReturnSuccess::value(output);
}
};
Ok(stream.to_output_stream())
}

View File

@ -41,7 +41,7 @@ impl WholeStreamCommand for Remove {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, rm)?.run()
rm(args, registry)
}
fn examples(&self) -> &[Example] {
@ -58,7 +58,17 @@ impl WholeStreamCommand for Remove {
}
}
fn rm(args: RemoveArgs, context: RunnableContext) -> Result<OutputStream, ShellError> {
let shell_manager = context.shell_manager.clone();
shell_manager.rm(args, &context)
fn rm(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let name = args.call_info.name_tag.clone();
let shell_manager = args.shell_manager.clone();
let (args, _): (RemoveArgs, _) = args.process(&registry).await?;
let mut result = shell_manager.rm(args, name)?;
while let Some(item) = result.next().await {
yield item;
}
};
Ok(stream.to_output_stream())
}

View File

@ -47,7 +47,7 @@ impl WholeStreamCommand for AliasCommand {
let stream = async_stream! {
let mut scope = call_info.scope.clone();
let evaluated = call_info.evaluate(&registry)?;
let evaluated = call_info.evaluate(&registry).await?;
if let Some(positional) = &evaluated.args.positional {
for (pos, arg) in positional.iter().enumerate() {
scope = scope.set_var(alias_command.args[pos].to_string(), arg.clone());

View File

@ -117,16 +117,7 @@ impl WholeStreamCommand for RunExternalCommand {
})
};
let context = RunnableContext {
input: InputStream::empty(),
shell_manager: external_context.shell_manager.clone(),
host: external_context.host.clone(),
ctrl_c: external_context.ctrl_c.clone(),
registry: external_context.registry.clone(),
name: args.call_info.name_tag.clone(),
};
let result = external_context.shell_manager.cd(cd_args, &context);
let result = external_context.shell_manager.cd(cd_args, args.call_info.name_tag.clone());
match result {
Ok(mut stream) => {
while let Some(value) = stream.next().await {

View File

@ -153,31 +153,23 @@ impl WholeStreamCommand for Save {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
Ok(args.process_raw(registry, save)?.run())
save(args, registry)
}
}
fn save(
SaveArgs {
path,
raw: save_raw,
}: SaveArgs,
RunnableContext {
input,
name,
shell_manager,
host,
ctrl_c,
registry,
..
}: RunnableContext,
raw_args: RawCommandArgs,
) -> Result<OutputStream, ShellError> {
let mut full_path = PathBuf::from(shell_manager.path());
let name_tag = name.clone();
fn save(raw_args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let mut full_path = PathBuf::from(raw_args.shell_manager.path());
let name_tag = raw_args.call_info.name_tag.clone();
let name = raw_args.call_info.name_tag.clone();
let scope = raw_args.call_info.scope.clone();
let registry = registry.clone();
let host = raw_args.host.clone();
let ctrl_c = raw_args.ctrl_c.clone();
let shell_manager = raw_args.shell_manager.clone();
let stream = async_stream! {
let head = raw_args.call_info.args.head.clone();
let (SaveArgs { path, raw: save_raw }, mut input) = raw_args.process(&registry).await?;
let input: Vec<Value> = input.collect().await;
if path.is_none() {
// If there is no filename, check the metadata for the anchor filename
@ -230,13 +222,13 @@ fn save(
shell_manager,
call_info: UnevaluatedCallInfo {
args: nu_protocol::hir::Call {
head: raw_args.call_info.args.head,
head,
positional: None,
named: None,
span: Span::unknown(),
is_last: false,
},
name_tag: raw_args.call_info.name_tag,
name_tag: name_tag.clone(),
scope,
}
};

View File

@ -3,8 +3,8 @@ use crate::context::CommandRegistry;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{
ColumnPath, PathMember, Primitive, ReturnSuccess, ReturnValue, Signature, SyntaxShape,
TaggedDictBuilder, UnspannedPathMember, UntaggedValue, Value,
ColumnPath, PathMember, Primitive, ReturnSuccess, Signature, SyntaxShape, TaggedDictBuilder,
UnspannedPathMember, UntaggedValue, Value,
};
use nu_source::span_for_spanned_list;
use nu_value_ext::{as_string, get_data_by_column_path};
@ -37,7 +37,7 @@ impl WholeStreamCommand for Select {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, select)?.run()
select(args, registry)
}
fn examples(&self) -> &[Example] {
@ -54,18 +54,18 @@ impl WholeStreamCommand for Select {
}
}
fn select(
SelectArgs { rest: mut fields }: SelectArgs,
RunnableContext {
mut input, name, ..
}: RunnableContext,
) -> Result<OutputStream, ShellError> {
fn select(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let name = args.call_info.name_tag.clone();
let stream = async_stream! {
let (SelectArgs { rest: mut fields }, mut input) = args.process(&registry).await?;
if fields.is_empty() {
return Err(ShellError::labeled_error(
yield Err(ShellError::labeled_error(
"Select requires columns to select",
"needs parameter",
name,
));
return;
}
let member = fields.remove(0);
@ -76,8 +76,6 @@ fn select(
.flatten()
.cloned()
.collect::<Vec<ColumnPath>>();
let stream = async_stream! {
let mut empty = true;
let mut bring_back: indexmap::IndexMap<String, Vec<Value>> = indexmap::IndexMap::new();
@ -172,7 +170,5 @@ fn select(
}
};
let stream: BoxStream<'static, ReturnValue> = stream.boxed();
Ok(stream.to_output_stream())
}

View File

@ -39,15 +39,14 @@ impl WholeStreamCommand for Shuffle {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, shuffle)?.run()
shuffle(args, registry)
}
}
fn shuffle(
Arguments { limit }: Arguments,
RunnableContext { input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
fn shuffle(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (Arguments { limit }, mut input) = args.process(&registry).await?;
let mut values: Vec<Value> = input.collect().await;
let out = if let Some(n) = limit {

View File

@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand;
use crate::context::CommandRegistry;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{Signature, SyntaxShape};
use nu_protocol::{ReturnSuccess, Signature, SyntaxShape};
use nu_source::Tagged;
pub struct Skip;
@ -30,7 +30,7 @@ impl WholeStreamCommand for Skip {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, skip)?.run()
skip(args, registry)
}
fn examples(&self) -> &[Example] {
@ -41,12 +41,25 @@ impl WholeStreamCommand for Skip {
}
}
fn skip(SkipArgs { rows }: SkipArgs, context: RunnableContext) -> Result<OutputStream, ShellError> {
let rows_desired = if let Some(quantity) = rows {
fn skip(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (SkipArgs { rows }, mut input) = args.process(&registry).await?;
let mut rows_desired = if let Some(quantity) = rows {
*quantity
} else {
1
};
Ok(OutputStream::from_input(context.input.skip(rows_desired)))
while let Some(input) = input.next().await {
if rows_desired == 0 {
yield ReturnSuccess::value(input);
}
if rows_desired > 0{
rows_desired -= 1;
}
}
};
Ok(stream.to_output_stream())
}

View File

@ -3,7 +3,9 @@ use crate::evaluate::evaluate_baseline_expr;
use crate::prelude::*;
use log::trace;
use nu_errors::ShellError;
use nu_protocol::{hir::ClassifiedCommand, Signature, SyntaxShape, UntaggedValue, Value};
use nu_protocol::{
hir::ClassifiedCommand, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value,
};
pub struct SkipUntil;
@ -33,7 +35,8 @@ impl WholeStreamCommand for SkipUntil {
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let scope = args.call_info.scope.clone();
let call_info = args.evaluate_once(&registry)?;
let stream = async_stream! {
let mut call_info = args.evaluate_once(&registry).await?;
let block = call_info.args.expect_nth(0)?.clone();
@ -43,56 +46,69 @@ impl WholeStreamCommand for SkipUntil {
tag,
} => {
if block.block.len() != 1 {
return Err(ShellError::labeled_error(
yield Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
return;
}
match block.block[0].list.get(0) {
Some(item) => match item {
ClassifiedCommand::Expr(expr) => expr.clone(),
_ => {
return Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
))
}
},
None => {
return Err(ShellError::labeled_error(
yield Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
return;
}
},
None => {
yield Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
return;
}
}
}
Value { tag, .. } => {
return Err(ShellError::labeled_error(
yield Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
return;
}
};
let objects = call_info.input.skip_while(move |item| {
let mut skipping = true;
while let Some(item) = call_info.input.next().await {
let condition = condition.clone();
trace!("ITEM = {:?}", item);
let result =
evaluate_baseline_expr(&*condition, &registry, &scope.clone().set_it(item.clone()));
evaluate_baseline_expr(&*condition, &registry, &scope.clone().set_it(item.clone()))
.await;
trace!("RESULT = {:?}", result);
let return_value = match result {
Ok(ref v) if v.is_true() => false,
_ => true,
Ok(ref v) if v.is_true() => true,
_ => false,
};
futures::future::ready(return_value)
});
if return_value {
skipping = false;
}
Ok(objects.from_input_stream())
if !skipping {
yield ReturnSuccess::value(item);
}
}
};
Ok(stream.to_output_stream())
}
}

View File

@ -3,7 +3,9 @@ use crate::evaluate::evaluate_baseline_expr;
use crate::prelude::*;
use log::trace;
use nu_errors::ShellError;
use nu_protocol::{hir::ClassifiedCommand, Signature, SyntaxShape, UntaggedValue, Value};
use nu_protocol::{
hir::ClassifiedCommand, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value,
};
pub struct SkipWhile;
@ -33,7 +35,8 @@ impl WholeStreamCommand for SkipWhile {
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let scope = args.call_info.scope.clone();
let call_info = args.evaluate_once(&registry)?;
let stream = async_stream! {
let mut call_info = args.evaluate_once(&registry).await?;
let block = call_info.args.expect_nth(0)?.clone();
@ -43,56 +46,69 @@ impl WholeStreamCommand for SkipWhile {
tag,
} => {
if block.block.len() != 1 {
return Err(ShellError::labeled_error(
yield Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
return;
}
match block.block[0].list.get(0) {
Some(item) => match item {
ClassifiedCommand::Expr(expr) => expr.clone(),
_ => {
return Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
))
}
},
None => {
return Err(ShellError::labeled_error(
yield Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
return;
}
},
None => {
yield Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
return;
}
}
}
Value { tag, .. } => {
return Err(ShellError::labeled_error(
yield Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
return;
}
};
let objects = call_info.input.skip_while(move |item| {
let mut skipping = true;
while let Some(item) = call_info.input.next().await {
let condition = condition.clone();
trace!("ITEM = {:?}", item);
let result =
evaluate_baseline_expr(&*condition, &registry, &scope.clone().set_it(item.clone()));
evaluate_baseline_expr(&*condition, &registry, &scope.clone().set_it(item.clone()))
.await;
trace!("RESULT = {:?}", result);
let return_value = match result {
Ok(ref v) if v.is_true() => true,
_ => false,
Ok(ref v) if v.is_true() => false,
_ => true,
};
futures::future::ready(return_value)
});
if return_value {
skipping = false;
}
Ok(objects.from_input_stream())
if !skipping {
yield ReturnSuccess::value(item);
}
}
};
Ok(stream.to_output_stream())
}
}

View File

@ -30,7 +30,7 @@ impl WholeStreamCommand for SortBy {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, sort_by)?.run()
sort_by(args, registry)
}
fn examples(&self) -> &[Example] {
@ -47,12 +47,11 @@ impl WholeStreamCommand for SortBy {
}
}
fn sort_by(
SortByArgs { rest }: SortByArgs,
mut context: RunnableContext,
) -> Result<OutputStream, ShellError> {
Ok(OutputStream::new(async_stream! {
let mut vec = context.input.drain_vec().await;
fn sort_by(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let (SortByArgs { rest }, mut input) = args.process(&registry).await?;
let mut vec = input.drain_vec().await;
if vec.is_empty() {
return;
@ -78,5 +77,7 @@ fn sort_by(
for item in vec {
yield item.into();
}
}))
};
Ok(stream.to_output_stream())
}

View File

@ -35,15 +35,15 @@ impl WholeStreamCommand for SplitBy {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, split_by)?.run()
split_by(args, registry)
}
}
pub fn split_by(
SplitByArgs { column_name }: SplitByArgs,
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
pub fn split_by(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let name = args.call_info.name_tag.clone();
let (SplitByArgs { column_name }, mut input) = args.process(&registry).await?;
let values: Vec<Value> = input.collect().await;
if values.len() > 1 || values.is_empty() {

View File

@ -42,22 +42,16 @@ impl WholeStreamCommand for SplitColumn {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, split_column)?.run()
split_column(args, registry)
}
}
fn split_column(
SplitColumnArgs {
separator,
rest,
collapse_empty,
}: SplitColumnArgs,
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let name_span = name.span;
Ok(input
.map(move |v| {
fn split_column(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let name_span = args.call_info.name_tag.span;
let registry = registry.clone();
let stream = async_stream! {
let (SplitColumnArgs { separator, rest, collapse_empty }, mut input) = args.process(&registry).await?;
while let Some(v) = input.next().await {
if let Ok(s) = v.as_string() {
let splitter = separator.replace("\\n", "\n");
trace!("splitting with {:?}", splitter);
@ -84,7 +78,7 @@ fn split_column(
dict.insert_untagged(v.clone(), Primitive::String(k.into()));
}
ReturnSuccess::value(dict.into_value())
yield ReturnSuccess::value(dict.into_value());
} else {
let mut dict = TaggedDictBuilder::new(&v.tag);
for (&k, v) in split_result.iter().zip(positional.iter()) {
@ -93,17 +87,19 @@ fn split_column(
UntaggedValue::Primitive(Primitive::String(k.into())),
);
}
ReturnSuccess::value(dict.into_value())
yield ReturnSuccess::value(dict.into_value());
}
} else {
Err(ShellError::labeled_error_with_secondary(
yield Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
name_span,
"value originates from here",
v.tag.span,
))
));
}
})
.to_output_stream())
}
};
Ok(stream.to_output_stream())
}

View File

@ -34,16 +34,16 @@ impl WholeStreamCommand for SplitRow {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, split_row)?.run()
split_row(args, registry)
}
}
fn split_row(
SplitRowArgs { separator }: SplitRowArgs,
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let stream = input
.map(move |v| {
fn split_row(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let name = args.call_info.name_tag.clone();
let (SplitRowArgs { separator }, mut input) = args.process(&registry).await?;
while let Some(v) = input.next().await {
if let Ok(s) = v.as_string() {
let splitter = separator.item.replace("\\n", "\n");
trace!("splitting with {:?}", splitter);
@ -51,26 +51,22 @@ fn split_row(
trace!("split result = {:?}", split_result);
let mut result = VecDeque::new();
for s in split_result {
result.push_back(ReturnSuccess::value(
yield ReturnSuccess::value(
UntaggedValue::Primitive(Primitive::String(s.into())).into_value(&v.tag),
));
);
}
futures::stream::iter(result)
} else {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::labeled_error_with_secondary(
yield Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
name.span,
"value originates from here",
v.tag.span,
)));
futures::stream::iter(result)
));
}
})
.flatten();
}
};
Ok(stream.to_output_stream())
}

View File

@ -56,19 +56,15 @@ impl WholeStreamCommand for TSortBy {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, t_sort_by)?.run()
t_sort_by(args, registry)
}
}
fn t_sort_by(
TSortByArgs {
show_columns,
group_by,
..
}: TSortByArgs,
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
Ok(OutputStream::new(async_stream! {
fn t_sort_by(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let name = args.call_info.name_tag.clone();
let (TSortByArgs { show_columns, group_by, ..}, mut input) = args.process(&registry).await?;
let values: Vec<Value> = input.collect().await;
let column_grouped_by_name = if let Some(grouped_by) = group_by {
@ -87,5 +83,7 @@ fn t_sort_by(
Err(err) => yield Err(err)
}
}
}))
};
Ok(stream.to_output_stream())
}

View File

@ -38,10 +38,11 @@ impl WholeStreamCommand for Table {
}
fn table(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let mut args = args.evaluate_once(registry)?;
let registry = registry.clone();
let stream = async_stream! {
let mut args = args.evaluate_once(&registry).await?;
let mut finished = false;
let stream = async_stream! {
let host = args.host.clone();
let mut start_number = match args.get("start_number") {
Some(Value { value: UntaggedValue::Primitive(Primitive::Int(i)), .. }) => {

View File

@ -1,8 +1,9 @@
use crate::commands::WholeStreamCommand;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::Signature;
use nu_protocol::{ReturnSuccess, Signature, UntaggedValue};
#[derive(Clone)]
pub struct To;
impl WholeStreamCommand for To {
@ -23,6 +24,14 @@ impl WholeStreamCommand for To {
_args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
Ok(crate::commands::help::get_help(self, registry).into())
let registry = registry.clone();
let stream = async_stream! {
yield Ok(ReturnSuccess::Value(
UntaggedValue::string(crate::commands::help::get_help(&To, &registry))
.into_value(Tag::unknown()),
));
};
Ok(stream.to_output_stream())
}
}

View File

@ -261,11 +261,12 @@ fn bson_value_to_bytes(bson: Bson, tag: Tag) -> Result<Vec<u8>, ShellError> {
}
fn to_bson(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let registry = registry.clone();
let stream = async_stream! {
let args = args.evaluate_once(&registry).await?;
let name_tag = args.name_tag();
let name_span = name_tag.span;
let stream = async_stream! {
let input: Vec<Value> = args.input.collect().await;
let to_process_input = if input.len() > 1 {

View File

@ -41,17 +41,15 @@ impl WholeStreamCommand for ToCSV {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, to_csv)?.run()
to_csv(args, registry)
}
}
fn to_csv(
ToCSVArgs {
separator,
headerless,
}: ToCSVArgs,
runnable_context: RunnableContext,
) -> Result<OutputStream, ShellError> {
fn to_csv(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let name = args.call_info.name_tag.clone();
let (ToCSVArgs { separator, headerless }, mut input) = args.process(&registry).await?;
let sep = match separator {
Some(Value {
value: UntaggedValue::Primitive(Primitive::String(s)),
@ -63,11 +61,12 @@ fn to_csv(
} else {
let vec_s: Vec<char> = s.chars().collect();
if vec_s.len() != 1 {
return Err(ShellError::labeled_error(
yield Err(ShellError::labeled_error(
"Expected a single separator char from --separator",
"requires a single character string input",
tag,
));
return;
};
vec_s[0]
}
@ -75,5 +74,12 @@ fn to_csv(
_ => ',',
};
to_delimited_data(headerless, sep, "CSV", runnable_context)
let mut result = to_delimited_data(headerless, sep, "CSV", input, name)?;
while let Some(item) = result.next().await {
yield item;
}
};
Ok(stream.to_output_stream())
}

View File

@ -169,7 +169,8 @@ pub fn to_delimited_data(
headerless: bool,
sep: char,
format_name: &'static str,
RunnableContext { input, name, .. }: RunnableContext,
input: InputStream,
name: Tag,
) -> Result<OutputStream, ShellError> {
let name_tag = name;
let name_span = name_tag.span;

Some files were not shown because too many files have changed in this diff Show More