From 660e8b5b7397d9ffd4570b596d0144b35d272656 Mon Sep 17 00:00:00 2001 From: Fernando Herrera Date: Thu, 23 Sep 2021 17:42:03 +0100 Subject: [PATCH 1/5] external with redirection --- crates/nu-command/src/run_external.rs | 192 +++++++++++++++++++++++-- crates/nu-engine/src/eval.rs | 21 ++- crates/nu-protocol/src/ty.rs | 2 + crates/nu-protocol/src/value/mod.rs | 8 ++ crates/nu-protocol/src/value/stream.rs | 3 +- 5 files changed, 210 insertions(+), 16 deletions(-) diff --git a/crates/nu-command/src/run_external.rs b/crates/nu-command/src/run_external.rs index e89e471eda..3c765886bb 100644 --- a/crates/nu-command/src/run_external.rs +++ b/crates/nu-command/src/run_external.rs @@ -1,14 +1,22 @@ +use std::borrow::Cow; +use std::cell::RefCell; use std::env; -use std::process::Command as CommandSys; +use std::io::{BufRead, BufReader, Write}; +use std::process::{Command as CommandSys, Stdio}; +use std::rc::Rc; +use std::sync::mpsc; use nu_protocol::{ ast::{Call, Expression}, engine::{Command, EvaluationContext}, ShellError, Signature, SyntaxShape, Value, }; +use nu_protocol::{Span, ValueStream}; use nu_engine::eval_expression; +const OUTPUT_BUFFER_SIZE: usize = 8192; + pub struct External; impl Command for External { @@ -21,7 +29,9 @@ impl Command for External { } fn signature(&self) -> nu_protocol::Signature { - Signature::build("run_external").rest("rest", SyntaxShape::Any, "external command to run") + Signature::build("run_external") + .switch("last_expression", "last_expression", None) + .rest("rest", SyntaxShape::Any, "external command to run") } fn run( @@ -39,6 +49,7 @@ pub struct ExternalCommand<'call, 'contex> { pub name: &'call Expression, pub args: &'call [Expression], pub context: &'contex EvaluationContext, + pub last_expression: bool, } impl<'call, 'contex> ExternalCommand<'call, 'contex> { @@ -54,6 +65,7 @@ impl<'call, 'contex> ExternalCommand<'call, 'contex> { name: &call.positional[0], args: &call.positional[1..], context, + last_expression: call.has_flag("last_expression"), }) } @@ -70,7 +82,7 @@ impl<'call, 'contex> ExternalCommand<'call, 'contex> { .collect() } - pub fn run_with_input(&self, _input: Value) -> Result { + pub fn run_with_input(&self, input: Value) -> Result { let mut process = self.create_command(); // TODO. We don't have a way to know the current directory @@ -81,18 +93,139 @@ impl<'call, 'contex> ExternalCommand<'call, 'contex> { let envs = self.context.stack.get_env_vars(); process.envs(envs); + // If the external is not the last command, its output will get piped + // either as a string or binary + if !self.last_expression { + process.stdout(Stdio::piped()); + } + + // If there is an input from the pipeline. The stdin from the process + // is piped so it can be used to send the input information + if let Value::String { .. } = input { + process.stdin(Stdio::piped()); + } + + if let Value::Stream { .. } = input { + process.stdin(Stdio::piped()); + } + match process.spawn() { Err(err) => Err(ShellError::ExternalCommand( format!("{}", err), self.name.span, )), - Ok(mut child) => match child.wait() { - Err(err) => Err(ShellError::ExternalCommand( - format!("{}", err), - self.name.span, - )), - Ok(_) => Ok(Value::nothing()), - }, + Ok(mut child) => { + // if there is a string or a stream, that is sent to the pipe std + let mut stdin_write = child + .stdin + .take() + .expect("Internal error: could not get stdin pipe for external command"); + + match input { + Value::Nothing { span: _ } => (), + Value::String { val, span: _ } => { + if stdin_write.write(val.as_bytes()).is_err() { + return Err(ShellError::ExternalCommand( + "Error writing input to stdin".to_string(), + self.name.span, + )); + } + } + Value::Binary { val, span: _ } => { + if stdin_write.write(&val).is_err() { + return Err(ShellError::ExternalCommand( + "Error writing input to stdin".to_string(), + self.name.span, + )); + } + } + Value::Stream { stream, span: _ } => { + for value in stream { + match value { + Value::String { val, span: _ } => { + if stdin_write.write(val.as_bytes()).is_err() { + return Err(ShellError::ExternalCommand( + "Error writing input to stdin".to_string(), + self.name.span, + )); + } + } + Value::Binary { val, span: _ } => { + if stdin_write.write(&val).is_err() { + return Err(ShellError::ExternalCommand( + "Error writing input to stdin".to_string(), + self.name.span, + )); + } + } + _ => continue, + } + } + } + _ => { + return Err(ShellError::ExternalCommand( + "Input is not string or binary".to_string(), + self.name.span, + )) + } + } + + // If this external is not the last expression, then its output is piped to a channel + // and we create a ValueStream that can be consumed + let value = if !self.last_expression { + let (tx, rx) = mpsc::sync_channel(0); + let stdout = child.stdout.take().ok_or_else(|| { + ShellError::ExternalCommand( + "Error taking stdout from external".to_string(), + self.name.span, + ) + })?; + + std::thread::spawn(move || { + // Stdout is read using the Buffer reader. It will do so until there is an + // error or there are no more bytes to read + let mut buf_read = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, stdout); + while let Ok(bytes) = buf_read.fill_buf() { + if bytes.is_empty() { + break; + } + + // The Cow generated from the function represents the conversion + // from bytes to String. If no replacements are required, then the + // borrowed value is a proper UTF-8 string. The Owned option represents + // a string where the values had to be replaced, thus marking it as bytes + let data = match String::from_utf8_lossy(bytes) { + Cow::Borrowed(s) => Data::String(s.into()), + Cow::Owned(_) => Data::Bytes(bytes.to_vec()), + }; + + let length = bytes.len(); + buf_read.consume(length); + + match tx.send(data) { + Ok(_) => continue, + Err(_) => break, + } + } + }); + + // The ValueStream is consumed by the next expression in the pipeline + Value::Stream { + stream: ValueStream(Rc::new(RefCell::new(ChannelReceiver::new(rx)))), + span: Span::unknown(), + } + } else { + Value::nothing() + }; + + match child.wait() { + Err(err) => Err(ShellError::ExternalCommand( + format!("{}", err), + self.name.span, + )), + Ok(_) => Ok(value), + } + } } } @@ -121,3 +254,42 @@ impl<'call, 'contex> ExternalCommand<'call, 'contex> { } } } + +// The piped data from stdout from the external command can be either String +// or binary. We use this enum to pass the data from the spawned process +enum Data { + String(String), + Bytes(Vec), +} + +// Receiver used for the ValueStream +// It implements iterator so it can be used as a ValueStream +struct ChannelReceiver { + rx: mpsc::Receiver, +} + +impl ChannelReceiver { + pub fn new(rx: mpsc::Receiver) -> Self { + Self { rx } + } +} + +impl Iterator for ChannelReceiver { + type Item = Value; + + fn next(&mut self) -> Option { + match self.rx.recv() { + Ok(v) => match v { + Data::String(s) => Some(Value::String { + val: s, + span: Span::unknown(), + }), + Data::Bytes(b) => Some(Value::Binary { + val: b, + span: Span::unknown(), + }), + }, + Err(_) => None, + } + } +} diff --git a/crates/nu-engine/src/eval.rs b/crates/nu-engine/src/eval.rs index 30db23220d..29ba6fc3e3 100644 --- a/crates/nu-engine/src/eval.rs +++ b/crates/nu-engine/src/eval.rs @@ -73,6 +73,7 @@ fn eval_external( name: &Span, args: &[Span], input: Value, + last_expression: bool, ) -> Result { let engine_state = context.engine_state.borrow(); @@ -98,6 +99,10 @@ fn eval_external( }) .collect(); + if last_expression { + call.named.push(("last_expression".into(), None)) + } + command.run(context, &call, input) } @@ -158,7 +163,9 @@ pub fn eval_expression( } Expr::RowCondition(_, expr) => eval_expression(context, expr), Expr::Call(call) => eval_call(context, call, Value::nothing()), - Expr::ExternalCall(name, args) => eval_external(context, name, args, Value::nothing()), + Expr::ExternalCall(name, args) => { + eval_external(context, name, args, Value::nothing(), true) + } Expr::Operator(_) => Ok(Value::Nothing { span: expr.span }), Expr::BinaryOp(lhs, op, rhs) => { let op_span = op.span; @@ -239,9 +246,9 @@ pub fn eval_block( block: &Block, mut input: Value, ) -> Result { - for stmt in &block.stmts { + for stmt in block.stmts.iter() { if let Statement::Pipeline(pipeline) = stmt { - for elem in &pipeline.expressions { + for (i, elem) in pipeline.expressions.iter().enumerate() { match elem { Expression { expr: Expr::Call(call), @@ -253,7 +260,13 @@ pub fn eval_block( expr: Expr::ExternalCall(name, args), .. } => { - input = eval_external(context, name, args, input)?; + input = eval_external( + context, + name, + args, + input, + i == pipeline.expressions.len() - 1, + )?; } elem => { diff --git a/crates/nu-protocol/src/ty.rs b/crates/nu-protocol/src/ty.rs index 7e6c2aa5e9..6338a6e476 100644 --- a/crates/nu-protocol/src/ty.rs +++ b/crates/nu-protocol/src/ty.rs @@ -20,6 +20,7 @@ pub enum Type { ValueStream, Unknown, Error, + Binary, } impl Display for Type { @@ -43,6 +44,7 @@ impl Display for Type { Type::ValueStream => write!(f, "value stream"), Type::Unknown => write!(f, "unknown"), Type::Error => write!(f, "error"), + Type::Binary => write!(f, "binary"), } } } diff --git a/crates/nu-protocol/src/value/mod.rs b/crates/nu-protocol/src/value/mod.rs index 5d81e03f91..bdb31de395 100644 --- a/crates/nu-protocol/src/value/mod.rs +++ b/crates/nu-protocol/src/value/mod.rs @@ -59,6 +59,10 @@ pub enum Value { Error { error: ShellError, }, + Binary { + val: Vec, + span: Span, + }, } impl Value { @@ -83,6 +87,7 @@ impl Value { Value::Block { span, .. } => *span, Value::Stream { span, .. } => *span, Value::Nothing { span, .. } => *span, + Value::Binary { span, .. } => *span, } } @@ -100,6 +105,7 @@ impl Value { Value::Block { span, .. } => *span = new_span, Value::Nothing { span, .. } => *span = new_span, Value::Error { .. } => {} + Value::Binary { span, .. } => *span = new_span, } self @@ -121,6 +127,7 @@ impl Value { Value::Block { .. } => Type::Block, Value::Stream { .. } => Type::ValueStream, Value::Error { .. } => Type::Error, + Value::Binary { .. } => Type::Binary, } } @@ -159,6 +166,7 @@ impl Value { Value::Block { val, .. } => format!("", val), Value::Nothing { .. } => String::new(), Value::Error { error } => format!("{:?}", error), + Value::Binary { val, .. } => format!("{:?}", val), } } diff --git a/crates/nu-protocol/src/value/stream.rs b/crates/nu-protocol/src/value/stream.rs index ad5a27208f..ca56f39f05 100644 --- a/crates/nu-protocol/src/value/stream.rs +++ b/crates/nu-protocol/src/value/stream.rs @@ -30,8 +30,7 @@ impl Iterator for ValueStream { fn next(&mut self) -> Option { { - let mut iter = self.0.borrow_mut(); - iter.next() + self.0.borrow_mut().next() } } } From 36c32e9832f97db34b5bc7f66db788d5361d97ba Mon Sep 17 00:00:00 2001 From: Fernando Herrera Date: Thu, 23 Sep 2021 18:01:20 +0100 Subject: [PATCH 2/5] input from ValueStream --- crates/nu-command/src/run_external.rs | 56 ++++++++++++--------------- 1 file changed, 24 insertions(+), 32 deletions(-) diff --git a/crates/nu-command/src/run_external.rs b/crates/nu-command/src/run_external.rs index 3c765886bb..557d9d7408 100644 --- a/crates/nu-command/src/run_external.rs +++ b/crates/nu-command/src/run_external.rs @@ -2,7 +2,7 @@ use std::borrow::Cow; use std::cell::RefCell; use std::env; use std::io::{BufRead, BufReader, Write}; -use std::process::{Command as CommandSys, Stdio}; +use std::process::{ChildStdin, Command as CommandSys, Stdio}; use std::rc::Rc; use std::sync::mpsc; @@ -116,49 +116,30 @@ impl<'call, 'contex> ExternalCommand<'call, 'contex> { )), Ok(mut child) => { // if there is a string or a stream, that is sent to the pipe std - let mut stdin_write = child - .stdin - .take() - .expect("Internal error: could not get stdin pipe for external command"); - match input { Value::Nothing { span: _ } => (), Value::String { val, span: _ } => { - if stdin_write.write(val.as_bytes()).is_err() { - return Err(ShellError::ExternalCommand( - "Error writing input to stdin".to_string(), - self.name.span, - )); + if let Some(mut stdin_write) = child.stdin.take() { + self.write_to_stdin(&mut stdin_write, val.as_bytes())? } } Value::Binary { val, span: _ } => { - if stdin_write.write(&val).is_err() { - return Err(ShellError::ExternalCommand( - "Error writing input to stdin".to_string(), - self.name.span, - )); + if let Some(mut stdin_write) = child.stdin.take() { + self.write_to_stdin(&mut stdin_write, &val)? } } Value::Stream { stream, span: _ } => { - for value in stream { - match value { - Value::String { val, span: _ } => { - if stdin_write.write(val.as_bytes()).is_err() { - return Err(ShellError::ExternalCommand( - "Error writing input to stdin".to_string(), - self.name.span, - )); + if let Some(mut stdin_write) = child.stdin.take() { + for value in stream { + match value { + Value::String { val, span: _ } => { + self.write_to_stdin(&mut stdin_write, val.as_bytes())? } - } - Value::Binary { val, span: _ } => { - if stdin_write.write(&val).is_err() { - return Err(ShellError::ExternalCommand( - "Error writing input to stdin".to_string(), - self.name.span, - )); + Value::Binary { val, span: _ } => { + self.write_to_stdin(&mut stdin_write, &val)? } + _ => continue, } - _ => continue, } } } @@ -253,6 +234,17 @@ impl<'call, 'contex> ExternalCommand<'call, 'contex> { process } } + + fn write_to_stdin(&self, stdin_write: &mut ChildStdin, val: &[u8]) -> Result<(), ShellError> { + if stdin_write.write(val).is_err() { + Err(ShellError::ExternalCommand( + "Error writing input to stdin".to_string(), + self.name.span, + )) + } else { + Ok(()) + } + } } // The piped data from stdout from the external command can be either String From 772f8598dde95d5da3d4a1b9e15b97d735af6311 Mon Sep 17 00:00:00 2001 From: Fernando Herrera Date: Thu, 23 Sep 2021 20:03:08 +0100 Subject: [PATCH 3/5] lines command --- crates/nu-command/src/default_context.rs | 4 +- crates/nu-command/src/lib.rs | 2 + crates/nu-command/src/lines.rs | 83 ++++++++++++++++++++++++ 3 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 crates/nu-command/src/lines.rs diff --git a/crates/nu-command/src/default_context.rs b/crates/nu-command/src/default_context.rs index 91c13590f6..5890d791cb 100644 --- a/crates/nu-command/src/default_context.rs +++ b/crates/nu-command/src/default_context.rs @@ -7,7 +7,7 @@ use nu_protocol::{ use crate::{ where_::Where, Alias, Benchmark, BuildString, Def, Do, Each, External, For, Git, GitCheckout, - If, Length, Let, LetEnv, ListGitBranches, Ls, Table, + If, Length, Let, LetEnv, Lines, ListGitBranches, Ls, Table, }; pub fn create_default_context() -> Rc> { @@ -50,6 +50,8 @@ pub fn create_default_context() -> Rc> { working_set.add_decl(Box::new(External)); + working_set.add_decl(Box::new(Lines)); + // This is a WIP proof of concept working_set.add_decl(Box::new(ListGitBranches)); working_set.add_decl(Box::new(Git)); diff --git a/crates/nu-command/src/lib.rs b/crates/nu-command/src/lib.rs index 96f0839fb3..1f150c2c65 100644 --- a/crates/nu-command/src/lib.rs +++ b/crates/nu-command/src/lib.rs @@ -12,6 +12,7 @@ mod if_; mod length; mod let_; mod let_env; +mod lines; mod list_git_branches; mod ls; mod run_external; @@ -32,6 +33,7 @@ pub use if_::If; pub use length::Length; pub use let_::Let; pub use let_env::LetEnv; +pub use lines::Lines; pub use list_git_branches::ListGitBranches; pub use ls::Ls; pub use run_external::External; diff --git a/crates/nu-command/src/lines.rs b/crates/nu-command/src/lines.rs new file mode 100644 index 0000000000..a8cbdea64d --- /dev/null +++ b/crates/nu-command/src/lines.rs @@ -0,0 +1,83 @@ +use std::cell::RefCell; +use std::rc::Rc; + +use nu_protocol::ast::Call; +use nu_protocol::engine::{Command, EvaluationContext}; +use nu_protocol::{Signature, Span, Value, ValueStream}; + +pub struct Lines; + +const SPLIT_CHAR: char = '\n'; + +impl Command for Lines { + fn name(&self) -> &str { + "lines" + } + + fn usage(&self) -> &str { + "Converts input to lines" + } + + fn signature(&self) -> nu_protocol::Signature { + Signature::build("lines") + } + + fn run( + &self, + _context: &EvaluationContext, + _call: &Call, + input: Value, + ) -> Result { + let value = match input { + Value::String { val, span } => { + let iter = val + .split(SPLIT_CHAR) + .map(|s| Value::String { + val: s.into(), + span, + }) + .collect::>(); // <----- how to avoid collecting? + + Value::Stream { + stream: ValueStream(Rc::new(RefCell::new(iter.into_iter()))), + span: Span::unknown(), + } + } + Value::Stream { stream, span: _ } => { + let iter = stream + .into_iter() + .filter_map(|value| { + if let Value::String { val, span } = value { + let inner = val + .split(SPLIT_CHAR) + .filter_map(|s| { + if !s.is_empty() { + Some(Value::String { + val: s.into(), + span, + }) + } else { + None + } + }) + .collect::>(); + + Some(inner) + } else { + None + } + }) + .flatten() + .collect::>(); // <----- how to avoid collecting? + + Value::Stream { + stream: ValueStream(Rc::new(RefCell::new(iter.into_iter()))), + span: Span::unknown(), + } + } + _ => unimplemented!(), + }; + + Ok(value) + } +} From 04990eeba4ef3c9e26189dfeebb1d146a144e0b7 Mon Sep 17 00:00:00 2001 From: Fernando Herrera Date: Thu, 23 Sep 2021 20:39:42 +0100 Subject: [PATCH 4/5] allow collect warning --- crates/nu-command/src/lines.rs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/crates/nu-command/src/lines.rs b/crates/nu-command/src/lines.rs index a8cbdea64d..09d5757465 100644 --- a/crates/nu-command/src/lines.rs +++ b/crates/nu-command/src/lines.rs @@ -29,17 +29,22 @@ impl Command for Lines { input: Value, ) -> Result { let value = match input { + #[allow(clippy::needless_collect)] + // Collect is needed because the string may not live long enough for + // the Rc structure to continue using it. If split could take ownership + // of the split values, then this wouldn't be needed Value::String { val, span } => { - let iter = val + let lines = val .split(SPLIT_CHAR) - .map(|s| Value::String { - val: s.into(), - span, - }) - .collect::>(); // <----- how to avoid collecting? + .map(|s| s.to_string()) + .collect::>(); + + let iter = lines + .into_iter() + .map(move |s| Value::String { val: s, span }); Value::Stream { - stream: ValueStream(Rc::new(RefCell::new(iter.into_iter()))), + stream: ValueStream(Rc::new(RefCell::new(iter))), span: Span::unknown(), } } @@ -67,11 +72,10 @@ impl Command for Lines { None } }) - .flatten() - .collect::>(); // <----- how to avoid collecting? + .flatten(); Value::Stream { - stream: ValueStream(Rc::new(RefCell::new(iter.into_iter()))), + stream: ValueStream(Rc::new(RefCell::new(iter))), span: Span::unknown(), } } From cb9db792a6f3327685c8998a723e83687bbc5c75 Mon Sep 17 00:00:00 2001 From: Fernando Herrera Date: Thu, 23 Sep 2021 20:44:50 +0100 Subject: [PATCH 5/5] filtering empty lines --- crates/nu-command/src/lines.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/nu-command/src/lines.rs b/crates/nu-command/src/lines.rs index 09d5757465..3694f26e20 100644 --- a/crates/nu-command/src/lines.rs +++ b/crates/nu-command/src/lines.rs @@ -39,9 +39,13 @@ impl Command for Lines { .map(|s| s.to_string()) .collect::>(); - let iter = lines - .into_iter() - .map(move |s| Value::String { val: s, span }); + let iter = lines.into_iter().filter_map(move |s| { + if !s.is_empty() { + Some(Value::String { val: s, span }) + } else { + None + } + }); Value::Stream { stream: ValueStream(Rc::new(RefCell::new(iter))),