diff --git a/crates/nu-command/src/run_external.rs b/crates/nu-command/src/run_external.rs index e89e471eda..3c765886bb 100644 --- a/crates/nu-command/src/run_external.rs +++ b/crates/nu-command/src/run_external.rs @@ -1,14 +1,22 @@ +use std::borrow::Cow; +use std::cell::RefCell; use std::env; -use std::process::Command as CommandSys; +use std::io::{BufRead, BufReader, Write}; +use std::process::{Command as CommandSys, Stdio}; +use std::rc::Rc; +use std::sync::mpsc; use nu_protocol::{ ast::{Call, Expression}, engine::{Command, EvaluationContext}, ShellError, Signature, SyntaxShape, Value, }; +use nu_protocol::{Span, ValueStream}; use nu_engine::eval_expression; +const OUTPUT_BUFFER_SIZE: usize = 8192; + pub struct External; impl Command for External { @@ -21,7 +29,9 @@ impl Command for External { } fn signature(&self) -> nu_protocol::Signature { - Signature::build("run_external").rest("rest", SyntaxShape::Any, "external command to run") + Signature::build("run_external") + .switch("last_expression", "last_expression", None) + .rest("rest", SyntaxShape::Any, "external command to run") } fn run( @@ -39,6 +49,7 @@ pub struct ExternalCommand<'call, 'contex> { pub name: &'call Expression, pub args: &'call [Expression], pub context: &'contex EvaluationContext, + pub last_expression: bool, } impl<'call, 'contex> ExternalCommand<'call, 'contex> { @@ -54,6 +65,7 @@ impl<'call, 'contex> ExternalCommand<'call, 'contex> { name: &call.positional[0], args: &call.positional[1..], context, + last_expression: call.has_flag("last_expression"), }) } @@ -70,7 +82,7 @@ impl<'call, 'contex> ExternalCommand<'call, 'contex> { .collect() } - pub fn run_with_input(&self, _input: Value) -> Result { + pub fn run_with_input(&self, input: Value) -> Result { let mut process = self.create_command(); // TODO. We don't have a way to know the current directory @@ -81,18 +93,139 @@ impl<'call, 'contex> ExternalCommand<'call, 'contex> { let envs = self.context.stack.get_env_vars(); process.envs(envs); + // If the external is not the last command, its output will get piped + // either as a string or binary + if !self.last_expression { + process.stdout(Stdio::piped()); + } + + // If there is an input from the pipeline. The stdin from the process + // is piped so it can be used to send the input information + if let Value::String { .. } = input { + process.stdin(Stdio::piped()); + } + + if let Value::Stream { .. } = input { + process.stdin(Stdio::piped()); + } + match process.spawn() { Err(err) => Err(ShellError::ExternalCommand( format!("{}", err), self.name.span, )), - Ok(mut child) => match child.wait() { - Err(err) => Err(ShellError::ExternalCommand( - format!("{}", err), - self.name.span, - )), - Ok(_) => Ok(Value::nothing()), - }, + Ok(mut child) => { + // if there is a string or a stream, that is sent to the pipe std + let mut stdin_write = child + .stdin + .take() + .expect("Internal error: could not get stdin pipe for external command"); + + match input { + Value::Nothing { span: _ } => (), + Value::String { val, span: _ } => { + if stdin_write.write(val.as_bytes()).is_err() { + return Err(ShellError::ExternalCommand( + "Error writing input to stdin".to_string(), + self.name.span, + )); + } + } + Value::Binary { val, span: _ } => { + if stdin_write.write(&val).is_err() { + return Err(ShellError::ExternalCommand( + "Error writing input to stdin".to_string(), + self.name.span, + )); + } + } + Value::Stream { stream, span: _ } => { + for value in stream { + match value { + Value::String { val, span: _ } => { + if stdin_write.write(val.as_bytes()).is_err() { + return Err(ShellError::ExternalCommand( + "Error writing input to stdin".to_string(), + self.name.span, + )); + } + } + Value::Binary { val, span: _ } => { + if stdin_write.write(&val).is_err() { + return Err(ShellError::ExternalCommand( + "Error writing input to stdin".to_string(), + self.name.span, + )); + } + } + _ => continue, + } + } + } + _ => { + return Err(ShellError::ExternalCommand( + "Input is not string or binary".to_string(), + self.name.span, + )) + } + } + + // If this external is not the last expression, then its output is piped to a channel + // and we create a ValueStream that can be consumed + let value = if !self.last_expression { + let (tx, rx) = mpsc::sync_channel(0); + let stdout = child.stdout.take().ok_or_else(|| { + ShellError::ExternalCommand( + "Error taking stdout from external".to_string(), + self.name.span, + ) + })?; + + std::thread::spawn(move || { + // Stdout is read using the Buffer reader. It will do so until there is an + // error or there are no more bytes to read + let mut buf_read = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, stdout); + while let Ok(bytes) = buf_read.fill_buf() { + if bytes.is_empty() { + break; + } + + // The Cow generated from the function represents the conversion + // from bytes to String. If no replacements are required, then the + // borrowed value is a proper UTF-8 string. The Owned option represents + // a string where the values had to be replaced, thus marking it as bytes + let data = match String::from_utf8_lossy(bytes) { + Cow::Borrowed(s) => Data::String(s.into()), + Cow::Owned(_) => Data::Bytes(bytes.to_vec()), + }; + + let length = bytes.len(); + buf_read.consume(length); + + match tx.send(data) { + Ok(_) => continue, + Err(_) => break, + } + } + }); + + // The ValueStream is consumed by the next expression in the pipeline + Value::Stream { + stream: ValueStream(Rc::new(RefCell::new(ChannelReceiver::new(rx)))), + span: Span::unknown(), + } + } else { + Value::nothing() + }; + + match child.wait() { + Err(err) => Err(ShellError::ExternalCommand( + format!("{}", err), + self.name.span, + )), + Ok(_) => Ok(value), + } + } } } @@ -121,3 +254,42 @@ impl<'call, 'contex> ExternalCommand<'call, 'contex> { } } } + +// The piped data from stdout from the external command can be either String +// or binary. We use this enum to pass the data from the spawned process +enum Data { + String(String), + Bytes(Vec), +} + +// Receiver used for the ValueStream +// It implements iterator so it can be used as a ValueStream +struct ChannelReceiver { + rx: mpsc::Receiver, +} + +impl ChannelReceiver { + pub fn new(rx: mpsc::Receiver) -> Self { + Self { rx } + } +} + +impl Iterator for ChannelReceiver { + type Item = Value; + + fn next(&mut self) -> Option { + match self.rx.recv() { + Ok(v) => match v { + Data::String(s) => Some(Value::String { + val: s, + span: Span::unknown(), + }), + Data::Bytes(b) => Some(Value::Binary { + val: b, + span: Span::unknown(), + }), + }, + Err(_) => None, + } + } +} diff --git a/crates/nu-engine/src/eval.rs b/crates/nu-engine/src/eval.rs index 30db23220d..29ba6fc3e3 100644 --- a/crates/nu-engine/src/eval.rs +++ b/crates/nu-engine/src/eval.rs @@ -73,6 +73,7 @@ fn eval_external( name: &Span, args: &[Span], input: Value, + last_expression: bool, ) -> Result { let engine_state = context.engine_state.borrow(); @@ -98,6 +99,10 @@ fn eval_external( }) .collect(); + if last_expression { + call.named.push(("last_expression".into(), None)) + } + command.run(context, &call, input) } @@ -158,7 +163,9 @@ pub fn eval_expression( } Expr::RowCondition(_, expr) => eval_expression(context, expr), Expr::Call(call) => eval_call(context, call, Value::nothing()), - Expr::ExternalCall(name, args) => eval_external(context, name, args, Value::nothing()), + Expr::ExternalCall(name, args) => { + eval_external(context, name, args, Value::nothing(), true) + } Expr::Operator(_) => Ok(Value::Nothing { span: expr.span }), Expr::BinaryOp(lhs, op, rhs) => { let op_span = op.span; @@ -239,9 +246,9 @@ pub fn eval_block( block: &Block, mut input: Value, ) -> Result { - for stmt in &block.stmts { + for stmt in block.stmts.iter() { if let Statement::Pipeline(pipeline) = stmt { - for elem in &pipeline.expressions { + for (i, elem) in pipeline.expressions.iter().enumerate() { match elem { Expression { expr: Expr::Call(call), @@ -253,7 +260,13 @@ pub fn eval_block( expr: Expr::ExternalCall(name, args), .. } => { - input = eval_external(context, name, args, input)?; + input = eval_external( + context, + name, + args, + input, + i == pipeline.expressions.len() - 1, + )?; } elem => { diff --git a/crates/nu-protocol/src/ty.rs b/crates/nu-protocol/src/ty.rs index 7e6c2aa5e9..6338a6e476 100644 --- a/crates/nu-protocol/src/ty.rs +++ b/crates/nu-protocol/src/ty.rs @@ -20,6 +20,7 @@ pub enum Type { ValueStream, Unknown, Error, + Binary, } impl Display for Type { @@ -43,6 +44,7 @@ impl Display for Type { Type::ValueStream => write!(f, "value stream"), Type::Unknown => write!(f, "unknown"), Type::Error => write!(f, "error"), + Type::Binary => write!(f, "binary"), } } } diff --git a/crates/nu-protocol/src/value/mod.rs b/crates/nu-protocol/src/value/mod.rs index 5d81e03f91..bdb31de395 100644 --- a/crates/nu-protocol/src/value/mod.rs +++ b/crates/nu-protocol/src/value/mod.rs @@ -59,6 +59,10 @@ pub enum Value { Error { error: ShellError, }, + Binary { + val: Vec, + span: Span, + }, } impl Value { @@ -83,6 +87,7 @@ impl Value { Value::Block { span, .. } => *span, Value::Stream { span, .. } => *span, Value::Nothing { span, .. } => *span, + Value::Binary { span, .. } => *span, } } @@ -100,6 +105,7 @@ impl Value { Value::Block { span, .. } => *span = new_span, Value::Nothing { span, .. } => *span = new_span, Value::Error { .. } => {} + Value::Binary { span, .. } => *span = new_span, } self @@ -121,6 +127,7 @@ impl Value { Value::Block { .. } => Type::Block, Value::Stream { .. } => Type::ValueStream, Value::Error { .. } => Type::Error, + Value::Binary { .. } => Type::Binary, } } @@ -159,6 +166,7 @@ impl Value { Value::Block { val, .. } => format!("", val), Value::Nothing { .. } => String::new(), Value::Error { error } => format!("{:?}", error), + Value::Binary { val, .. } => format!("{:?}", val), } } diff --git a/crates/nu-protocol/src/value/stream.rs b/crates/nu-protocol/src/value/stream.rs index ad5a27208f..ca56f39f05 100644 --- a/crates/nu-protocol/src/value/stream.rs +++ b/crates/nu-protocol/src/value/stream.rs @@ -30,8 +30,7 @@ impl Iterator for ValueStream { fn next(&mut self) -> Option { { - let mut iter = self.0.borrow_mut(); - iter.next() + self.0.borrow_mut().next() } } }