use std::borrow::Cow; use std::collections::HashMap; use std::env; use std::io::{BufRead, BufReader, Write}; use std::process::{Command as CommandSys, Stdio}; use std::sync::mpsc; use nu_protocol::engine::{EngineState, Stack}; use nu_protocol::{ast::Call, engine::Command, ShellError, Signature, SyntaxShape, Value}; use nu_protocol::{IntoPipelineData, PipelineData, Span, Spanned}; use nu_engine::CallExt; const OUTPUT_BUFFER_SIZE: usize = 8192; #[derive(Clone)] pub struct External; impl Command for External { fn name(&self) -> &str { "run_external" } fn usage(&self) -> &str { "Runs external command" } fn signature(&self) -> nu_protocol::Signature { Signature::build("run_external") .switch("last_expression", "last_expression", None) .rest("rest", SyntaxShape::Any, "external command to run") } fn run( &self, engine_state: &EngineState, stack: &mut Stack, call: &Call, input: PipelineData, ) -> Result { let name: Spanned = call.req(engine_state, stack, 0)?; let args: Vec = call.rest(engine_state, stack, 1)?; let last_expression = call.has_flag("last_expression"); let env_vars = stack.get_env_vars(); let command = ExternalCommand { name, args, last_expression, env_vars, }; command.run_with_input(input) } } pub struct ExternalCommand { pub name: Spanned, pub args: Vec, pub last_expression: bool, pub env_vars: HashMap, } impl ExternalCommand { pub fn run_with_input(&self, input: PipelineData) -> Result { let mut process = self.create_command(); // TODO. We don't have a way to know the current directory // This should be information from the EvaluationContex or EngineState let path = env::current_dir().unwrap(); process.current_dir(path); process.envs(&self.env_vars); // If the external is not the last command, its output will get piped // either as a string or binary if !self.last_expression { process.stdout(Stdio::piped()); } // If there is an input from the pipeline. The stdin from the process // is piped so it can be used to send the input information if !matches!(input, PipelineData::Value(Value::Nothing { .. })) { process.stdin(Stdio::piped()); } match process.spawn() { Err(err) => Err(ShellError::ExternalCommand( format!("{}", err), self.name.span, )), Ok(mut child) => { // if there is a string or a stream, that is sent to the pipe std if let Some(mut stdin_write) = child.stdin.take() { std::thread::spawn(move || { for value in input.into_iter() { match value { Value::String { val, span: _ } => { if stdin_write.write(val.as_bytes()).is_err() { return Ok(()); } } Value::Binary { val, span: _ } => { if stdin_write.write(&val).is_err() { return Ok(()); } } x => { if stdin_write.write(x.into_string().as_bytes()).is_err() { return Err(()); } } } } Ok(()) }); } // If this external is not the last expression, then its output is piped to a channel // and we create a ValueStream that can be consumed let value = if !self.last_expression { let (tx, rx) = mpsc::channel(); let stdout = child.stdout.take().ok_or_else(|| { ShellError::ExternalCommand( "Error taking stdout from external".to_string(), self.name.span, ) })?; std::thread::spawn(move || { // Stdout is read using the Buffer reader. It will do so until there is an // error or there are no more bytes to read let mut buf_read = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, stdout); while let Ok(bytes) = buf_read.fill_buf() { if bytes.is_empty() { break; } // The Cow generated from the function represents the conversion // from bytes to String. If no replacements are required, then the // borrowed value is a proper UTF-8 string. The Owned option represents // a string where the values had to be replaced, thus marking it as bytes let data = match String::from_utf8_lossy(bytes) { Cow::Borrowed(s) => Data::String(s.into()), Cow::Owned(_) => Data::Bytes(bytes.to_vec()), }; let length = bytes.len(); buf_read.consume(length); match tx.send(data) { Ok(_) => continue, Err(_) => break, } } }); // The ValueStream is consumed by the next expression in the pipeline ChannelReceiver::new(rx).into_pipeline_data() } else { PipelineData::new() }; match child.wait() { Err(err) => Err(ShellError::ExternalCommand( format!("{}", err), self.name.span, )), Ok(_) => Ok(value), } } } } fn create_command(&self) -> CommandSys { // in all the other cases shell out if cfg!(windows) { //TODO. This should be modifiable from the config file. // We could give the option to call from powershell // for minimal builds cwd is unused let mut process = CommandSys::new("cmd"); process.arg("/c"); process.arg(&self.name.item); for arg in &self.args { // Clean the args before we use them: // https://stackoverflow.com/questions/1200235/how-to-pass-a-quoted-pipe-character-to-cmd-exe // cmd.exe needs to have a caret to escape a pipe let arg = arg.replace("|", "^|"); process.arg(&arg); } process } else { let cmd_with_args = vec![self.name.item.clone(), self.args.join(" ")].join(" "); let mut process = CommandSys::new("sh"); process.arg("-c").arg(cmd_with_args); process } } } // The piped data from stdout from the external command can be either String // or binary. We use this enum to pass the data from the spawned process enum Data { String(String), Bytes(Vec), } // Receiver used for the ValueStream // It implements iterator so it can be used as a ValueStream struct ChannelReceiver { rx: mpsc::Receiver, } impl ChannelReceiver { pub fn new(rx: mpsc::Receiver) -> Self { Self { rx } } } impl Iterator for ChannelReceiver { type Item = Value; fn next(&mut self) -> Option { match self.rx.recv() { Ok(v) => match v { Data::String(s) => Some(Value::String { val: s, span: Span::unknown(), }), Data::Bytes(b) => Some(Value::Binary { val: b, span: Span::unknown(), }), }, Err(_) => None, } } }