diff --git a/Cargo.toml b/Cargo.toml index cc148db0a..bb4f1e905 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -201,6 +201,11 @@ name = "nonu" path = "crates/nu-test-support/src/bins/nonu.rs" required-features = ["test-bins"] +[[bin]] +name = "iecho" +path = "crates/nu-test-support/src/bins/iecho.rs" +required-features = ["test-bins"] + # Core plugins that ship with `cargo install nu` by default # Currently, Cargo limits us to installing only one binary # unless we use [[bin]], so we use this as a workaround diff --git a/crates/nu-test-support/src/bins/chop.rs b/crates/nu-test-support/src/bins/chop.rs index 49f2e09a8..e1e63f29e 100644 --- a/crates/nu-test-support/src/bins/chop.rs +++ b/crates/nu-test-support/src/bins/chop.rs @@ -1,4 +1,4 @@ -use std::io::{self, BufRead}; +use std::io::{self, BufRead, Write}; fn main() { if did_chop_arguments() { @@ -8,9 +8,12 @@ fn main() { // if no arguments given, chop from standard input and exit. let stdin = io::stdin(); + let mut stdout = io::stdout(); for line in stdin.lock().lines() { if let Ok(given) = line { - println!("{}", chop(&given)); + if let Err(_e) = writeln!(stdout, "{}", chop(&given)) { + break; + } } } diff --git a/crates/nu-test-support/src/bins/iecho.rs b/crates/nu-test-support/src/bins/iecho.rs new file mode 100644 index 000000000..711d498c5 --- /dev/null +++ b/crates/nu-test-support/src/bins/iecho.rs @@ -0,0 +1,13 @@ +use std::io::{self, Write}; + +fn main() { + let args: Vec = std::env::args().collect(); + + // println! panics if stdout gets closed, whereas writeln gives us an error + let mut stdout = io::stdout(); + let _ = args + .iter() + .skip(1) + .cycle() + .try_for_each(|v| writeln!(stdout, "{}", v)); +} diff --git a/src/commands/classified/external.rs b/src/commands/classified/external.rs index a798117e9..de306c1c3 100644 --- a/src/commands/classified/external.rs +++ b/src/commands/classified/external.rs @@ -1,4 +1,6 @@ +use crate::futures::ThreadedReceiver; use crate::prelude::*; +use futures::executor::block_on_stream; use futures::stream::StreamExt; use futures_codec::{FramedRead, LinesCodec}; use log::trace; @@ -11,6 +13,7 @@ use nu_value_ext::as_column_path; use std::io::Write; use std::ops::Deref; use std::process::{Command, Stdio}; +use std::sync::mpsc; pub fn nu_value_to_string(command: &ExternalCommand, from: &Value) -> Result { match &from.value { @@ -27,7 +30,7 @@ pub fn nu_value_to_string(command: &ExternalCommand, from: &Value) -> Result Result, ShellError> { match &from.value { @@ -40,12 +43,12 @@ pub fn nu_value_to_string_for_stdin( unsupported.type_name() ), "expected a string", - &command.name_tag, + name_tag, )), } } -pub(crate) async fn run_external_command( +pub(crate) fn run_external_command( command: ExternalCommand, context: &mut Context, input: Option, @@ -62,9 +65,9 @@ pub(crate) async fn run_external_command( } if command.has_it_argument() || command.has_nu_argument() { - run_with_iterator_arg(command, context, input, is_last).await + run_with_iterator_arg(command, context, input, is_last) } else { - run_with_stdin(command, context, input, is_last).await + run_with_stdin(command, context, input, is_last) } } @@ -114,7 +117,7 @@ fn to_column_path( ) } -async fn run_with_iterator_arg( +fn run_with_iterator_arg( command: ExternalCommand, context: &mut Context, input: Option, @@ -336,7 +339,7 @@ async fn run_with_iterator_arg( Ok(Some(stream.to_input_stream())) } -async fn run_with_stdin( +fn run_with_stdin( command: ExternalCommand, context: &mut Context, input: Option, @@ -426,7 +429,6 @@ fn spawn( is_last: bool, ) -> Result, ShellError> { let command = command.clone(); - let name_tag = command.name_tag.clone(); let mut process = { #[cfg(windows)] @@ -467,76 +469,94 @@ fn spawn( trace!(target: "nu::run::external", "built command {:?}", process); + // TODO Switch to async_std::process once it's stabilized if let Ok(mut child) = process.spawn() { - let stream = async_stream! { - if let Some(mut input) = input { - let mut stdin_write = child.stdin + let (tx, rx) = mpsc::sync_channel(0); + + let mut stdin = child.stdin.take(); + + let stdin_write_tx = tx.clone(); + let stdout_read_tx = tx; + let stdin_name_tag = command.name_tag.clone(); + let stdout_name_tag = command.name_tag; + + std::thread::spawn(move || { + if let Some(input) = input { + let mut stdin_write = stdin .take() .expect("Internal error: could not get stdin pipe for external command"); - while let Some(value) = input.next().await { - let input_string = match nu_value_to_string_for_stdin(&command, &value) { + for value in block_on_stream(input) { + let input_string = match nu_value_to_string_for_stdin(&stdin_name_tag, &value) { Ok(None) => continue, Ok(Some(v)) => v, Err(e) => { - yield Ok(Value { + let _ = stdin_write_tx.send(Ok(Value { value: UntaggedValue::Error(e), - tag: name_tag - }); - return; + tag: stdin_name_tag, + })); + return Err(()); } }; if let Err(e) = stdin_write.write(input_string.as_bytes()) { let message = format!("Unable to write to stdin (error = {})", e); - yield Ok(Value { + let _ = stdin_write_tx.send(Ok(Value { value: UntaggedValue::Error(ShellError::labeled_error( message, "application may have closed before completing pipeline", - &name_tag)), - tag: name_tag - }); - return; + &stdin_name_tag, + )), + tag: stdin_name_tag, + })); + return Err(()); } } } + Ok(()) + }); + + std::thread::spawn(move || { if !is_last { let stdout = if let Some(stdout) = child.stdout.take() { stdout } else { - yield Ok(Value { + let _ = stdout_read_tx.send(Ok(Value { value: UntaggedValue::Error(ShellError::labeled_error( "Can't redirect the stdout for external command", "can't redirect stdout", - &name_tag)), - tag: name_tag - }); - return; + &stdout_name_tag, + )), + tag: stdout_name_tag, + })); + return Err(()); }; let file = futures::io::AllowStdIo::new(StdoutWithNewline::new(stdout)); - let mut stream = FramedRead::new(file, LinesCodec); + let stream = FramedRead::new(file, LinesCodec); - while let Some(line) = stream.next().await { + for line in block_on_stream(stream) { if let Ok(line) = line { - yield Ok(Value { + let result = stdout_read_tx.send(Ok(Value { value: UntaggedValue::Primitive(Primitive::Line(line)), - tag: name_tag.clone(), - }); + tag: stdout_name_tag.clone(), + })); + + if result.is_err() { + break; + } } else { - yield Ok(Value { - value: UntaggedValue::Error( - ShellError::labeled_error( - "Unable to read lines from stdout. This usually happens when the output does not end with a newline.", - "unable to read from stdout", - &name_tag, - ) - ), - tag: name_tag.clone(), - }); - return; + let _ = stdout_read_tx.send(Ok(Value { + value: UntaggedValue::Error(ShellError::labeled_error( + "Unable to read lines from stdout. This usually happens when the output does not end with a newline.", + "unable to read from stdout", + &stdout_name_tag, + )), + tag: stdout_name_tag.clone(), + })); + break; } } } @@ -547,21 +567,22 @@ fn spawn( let cfg = crate::data::config::config(Tag::unknown()); if let Ok(cfg) = cfg { if cfg.contains_key("nonzero_exit_errors") { - yield Ok(Value { - value: UntaggedValue::Error( - ShellError::labeled_error( - "External command failed", - "command failed", - &name_tag, - ) - ), - tag: name_tag, - }); + let _ = stdout_read_tx.send(Ok(Value { + value: UntaggedValue::Error(ShellError::labeled_error( + "External command failed", + "command failed", + &stdout_name_tag, + )), + tag: stdout_name_tag, + })); } } } - }; + Ok(()) + }); + + let stream = ThreadedReceiver::new(rx); Ok(Some(stream.to_input_stream())) } else { Err(ShellError::labeled_error( @@ -670,9 +691,7 @@ mod tests { let mut ctx = Context::basic().expect("There was a problem creating a basic context."); - assert!(run_external_command(cmd, &mut ctx, None, false) - .await - .is_err()); + assert!(run_external_command(cmd, &mut ctx, None, false).is_err()); Ok(()) } diff --git a/src/commands/classified/internal.rs b/src/commands/classified/internal.rs index b9b6509d6..9fa1835df 100644 --- a/src/commands/classified/internal.rs +++ b/src/commands/classified/internal.rs @@ -5,7 +5,7 @@ use nu_errors::ShellError; use nu_parser::InternalCommand; use nu_protocol::{CommandAction, Primitive, ReturnSuccess, UntaggedValue, Value}; -pub(crate) async fn run_internal_command( +pub(crate) fn run_internal_command( command: InternalCommand, context: &mut Context, input: Option, diff --git a/src/commands/classified/pipeline.rs b/src/commands/classified/pipeline.rs index 0bce8e744..d53067195 100644 --- a/src/commands/classified/pipeline.rs +++ b/src/commands/classified/pipeline.rs @@ -31,15 +31,15 @@ pub(crate) async fn run_pipeline( (_, Some(ClassifiedCommand::Error(err))) => return Err(err.clone().into()), (Some(ClassifiedCommand::Internal(left)), _) => { - run_internal_command(left, ctx, input, Text::from(line)).await? + run_internal_command(left, ctx, input, Text::from(line))? } (Some(ClassifiedCommand::External(left)), None) => { - run_external_command(left, ctx, input, true).await? + run_external_command(left, ctx, input, true)? } (Some(ClassifiedCommand::External(left)), _) => { - run_external_command(left, ctx, input, false).await? + run_external_command(left, ctx, input, false)? } (None, _) => break, diff --git a/src/futures.rs b/src/futures.rs new file mode 100644 index 000000000..8bfc7203c --- /dev/null +++ b/src/futures.rs @@ -0,0 +1,149 @@ +use futures::stream::Stream; +use std::pin::Pin; +use std::sync::{mpsc, Arc, Mutex}; +use std::task::{self, Poll, Waker}; +use std::thread; + +#[allow(clippy::option_option)] +struct SharedState { + result: Option>, + kill: bool, + waker: Option, +} + +pub struct ThreadedReceiver { + shared_state: Arc>>, +} + +impl ThreadedReceiver { + pub fn new(recv: mpsc::Receiver) -> ThreadedReceiver { + let shared_state = Arc::new(Mutex::new(SharedState { + result: None, + kill: false, + waker: None, + })); + + // Clone everything to avoid lifetimes + let thread_shared_state = shared_state.clone(); + thread::spawn(move || { + loop { + let result = recv.recv(); + + { + let mut shared_state = thread_shared_state + .lock() + .expect("ThreadedFuture shared state shouldn't be poisoned"); + + if let Ok(result) = result { + shared_state.result = Some(Some(result)); + } else { + break; + } + } + + // Don't attempt to recv anything else until consumed + loop { + let mut shared_state = thread_shared_state + .lock() + .expect("ThreadedFuture shared state shouldn't be poisoned"); + + if shared_state.kill { + return; + } + + if shared_state.result.is_some() { + if let Some(waker) = shared_state.waker.take() { + waker.wake(); + } + } else { + break; + } + } + } + + // Let the Stream implementation know that we're done + let mut shared_state = thread_shared_state + .lock() + .expect("ThreadedFuture shared state shouldn't be poisoned"); + + shared_state.result = Some(None); + if let Some(waker) = shared_state.waker.take() { + waker.wake(); + } + }); + + ThreadedReceiver { shared_state } + } +} + +impl Stream for ThreadedReceiver { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + let mut shared_state = self + .shared_state + .lock() + .expect("ThreadedFuture shared state shouldn't be poisoned"); + + if let Some(result) = shared_state.result.take() { + Poll::Ready(result) + } else { + shared_state.waker = Some(cx.waker().clone()); + Poll::Pending + } + } +} + +impl Drop for ThreadedReceiver { + fn drop(&mut self) { + // Setting the kill flag to true will cause the thread spawned in `new` to exit, which + // will cause the `Receiver` argument to get dropped. This can allow senders to + // potentially clean up. + match self.shared_state.lock() { + Ok(mut state) => state.kill = true, + Err(mut poisoned_err) => poisoned_err.get_mut().kill = true, + } + } +} + +#[cfg(test)] +mod tests { + mod threaded_receiver { + use super::super::ThreadedReceiver; + use futures::executor::block_on_stream; + use std::sync::mpsc; + + #[test] + fn returns_expected_result() { + let (tx, rx) = mpsc::sync_channel(0); + std::thread::spawn(move || { + let _ = tx.send(1); + let _ = tx.send(2); + let _ = tx.send(3); + }); + + let stream = ThreadedReceiver::new(rx); + let mut result = block_on_stream(stream); + assert_eq!(Some(1), result.next()); + assert_eq!(Some(2), result.next()); + assert_eq!(Some(3), result.next()); + assert_eq!(None, result.next()); + } + + #[test] + fn drops_receiver_when_stream_dropped() { + let (tx, rx) = mpsc::sync_channel(0); + let th = std::thread::spawn(move || { + tx.send(1).and_then(|_| tx.send(2)).and_then(|_| tx.send(3)) + }); + + { + let stream = ThreadedReceiver::new(rx); + let mut result = block_on_stream(stream); + assert_eq!(Some(1), result.next()); + } + let result = th.join(); + assert_eq!(true, result.unwrap().is_err()); + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 1d376bc3d..7f76bdc1d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,7 @@ mod deserializer; mod env; mod evaluate; mod format; +mod futures; mod git; mod shell; mod stream; diff --git a/tests/commands/touch.rs b/tests/commands/touch.rs index 86a40dff8..c527a2143 100644 --- a/tests/commands/touch.rs +++ b/tests/commands/touch.rs @@ -1,15 +1,12 @@ -use nu_test_support::fs::Stub::EmptyFile; use nu_test_support::nu; use nu_test_support::playground::Playground; #[test] -fn adds_a_file() { - Playground::setup("add_test_1", |dirs, sandbox| { - sandbox.with_files(vec![EmptyFile("i_will_be_created.txt")]); - +fn creates_a_file_when_it_doesnt_exist() { + Playground::setup("create_test_1", |dirs, _sandbox| { nu!( - cwd: dirs.root(), - "touch touch_test/i_will_be_created.txt" + cwd: dirs.test(), + "touch i_will_be_created.txt" ); let path = dirs.test().join("i_will_be_created.txt"); diff --git a/tests/shell/pipeline/commands/external.rs b/tests/shell/pipeline/commands/external.rs index bb0230645..5b1dc8cf7 100644 --- a/tests/shell/pipeline/commands/external.rs +++ b/tests/shell/pipeline/commands/external.rs @@ -102,7 +102,7 @@ mod it_evaluation { } mod stdin_evaluation { - use super::nu_error; + use super::{nu, nu_error}; use nu_test_support::pipeline; #[test] @@ -117,6 +117,21 @@ mod stdin_evaluation { assert_eq!(stderr, ""); } + + #[test] + fn does_not_block_indefinitely() { + let stdout = nu!( + cwd: ".", + pipeline(r#" + iecho yes + | chop + | chop + | first 1 + "# + )); + + assert_eq!(stdout, "y"); + } } mod external_words {