From 7451414b9e4099dd2a6d372d7fd541677bed71b1 Mon Sep 17 00:00:00 2001 From: Jason Gedge Date: Tue, 7 Jan 2020 16:00:01 -0500 Subject: [PATCH] Eliminate ClassifiedInputStream in favour of InputStream. (#1056) --- src/cli.rs | 4 +- src/commands/classified/external.rs | 377 +++++++++++++++++----------- src/commands/classified/internal.rs | 7 +- src/commands/classified/mod.rs | 31 --- src/commands/classified/pipeline.rs | 26 +- src/stream.rs | 6 +- tests/shell/mod.rs | 32 ++- 7 files changed, 285 insertions(+), 198 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 0933dfe35..2441ebb19 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,5 +1,4 @@ use crate::commands::classified::pipeline::run_pipeline; -use crate::commands::classified::ClassifiedInputStream; use crate::commands::plugin::JsonRpc; use crate::commands::plugin::{PluginCommand, PluginSink}; use crate::commands::whole_stream_command; @@ -608,8 +607,7 @@ async fn process_line(readline: Result, ctx: &mut Context return LineResult::Error(line.to_string(), err); } - let input = ClassifiedInputStream::new(); - + let input = InputStream::empty(); match run_pipeline(pipeline, ctx, input, line).await { Ok(_) => LineResult::Success(line.to_string()), Err(err) => LineResult::Error(line.to_string(), err), diff --git a/src/commands/classified/external.rs b/src/commands/classified/external.rs index f14fde52d..4f9252388 100644 --- a/src/commands/classified/external.rs +++ b/src/commands/classified/external.rs @@ -5,13 +5,11 @@ use futures_codec::{Decoder, Encoder, Framed}; use log::trace; use nu_errors::ShellError; use nu_parser::ExternalCommand; -use nu_protocol::{Primitive, UntaggedValue, Value}; -use std::io::{Error, ErrorKind}; +use nu_protocol::{Primitive, ShellTypeName, UntaggedValue, Value}; +use std::io::{Error, ErrorKind, Write}; use std::ops::Deref; use subprocess::Exec; -use super::ClassifiedInputStream; - /// A simple `Codec` implementation that splits up data into lines. pub struct LinesCodec {} @@ -50,163 +48,107 @@ impl Decoder for LinesCodec { } } -#[derive(Debug)] -pub(crate) enum StreamNext { - Last, - External, - Internal, -} - pub(crate) async fn run_external_command( command: ExternalCommand, context: &mut Context, - input: ClassifiedInputStream, - stream_next: StreamNext, -) -> Result { - let stdin = input.stdin; - let inputs: Vec = input.objects.into_vec().await; - + input: InputStream, + is_last: bool, +) -> Result { trace!(target: "nu::run::external", "-> {}", command.name); + + let has_it_arg = command.args.iter().any(|arg| arg.contains("$it")); + if has_it_arg { + run_with_iterator_arg(command, context, input, is_last).await + } else { + run_with_stdin(command, context, input, is_last).await + } +} + +async fn run_with_iterator_arg( + command: ExternalCommand, + context: &mut Context, + input: InputStream, + is_last: bool, +) -> Result { + let name = command.name; + let args = command.args; + let name_tag = command.name_tag; + let inputs = input.into_vec().await; + trace!(target: "nu::run::external", "inputs = {:?}", inputs); - let mut arg_string = command.name.to_owned(); - for arg in command.args.iter() { - arg_string.push_str(&arg); - } + let input_strings = inputs + .iter() + .map(|i| match i { + Value { + value: UntaggedValue::Primitive(Primitive::String(s)), + .. + } + | Value { + value: UntaggedValue::Primitive(Primitive::Line(s)), + .. + } => Ok(s.clone()), + _ => { + let arg = args.iter().find(|arg| arg.contains("$it")); + if let Some(arg) = arg { + Err(ShellError::labeled_error( + "External $it needs string data", + "given row instead of string data", + &arg.tag, + )) + } else { + Err(ShellError::labeled_error( + "$it needs string data", + "given something else", + &name_tag, + )) + } + } + }) + .collect::, ShellError>>()?; let home_dir = dirs::home_dir(); - - trace!(target: "nu::run::external", "command = {:?}", command.name); - - let mut process; - if arg_string.contains("$it") { - let input_strings = inputs - .iter() - .map(|i| match i { - Value { - value: UntaggedValue::Primitive(Primitive::String(s)), - .. - } - | Value { - value: UntaggedValue::Primitive(Primitive::Line(s)), - .. - } => Ok(s.clone()), - _ => { - let arg = command.args.iter().find(|arg| arg.contains("$it")); - - if let Some(arg) = arg { - Err(ShellError::labeled_error( - "External $it needs string data", - "given row instead of string data", - &arg.tag, - )) - } else { - Err(ShellError::labeled_error( - "$it needs string data", - "given something else", - command.name_tag.clone(), - )) - } - } - }) - .collect::, ShellError>>()?; - - let commands = input_strings.iter().map(|i| { - let args = command.args.iter().filter_map(|arg| { - if arg.chars().all(|c| c.is_whitespace()) { - None - } else { - // Let's also replace ~ as we shell out - let arg = shellexpand::tilde_with_context(arg.deref(), || home_dir.as_ref()); - - Some(arg.replace("$it", &i)) - } - }); - - format!("{} {}", command.name, itertools::join(args, " ")) + let commands = input_strings.iter().map(|i| { + let args = args.iter().filter_map(|arg| { + if arg.chars().all(|c| c.is_whitespace()) { + None + } else { + let arg = shellexpand::tilde_with_context(arg.deref(), || home_dir.as_ref()); + Some(arg.replace("$it", &i)) + } }); - process = Exec::shell(itertools::join(commands, " && ")) - } else { - process = Exec::cmd(&command.name); - for arg in command.args.iter() { - // Let's also replace ~ as we shell out - let arg = shellexpand::tilde_with_context(arg.deref(), || home_dir.as_ref()); + format!("{} {}", name, itertools::join(args, " ")) + }); - let arg_chars: Vec<_> = arg.chars().collect(); - - if arg_chars.len() > 1 - && ((arg_chars[0] == '"' && arg_chars[arg_chars.len() - 1] == '"') - || (arg_chars[0] == '\'' && arg_chars[arg_chars.len() - 1] == '\'')) - { - // quoted string - let new_arg: String = arg_chars[1..arg_chars.len() - 1].iter().collect(); - process = process.arg(new_arg); - } else { - process = process.arg(arg.as_ref()); - } - } - } + let mut process = Exec::shell(itertools::join(commands, " && ")); process = process.cwd(context.shell_manager.path()?); - trace!(target: "nu::run::external", "cwd = {:?}", context.shell_manager.path()); - let mut process = match stream_next { - StreamNext::Last => process, - StreamNext::External | StreamNext::Internal => { - process.stdout(subprocess::Redirection::Pipe) - } - }; - - trace!(target: "nu::run::external", "set up stdout pipe"); - - if let Some(stdin) = stdin { - process = process.stdin(stdin); + if !is_last { + process = process.stdout(subprocess::Redirection::Pipe); + trace!(target: "nu::run::external", "set up stdout pipe"); } - trace!(target: "nu::run::external", "set up stdin pipe"); trace!(target: "nu::run::external", "built process {:?}", process); - let popen = process.popen(); - - trace!(target: "nu::run::external", "next = {:?}", stream_next); - - let name_tag = command.name_tag.clone(); + let popen = process.detached().popen(); if let Ok(mut popen) = popen { - popen.detach(); - match stream_next { - StreamNext::Last => { - let _ = popen.wait(); - Ok(ClassifiedInputStream::new()) - } - StreamNext::External => { - let stdout = popen.stdout.take().ok_or_else(|| { - ShellError::untagged_runtime_error( - "Can't redirect the stdout for external command", - ) - })?; - Ok(ClassifiedInputStream::from_stdout(stdout)) - } - StreamNext::Internal => { - let stdout = popen.stdout.take().ok_or_else(|| { - ShellError::untagged_runtime_error( - "Can't redirect the stdout for internal command", - ) - })?; - let file = futures::io::AllowStdIo::new(stdout); - let stream = Framed::new(file, LinesCodec {}); - let stream = stream.map(move |line| { - if let Ok(line) = line { - line.into_value(&name_tag) - } else { - panic!("Internal error: could not read lines of text from stdin") - } - }); - Ok(ClassifiedInputStream::from_input_stream( - stream.boxed() as BoxStream<'static, Value> - )) - } + if is_last { + let _ = popen.wait(); + Ok(InputStream::empty()) + } else { + let stdout = popen.stdout.take().ok_or_else(|| { + ShellError::untagged_runtime_error("Can't redirect the stdout for external command") + })?; + let file = futures::io::AllowStdIo::new(stdout); + let stream = Framed::new(file, LinesCodec {}); + let stream = stream.map(move |line| { + line.expect("Internal error: could not read lines of text from stdin") + .into_value(&name_tag) + }); + Ok(stream.boxed().into()) } } else { Err(ShellError::labeled_error( @@ -216,3 +158,156 @@ pub(crate) async fn run_external_command( )) } } + +async fn run_with_stdin( + command: ExternalCommand, + context: &mut Context, + mut input: InputStream, + is_last: bool, +) -> Result { + let name_tag = command.name_tag; + let home_dir = dirs::home_dir(); + + let mut process = Exec::cmd(&command.name); + for arg in command.args.iter() { + // Let's also replace ~ as we shell out + let arg = shellexpand::tilde_with_context(arg.deref(), || home_dir.as_ref()); + + // Strip quotes from a quoted string + if arg.len() > 1 + && ((arg.starts_with('"') && arg.ends_with('"')) + || (arg.starts_with('\'') && arg.ends_with('\''))) + { + process = process.arg(arg.chars().skip(1).take(arg.len() - 2).collect::()); + } else { + process = process.arg(arg.as_ref()); + } + } + + process = process.cwd(context.shell_manager.path()?); + trace!(target: "nu::run::external", "cwd = {:?}", context.shell_manager.path()); + + if !is_last { + process = process.stdout(subprocess::Redirection::Pipe); + trace!(target: "nu::run::external", "set up stdout pipe"); + } + + process = process.stdin(subprocess::Redirection::Pipe); + trace!(target: "nu::run::external", "set up stdin pipe"); + + trace!(target: "nu::run::external", "built process {:?}", process); + + let popen = process.detached().popen(); + if let Ok(mut popen) = popen { + let mut stdin_write = popen + .stdin + .take() + .expect("Internal error: could not get stdin pipe for external command"); + + let stream = async_stream! { + while let Some(item) = input.next().await { + match item.value { + UntaggedValue::Primitive(Primitive::Nothing) => { + // If first in a pipeline, will receive Nothing. This is not an error. + }, + + UntaggedValue::Primitive(Primitive::String(s)) | + UntaggedValue::Primitive(Primitive::Line(s)) => + { + if let Err(e) = stdin_write.write(s.as_bytes()) { + let message = format!("Unable to write to stdin (error = {})", e); + yield Ok(Value { + value: UntaggedValue::Error(ShellError::labeled_error( + message, + "unable to write to stdin", + &name_tag, + )), + tag: name_tag, + }); + return; + } + }, + + // TODO serialize other primitives? https://github.com/nushell/nushell/issues/778 + + v => { + let message = format!("Received unexpected type from pipeline ({})", v.type_name()); + yield Ok(Value { + value: UntaggedValue::Error(ShellError::labeled_error( + message, + "expected a string", + &name_tag, + )), + tag: name_tag, + }); + return; + }, + } + } + + // Close stdin, which informs the external process that there's no more input + drop(stdin_write); + + if !is_last { + let stdout = if let Some(stdout) = popen.stdout.take() { + stdout + } else { + yield Ok(Value { + value: UntaggedValue::Error( + ShellError::labeled_error( + "Can't redirect the stdout for external command", + "can't redirect stdout", + &name_tag, + ) + ), + tag: name_tag, + }); + return; + }; + + let file = futures::io::AllowStdIo::new(stdout); + let stream = Framed::new(file, LinesCodec {}); + let mut stream = stream.map(|line| { + if let Ok(line) = line { + line.into_value(&name_tag) + } else { + panic!("Internal error: could not read lines of text from stdin") + } + }); + + loop { + match stream.next().await { + Some(item) => yield Ok(item), + None => break, + } + } + } + + let errored = match popen.wait() { + Ok(status) => !status.success(), + Err(e) => true, + }; + + if errored { + yield Ok(Value { + value: UntaggedValue::Error( + ShellError::labeled_error( + "External command failed", + "command failed", + &name_tag, + ) + ), + tag: name_tag, + }); + }; + }; + + Ok(stream.to_input_stream()) + } else { + Err(ShellError::labeled_error( + "Command not found", + "command not found", + name_tag, + )) + } +} diff --git a/src/commands/classified/internal.rs b/src/commands/classified/internal.rs index f2c37653f..643246ebe 100644 --- a/src/commands/classified/internal.rs +++ b/src/commands/classified/internal.rs @@ -5,12 +5,10 @@ use nu_errors::ShellError; use nu_parser::InternalCommand; use nu_protocol::{CommandAction, Primitive, ReturnSuccess, UntaggedValue, Value}; -use super::ClassifiedInputStream; - pub(crate) async fn run_internal_command( command: InternalCommand, context: &mut Context, - input: ClassifiedInputStream, + input: InputStream, source: Text, ) -> Result { if log_enabled!(log::Level::Trace) { @@ -19,8 +17,7 @@ pub(crate) async fn run_internal_command( trace!(target: "nu::run::internal", "{}", command.args.debug(&source)); } - let objects: InputStream = - trace_stream!(target: "nu::trace_stream::internal", "input" = input.objects); + let objects: InputStream = trace_stream!(target: "nu::trace_stream::internal", "input" = input); let internal_command = context.expect_command(&command.name); diff --git a/src/commands/classified/mod.rs b/src/commands/classified/mod.rs index 5e5d5b95f..5d64a3b4c 100644 --- a/src/commands/classified/mod.rs +++ b/src/commands/classified/mod.rs @@ -1,6 +1,3 @@ -use crate::prelude::*; -use nu_protocol::UntaggedValue; - mod dynamic; pub(crate) mod external; pub(crate) mod internal; @@ -8,31 +5,3 @@ pub(crate) mod pipeline; #[allow(unused_imports)] pub(crate) use dynamic::Command as DynamicCommand; - -pub(crate) struct ClassifiedInputStream { - pub(crate) objects: InputStream, - pub(crate) stdin: Option, -} - -impl ClassifiedInputStream { - pub(crate) fn new() -> ClassifiedInputStream { - ClassifiedInputStream { - objects: vec![UntaggedValue::nothing().into_value(Tag::unknown())].into(), - stdin: None, - } - } - - pub(crate) fn from_input_stream(stream: impl Into) -> ClassifiedInputStream { - ClassifiedInputStream { - objects: stream.into(), - stdin: None, - } - } - - pub(crate) fn from_stdout(stdout: std::fs::File) -> ClassifiedInputStream { - ClassifiedInputStream { - objects: VecDeque::new().into(), - stdin: Some(stdout), - } - } -} diff --git a/src/commands/classified/pipeline.rs b/src/commands/classified/pipeline.rs index 253f7b0c2..e067a90bd 100644 --- a/src/commands/classified/pipeline.rs +++ b/src/commands/classified/pipeline.rs @@ -1,8 +1,7 @@ -use crate::commands::classified::external::{run_external_command, StreamNext}; +use crate::commands::classified::external::run_external_command; use crate::commands::classified::internal::run_internal_command; -use crate::commands::classified::ClassifiedInputStream; use crate::context::Context; -use crate::stream::OutputStream; +use crate::stream::{InputStream, OutputStream}; use nu_errors::ShellError; use nu_parser::{ClassifiedCommand, ClassifiedPipeline}; use nu_protocol::{ReturnSuccess, UntaggedValue, Value}; @@ -12,7 +11,7 @@ use std::sync::atomic::Ordering; pub(crate) async fn run_pipeline( pipeline: ClassifiedPipeline, ctx: &mut Context, - mut input: ClassifiedInputStream, + mut input: InputStream, line: &str, ) -> Result<(), ShellError> { let mut iter = pipeline.commands.list.into_iter().peekable(); @@ -31,20 +30,15 @@ pub(crate) async fn run_pipeline( } (Some(ClassifiedCommand::Internal(left)), _) => { - let stream = run_internal_command(left, ctx, input, Text::from(line)).await?; - ClassifiedInputStream::from_input_stream(stream) - } - - (Some(ClassifiedCommand::External(left)), Some(ClassifiedCommand::External(_))) => { - run_external_command(left, ctx, input, StreamNext::External).await? - } - - (Some(ClassifiedCommand::External(left)), Some(_)) => { - run_external_command(left, ctx, input, StreamNext::Internal).await? + run_internal_command(left, ctx, input, Text::from(line)).await? } (Some(ClassifiedCommand::External(left)), None) => { - run_external_command(left, ctx, input, StreamNext::Last).await? + run_external_command(left, ctx, input, true).await? + } + + (Some(ClassifiedCommand::External(left)), _) => { + run_external_command(left, ctx, input, false).await? } (None, _) => break, @@ -52,7 +46,7 @@ pub(crate) async fn run_pipeline( } use futures::stream::TryStreamExt; - let mut output_stream: OutputStream = input.objects.into(); + let mut output_stream: OutputStream = input.into(); loop { match output_stream.try_next().await { Ok(Some(ReturnSuccess::Value(Value { diff --git a/src/stream.rs b/src/stream.rs index a33377e9f..bf9563dc2 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,11 +1,15 @@ use crate::prelude::*; -use nu_protocol::{ReturnSuccess, ReturnValue, Value}; +use nu_protocol::{ReturnSuccess, ReturnValue, UntaggedValue, Value}; pub struct InputStream { pub(crate) values: BoxStream<'static, Value>, } impl InputStream { + pub fn empty() -> InputStream { + vec![UntaggedValue::nothing().into_value(Tag::unknown())].into() + } + pub fn into_vec(self) -> impl Future> { self.values.collect() } diff --git a/tests/shell/mod.rs b/tests/shell/mod.rs index 25909ea3c..2a146ad17 100644 --- a/tests/shell/mod.rs +++ b/tests/shell/mod.rs @@ -1,7 +1,7 @@ mod pipeline { use nu_test_support::fs::Stub::EmptyFile; use nu_test_support::playground::Playground; - use nu_test_support::{nu, pipeline}; + use nu_test_support::{nu, nu_error, pipeline}; #[test] fn can_process_row_as_it_argument_to_an_external_command_given_the_it_data_is_a_string() { @@ -72,6 +72,36 @@ mod pipeline { }) } + #[test] + fn can_process_stdout_of_external_piped_to_stdin_of_external() { + let actual = nu!( + cwd: "tests/fixtures", + "^echo 1 | ^cat" + ); + + assert!(actual.contains("1")); + } + + #[test] + fn can_process_row_from_internal_piped_to_stdin_of_external() { + let actual = nu!( + cwd: "tests/fixtures", + "echo \"1\" | ^cat" + ); + + assert!(actual.contains("1")); + } + + #[test] + fn shows_error_for_external_command_that_fails() { + let actual = nu_error!( + cwd: "tests/fixtures", + "echo \"1\" | ^false" + ); + + assert!(actual.contains("External command failed")); + } + mod expands_tilde { use super::nu;