Have internal/external/pipelines taken an optional InputStream. (#1182)

Primarily, this fixes an issue where we always open a stdin pipe for
external commands, which can break various interactive commands (e.g.,
editors).
This commit is contained in:
Jason Gedge 2020-01-10 01:31:44 -05:00 committed by Jonathan Turner
parent 5692a08e7f
commit 347f91ab53
4 changed files with 76 additions and 67 deletions

View File

@ -607,8 +607,7 @@ async fn process_line(readline: Result<String, ReadlineError>, ctx: &mut Context
return LineResult::Error(line.to_string(), err); return LineResult::Error(line.to_string(), err);
} }
let input = InputStream::empty(); match run_pipeline(pipeline, ctx, None, line).await {
match run_pipeline(pipeline, ctx, input, line).await {
Ok(_) => LineResult::Success(line.to_string()), Ok(_) => LineResult::Success(line.to_string()),
Err(err) => LineResult::Error(line.to_string(), err), Err(err) => LineResult::Error(line.to_string(), err),
} }

View File

@ -51,9 +51,9 @@ impl Decoder for LinesCodec {
pub(crate) async fn run_external_command( pub(crate) async fn run_external_command(
command: ExternalCommand, command: ExternalCommand,
context: &mut Context, context: &mut Context,
input: InputStream, input: Option<InputStream>,
is_last: bool, is_last: bool,
) -> Result<InputStream, ShellError> { ) -> Result<Option<InputStream>, ShellError> {
trace!(target: "nu::run::external", "-> {}", command.name); trace!(target: "nu::run::external", "-> {}", command.name);
let has_it_arg = command.args.iter().any(|arg| arg.contains("$it")); let has_it_arg = command.args.iter().any(|arg| arg.contains("$it"));
@ -67,13 +67,13 @@ pub(crate) async fn run_external_command(
async fn run_with_iterator_arg( async fn run_with_iterator_arg(
command: ExternalCommand, command: ExternalCommand,
context: &mut Context, context: &mut Context,
input: InputStream, input: Option<InputStream>,
is_last: bool, is_last: bool,
) -> Result<InputStream, ShellError> { ) -> Result<Option<InputStream>, ShellError> {
let name = command.name; let name = command.name;
let args = command.args; let args = command.args;
let name_tag = command.name_tag; let name_tag = command.name_tag;
let inputs = input.into_vec().await; let inputs = input.unwrap_or_else(InputStream::empty).into_vec().await;
trace!(target: "nu::run::external", "inputs = {:?}", inputs); trace!(target: "nu::run::external", "inputs = {:?}", inputs);
@ -137,7 +137,7 @@ async fn run_with_iterator_arg(
if let Ok(mut popen) = popen { if let Ok(mut popen) = popen {
if is_last { if is_last {
let _ = popen.wait(); let _ = popen.wait();
Ok(InputStream::empty()) Ok(None)
} else { } else {
let stdout = popen.stdout.take().ok_or_else(|| { let stdout = popen.stdout.take().ok_or_else(|| {
ShellError::untagged_runtime_error("Can't redirect the stdout for external command") ShellError::untagged_runtime_error("Can't redirect the stdout for external command")
@ -148,7 +148,7 @@ async fn run_with_iterator_arg(
line.expect("Internal error: could not read lines of text from stdin") line.expect("Internal error: could not read lines of text from stdin")
.into_value(&name_tag) .into_value(&name_tag)
}); });
Ok(stream.boxed().into()) Ok(Some(stream.boxed().into()))
} }
} else { } else {
Err(ShellError::labeled_error( Err(ShellError::labeled_error(
@ -162,9 +162,9 @@ async fn run_with_iterator_arg(
async fn run_with_stdin( async fn run_with_stdin(
command: ExternalCommand, command: ExternalCommand,
context: &mut Context, context: &mut Context,
mut input: InputStream, input: Option<InputStream>,
is_last: bool, is_last: bool,
) -> Result<InputStream, ShellError> { ) -> Result<Option<InputStream>, ShellError> {
let name_tag = command.name_tag; let name_tag = command.name_tag;
let home_dir = dirs::home_dir(); let home_dir = dirs::home_dir();
@ -192,61 +192,65 @@ async fn run_with_stdin(
trace!(target: "nu::run::external", "set up stdout pipe"); trace!(target: "nu::run::external", "set up stdout pipe");
} }
process = process.stdin(subprocess::Redirection::Pipe); if input.is_some() {
trace!(target: "nu::run::external", "set up stdin pipe"); process = process.stdin(subprocess::Redirection::Pipe);
trace!(target: "nu::run::external", "set up stdin pipe");
}
trace!(target: "nu::run::external", "built process {:?}", process); trace!(target: "nu::run::external", "built process {:?}", process);
let popen = process.detached().popen(); let popen = process.detached().popen();
if let Ok(mut popen) = popen { if let Ok(mut popen) = popen {
let mut stdin_write = popen
.stdin
.take()
.expect("Internal error: could not get stdin pipe for external command");
let stream = async_stream! { let stream = async_stream! {
while let Some(item) = input.next().await { if let Some(mut input) = input {
match item.value { let mut stdin_write = popen
UntaggedValue::Primitive(Primitive::Nothing) => { .stdin
// If first in a pipeline, will receive Nothing. This is not an error. .take()
}, .expect("Internal error: could not get stdin pipe for external command");
UntaggedValue::Primitive(Primitive::String(s)) | while let Some(item) = input.next().await {
UntaggedValue::Primitive(Primitive::Line(s)) => match item.value {
{ UntaggedValue::Primitive(Primitive::Nothing) => {
if let Err(e) = stdin_write.write(s.as_bytes()) { // If first in a pipeline, will receive Nothing. This is not an error.
let message = format!("Unable to write to stdin (error = {})", e); },
UntaggedValue::Primitive(Primitive::String(s)) |
UntaggedValue::Primitive(Primitive::Line(s)) =>
{
if let Err(e) = stdin_write.write(s.as_bytes()) {
let message = format!("Unable to write to stdin (error = {})", e);
yield Ok(Value {
value: UntaggedValue::Error(ShellError::labeled_error(
message,
"unable to write to stdin",
&name_tag,
)),
tag: name_tag,
});
return;
}
},
// TODO serialize other primitives? https://github.com/nushell/nushell/issues/778
v => {
let message = format!("Received unexpected type from pipeline ({})", v.type_name());
yield Ok(Value { yield Ok(Value {
value: UntaggedValue::Error(ShellError::labeled_error( value: UntaggedValue::Error(ShellError::labeled_error(
message, message,
"unable to write to stdin", "expected a string",
&name_tag, &name_tag,
)), )),
tag: name_tag, tag: name_tag,
}); });
return; return;
} },
}, }
// TODO serialize other primitives? https://github.com/nushell/nushell/issues/778
v => {
let message = format!("Received unexpected type from pipeline ({})", v.type_name());
yield Ok(Value {
value: UntaggedValue::Error(ShellError::labeled_error(
message,
"expected a string",
&name_tag,
)),
tag: name_tag,
});
return;
},
} }
}
// Close stdin, which informs the external process that there's no more input // Close stdin, which informs the external process that there's no more input
drop(stdin_write); drop(stdin_write);
}
if !is_last { if !is_last {
let stdout = if let Some(stdout) = popen.stdout.take() { let stdout = if let Some(stdout) = popen.stdout.take() {
@ -302,7 +306,7 @@ async fn run_with_stdin(
}; };
}; };
Ok(stream.to_input_stream()) Ok(Some(stream.to_input_stream()))
} else { } else {
Err(ShellError::labeled_error( Err(ShellError::labeled_error(
"Command not found", "Command not found",

View File

@ -8,16 +8,20 @@ use nu_protocol::{CommandAction, Primitive, ReturnSuccess, UntaggedValue, Value}
pub(crate) async fn run_internal_command( pub(crate) async fn run_internal_command(
command: InternalCommand, command: InternalCommand,
context: &mut Context, context: &mut Context,
input: InputStream, input: Option<InputStream>,
source: Text, source: Text,
) -> Result<InputStream, ShellError> { ) -> Result<Option<InputStream>, ShellError> {
if log_enabled!(log::Level::Trace) { if log_enabled!(log::Level::Trace) {
trace!(target: "nu::run::internal", "->"); trace!(target: "nu::run::internal", "->");
trace!(target: "nu::run::internal", "{}", command.name); trace!(target: "nu::run::internal", "{}", command.name);
trace!(target: "nu::run::internal", "{}", command.args.debug(&source)); trace!(target: "nu::run::internal", "{}", command.args.debug(&source));
} }
let objects: InputStream = trace_stream!(target: "nu::trace_stream::internal", "input" = input); let objects: InputStream = if let Some(input) = input {
trace_stream!(target: "nu::trace_stream::internal", "input" = input)
} else {
InputStream::empty()
};
let internal_command = context.expect_command(&command.name); let internal_command = context.expect_command(&command.name);
@ -167,5 +171,5 @@ pub(crate) async fn run_internal_command(
} }
}; };
Ok(stream.to_input_stream()) Ok(Some(stream.to_input_stream()))
} }

View File

@ -11,7 +11,7 @@ use std::sync::atomic::Ordering;
pub(crate) async fn run_pipeline( pub(crate) async fn run_pipeline(
pipeline: ClassifiedPipeline, pipeline: ClassifiedPipeline,
ctx: &mut Context, ctx: &mut Context,
mut input: InputStream, mut input: Option<InputStream>,
line: &str, line: &str,
) -> Result<(), ShellError> { ) -> Result<(), ShellError> {
let mut iter = pipeline.commands.list.into_iter().peekable(); let mut iter = pipeline.commands.list.into_iter().peekable();
@ -46,21 +46,23 @@ pub(crate) async fn run_pipeline(
} }
use futures::stream::TryStreamExt; use futures::stream::TryStreamExt;
let mut output_stream: OutputStream = input.into(); if let Some(input) = input {
loop { let mut output_stream: OutputStream = input.into();
match output_stream.try_next().await { loop {
Ok(Some(ReturnSuccess::Value(Value { match output_stream.try_next().await {
value: UntaggedValue::Error(e), Ok(Some(ReturnSuccess::Value(Value {
.. value: UntaggedValue::Error(e),
}))) => return Err(e), ..
Ok(Some(_item)) => { }))) => return Err(e),
if ctx.ctrl_c.load(Ordering::SeqCst) { Ok(Some(_item)) => {
if ctx.ctrl_c.load(Ordering::SeqCst) {
break;
}
}
_ => {
break; break;
} }
} }
_ => {
break;
}
} }
} }