Use threads to avoid blocking reads/writes in externals. (#1440)

In particular, one thing that we can't (properly) do before this commit
is consuming an infinite input stream. For example:

```
yes | grep y | head -n10
```

will give 10 "y"s in most shells, but blocks indefinitely in nu. This PR
resolves that by doing blocking I/O in threads, and reducing the `await`
calls we currently have in our pipeline code.
This commit is contained in:
Jason Gedge 2020-03-01 12:19:09 -05:00 committed by GitHub
parent ca615d9389
commit 7304d06c0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 273 additions and 71 deletions

View File

@ -201,6 +201,11 @@ name = "nonu"
path = "crates/nu-test-support/src/bins/nonu.rs" path = "crates/nu-test-support/src/bins/nonu.rs"
required-features = ["test-bins"] required-features = ["test-bins"]
[[bin]]
name = "iecho"
path = "crates/nu-test-support/src/bins/iecho.rs"
required-features = ["test-bins"]
# Core plugins that ship with `cargo install nu` by default # Core plugins that ship with `cargo install nu` by default
# Currently, Cargo limits us to installing only one binary # Currently, Cargo limits us to installing only one binary
# unless we use [[bin]], so we use this as a workaround # unless we use [[bin]], so we use this as a workaround

View File

@ -1,4 +1,4 @@
use std::io::{self, BufRead}; use std::io::{self, BufRead, Write};
fn main() { fn main() {
if did_chop_arguments() { if did_chop_arguments() {
@ -8,9 +8,12 @@ fn main() {
// if no arguments given, chop from standard input and exit. // if no arguments given, chop from standard input and exit.
let stdin = io::stdin(); let stdin = io::stdin();
let mut stdout = io::stdout();
for line in stdin.lock().lines() { for line in stdin.lock().lines() {
if let Ok(given) = line { if let Ok(given) = line {
println!("{}", chop(&given)); if let Err(_e) = writeln!(stdout, "{}", chop(&given)) {
break;
}
} }
} }

View File

@ -0,0 +1,13 @@
use std::io::{self, Write};
fn main() {
let args: Vec<String> = std::env::args().collect();
// println! panics if stdout gets closed, whereas writeln gives us an error
let mut stdout = io::stdout();
let _ = args
.iter()
.skip(1)
.cycle()
.try_for_each(|v| writeln!(stdout, "{}", v));
}

View File

@ -1,4 +1,6 @@
use crate::futures::ThreadedReceiver;
use crate::prelude::*; use crate::prelude::*;
use futures::executor::block_on_stream;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use futures_codec::{FramedRead, LinesCodec}; use futures_codec::{FramedRead, LinesCodec};
use log::trace; use log::trace;
@ -11,6 +13,7 @@ use nu_value_ext::as_column_path;
use std::io::Write; use std::io::Write;
use std::ops::Deref; use std::ops::Deref;
use std::process::{Command, Stdio}; use std::process::{Command, Stdio};
use std::sync::mpsc;
pub fn nu_value_to_string(command: &ExternalCommand, from: &Value) -> Result<String, ShellError> { pub fn nu_value_to_string(command: &ExternalCommand, from: &Value) -> Result<String, ShellError> {
match &from.value { match &from.value {
@ -27,7 +30,7 @@ pub fn nu_value_to_string(command: &ExternalCommand, from: &Value) -> Result<Str
} }
pub fn nu_value_to_string_for_stdin( pub fn nu_value_to_string_for_stdin(
command: &ExternalCommand, name_tag: &Tag,
from: &Value, from: &Value,
) -> Result<Option<String>, ShellError> { ) -> Result<Option<String>, ShellError> {
match &from.value { match &from.value {
@ -40,12 +43,12 @@ pub fn nu_value_to_string_for_stdin(
unsupported.type_name() unsupported.type_name()
), ),
"expected a string", "expected a string",
&command.name_tag, name_tag,
)), )),
} }
} }
pub(crate) async fn run_external_command( pub(crate) fn run_external_command(
command: ExternalCommand, command: ExternalCommand,
context: &mut Context, context: &mut Context,
input: Option<InputStream>, input: Option<InputStream>,
@ -62,9 +65,9 @@ pub(crate) async fn run_external_command(
} }
if command.has_it_argument() || command.has_nu_argument() { if command.has_it_argument() || command.has_nu_argument() {
run_with_iterator_arg(command, context, input, is_last).await run_with_iterator_arg(command, context, input, is_last)
} else { } else {
run_with_stdin(command, context, input, is_last).await run_with_stdin(command, context, input, is_last)
} }
} }
@ -114,7 +117,7 @@ fn to_column_path(
) )
} }
async fn run_with_iterator_arg( fn run_with_iterator_arg(
command: ExternalCommand, command: ExternalCommand,
context: &mut Context, context: &mut Context,
input: Option<InputStream>, input: Option<InputStream>,
@ -336,7 +339,7 @@ async fn run_with_iterator_arg(
Ok(Some(stream.to_input_stream())) Ok(Some(stream.to_input_stream()))
} }
async fn run_with_stdin( fn run_with_stdin(
command: ExternalCommand, command: ExternalCommand,
context: &mut Context, context: &mut Context,
input: Option<InputStream>, input: Option<InputStream>,
@ -426,7 +429,6 @@ fn spawn(
is_last: bool, is_last: bool,
) -> Result<Option<InputStream>, ShellError> { ) -> Result<Option<InputStream>, ShellError> {
let command = command.clone(); let command = command.clone();
let name_tag = command.name_tag.clone();
let mut process = { let mut process = {
#[cfg(windows)] #[cfg(windows)]
@ -467,76 +469,94 @@ fn spawn(
trace!(target: "nu::run::external", "built command {:?}", process); trace!(target: "nu::run::external", "built command {:?}", process);
// TODO Switch to async_std::process once it's stabilized
if let Ok(mut child) = process.spawn() { if let Ok(mut child) = process.spawn() {
let stream = async_stream! { let (tx, rx) = mpsc::sync_channel(0);
if let Some(mut input) = input {
let mut stdin_write = child.stdin let mut stdin = child.stdin.take();
let stdin_write_tx = tx.clone();
let stdout_read_tx = tx;
let stdin_name_tag = command.name_tag.clone();
let stdout_name_tag = command.name_tag;
std::thread::spawn(move || {
if let Some(input) = input {
let mut stdin_write = stdin
.take() .take()
.expect("Internal error: could not get stdin pipe for external command"); .expect("Internal error: could not get stdin pipe for external command");
while let Some(value) = input.next().await { for value in block_on_stream(input) {
let input_string = match nu_value_to_string_for_stdin(&command, &value) { let input_string = match nu_value_to_string_for_stdin(&stdin_name_tag, &value) {
Ok(None) => continue, Ok(None) => continue,
Ok(Some(v)) => v, Ok(Some(v)) => v,
Err(e) => { Err(e) => {
yield Ok(Value { let _ = stdin_write_tx.send(Ok(Value {
value: UntaggedValue::Error(e), value: UntaggedValue::Error(e),
tag: name_tag tag: stdin_name_tag,
}); }));
return; return Err(());
} }
}; };
if let Err(e) = stdin_write.write(input_string.as_bytes()) { if let Err(e) = stdin_write.write(input_string.as_bytes()) {
let message = format!("Unable to write to stdin (error = {})", e); let message = format!("Unable to write to stdin (error = {})", e);
yield Ok(Value { let _ = stdin_write_tx.send(Ok(Value {
value: UntaggedValue::Error(ShellError::labeled_error( value: UntaggedValue::Error(ShellError::labeled_error(
message, message,
"application may have closed before completing pipeline", "application may have closed before completing pipeline",
&name_tag)), &stdin_name_tag,
tag: name_tag )),
}); tag: stdin_name_tag,
return; }));
return Err(());
} }
} }
} }
Ok(())
});
std::thread::spawn(move || {
if !is_last { if !is_last {
let stdout = if let Some(stdout) = child.stdout.take() { let stdout = if let Some(stdout) = child.stdout.take() {
stdout stdout
} else { } else {
yield Ok(Value { let _ = stdout_read_tx.send(Ok(Value {
value: UntaggedValue::Error(ShellError::labeled_error( value: UntaggedValue::Error(ShellError::labeled_error(
"Can't redirect the stdout for external command", "Can't redirect the stdout for external command",
"can't redirect stdout", "can't redirect stdout",
&name_tag)), &stdout_name_tag,
tag: name_tag )),
}); tag: stdout_name_tag,
return; }));
return Err(());
}; };
let file = futures::io::AllowStdIo::new(StdoutWithNewline::new(stdout)); let file = futures::io::AllowStdIo::new(StdoutWithNewline::new(stdout));
let mut stream = FramedRead::new(file, LinesCodec); let stream = FramedRead::new(file, LinesCodec);
while let Some(line) = stream.next().await { for line in block_on_stream(stream) {
if let Ok(line) = line { if let Ok(line) = line {
yield Ok(Value { let result = stdout_read_tx.send(Ok(Value {
value: UntaggedValue::Primitive(Primitive::Line(line)), value: UntaggedValue::Primitive(Primitive::Line(line)),
tag: name_tag.clone(), tag: stdout_name_tag.clone(),
}); }));
if result.is_err() {
break;
}
} else { } else {
yield Ok(Value { let _ = stdout_read_tx.send(Ok(Value {
value: UntaggedValue::Error( value: UntaggedValue::Error(ShellError::labeled_error(
ShellError::labeled_error( "Unable to read lines from stdout. This usually happens when the output does not end with a newline.",
"Unable to read lines from stdout. This usually happens when the output does not end with a newline.", "unable to read from stdout",
"unable to read from stdout", &stdout_name_tag,
&name_tag, )),
) tag: stdout_name_tag.clone(),
), }));
tag: name_tag.clone(), break;
});
return;
} }
} }
} }
@ -547,21 +567,22 @@ fn spawn(
let cfg = crate::data::config::config(Tag::unknown()); let cfg = crate::data::config::config(Tag::unknown());
if let Ok(cfg) = cfg { if let Ok(cfg) = cfg {
if cfg.contains_key("nonzero_exit_errors") { if cfg.contains_key("nonzero_exit_errors") {
yield Ok(Value { let _ = stdout_read_tx.send(Ok(Value {
value: UntaggedValue::Error( value: UntaggedValue::Error(ShellError::labeled_error(
ShellError::labeled_error( "External command failed",
"External command failed", "command failed",
"command failed", &stdout_name_tag,
&name_tag, )),
) tag: stdout_name_tag,
), }));
tag: name_tag,
});
} }
} }
} }
};
Ok(())
});
let stream = ThreadedReceiver::new(rx);
Ok(Some(stream.to_input_stream())) Ok(Some(stream.to_input_stream()))
} else { } else {
Err(ShellError::labeled_error( Err(ShellError::labeled_error(
@ -670,9 +691,7 @@ mod tests {
let mut ctx = Context::basic().expect("There was a problem creating a basic context."); let mut ctx = Context::basic().expect("There was a problem creating a basic context.");
assert!(run_external_command(cmd, &mut ctx, None, false) assert!(run_external_command(cmd, &mut ctx, None, false).is_err());
.await
.is_err());
Ok(()) Ok(())
} }

View File

@ -5,7 +5,7 @@ use nu_errors::ShellError;
use nu_parser::InternalCommand; use nu_parser::InternalCommand;
use nu_protocol::{CommandAction, Primitive, ReturnSuccess, UntaggedValue, Value}; use nu_protocol::{CommandAction, Primitive, ReturnSuccess, UntaggedValue, Value};
pub(crate) async fn run_internal_command( pub(crate) fn run_internal_command(
command: InternalCommand, command: InternalCommand,
context: &mut Context, context: &mut Context,
input: Option<InputStream>, input: Option<InputStream>,

View File

@ -31,15 +31,15 @@ pub(crate) async fn run_pipeline(
(_, Some(ClassifiedCommand::Error(err))) => return Err(err.clone().into()), (_, Some(ClassifiedCommand::Error(err))) => return Err(err.clone().into()),
(Some(ClassifiedCommand::Internal(left)), _) => { (Some(ClassifiedCommand::Internal(left)), _) => {
run_internal_command(left, ctx, input, Text::from(line)).await? run_internal_command(left, ctx, input, Text::from(line))?
} }
(Some(ClassifiedCommand::External(left)), None) => { (Some(ClassifiedCommand::External(left)), None) => {
run_external_command(left, ctx, input, true).await? run_external_command(left, ctx, input, true)?
} }
(Some(ClassifiedCommand::External(left)), _) => { (Some(ClassifiedCommand::External(left)), _) => {
run_external_command(left, ctx, input, false).await? run_external_command(left, ctx, input, false)?
} }
(None, _) => break, (None, _) => break,

149
src/futures.rs Normal file
View File

@ -0,0 +1,149 @@
use futures::stream::Stream;
use std::pin::Pin;
use std::sync::{mpsc, Arc, Mutex};
use std::task::{self, Poll, Waker};
use std::thread;
#[allow(clippy::option_option)]
struct SharedState<T: Send + 'static> {
result: Option<Option<T>>,
kill: bool,
waker: Option<Waker>,
}
pub struct ThreadedReceiver<T: Send + 'static> {
shared_state: Arc<Mutex<SharedState<T>>>,
}
impl<T: Send + 'static> ThreadedReceiver<T> {
pub fn new(recv: mpsc::Receiver<T>) -> ThreadedReceiver<T> {
let shared_state = Arc::new(Mutex::new(SharedState {
result: None,
kill: false,
waker: None,
}));
// Clone everything to avoid lifetimes
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
loop {
let result = recv.recv();
{
let mut shared_state = thread_shared_state
.lock()
.expect("ThreadedFuture shared state shouldn't be poisoned");
if let Ok(result) = result {
shared_state.result = Some(Some(result));
} else {
break;
}
}
// Don't attempt to recv anything else until consumed
loop {
let mut shared_state = thread_shared_state
.lock()
.expect("ThreadedFuture shared state shouldn't be poisoned");
if shared_state.kill {
return;
}
if shared_state.result.is_some() {
if let Some(waker) = shared_state.waker.take() {
waker.wake();
}
} else {
break;
}
}
}
// Let the Stream implementation know that we're done
let mut shared_state = thread_shared_state
.lock()
.expect("ThreadedFuture shared state shouldn't be poisoned");
shared_state.result = Some(None);
if let Some(waker) = shared_state.waker.take() {
waker.wake();
}
});
ThreadedReceiver { shared_state }
}
}
impl<T: Send + 'static> Stream for ThreadedReceiver<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let mut shared_state = self
.shared_state
.lock()
.expect("ThreadedFuture shared state shouldn't be poisoned");
if let Some(result) = shared_state.result.take() {
Poll::Ready(result)
} else {
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
impl<T: Send + 'static> Drop for ThreadedReceiver<T> {
fn drop(&mut self) {
// Setting the kill flag to true will cause the thread spawned in `new` to exit, which
// will cause the `Receiver` argument to get dropped. This can allow senders to
// potentially clean up.
match self.shared_state.lock() {
Ok(mut state) => state.kill = true,
Err(mut poisoned_err) => poisoned_err.get_mut().kill = true,
}
}
}
#[cfg(test)]
mod tests {
mod threaded_receiver {
use super::super::ThreadedReceiver;
use futures::executor::block_on_stream;
use std::sync::mpsc;
#[test]
fn returns_expected_result() {
let (tx, rx) = mpsc::sync_channel(0);
std::thread::spawn(move || {
let _ = tx.send(1);
let _ = tx.send(2);
let _ = tx.send(3);
});
let stream = ThreadedReceiver::new(rx);
let mut result = block_on_stream(stream);
assert_eq!(Some(1), result.next());
assert_eq!(Some(2), result.next());
assert_eq!(Some(3), result.next());
assert_eq!(None, result.next());
}
#[test]
fn drops_receiver_when_stream_dropped() {
let (tx, rx) = mpsc::sync_channel(0);
let th = std::thread::spawn(move || {
tx.send(1).and_then(|_| tx.send(2)).and_then(|_| tx.send(3))
});
{
let stream = ThreadedReceiver::new(rx);
let mut result = block_on_stream(stream);
assert_eq!(Some(1), result.next());
}
let result = th.join();
assert_eq!(true, result.unwrap().is_err());
}
}
}

View File

@ -15,6 +15,7 @@ mod deserializer;
mod env; mod env;
mod evaluate; mod evaluate;
mod format; mod format;
mod futures;
mod git; mod git;
mod shell; mod shell;
mod stream; mod stream;

View File

@ -1,15 +1,12 @@
use nu_test_support::fs::Stub::EmptyFile;
use nu_test_support::nu; use nu_test_support::nu;
use nu_test_support::playground::Playground; use nu_test_support::playground::Playground;
#[test] #[test]
fn adds_a_file() { fn creates_a_file_when_it_doesnt_exist() {
Playground::setup("add_test_1", |dirs, sandbox| { Playground::setup("create_test_1", |dirs, _sandbox| {
sandbox.with_files(vec![EmptyFile("i_will_be_created.txt")]);
nu!( nu!(
cwd: dirs.root(), cwd: dirs.test(),
"touch touch_test/i_will_be_created.txt" "touch i_will_be_created.txt"
); );
let path = dirs.test().join("i_will_be_created.txt"); let path = dirs.test().join("i_will_be_created.txt");

View File

@ -102,7 +102,7 @@ mod it_evaluation {
} }
mod stdin_evaluation { mod stdin_evaluation {
use super::nu_error; use super::{nu, nu_error};
use nu_test_support::pipeline; use nu_test_support::pipeline;
#[test] #[test]
@ -117,6 +117,21 @@ mod stdin_evaluation {
assert_eq!(stderr, ""); assert_eq!(stderr, "");
} }
#[test]
fn does_not_block_indefinitely() {
let stdout = nu!(
cwd: ".",
pipeline(r#"
iecho yes
| chop
| chop
| first 1
"#
));
assert_eq!(stdout, "y");
}
} }
mod external_words { mod external_words {