Eliminate ClassifiedInputStream in favour of InputStream. (#1056)

This commit is contained in:
Jason Gedge 2020-01-07 16:00:01 -05:00 committed by Jonathan Turner
parent 41ebc6b42d
commit 7451414b9e
7 changed files with 285 additions and 198 deletions

View File

@ -1,5 +1,4 @@
use crate::commands::classified::pipeline::run_pipeline; use crate::commands::classified::pipeline::run_pipeline;
use crate::commands::classified::ClassifiedInputStream;
use crate::commands::plugin::JsonRpc; use crate::commands::plugin::JsonRpc;
use crate::commands::plugin::{PluginCommand, PluginSink}; use crate::commands::plugin::{PluginCommand, PluginSink};
use crate::commands::whole_stream_command; use crate::commands::whole_stream_command;
@ -608,8 +607,7 @@ async fn process_line(readline: Result<String, ReadlineError>, ctx: &mut Context
return LineResult::Error(line.to_string(), err); return LineResult::Error(line.to_string(), err);
} }
let input = ClassifiedInputStream::new(); let input = InputStream::empty();
match run_pipeline(pipeline, ctx, input, line).await { match run_pipeline(pipeline, ctx, input, line).await {
Ok(_) => LineResult::Success(line.to_string()), Ok(_) => LineResult::Success(line.to_string()),
Err(err) => LineResult::Error(line.to_string(), err), Err(err) => LineResult::Error(line.to_string(), err),

View File

@ -5,13 +5,11 @@ use futures_codec::{Decoder, Encoder, Framed};
use log::trace; use log::trace;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_parser::ExternalCommand; use nu_parser::ExternalCommand;
use nu_protocol::{Primitive, UntaggedValue, Value}; use nu_protocol::{Primitive, ShellTypeName, UntaggedValue, Value};
use std::io::{Error, ErrorKind}; use std::io::{Error, ErrorKind, Write};
use std::ops::Deref; use std::ops::Deref;
use subprocess::Exec; use subprocess::Exec;
use super::ClassifiedInputStream;
/// A simple `Codec` implementation that splits up data into lines. /// A simple `Codec` implementation that splits up data into lines.
pub struct LinesCodec {} pub struct LinesCodec {}
@ -50,36 +48,35 @@ impl Decoder for LinesCodec {
} }
} }
#[derive(Debug)]
pub(crate) enum StreamNext {
Last,
External,
Internal,
}
pub(crate) async fn run_external_command( pub(crate) async fn run_external_command(
command: ExternalCommand, command: ExternalCommand,
context: &mut Context, context: &mut Context,
input: ClassifiedInputStream, input: InputStream,
stream_next: StreamNext, is_last: bool,
) -> Result<ClassifiedInputStream, ShellError> { ) -> Result<InputStream, ShellError> {
let stdin = input.stdin;
let inputs: Vec<Value> = input.objects.into_vec().await;
trace!(target: "nu::run::external", "-> {}", command.name); trace!(target: "nu::run::external", "-> {}", command.name);
let has_it_arg = command.args.iter().any(|arg| arg.contains("$it"));
if has_it_arg {
run_with_iterator_arg(command, context, input, is_last).await
} else {
run_with_stdin(command, context, input, is_last).await
}
}
async fn run_with_iterator_arg(
command: ExternalCommand,
context: &mut Context,
input: InputStream,
is_last: bool,
) -> Result<InputStream, ShellError> {
let name = command.name;
let args = command.args;
let name_tag = command.name_tag;
let inputs = input.into_vec().await;
trace!(target: "nu::run::external", "inputs = {:?}", inputs); trace!(target: "nu::run::external", "inputs = {:?}", inputs);
let mut arg_string = command.name.to_owned();
for arg in command.args.iter() {
arg_string.push_str(&arg);
}
let home_dir = dirs::home_dir();
trace!(target: "nu::run::external", "command = {:?}", command.name);
let mut process;
if arg_string.contains("$it") {
let input_strings = inputs let input_strings = inputs
.iter() .iter()
.map(|i| match i { .map(|i| match i {
@ -92,8 +89,7 @@ pub(crate) async fn run_external_command(
.. ..
} => Ok(s.clone()), } => Ok(s.clone()),
_ => { _ => {
let arg = command.args.iter().find(|arg| arg.contains("$it")); let arg = args.iter().find(|arg| arg.contains("$it"));
if let Some(arg) = arg { if let Some(arg) = arg {
Err(ShellError::labeled_error( Err(ShellError::labeled_error(
"External $it needs string data", "External $it needs string data",
@ -104,109 +100,55 @@ pub(crate) async fn run_external_command(
Err(ShellError::labeled_error( Err(ShellError::labeled_error(
"$it needs string data", "$it needs string data",
"given something else", "given something else",
command.name_tag.clone(), &name_tag,
)) ))
} }
} }
}) })
.collect::<Result<Vec<String>, ShellError>>()?; .collect::<Result<Vec<String>, ShellError>>()?;
let home_dir = dirs::home_dir();
let commands = input_strings.iter().map(|i| { let commands = input_strings.iter().map(|i| {
let args = command.args.iter().filter_map(|arg| { let args = args.iter().filter_map(|arg| {
if arg.chars().all(|c| c.is_whitespace()) { if arg.chars().all(|c| c.is_whitespace()) {
None None
} else { } else {
// Let's also replace ~ as we shell out
let arg = shellexpand::tilde_with_context(arg.deref(), || home_dir.as_ref()); let arg = shellexpand::tilde_with_context(arg.deref(), || home_dir.as_ref());
Some(arg.replace("$it", &i)) Some(arg.replace("$it", &i))
} }
}); });
format!("{} {}", command.name, itertools::join(args, " ")) format!("{} {}", name, itertools::join(args, " "))
}); });
process = Exec::shell(itertools::join(commands, " && ")) let mut process = Exec::shell(itertools::join(commands, " && "));
} else {
process = Exec::cmd(&command.name);
for arg in command.args.iter() {
// Let's also replace ~ as we shell out
let arg = shellexpand::tilde_with_context(arg.deref(), || home_dir.as_ref());
let arg_chars: Vec<_> = arg.chars().collect();
if arg_chars.len() > 1
&& ((arg_chars[0] == '"' && arg_chars[arg_chars.len() - 1] == '"')
|| (arg_chars[0] == '\'' && arg_chars[arg_chars.len() - 1] == '\''))
{
// quoted string
let new_arg: String = arg_chars[1..arg_chars.len() - 1].iter().collect();
process = process.arg(new_arg);
} else {
process = process.arg(arg.as_ref());
}
}
}
process = process.cwd(context.shell_manager.path()?); process = process.cwd(context.shell_manager.path()?);
trace!(target: "nu::run::external", "cwd = {:?}", context.shell_manager.path()); trace!(target: "nu::run::external", "cwd = {:?}", context.shell_manager.path());
let mut process = match stream_next { if !is_last {
StreamNext::Last => process, process = process.stdout(subprocess::Redirection::Pipe);
StreamNext::External | StreamNext::Internal => {
process.stdout(subprocess::Redirection::Pipe)
}
};
trace!(target: "nu::run::external", "set up stdout pipe"); trace!(target: "nu::run::external", "set up stdout pipe");
if let Some(stdin) = stdin {
process = process.stdin(stdin);
} }
trace!(target: "nu::run::external", "set up stdin pipe");
trace!(target: "nu::run::external", "built process {:?}", process); trace!(target: "nu::run::external", "built process {:?}", process);
let popen = process.popen(); let popen = process.detached().popen();
trace!(target: "nu::run::external", "next = {:?}", stream_next);
let name_tag = command.name_tag.clone();
if let Ok(mut popen) = popen { if let Ok(mut popen) = popen {
popen.detach(); if is_last {
match stream_next {
StreamNext::Last => {
let _ = popen.wait(); let _ = popen.wait();
Ok(ClassifiedInputStream::new()) Ok(InputStream::empty())
} } else {
StreamNext::External => {
let stdout = popen.stdout.take().ok_or_else(|| { let stdout = popen.stdout.take().ok_or_else(|| {
ShellError::untagged_runtime_error( ShellError::untagged_runtime_error("Can't redirect the stdout for external command")
"Can't redirect the stdout for external command",
)
})?;
Ok(ClassifiedInputStream::from_stdout(stdout))
}
StreamNext::Internal => {
let stdout = popen.stdout.take().ok_or_else(|| {
ShellError::untagged_runtime_error(
"Can't redirect the stdout for internal command",
)
})?; })?;
let file = futures::io::AllowStdIo::new(stdout); let file = futures::io::AllowStdIo::new(stdout);
let stream = Framed::new(file, LinesCodec {}); let stream = Framed::new(file, LinesCodec {});
let stream = stream.map(move |line| { let stream = stream.map(move |line| {
if let Ok(line) = line { line.expect("Internal error: could not read lines of text from stdin")
line.into_value(&name_tag) .into_value(&name_tag)
} else {
panic!("Internal error: could not read lines of text from stdin")
}
}); });
Ok(ClassifiedInputStream::from_input_stream( Ok(stream.boxed().into())
stream.boxed() as BoxStream<'static, Value>
))
}
} }
} else { } else {
Err(ShellError::labeled_error( Err(ShellError::labeled_error(
@ -216,3 +158,156 @@ pub(crate) async fn run_external_command(
)) ))
} }
} }
async fn run_with_stdin(
command: ExternalCommand,
context: &mut Context,
mut input: InputStream,
is_last: bool,
) -> Result<InputStream, ShellError> {
let name_tag = command.name_tag;
let home_dir = dirs::home_dir();
let mut process = Exec::cmd(&command.name);
for arg in command.args.iter() {
// Let's also replace ~ as we shell out
let arg = shellexpand::tilde_with_context(arg.deref(), || home_dir.as_ref());
// Strip quotes from a quoted string
if arg.len() > 1
&& ((arg.starts_with('"') && arg.ends_with('"'))
|| (arg.starts_with('\'') && arg.ends_with('\'')))
{
process = process.arg(arg.chars().skip(1).take(arg.len() - 2).collect::<String>());
} else {
process = process.arg(arg.as_ref());
}
}
process = process.cwd(context.shell_manager.path()?);
trace!(target: "nu::run::external", "cwd = {:?}", context.shell_manager.path());
if !is_last {
process = process.stdout(subprocess::Redirection::Pipe);
trace!(target: "nu::run::external", "set up stdout pipe");
}
process = process.stdin(subprocess::Redirection::Pipe);
trace!(target: "nu::run::external", "set up stdin pipe");
trace!(target: "nu::run::external", "built process {:?}", process);
let popen = process.detached().popen();
if let Ok(mut popen) = popen {
let mut stdin_write = popen
.stdin
.take()
.expect("Internal error: could not get stdin pipe for external command");
let stream = async_stream! {
while let Some(item) = input.next().await {
match item.value {
UntaggedValue::Primitive(Primitive::Nothing) => {
// If first in a pipeline, will receive Nothing. This is not an error.
},
UntaggedValue::Primitive(Primitive::String(s)) |
UntaggedValue::Primitive(Primitive::Line(s)) =>
{
if let Err(e) = stdin_write.write(s.as_bytes()) {
let message = format!("Unable to write to stdin (error = {})", e);
yield Ok(Value {
value: UntaggedValue::Error(ShellError::labeled_error(
message,
"unable to write to stdin",
&name_tag,
)),
tag: name_tag,
});
return;
}
},
// TODO serialize other primitives? https://github.com/nushell/nushell/issues/778
v => {
let message = format!("Received unexpected type from pipeline ({})", v.type_name());
yield Ok(Value {
value: UntaggedValue::Error(ShellError::labeled_error(
message,
"expected a string",
&name_tag,
)),
tag: name_tag,
});
return;
},
}
}
// Close stdin, which informs the external process that there's no more input
drop(stdin_write);
if !is_last {
let stdout = if let Some(stdout) = popen.stdout.take() {
stdout
} else {
yield Ok(Value {
value: UntaggedValue::Error(
ShellError::labeled_error(
"Can't redirect the stdout for external command",
"can't redirect stdout",
&name_tag,
)
),
tag: name_tag,
});
return;
};
let file = futures::io::AllowStdIo::new(stdout);
let stream = Framed::new(file, LinesCodec {});
let mut stream = stream.map(|line| {
if let Ok(line) = line {
line.into_value(&name_tag)
} else {
panic!("Internal error: could not read lines of text from stdin")
}
});
loop {
match stream.next().await {
Some(item) => yield Ok(item),
None => break,
}
}
}
let errored = match popen.wait() {
Ok(status) => !status.success(),
Err(e) => true,
};
if errored {
yield Ok(Value {
value: UntaggedValue::Error(
ShellError::labeled_error(
"External command failed",
"command failed",
&name_tag,
)
),
tag: name_tag,
});
};
};
Ok(stream.to_input_stream())
} else {
Err(ShellError::labeled_error(
"Command not found",
"command not found",
name_tag,
))
}
}

View File

@ -5,12 +5,10 @@ use nu_errors::ShellError;
use nu_parser::InternalCommand; use nu_parser::InternalCommand;
use nu_protocol::{CommandAction, Primitive, ReturnSuccess, UntaggedValue, Value}; use nu_protocol::{CommandAction, Primitive, ReturnSuccess, UntaggedValue, Value};
use super::ClassifiedInputStream;
pub(crate) async fn run_internal_command( pub(crate) async fn run_internal_command(
command: InternalCommand, command: InternalCommand,
context: &mut Context, context: &mut Context,
input: ClassifiedInputStream, input: InputStream,
source: Text, source: Text,
) -> Result<InputStream, ShellError> { ) -> Result<InputStream, ShellError> {
if log_enabled!(log::Level::Trace) { if log_enabled!(log::Level::Trace) {
@ -19,8 +17,7 @@ pub(crate) async fn run_internal_command(
trace!(target: "nu::run::internal", "{}", command.args.debug(&source)); trace!(target: "nu::run::internal", "{}", command.args.debug(&source));
} }
let objects: InputStream = let objects: InputStream = trace_stream!(target: "nu::trace_stream::internal", "input" = input);
trace_stream!(target: "nu::trace_stream::internal", "input" = input.objects);
let internal_command = context.expect_command(&command.name); let internal_command = context.expect_command(&command.name);

View File

@ -1,6 +1,3 @@
use crate::prelude::*;
use nu_protocol::UntaggedValue;
mod dynamic; mod dynamic;
pub(crate) mod external; pub(crate) mod external;
pub(crate) mod internal; pub(crate) mod internal;
@ -8,31 +5,3 @@ pub(crate) mod pipeline;
#[allow(unused_imports)] #[allow(unused_imports)]
pub(crate) use dynamic::Command as DynamicCommand; pub(crate) use dynamic::Command as DynamicCommand;
pub(crate) struct ClassifiedInputStream {
pub(crate) objects: InputStream,
pub(crate) stdin: Option<std::fs::File>,
}
impl ClassifiedInputStream {
pub(crate) fn new() -> ClassifiedInputStream {
ClassifiedInputStream {
objects: vec![UntaggedValue::nothing().into_value(Tag::unknown())].into(),
stdin: None,
}
}
pub(crate) fn from_input_stream(stream: impl Into<InputStream>) -> ClassifiedInputStream {
ClassifiedInputStream {
objects: stream.into(),
stdin: None,
}
}
pub(crate) fn from_stdout(stdout: std::fs::File) -> ClassifiedInputStream {
ClassifiedInputStream {
objects: VecDeque::new().into(),
stdin: Some(stdout),
}
}
}

View File

@ -1,8 +1,7 @@
use crate::commands::classified::external::{run_external_command, StreamNext}; use crate::commands::classified::external::run_external_command;
use crate::commands::classified::internal::run_internal_command; use crate::commands::classified::internal::run_internal_command;
use crate::commands::classified::ClassifiedInputStream;
use crate::context::Context; use crate::context::Context;
use crate::stream::OutputStream; use crate::stream::{InputStream, OutputStream};
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_parser::{ClassifiedCommand, ClassifiedPipeline}; use nu_parser::{ClassifiedCommand, ClassifiedPipeline};
use nu_protocol::{ReturnSuccess, UntaggedValue, Value}; use nu_protocol::{ReturnSuccess, UntaggedValue, Value};
@ -12,7 +11,7 @@ use std::sync::atomic::Ordering;
pub(crate) async fn run_pipeline( pub(crate) async fn run_pipeline(
pipeline: ClassifiedPipeline, pipeline: ClassifiedPipeline,
ctx: &mut Context, ctx: &mut Context,
mut input: ClassifiedInputStream, mut input: InputStream,
line: &str, line: &str,
) -> Result<(), ShellError> { ) -> Result<(), ShellError> {
let mut iter = pipeline.commands.list.into_iter().peekable(); let mut iter = pipeline.commands.list.into_iter().peekable();
@ -31,20 +30,15 @@ pub(crate) async fn run_pipeline(
} }
(Some(ClassifiedCommand::Internal(left)), _) => { (Some(ClassifiedCommand::Internal(left)), _) => {
let stream = run_internal_command(left, ctx, input, Text::from(line)).await?; run_internal_command(left, ctx, input, Text::from(line)).await?
ClassifiedInputStream::from_input_stream(stream)
}
(Some(ClassifiedCommand::External(left)), Some(ClassifiedCommand::External(_))) => {
run_external_command(left, ctx, input, StreamNext::External).await?
}
(Some(ClassifiedCommand::External(left)), Some(_)) => {
run_external_command(left, ctx, input, StreamNext::Internal).await?
} }
(Some(ClassifiedCommand::External(left)), None) => { (Some(ClassifiedCommand::External(left)), None) => {
run_external_command(left, ctx, input, StreamNext::Last).await? run_external_command(left, ctx, input, true).await?
}
(Some(ClassifiedCommand::External(left)), _) => {
run_external_command(left, ctx, input, false).await?
} }
(None, _) => break, (None, _) => break,
@ -52,7 +46,7 @@ pub(crate) async fn run_pipeline(
} }
use futures::stream::TryStreamExt; use futures::stream::TryStreamExt;
let mut output_stream: OutputStream = input.objects.into(); let mut output_stream: OutputStream = input.into();
loop { loop {
match output_stream.try_next().await { match output_stream.try_next().await {
Ok(Some(ReturnSuccess::Value(Value { Ok(Some(ReturnSuccess::Value(Value {

View File

@ -1,11 +1,15 @@
use crate::prelude::*; use crate::prelude::*;
use nu_protocol::{ReturnSuccess, ReturnValue, Value}; use nu_protocol::{ReturnSuccess, ReturnValue, UntaggedValue, Value};
pub struct InputStream { pub struct InputStream {
pub(crate) values: BoxStream<'static, Value>, pub(crate) values: BoxStream<'static, Value>,
} }
impl InputStream { impl InputStream {
pub fn empty() -> InputStream {
vec![UntaggedValue::nothing().into_value(Tag::unknown())].into()
}
pub fn into_vec(self) -> impl Future<Output = Vec<Value>> { pub fn into_vec(self) -> impl Future<Output = Vec<Value>> {
self.values.collect() self.values.collect()
} }

View File

@ -1,7 +1,7 @@
mod pipeline { mod pipeline {
use nu_test_support::fs::Stub::EmptyFile; use nu_test_support::fs::Stub::EmptyFile;
use nu_test_support::playground::Playground; use nu_test_support::playground::Playground;
use nu_test_support::{nu, pipeline}; use nu_test_support::{nu, nu_error, pipeline};
#[test] #[test]
fn can_process_row_as_it_argument_to_an_external_command_given_the_it_data_is_a_string() { fn can_process_row_as_it_argument_to_an_external_command_given_the_it_data_is_a_string() {
@ -72,6 +72,36 @@ mod pipeline {
}) })
} }
#[test]
fn can_process_stdout_of_external_piped_to_stdin_of_external() {
let actual = nu!(
cwd: "tests/fixtures",
"^echo 1 | ^cat"
);
assert!(actual.contains("1"));
}
#[test]
fn can_process_row_from_internal_piped_to_stdin_of_external() {
let actual = nu!(
cwd: "tests/fixtures",
"echo \"1\" | ^cat"
);
assert!(actual.contains("1"));
}
#[test]
fn shows_error_for_external_command_that_fails() {
let actual = nu_error!(
cwd: "tests/fixtures",
"echo \"1\" | ^false"
);
assert!(actual.contains("External command failed"));
}
mod expands_tilde { mod expands_tilde {
use super::nu; use super::nu;