Merge pull request #61 from elferherrera/externals

Externals with redirection
This commit is contained in:
JT 2021-09-24 10:26:38 +12:00 committed by GitHub
commit 6c589affe7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 298 additions and 17 deletions

View File

@ -7,7 +7,7 @@ use nu_protocol::{
use crate::{
where_::Where, Alias, Benchmark, BuildString, Def, Do, Each, External, For, Git, GitCheckout,
If, Length, Let, LetEnv, ListGitBranches, Ls, Table,
If, Length, Let, LetEnv, Lines, ListGitBranches, Ls, Table,
};
pub fn create_default_context() -> Rc<RefCell<EngineState>> {
@ -50,6 +50,8 @@ pub fn create_default_context() -> Rc<RefCell<EngineState>> {
working_set.add_decl(Box::new(External));
working_set.add_decl(Box::new(Lines));
// This is a WIP proof of concept
working_set.add_decl(Box::new(ListGitBranches));
working_set.add_decl(Box::new(Git));

View File

@ -12,6 +12,7 @@ mod if_;
mod length;
mod let_;
mod let_env;
mod lines;
mod list_git_branches;
mod ls;
mod run_external;
@ -32,6 +33,7 @@ pub use if_::If;
pub use length::Length;
pub use let_::Let;
pub use let_env::LetEnv;
pub use lines::Lines;
pub use list_git_branches::ListGitBranches;
pub use ls::Ls;
pub use run_external::External;

View File

@ -0,0 +1,91 @@
use std::cell::RefCell;
use std::rc::Rc;
use nu_protocol::ast::Call;
use nu_protocol::engine::{Command, EvaluationContext};
use nu_protocol::{Signature, Span, Value, ValueStream};
pub struct Lines;
const SPLIT_CHAR: char = '\n';
impl Command for Lines {
fn name(&self) -> &str {
"lines"
}
fn usage(&self) -> &str {
"Converts input to lines"
}
fn signature(&self) -> nu_protocol::Signature {
Signature::build("lines")
}
fn run(
&self,
_context: &EvaluationContext,
_call: &Call,
input: Value,
) -> Result<nu_protocol::Value, nu_protocol::ShellError> {
let value = match input {
#[allow(clippy::needless_collect)]
// Collect is needed because the string may not live long enough for
// the Rc structure to continue using it. If split could take ownership
// of the split values, then this wouldn't be needed
Value::String { val, span } => {
let lines = val
.split(SPLIT_CHAR)
.map(|s| s.to_string())
.collect::<Vec<String>>();
let iter = lines.into_iter().filter_map(move |s| {
if !s.is_empty() {
Some(Value::String { val: s, span })
} else {
None
}
});
Value::Stream {
stream: ValueStream(Rc::new(RefCell::new(iter))),
span: Span::unknown(),
}
}
Value::Stream { stream, span: _ } => {
let iter = stream
.into_iter()
.filter_map(|value| {
if let Value::String { val, span } = value {
let inner = val
.split(SPLIT_CHAR)
.filter_map(|s| {
if !s.is_empty() {
Some(Value::String {
val: s.into(),
span,
})
} else {
None
}
})
.collect::<Vec<Value>>();
Some(inner)
} else {
None
}
})
.flatten();
Value::Stream {
stream: ValueStream(Rc::new(RefCell::new(iter))),
span: Span::unknown(),
}
}
_ => unimplemented!(),
};
Ok(value)
}
}

View File

@ -1,14 +1,22 @@
use std::borrow::Cow;
use std::cell::RefCell;
use std::env;
use std::process::Command as CommandSys;
use std::io::{BufRead, BufReader, Write};
use std::process::{ChildStdin, Command as CommandSys, Stdio};
use std::rc::Rc;
use std::sync::mpsc;
use nu_protocol::{
ast::{Call, Expression},
engine::{Command, EvaluationContext},
ShellError, Signature, SyntaxShape, Value,
};
use nu_protocol::{Span, ValueStream};
use nu_engine::eval_expression;
const OUTPUT_BUFFER_SIZE: usize = 8192;
pub struct External;
impl Command for External {
@ -21,7 +29,9 @@ impl Command for External {
}
fn signature(&self) -> nu_protocol::Signature {
Signature::build("run_external").rest("rest", SyntaxShape::Any, "external command to run")
Signature::build("run_external")
.switch("last_expression", "last_expression", None)
.rest("rest", SyntaxShape::Any, "external command to run")
}
fn run(
@ -39,6 +49,7 @@ pub struct ExternalCommand<'call, 'contex> {
pub name: &'call Expression,
pub args: &'call [Expression],
pub context: &'contex EvaluationContext,
pub last_expression: bool,
}
impl<'call, 'contex> ExternalCommand<'call, 'contex> {
@ -54,6 +65,7 @@ impl<'call, 'contex> ExternalCommand<'call, 'contex> {
name: &call.positional[0],
args: &call.positional[1..],
context,
last_expression: call.has_flag("last_expression"),
})
}
@ -70,7 +82,7 @@ impl<'call, 'contex> ExternalCommand<'call, 'contex> {
.collect()
}
pub fn run_with_input(&self, _input: Value) -> Result<Value, ShellError> {
pub fn run_with_input(&self, input: Value) -> Result<Value, ShellError> {
let mut process = self.create_command();
// TODO. We don't have a way to know the current directory
@ -81,18 +93,120 @@ impl<'call, 'contex> ExternalCommand<'call, 'contex> {
let envs = self.context.stack.get_env_vars();
process.envs(envs);
// If the external is not the last command, its output will get piped
// either as a string or binary
if !self.last_expression {
process.stdout(Stdio::piped());
}
// If there is an input from the pipeline. The stdin from the process
// is piped so it can be used to send the input information
if let Value::String { .. } = input {
process.stdin(Stdio::piped());
}
if let Value::Stream { .. } = input {
process.stdin(Stdio::piped());
}
match process.spawn() {
Err(err) => Err(ShellError::ExternalCommand(
format!("{}", err),
self.name.span,
)),
Ok(mut child) => match child.wait() {
Err(err) => Err(ShellError::ExternalCommand(
format!("{}", err),
self.name.span,
)),
Ok(_) => Ok(Value::nothing()),
},
Ok(mut child) => {
// if there is a string or a stream, that is sent to the pipe std
match input {
Value::Nothing { span: _ } => (),
Value::String { val, span: _ } => {
if let Some(mut stdin_write) = child.stdin.take() {
self.write_to_stdin(&mut stdin_write, val.as_bytes())?
}
}
Value::Binary { val, span: _ } => {
if let Some(mut stdin_write) = child.stdin.take() {
self.write_to_stdin(&mut stdin_write, &val)?
}
}
Value::Stream { stream, span: _ } => {
if let Some(mut stdin_write) = child.stdin.take() {
for value in stream {
match value {
Value::String { val, span: _ } => {
self.write_to_stdin(&mut stdin_write, val.as_bytes())?
}
Value::Binary { val, span: _ } => {
self.write_to_stdin(&mut stdin_write, &val)?
}
_ => continue,
}
}
}
}
_ => {
return Err(ShellError::ExternalCommand(
"Input is not string or binary".to_string(),
self.name.span,
))
}
}
// If this external is not the last expression, then its output is piped to a channel
// and we create a ValueStream that can be consumed
let value = if !self.last_expression {
let (tx, rx) = mpsc::sync_channel(0);
let stdout = child.stdout.take().ok_or_else(|| {
ShellError::ExternalCommand(
"Error taking stdout from external".to_string(),
self.name.span,
)
})?;
std::thread::spawn(move || {
// Stdout is read using the Buffer reader. It will do so until there is an
// error or there are no more bytes to read
let mut buf_read = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, stdout);
while let Ok(bytes) = buf_read.fill_buf() {
if bytes.is_empty() {
break;
}
// The Cow generated from the function represents the conversion
// from bytes to String. If no replacements are required, then the
// borrowed value is a proper UTF-8 string. The Owned option represents
// a string where the values had to be replaced, thus marking it as bytes
let data = match String::from_utf8_lossy(bytes) {
Cow::Borrowed(s) => Data::String(s.into()),
Cow::Owned(_) => Data::Bytes(bytes.to_vec()),
};
let length = bytes.len();
buf_read.consume(length);
match tx.send(data) {
Ok(_) => continue,
Err(_) => break,
}
}
});
// The ValueStream is consumed by the next expression in the pipeline
Value::Stream {
stream: ValueStream(Rc::new(RefCell::new(ChannelReceiver::new(rx)))),
span: Span::unknown(),
}
} else {
Value::nothing()
};
match child.wait() {
Err(err) => Err(ShellError::ExternalCommand(
format!("{}", err),
self.name.span,
)),
Ok(_) => Ok(value),
}
}
}
}
@ -120,4 +234,54 @@ impl<'call, 'contex> ExternalCommand<'call, 'contex> {
process
}
}
fn write_to_stdin(&self, stdin_write: &mut ChildStdin, val: &[u8]) -> Result<(), ShellError> {
if stdin_write.write(val).is_err() {
Err(ShellError::ExternalCommand(
"Error writing input to stdin".to_string(),
self.name.span,
))
} else {
Ok(())
}
}
}
// The piped data from stdout from the external command can be either String
// or binary. We use this enum to pass the data from the spawned process
enum Data {
String(String),
Bytes(Vec<u8>),
}
// Receiver used for the ValueStream
// It implements iterator so it can be used as a ValueStream
struct ChannelReceiver {
rx: mpsc::Receiver<Data>,
}
impl ChannelReceiver {
pub fn new(rx: mpsc::Receiver<Data>) -> Self {
Self { rx }
}
}
impl Iterator for ChannelReceiver {
type Item = Value;
fn next(&mut self) -> Option<Self::Item> {
match self.rx.recv() {
Ok(v) => match v {
Data::String(s) => Some(Value::String {
val: s,
span: Span::unknown(),
}),
Data::Bytes(b) => Some(Value::Binary {
val: b,
span: Span::unknown(),
}),
},
Err(_) => None,
}
}
}

View File

@ -73,6 +73,7 @@ fn eval_external(
name: &Span,
args: &[Span],
input: Value,
last_expression: bool,
) -> Result<Value, ShellError> {
let engine_state = context.engine_state.borrow();
@ -98,6 +99,10 @@ fn eval_external(
})
.collect();
if last_expression {
call.named.push(("last_expression".into(), None))
}
command.run(context, &call, input)
}
@ -158,7 +163,9 @@ pub fn eval_expression(
}
Expr::RowCondition(_, expr) => eval_expression(context, expr),
Expr::Call(call) => eval_call(context, call, Value::nothing()),
Expr::ExternalCall(name, args) => eval_external(context, name, args, Value::nothing()),
Expr::ExternalCall(name, args) => {
eval_external(context, name, args, Value::nothing(), true)
}
Expr::Operator(_) => Ok(Value::Nothing { span: expr.span }),
Expr::BinaryOp(lhs, op, rhs) => {
let op_span = op.span;
@ -239,9 +246,9 @@ pub fn eval_block(
block: &Block,
mut input: Value,
) -> Result<Value, ShellError> {
for stmt in &block.stmts {
for stmt in block.stmts.iter() {
if let Statement::Pipeline(pipeline) = stmt {
for elem in &pipeline.expressions {
for (i, elem) in pipeline.expressions.iter().enumerate() {
match elem {
Expression {
expr: Expr::Call(call),
@ -253,7 +260,13 @@ pub fn eval_block(
expr: Expr::ExternalCall(name, args),
..
} => {
input = eval_external(context, name, args, input)?;
input = eval_external(
context,
name,
args,
input,
i == pipeline.expressions.len() - 1,
)?;
}
elem => {

View File

@ -20,6 +20,7 @@ pub enum Type {
ValueStream,
Unknown,
Error,
Binary,
}
impl Display for Type {
@ -43,6 +44,7 @@ impl Display for Type {
Type::ValueStream => write!(f, "value stream"),
Type::Unknown => write!(f, "unknown"),
Type::Error => write!(f, "error"),
Type::Binary => write!(f, "binary"),
}
}
}

View File

@ -59,6 +59,10 @@ pub enum Value {
Error {
error: ShellError,
},
Binary {
val: Vec<u8>,
span: Span,
},
}
impl Value {
@ -83,6 +87,7 @@ impl Value {
Value::Block { span, .. } => *span,
Value::Stream { span, .. } => *span,
Value::Nothing { span, .. } => *span,
Value::Binary { span, .. } => *span,
}
}
@ -100,6 +105,7 @@ impl Value {
Value::Block { span, .. } => *span = new_span,
Value::Nothing { span, .. } => *span = new_span,
Value::Error { .. } => {}
Value::Binary { span, .. } => *span = new_span,
}
self
@ -121,6 +127,7 @@ impl Value {
Value::Block { .. } => Type::Block,
Value::Stream { .. } => Type::ValueStream,
Value::Error { .. } => Type::Error,
Value::Binary { .. } => Type::Binary,
}
}
@ -159,6 +166,7 @@ impl Value {
Value::Block { val, .. } => format!("<Block {}>", val),
Value::Nothing { .. } => String::new(),
Value::Error { error } => format!("{:?}", error),
Value::Binary { val, .. } => format!("{:?}", val),
}
}

View File

@ -30,8 +30,7 @@ impl Iterator for ValueStream {
fn next(&mut self) -> Option<Self::Item> {
{
let mut iter = self.0.borrow_mut();
iter.next()
self.0.borrow_mut().next()
}
}
}