Pipeline blocks (#1579)

* Making Commands match what UntaggedValue needs

* WIP

* WIP

* WIP

* Moved to expressions for conditions

* Add 'each' command to use command blocks

* More cleanup

* Add test for 'each'

* Instead use an expression block
This commit is contained in:
Jonathan Turner
2020-04-13 19:59:57 +12:00
committed by GitHub
parent 85d6b24be3
commit 08a09e2273
40 changed files with 544 additions and 444 deletions

View File

@ -10,7 +10,7 @@ use crate::prelude::*;
use futures_codec::FramedRead;
use nu_errors::ShellError;
use nu_parser::{ClassifiedCommand, ExternalCommand};
use nu_protocol::hir::{ClassifiedCommand, ExternalCommand};
use nu_protocol::{Primitive, ReturnSuccess, Signature, UntaggedValue, Value};
use log::{debug, trace};
@ -304,6 +304,7 @@ pub fn create_default_context(
whole_stream_command(Range),
whole_stream_command(Rename),
whole_stream_command(Uniq),
per_item_command(Each),
// Table manipulation
whole_stream_command(Shuffle),
whole_stream_command(Wrap),

View File

@ -20,6 +20,7 @@ pub(crate) mod date;
pub(crate) mod debug;
pub(crate) mod default;
pub(crate) mod du;
pub(crate) mod each;
pub(crate) mod echo;
pub(crate) mod edit;
pub(crate) mod enter;
@ -124,6 +125,7 @@ pub(crate) use date::Date;
pub(crate) use debug::Debug;
pub(crate) use default::Default;
pub(crate) use du::Du;
pub(crate) use each::Each;
pub(crate) use echo::Echo;
pub(crate) use edit::Edit;
pub(crate) mod kill;

View File

@ -2,7 +2,7 @@ use crate::commands::UnevaluatedCallInfo;
use crate::commands::WholeStreamCommand;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_parser::{hir, hir::Expression, hir::Literal, hir::SpannedExpression};
use nu_protocol::{hir, hir::Expression, hir::Literal, hir::SpannedExpression};
use nu_protocol::{Primitive, ReturnSuccess, Signature, UntaggedValue, Value};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;

View File

@ -1,5 +1,5 @@
use derive_new::new;
use nu_parser::hir;
use nu_protocol::hir;
#[derive(new, Debug)]
pub(crate) struct Command {

View File

@ -6,8 +6,7 @@ use futures::stream::StreamExt;
use futures_codec::FramedRead;
use log::trace;
use nu_errors::ShellError;
use nu_parser::ExternalArg;
use nu_parser::ExternalCommand;
use nu_protocol::hir::{ExternalArg, ExternalCommand};
use nu_protocol::{ColumnPath, Primitive, ShellTypeName, UntaggedValue, Value};
use nu_source::{Tag, Tagged};
use nu_value_ext::as_column_path;

View File

@ -2,7 +2,7 @@ use crate::commands::UnevaluatedCallInfo;
use crate::prelude::*;
use log::{log_enabled, trace};
use nu_errors::ShellError;
use nu_parser::InternalCommand;
use nu_protocol::hir::InternalCommand;
use nu_protocol::{CommandAction, Primitive, ReturnSuccess, UntaggedValue, Value};
pub(crate) fn run_internal_command(
@ -61,7 +61,7 @@ pub(crate) fn run_internal_command(
ctrl_c: context.ctrl_c.clone(),
shell_manager: context.shell_manager.clone(),
call_info: UnevaluatedCallInfo {
args: nu_parser::hir::Call {
args: nu_protocol::hir::Call {
head: command.args.head,
positional: None,
named: None,

View File

@ -3,7 +3,7 @@ use crate::commands::classified::internal::run_internal_command;
use crate::context::Context;
use crate::stream::InputStream;
use nu_errors::ShellError;
use nu_parser::{ClassifiedCommand, ClassifiedPipeline};
use nu_protocol::hir::{ClassifiedCommand, ClassifiedPipeline};
pub(crate) async fn run_pipeline(
pipeline: ClassifiedPipeline,

View File

@ -6,7 +6,7 @@ use crate::prelude::*;
use derive_new::new;
use getset::Getters;
use nu_errors::ShellError;
use nu_parser::hir;
use nu_protocol::hir;
use nu_protocol::{CallInfo, EvaluatedArgs, ReturnValue, Scope, Signature, Value};
use serde::{Deserialize, Serialize};
use std::ops::Deref;
@ -364,6 +364,14 @@ impl EvaluatedCommandArgs {
self.call_info.args.nth(pos)
}
/// Get the nth positional argument, error if not possible
pub fn expect_nth(&self, pos: usize) -> Result<&Value, ShellError> {
match self.call_info.args.nth(pos) {
None => Err(ShellError::unimplemented("Better error: expect_nth")),
Some(item) => Ok(item),
}
}
pub fn get(&self, name: &str) -> Option<&Value> {
self.call_info.args.get(name)
}

View File

@ -0,0 +1,89 @@
use crate::commands::classified::pipeline::run_pipeline;
use crate::commands::PerItemCommand;
use crate::context::CommandRegistry;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{
hir::ClassifiedPipeline, CallInfo, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value,
};
pub struct Each;
impl PerItemCommand for Each {
fn name(&self) -> &str {
"each"
}
fn signature(&self) -> Signature {
Signature::build("each").required(
"block",
SyntaxShape::Block,
"the block to run on each row",
)
}
fn usage(&self) -> &str {
"Run a block on each row of the table."
}
fn run(
&self,
call_info: &CallInfo,
registry: &CommandRegistry,
raw_args: &RawCommandArgs,
input: Value,
) -> Result<OutputStream, ShellError> {
let call_info = call_info.clone();
let registry = registry.clone();
let raw_args = raw_args.clone();
let stream = async_stream! {
match call_info.args.expect_nth(0)? {
Value {
value: UntaggedValue::Block(block),
tag
} => {
let mut context = Context::from_raw(&raw_args, &registry);
let input_stream = async_stream! {
yield Ok(input.clone())
}.to_input_stream();
let result = run_pipeline(
ClassifiedPipeline::new(block.clone(), None),
&mut context,
Some(input_stream),
).await;
match result {
Ok(Some(v)) => {
let results: Vec<Value> = v.collect().await;
for result in results {
yield Ok(ReturnSuccess::Value(result));
}
}
Ok(None) => {
yield Err(ShellError::labeled_error(
"Expected a block",
"each needs a block",
tag,
));
}
Err(e) => {
yield Err(e);
}
}
}
Value { tag, .. } => {
yield Err(ShellError::labeled_error(
"Expected a block",
"each needs a block",
tag,
))
}
};
};
Ok(stream.to_output_stream())
}
}

View File

@ -97,7 +97,7 @@ impl PerItemCommand for Enter {
ctrl_c: raw_args.ctrl_c,
shell_manager: raw_args.shell_manager,
call_info: UnevaluatedCallInfo {
args: nu_parser::hir::Call {
args: nu_protocol::hir::Call {
head: raw_args.call_info.args.head,
positional: None,
named: None,

View File

@ -228,7 +228,7 @@ fn save(
ctrl_c,
shell_manager,
call_info: UnevaluatedCallInfo {
args: nu_parser::hir::Call {
args: nu_protocol::hir::Call {
head: raw_args.call_info.args.head,
positional: None,
named: None,

View File

@ -1,16 +1,12 @@
use crate::commands::WholeStreamCommand;
use crate::evaluate::evaluate_baseline_expr;
use crate::prelude::*;
use log::trace;
use nu_errors::ShellError;
use nu_protocol::{Evaluate, Scope, Signature, SyntaxShape};
use nu_protocol::{hir::ClassifiedCommand, Scope, Signature, SyntaxShape, UntaggedValue, Value};
pub struct SkipWhile;
#[derive(Deserialize)]
pub struct SkipWhileArgs {
condition: Evaluate,
}
impl WholeStreamCommand for SkipWhile {
fn name(&self) -> &str {
"skip-while"
@ -20,7 +16,7 @@ impl WholeStreamCommand for SkipWhile {
Signature::build("skip-while")
.required(
"condition",
SyntaxShape::Block,
SyntaxShape::Condition,
"the condition that must be met to continue skipping",
)
.filter()
@ -35,26 +31,57 @@ impl WholeStreamCommand for SkipWhile {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
args.process(registry, skip_while)?.run()
}
}
let registry = registry.clone();
let call_info = args.evaluate_once(&registry)?;
pub fn skip_while(
SkipWhileArgs { condition }: SkipWhileArgs,
RunnableContext { input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let objects = input.values.skip_while(move |item| {
trace!("ITEM = {:?}", item);
let result = condition.invoke(&Scope::new(item.clone()));
trace!("RESULT = {:?}", result);
let block = call_info.args.expect_nth(0)?.clone();
let return_value = match result {
Ok(ref v) if v.is_true() => true,
_ => false,
let condition = match block {
Value {
value: UntaggedValue::Block(block),
tag,
} => match block.list.get(0) {
Some(item) => match item {
ClassifiedCommand::Expr(expr) => expr.clone(),
_ => {
return Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
))
}
},
None => {
return Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
}
},
Value { tag, .. } => {
return Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
}
};
futures::future::ready(return_value)
});
let objects = call_info.input.values.skip_while(move |item| {
let condition = condition.clone();
trace!("ITEM = {:?}", item);
let result = evaluate_baseline_expr(&*condition, &registry, &Scope::new(item.clone()));
trace!("RESULT = {:?}", result);
Ok(objects.from_input_stream())
let return_value = match result {
Ok(ref v) if v.is_true() => true,
_ => false,
};
futures::future::ready(return_value)
});
Ok(objects.from_input_stream())
}
}

View File

@ -1,8 +1,12 @@
use crate::commands::PerItemCommand;
use crate::context::CommandRegistry;
use crate::evaluate::evaluate_baseline_expr;
use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{CallInfo, ReturnSuccess, Scope, Signature, SyntaxShape, UntaggedValue, Value};
use nu_protocol::{
hir::ClassifiedCommand, CallInfo, ReturnSuccess, Scope, Signature, SyntaxShape, UntaggedValue,
Value,
};
pub struct Where;
@ -14,7 +18,7 @@ impl PerItemCommand for Where {
fn signature(&self) -> Signature {
Signature::build("where").required(
"condition",
SyntaxShape::Block,
SyntaxShape::Condition,
"the condition that must match",
)
}
@ -26,37 +30,57 @@ impl PerItemCommand for Where {
fn run(
&self,
call_info: &CallInfo,
_registry: &CommandRegistry,
registry: &CommandRegistry,
_raw_args: &RawCommandArgs,
input: Value,
) -> Result<OutputStream, ShellError> {
let condition = call_info.args.expect_nth(0)?;
let stream = match condition {
let block = call_info.args.expect_nth(0)?.clone();
let condition = match block {
Value {
value: UntaggedValue::Block(block),
..
} => {
let result = block.invoke(&Scope::new(input.clone()));
match result {
Ok(v) => {
if v.is_true() {
VecDeque::from(vec![Ok(ReturnSuccess::Value(input))])
} else {
VecDeque::new()
}
tag,
} => match block.list.get(0) {
Some(item) => match item {
ClassifiedCommand::Expr(expr) => expr.clone(),
_ => {
return Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
))
}
Err(e) => return Err(e),
},
None => {
return Err(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
tag,
));
}
}
},
Value { tag, .. } => {
return Err(ShellError::labeled_error(
"Expected a condition",
"where needs a condition",
"expected a condition",
tag,
))
));
}
};
let condition = evaluate_baseline_expr(&condition, registry, &Scope::new(input.clone()))?;
let stream = match condition.as_bool() {
Ok(b) => {
if b {
VecDeque::from(vec![Ok(ReturnSuccess::Value(input))])
} else {
VecDeque::new()
}
}
Err(e) => return Err(e),
};
Ok(stream.into())
}
}

View File

@ -1,11 +1,13 @@
use crate::commands::{command::CommandArgs, Command, UnevaluatedCallInfo};
use crate::commands::{
command::CommandArgs, command::RawCommandArgs, Command, UnevaluatedCallInfo,
};
use crate::env::host::Host;
use crate::shell::shell_manager::ShellManager;
use crate::stream::{InputStream, OutputStream};
use indexmap::IndexMap;
use nu_errors::ShellError;
use nu_parser::{hir, SignatureRegistry};
use nu_protocol::Signature;
use nu_parser::SignatureRegistry;
use nu_protocol::{hir, Signature};
use nu_source::{Tag, Text};
use parking_lot::Mutex;
use std::error::Error;
@ -40,12 +42,6 @@ impl CommandRegistry {
}
impl CommandRegistry {
pub(crate) fn empty() -> CommandRegistry {
CommandRegistry {
registry: Arc::new(Mutex::new(IndexMap::default())),
}
}
pub(crate) fn get_command(&self, name: &str) -> Option<Arc<Command>> {
let registry = self.registry.lock();
@ -92,6 +88,30 @@ impl Context {
&self.registry
}
pub(crate) fn from_raw(raw_args: &RawCommandArgs, registry: &CommandRegistry) -> Context {
#[cfg(windows)]
{
Context {
registry: registry.clone(),
host: raw_args.host.clone(),
current_errors: Arc::new(Mutex::new(vec![])),
ctrl_c: raw_args.ctrl_c.clone(),
shell_manager: raw_args.shell_manager.clone(),
windows_drives_previous_cwd: Arc::new(Mutex::new(std::collections::HashMap::new())),
}
}
#[cfg(not(windows))]
{
Context {
registry: registry.clone(),
host: raw_args.host.clone(),
current_errors: Arc::new(Mutex::new(vec![])),
ctrl_c: raw_args.ctrl_c.clone(),
shell_manager: raw_args.shell_manager.clone(),
}
}
}
pub(crate) fn basic() -> Result<Context, Box<dyn Error>> {
let registry = CommandRegistry::new();

View File

@ -1,16 +1,11 @@
pub(crate) mod shape;
use crate::context::CommandRegistry;
use crate::evaluate::evaluate_baseline_expr;
use bigdecimal::BigDecimal;
use chrono::{DateTime, Utc};
use derive_new::new;
use log::trace;
use nu_errors::ShellError;
use nu_parser::hir;
use nu_protocol::{
Evaluate, EvaluateTrait, Primitive, Scope, ShellTypeName, SpannedTypeName, TaggedDictBuilder,
UntaggedValue, Value,
hir, Primitive, ShellTypeName, SpannedTypeName, TaggedDictBuilder, UntaggedValue, Value,
};
use nu_source::Tag;
use nu_value_ext::ValueExt;
@ -29,42 +24,12 @@ pub struct Operation {
#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Clone, Hash, Serialize, Deserialize, new)]
pub struct Block {
pub(crate) expressions: Vec<hir::SpannedExpression>,
pub(crate) commands: hir::Commands,
pub(crate) tag: Tag,
}
interfaces!(Block: dyn ObjectHash);
#[typetag::serde]
impl EvaluateTrait for Block {
fn invoke(&self, scope: &Scope) -> Result<Value, ShellError> {
if self.expressions.is_empty() {
return Ok(UntaggedValue::nothing().into_value(&self.tag));
}
let mut last = Ok(UntaggedValue::nothing().into_value(&self.tag));
trace!(
"EXPRS = {:?}",
self.expressions
.iter()
.map(|e| format!("{:?}", e))
.collect::<Vec<_>>()
);
for expr in self.expressions.iter() {
last = evaluate_baseline_expr(&expr, &CommandRegistry::empty(), &scope)
}
last
}
fn clone_box(&self) -> Evaluate {
let block = self.clone();
Evaluate::new(block)
}
}
#[derive(Serialize, Deserialize)]
pub enum Switch {
Present,

View File

@ -1,5 +1,4 @@
use nu_parser::hir::Number;
use nu_protocol::Primitive;
use nu_protocol::{hir::Number, Primitive};
pub fn number(number: impl Into<Number>) -> Primitive {
let number = number.into();

View File

@ -3,7 +3,7 @@ use crate::data::base::shape::{Column, InlineShape};
use crate::data::primitive::style_primitive;
use chrono::DateTime;
use nu_errors::ShellError;
use nu_parser::hir::CompareOperator;
use nu_protocol::hir::CompareOperator;
use nu_protocol::{Primitive, Type, UntaggedValue};
use nu_source::{DebugDocBuilder, PrettyDebug, Tagged};

View File

@ -1,7 +1,8 @@
use log::trace;
use nu_errors::{CoerceInto, ShellError};
use nu_protocol::{
CallInfo, ColumnPath, Evaluate, Primitive, RangeInclusion, ShellTypeName, UntaggedValue, Value,
hir::Commands, CallInfo, ColumnPath, Primitive, RangeInclusion, ShellTypeName, UntaggedValue,
Value,
};
use nu_source::{HasSpan, Spanned, SpannedItem, Tagged, TaggedItem};
use nu_value_ext::ValueExt;
@ -368,7 +369,7 @@ impl<'de, 'a> de::Deserializer<'de> for &'a mut ConfigDeserializer<'de> {
))
}
};
return visit::<Evaluate, _>(block, name, fields, visitor);
return visit::<Commands, _>(block, name, fields, visitor);
}
if name == "ColumnPath" {

View File

@ -3,8 +3,7 @@ use crate::context::CommandRegistry;
use crate::evaluate::evaluate_baseline_expr;
use indexmap::IndexMap;
use nu_errors::ShellError;
use nu_parser::hir;
use nu_protocol::{EvaluatedArgs, Scope, UntaggedValue, Value};
use nu_protocol::{hir, EvaluatedArgs, Scope, UntaggedValue, Value};
pub(crate) fn evaluate_args(
call: &hir::Call,

View File

@ -1,13 +1,11 @@
use crate::context::CommandRegistry;
use crate::data::base::Block;
use crate::evaluate::operator::apply_operator;
use crate::prelude::*;
use log::trace;
use nu_errors::{ArgumentError, ShellError};
use nu_parser::hir::{self, Expression, SpannedExpression};
use nu_protocol::hir::{self, Expression, SpannedExpression};
use nu_protocol::{
ColumnPath, Evaluate, Primitive, RangeInclusion, Scope, UnspannedPathMember, UntaggedValue,
Value,
ColumnPath, Primitive, RangeInclusion, Scope, UnspannedPathMember, UntaggedValue, Value,
};
pub(crate) fn evaluate_baseline_expr(
@ -81,11 +79,7 @@ pub(crate) fn evaluate_baseline_expr(
Ok(UntaggedValue::Table(exprs).into_value(tag))
}
Expression::Block(block) => Ok(UntaggedValue::Block(Evaluate::new(Block::new(
block.clone(),
tag.clone(),
)))
.into_value(&tag)),
Expression::Block(block) => Ok(UntaggedValue::Block(block.clone()).into_value(&tag)),
Expression::Path(path) => {
let value = evaluate_baseline_expr(&path.head, registry, scope)?;
let mut item = value;
@ -138,8 +132,8 @@ fn evaluate_literal(literal: &hir::Literal, span: Span) -> Value {
.into_value(span)
}
hir::Literal::Number(int) => match int {
nu_parser::hir::Number::Int(i) => UntaggedValue::int(i.clone()).into_value(span),
nu_parser::hir::Number::Decimal(d) => {
nu_protocol::hir::Number::Int(i) => UntaggedValue::int(i.clone()).into_value(span),
nu_protocol::hir::Number::Decimal(d) => {
UntaggedValue::decimal(d.clone()).into_value(span)
}
},
@ -166,7 +160,10 @@ fn evaluate_reference(name: &hir::Variable, scope: &Scope, tag: Tag) -> Result<V
}
}
fn evaluate_external(external: &hir::ExternalCommand, _scope: &Scope) -> Result<Value, ShellError> {
fn evaluate_external(
external: &hir::ExternalStringCommand,
_scope: &Scope,
) -> Result<Value, ShellError> {
Err(ShellError::syntax_error(
"Unexpected external command".spanned(external.name.span),
))

View File

@ -1,5 +1,5 @@
use crate::data::value;
use nu_parser::hir::CompareOperator;
use nu_protocol::hir::CompareOperator;
use nu_protocol::{Primitive, ShellTypeName, UntaggedValue, Value};
use std::ops::Not;

View File

@ -91,7 +91,7 @@ pub(crate) use async_stream::stream as async_stream;
pub(crate) use bigdecimal::BigDecimal;
pub(crate) use futures::stream::BoxStream;
pub(crate) use futures::{FutureExt, Stream, StreamExt};
pub(crate) use nu_protocol::{EvaluateTrait, MaybeOwned};
pub(crate) use nu_protocol::MaybeOwned;
pub(crate) use nu_source::{
b, AnchorLocation, DebugDocBuilder, PrettyDebug, PrettyDebugWithSource, Span, SpannedItem, Tag,
TaggedItem, Text,

View File

@ -112,9 +112,9 @@ impl NuCompleter {
let result = nu_parser::classify_pipeline(&lite_parse, &self.commands);
for command in result.commands.list {
if let nu_parser::ClassifiedCommand::Internal(nu_parser::InternalCommand {
args, ..
}) = command
if let nu_protocol::hir::ClassifiedCommand::Internal(
nu_protocol::hir::InternalCommand { args, .. },
) = command
{
if replace_pos >= args.span.start() && replace_pos <= args.span.end() {
if let Some(named) = args.named {

View File

@ -1,7 +1,7 @@
use crate::context::Context;
use ansi_term::{Color, Style};
use nu_parser::hir::FlatShape;
use nu_parser::SignatureRegistry;
use nu_protocol::hir::FlatShape;
use nu_source::{Span, Spanned, Tag, Tagged};
use rustyline::completion::Completer;
use rustyline::error::ReadlineError;

View File

@ -2,7 +2,7 @@ use crate::data::value::compare_values;
use crate::data::TaggedListBuilder;
use chrono::{DateTime, NaiveDate, Utc};
use nu_errors::ShellError;
use nu_parser::hir::CompareOperator;
use nu_protocol::hir::CompareOperator;
use nu_protocol::{Primitive, TaggedDictBuilder, UntaggedValue, Value};
use nu_source::{SpannedItem, Tag, Tagged, TaggedItem};
use nu_value_ext::{get_data_by_key, ValueExt};

View File

@ -0,0 +1,13 @@
use nu_test_support::{nu, pipeline};
#[test]
fn each_works_separately() {
let actual = nu!(
cwd: "tests/fixtures/formats", pipeline(
r#"
echo [1 2 3] | each { echo $it 10 | sum } | to-json | echo $it
"#
));
assert_eq!(actual, "[11,12,13]");
}

View File

@ -4,6 +4,7 @@ mod cd;
mod compact;
mod cp;
mod default;
mod each;
mod edit;
mod enter;
mod first;