diff --git a/crates/nu-command/src/commands/classified/external.rs b/crates/nu-command/src/commands/classified/external.rs index 72c767d31..729964145 100644 --- a/crates/nu-command/src/commands/classified/external.rs +++ b/crates/nu-command/src/commands/classified/external.rs @@ -1,7 +1,7 @@ -use crate::futures::ThreadedReceiver; use crate::prelude::*; use nu_engine::{evaluate_baseline_expr, BufCodecReader}; use nu_engine::{MaybeTextCodec, StringOrBinary}; +use parking_lot::Mutex; use std::io::Write; use std::ops::Deref; @@ -431,7 +431,7 @@ fn spawn( Ok(()) }); - let stream = ThreadedReceiver::new(rx); + let stream = ChannelReceiver::new(rx); Ok(stream.to_input_stream()) } else { Err(ShellError::labeled_error( @@ -442,6 +442,30 @@ fn spawn( } } +struct ChannelReceiver { + rx: Arc>>>, +} + +impl ChannelReceiver { + pub fn new(rx: mpsc::Receiver>) -> Self { + Self { + rx: Arc::new(Mutex::new(rx)), + } + } +} + +impl Iterator for ChannelReceiver { + type Item = Result; + + fn next(&mut self) -> Option { + let rx = self.rx.lock(); + match rx.recv() { + Ok(v) => Some(v), + Err(_) => None, + } + } +} + fn expand_tilde(input: &SI, home_dir: HD) -> std::borrow::Cow where SI: AsRef, diff --git a/crates/nu-command/src/futures.rs b/crates/nu-command/src/futures.rs deleted file mode 100644 index aa1fce69b..000000000 --- a/crates/nu-command/src/futures.rs +++ /dev/null @@ -1,163 +0,0 @@ -use std::sync::{mpsc, Arc, Mutex}; -use std::task::Waker; -use std::thread; - -#[allow(clippy::option_option)] -struct SharedState { - result: Option>, - kill: bool, - waker: Option, -} - -pub struct ThreadedReceiver { - shared_state: Arc>>, -} - -impl ThreadedReceiver { - pub fn new(recv: mpsc::Receiver) -> ThreadedReceiver { - let shared_state = Arc::new(Mutex::new(SharedState { - result: None, - kill: false, - waker: None, - })); - - // Clone everything to avoid lifetimes - let thread_shared_state = shared_state.clone(); - thread::spawn(move || { - loop { - let result = recv.recv(); - - { - let mut shared_state = thread_shared_state - .lock() - .expect("ThreadedFuture shared state shouldn't be poisoned"); - - if let Ok(result) = result { - shared_state.result = Some(Some(result)); - } else { - break; - } - } - - // Don't attempt to recv anything else until consumed - loop { - let mut shared_state = thread_shared_state - .lock() - .expect("ThreadedFuture shared state shouldn't be poisoned"); - - if shared_state.kill { - return; - } - - if shared_state.result.is_some() { - if let Some(waker) = shared_state.waker.take() { - waker.wake(); - } - } else { - break; - } - } - } - - // Let the Stream implementation know that we're done - let mut shared_state = thread_shared_state - .lock() - .expect("ThreadedFuture shared state shouldn't be poisoned"); - - shared_state.result = Some(None); - if let Some(waker) = shared_state.waker.take() { - waker.wake(); - } - }); - - ThreadedReceiver { shared_state } - } -} - -// impl Stream for ThreadedReceiver { -// type Item = T; - -// fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { -// let mut shared_state = self -// .shared_state -// .lock() -// .expect("ThreadedFuture shared state shouldn't be poisoned"); - -// if let Some(result) = shared_state.result.take() { -// Poll::Ready(result) -// } else { -// shared_state.waker = Some(cx.waker().clone()); -// Poll::Pending -// } -// } -// } - -impl Iterator for ThreadedReceiver { - type Item = T; - - fn next(&mut self) -> Option { - loop { - let mut shared_state = self - .shared_state - .lock() - .expect("ThreadedFuture shared state shouldn't be poisoned"); - - if let Some(result) = shared_state.result.take() { - return result; - } - } - } -} - -impl Drop for ThreadedReceiver { - fn drop(&mut self) { - // Setting the kill flag to true will cause the thread spawned in `new` to exit, which - // will cause the `Receiver` argument to get dropped. This can allow senders to - // potentially clean up. - match self.shared_state.lock() { - Ok(mut state) => state.kill = true, - Err(mut poisoned_err) => poisoned_err.get_mut().kill = true, - } - } -} - -#[cfg(test)] -mod tests { - mod threaded_receiver { - use super::super::ThreadedReceiver; - use std::sync::mpsc; - - #[test] - fn returns_expected_result() { - let (tx, rx) = mpsc::sync_channel(0); - std::thread::spawn(move || { - let _ = tx.send(1); - let _ = tx.send(2); - let _ = tx.send(3); - }); - - let stream = ThreadedReceiver::new(rx); - let mut result = stream; - assert_eq!(Some(1), result.next()); - assert_eq!(Some(2), result.next()); - assert_eq!(Some(3), result.next()); - assert_eq!(None, result.next()); - } - - #[test] - fn drops_receiver_when_stream_dropped() { - let (tx, rx) = mpsc::sync_channel(0); - let th = std::thread::spawn(move || { - tx.send(1).and_then(|_| tx.send(2)).and_then(|_| tx.send(3)) - }); - - { - let stream = ThreadedReceiver::new(rx); - let mut result = stream; - assert_eq!(Some(1), result.next()); - } - let result = th.join(); - assert_eq!(true, result.unwrap().is_err()); - } - } -} diff --git a/crates/nu-command/src/lib.rs b/crates/nu-command/src/lib.rs index d7a5ee6ff..0b491777c 100644 --- a/crates/nu-command/src/lib.rs +++ b/crates/nu-command/src/lib.rs @@ -7,7 +7,6 @@ extern crate indexmap; #[macro_use] mod prelude; pub mod commands; -mod futures; pub mod utils; #[cfg(test)]