use crate::prelude::*; use bytes::{BufMut, BytesMut}; use futures::stream::StreamExt; use futures_codec::{Decoder, Encoder, Framed}; use log::trace; use nu_errors::ShellError; use nu_parser::ExternalCommand; use nu_protocol::{Primitive, ShellTypeName, UntaggedValue, Value}; use std::io::{Error, ErrorKind, Write}; use std::ops::Deref; use subprocess::Exec; /// A simple `Codec` implementation that splits up data into lines. pub struct LinesCodec {} impl Encoder for LinesCodec { type Item = String; type Error = Error; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { dst.put(item); Ok(()) } } impl Decoder for LinesCodec { type Item = nu_protocol::UntaggedValue; type Error = Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { match src.iter().position(|b| b == &b'\n') { Some(pos) if !src.is_empty() => { let buf = src.split_to(pos + 1); String::from_utf8(buf.to_vec()) .map(UntaggedValue::line) .map(Some) .map_err(|e| Error::new(ErrorKind::InvalidData, e)) } _ if !src.is_empty() => { let drained = src.take(); String::from_utf8(drained.to_vec()) .map(UntaggedValue::string) .map(Some) .map_err(|e| Error::new(ErrorKind::InvalidData, e)) } _ => Ok(None), } } } pub(crate) async fn run_external_command( command: ExternalCommand, context: &mut Context, input: Option, is_last: bool, ) -> Result, ShellError> { trace!(target: "nu::run::external", "-> {}", command.name); let has_it_arg = command.args.iter().any(|arg| arg.contains("$it")); if has_it_arg { run_with_iterator_arg(command, context, input, is_last).await } else { run_with_stdin(command, context, input, is_last).await } } async fn run_with_iterator_arg( command: ExternalCommand, context: &mut Context, input: Option, is_last: bool, ) -> Result, ShellError> { let name = command.name; let args = command.args; let name_tag = command.name_tag; let inputs = input.unwrap_or_else(InputStream::empty).into_vec().await; trace!(target: "nu::run::external", "inputs = {:?}", inputs); let input_strings = inputs .iter() .map(|i| match i { Value { value: UntaggedValue::Primitive(Primitive::String(s)), .. } | Value { value: UntaggedValue::Primitive(Primitive::Line(s)), .. } => Ok(s.clone()), Value { value: UntaggedValue::Primitive(Primitive::Path(p)), .. } => Ok(p.to_string_lossy().to_string()), _ => { let arg = args.iter().find(|arg| arg.contains("$it")); if let Some(arg) = arg { Err(ShellError::labeled_error( "External $it needs string data", "given row instead of string data", &arg.tag, )) } else { Err(ShellError::labeled_error( "$it needs string data", "given something else", &name_tag, )) } } }) .collect::, ShellError>>()?; let home_dir = dirs::home_dir(); let commands = input_strings.iter().map(|i| { let args = args.iter().filter_map(|arg| { if arg.chars().all(|c| c.is_whitespace()) { None } else { let arg = shellexpand::tilde_with_context(arg.deref(), || home_dir.as_ref()); Some(arg.replace("$it", &i)) } }); format!("{} {}", name, itertools::join(args, " ")) }); let mut process = Exec::shell(itertools::join(commands, " && ")); process = process.cwd(context.shell_manager.path()?); trace!(target: "nu::run::external", "cwd = {:?}", context.shell_manager.path()); if !is_last { process = process.stdout(subprocess::Redirection::Pipe); trace!(target: "nu::run::external", "set up stdout pipe"); } trace!(target: "nu::run::external", "built process {:?}", process); let popen = process.detached().popen(); if let Ok(mut popen) = popen { if is_last { let _ = popen.wait(); Ok(None) } else { let stdout = popen.stdout.take().ok_or_else(|| { ShellError::untagged_runtime_error("Can't redirect the stdout for external command") })?; let file = futures::io::AllowStdIo::new(stdout); let stream = Framed::new(file, LinesCodec {}); let stream = stream.map(move |line| { line.expect("Internal error: could not read lines of text from stdin") .into_value(&name_tag) }); Ok(Some(stream.boxed().into())) } } else { Err(ShellError::labeled_error( "Command not found", "command not found", name_tag, )) } } pub fn argument_is_quoted(argument: &str) -> bool { (argument.starts_with('"') && argument.ends_with('"') || (argument.starts_with('\'') && argument.ends_with('\''))) } pub fn remove_quotes(argument: &str) -> &str { let size = argument.len(); &argument[1..size - 1] } async fn run_with_stdin( command: ExternalCommand, context: &mut Context, input: Option, is_last: bool, ) -> Result, ShellError> { let name_tag = command.name_tag; let home_dir = dirs::home_dir(); let mut process = Exec::cmd(&command.name); for arg in command.args.iter() { // Let's also replace ~ as we shell out let arg = shellexpand::tilde_with_context(arg.deref(), || home_dir.as_ref()); // Strip quotes from a quoted string process = if arg.len() > 1 && (argument_is_quoted(&arg)) { process.arg(remove_quotes(&arg)) } else { process.arg(arg.as_ref()) }; } process = process.cwd(context.shell_manager.path()?); trace!(target: "nu::run::external", "cwd = {:?}", context.shell_manager.path()); if !is_last { process = process.stdout(subprocess::Redirection::Pipe); trace!(target: "nu::run::external", "set up stdout pipe"); } if input.is_some() { process = process.stdin(subprocess::Redirection::Pipe); trace!(target: "nu::run::external", "set up stdin pipe"); } trace!(target: "nu::run::external", "built process {:?}", process); let popen = process.detached().popen(); if let Ok(mut popen) = popen { let stream = async_stream! { if let Some(mut input) = input { let mut stdin_write = popen .stdin .take() .expect("Internal error: could not get stdin pipe for external command"); while let Some(item) = input.next().await { match item.value { UntaggedValue::Primitive(Primitive::Nothing) => { // If first in a pipeline, will receive Nothing. This is not an error. }, UntaggedValue::Primitive(Primitive::String(s)) | UntaggedValue::Primitive(Primitive::Line(s)) => { if let Err(e) = stdin_write.write(s.as_bytes()) { let message = format!("Unable to write to stdin (error = {})", e); yield Ok(Value { value: UntaggedValue::Error(ShellError::labeled_error( message, "application may have closed before completing pipeline", &name_tag, )), tag: name_tag, }); return; } }, // TODO serialize other primitives? https://github.com/nushell/nushell/issues/778 v => { let message = format!("Received unexpected type from pipeline ({})", v.type_name()); yield Ok(Value { value: UntaggedValue::Error(ShellError::labeled_error( message, "expected a string", &name_tag, )), tag: name_tag, }); return; }, } } // Close stdin, which informs the external process that there's no more input drop(stdin_write); } if !is_last { let stdout = if let Some(stdout) = popen.stdout.take() { stdout } else { yield Ok(Value { value: UntaggedValue::Error( ShellError::labeled_error( "Can't redirect the stdout for external command", "can't redirect stdout", &name_tag, ) ), tag: name_tag, }); return; }; let file = futures::io::AllowStdIo::new(stdout); let stream = Framed::new(file, LinesCodec {}); let mut stream = stream.map(|line| { if let Ok(line) = line { line.into_value(&name_tag) } else { panic!("Internal error: could not read lines of text from stdin") } }); loop { match stream.next().await { Some(item) => yield Ok(item), None => break, } } } loop { match popen.poll() { None => std::thread::sleep(std::time::Duration::new(0, 100_000_000)), Some(status) => { if !status.success() { yield Ok(Value { value: UntaggedValue::Error( ShellError::labeled_error( "External command failed", "command failed", &name_tag, ) ), tag: name_tag, }); } break; } } } }; Ok(Some(stream.to_input_stream())) } else { Err(ShellError::labeled_error( "Command not found", "command not found", name_tag, )) } } #[cfg(test)] mod tests { use super::{argument_is_quoted, remove_quotes, run_external_command, Context, OutputStream}; use futures::executor::block_on; use futures::stream::TryStreamExt; use nu_errors::ShellError; use nu_parser::commands::classified::external::{ExternalArgs, ExternalCommand}; use nu_protocol::{UntaggedValue, Value}; use nu_source::{Span, SpannedItem, Tag}; async fn read(mut stream: OutputStream) -> Option { match stream.try_next().await { Ok(val) => { if let Some(val) = val { val.raw_value() } else { None } } Err(_) => None, } } fn external(name: &str) -> ExternalCommand { let mut path = nu_test_support::fs::binaries(); path.push(name); let name = path.to_string_lossy().to_string().spanned(Span::unknown()); ExternalCommand { name: name.to_string(), name_tag: Tag { anchor: None, span: name.span, }, args: ExternalArgs { list: vec![], span: name.span, }, } } async fn non_existent_run() -> Result<(), ShellError> { let cmd = external("i_dont_exist.exe"); let mut ctx = Context::basic().expect("There was a problem creating a basic context."); assert!(run_external_command(cmd, &mut ctx, None, false) .await .is_err()); Ok(()) } async fn failure_run() -> Result<(), ShellError> { let cmd = external("fail"); let mut ctx = Context::basic().expect("There was a problem creating a basic context."); let stream = run_external_command(cmd, &mut ctx, None, false) .await? .expect("There was a problem running the external command."); match read(stream.into()).await { Some(Value { value: UntaggedValue::Error(_), .. }) => {} None | _ => panic!("Command didn't fail."), } Ok(()) } #[test] fn identifies_command_failed() -> Result<(), ShellError> { block_on(failure_run()) } #[test] fn identifies_command_not_found() -> Result<(), ShellError> { block_on(non_existent_run()) } #[test] fn checks_quotes_from_argument_to_be_passed_in() { assert_eq!(argument_is_quoted("'andrés"), false); assert_eq!(argument_is_quoted("andrés'"), false); assert_eq!(argument_is_quoted(r#""andrés"#), false); assert_eq!(argument_is_quoted(r#"andrés""#), false); assert_eq!(argument_is_quoted("'andrés'"), true); assert_eq!(argument_is_quoted(r#""andrés""#), true); } #[test] fn strips_quotes_from_argument_to_be_passed_in() { assert_eq!(remove_quotes(r#"'andrés'"#), "andrés"); assert_eq!(remove_quotes(r#""andrés""#), "andrés"); } }