From 2a54ee0c540869c3fe50805025976362e0d0325b Mon Sep 17 00:00:00 2001 From: Jason Gedge Date: Sun, 29 Mar 2020 10:55:54 -0400 Subject: [PATCH] Introduce `InterruptibleStream` type. An interruptible stream can query an `AtomicBool. If that bool is true, the stream will no longer produce any values. Also introducing the `Interruptible` trait, which extends any `Stream` with the `interruptible` function, to simplify the construction and allow chaining. --- crates/nu-cli/src/prelude.rs | 16 ++++++++++- crates/nu-cli/src/stream/interruptible.rs | 35 +++++++++++++++++++++++ crates/nu-cli/src/stream/mod.rs | 2 ++ 3 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 crates/nu-cli/src/stream/interruptible.rs diff --git a/crates/nu-cli/src/prelude.rs b/crates/nu-cli/src/prelude.rs index 93d59aafe..ccc0a30eb 100644 --- a/crates/nu-cli/src/prelude.rs +++ b/crates/nu-cli/src/prelude.rs @@ -86,7 +86,7 @@ pub(crate) use crate::shell::filesystem_shell::FilesystemShell; pub(crate) use crate::shell::help_shell::HelpShell; pub(crate) use crate::shell::shell_manager::ShellManager; pub(crate) use crate::shell::value_shell::ValueShell; -pub(crate) use crate::stream::{InputStream, OutputStream}; +pub(crate) use crate::stream::{InputStream, InterruptibleStream, OutputStream}; pub(crate) use async_stream::stream as async_stream; pub(crate) use bigdecimal::BigDecimal; pub(crate) use futures::stream::BoxStream; @@ -102,6 +102,7 @@ pub(crate) use num_traits::cast::ToPrimitive; pub(crate) use serde::Deserialize; pub(crate) use std::collections::VecDeque; pub(crate) use std::future::Future; +pub(crate) use std::sync::atomic::AtomicBool; pub(crate) use std::sync::Arc; pub(crate) use itertools::Itertools; @@ -160,3 +161,16 @@ where } } } + +pub trait Interruptible { + fn interruptible(self, ctrl_c: Arc) -> InterruptibleStream; +} + +impl Interruptible for S +where + S: Stream + Send + 'static, +{ + fn interruptible(self, ctrl_c: Arc) -> InterruptibleStream { + InterruptibleStream::new(self, ctrl_c) + } +} diff --git a/crates/nu-cli/src/stream/interruptible.rs b/crates/nu-cli/src/stream/interruptible.rs new file mode 100644 index 000000000..d93822529 --- /dev/null +++ b/crates/nu-cli/src/stream/interruptible.rs @@ -0,0 +1,35 @@ +use crate::prelude::*; +use futures::task::Poll; +use std::sync::atomic::{AtomicBool, Ordering}; + +pub struct InterruptibleStream { + inner: BoxStream<'static, V>, + ctrl_c: Arc, +} + +impl InterruptibleStream { + pub fn new(inner: S, ctrl_c: Arc) -> InterruptibleStream + where + S: Stream + Send + 'static, + { + InterruptibleStream { + inner: inner.boxed(), + ctrl_c, + } + } +} + +impl Stream for InterruptibleStream { + type Item = V; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> core::task::Poll> { + if self.ctrl_c.load(Ordering::SeqCst) { + Poll::Ready(None) + } else { + Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx) + } + } +} diff --git a/crates/nu-cli/src/stream/mod.rs b/crates/nu-cli/src/stream/mod.rs index 3daa62395..ee08c663b 100644 --- a/crates/nu-cli/src/stream/mod.rs +++ b/crates/nu-cli/src/stream/mod.rs @@ -1,5 +1,7 @@ mod input; +mod interruptible; mod output; pub use input::*; +pub use interruptible::*; pub use output::*;