forked from extern/nushell
Introduce InterruptibleStream
type.
An interruptible stream can query an `AtomicBool. If that bool is true, the stream will no longer produce any values. Also introducing the `Interruptible` trait, which extends any `Stream` with the `interruptible` function, to simplify the construction and allow chaining.
This commit is contained in:
parent
cad2741e9e
commit
2a54ee0c54
@ -86,7 +86,7 @@ pub(crate) use crate::shell::filesystem_shell::FilesystemShell;
|
|||||||
pub(crate) use crate::shell::help_shell::HelpShell;
|
pub(crate) use crate::shell::help_shell::HelpShell;
|
||||||
pub(crate) use crate::shell::shell_manager::ShellManager;
|
pub(crate) use crate::shell::shell_manager::ShellManager;
|
||||||
pub(crate) use crate::shell::value_shell::ValueShell;
|
pub(crate) use crate::shell::value_shell::ValueShell;
|
||||||
pub(crate) use crate::stream::{InputStream, OutputStream};
|
pub(crate) use crate::stream::{InputStream, InterruptibleStream, OutputStream};
|
||||||
pub(crate) use async_stream::stream as async_stream;
|
pub(crate) use async_stream::stream as async_stream;
|
||||||
pub(crate) use bigdecimal::BigDecimal;
|
pub(crate) use bigdecimal::BigDecimal;
|
||||||
pub(crate) use futures::stream::BoxStream;
|
pub(crate) use futures::stream::BoxStream;
|
||||||
@ -102,6 +102,7 @@ pub(crate) use num_traits::cast::ToPrimitive;
|
|||||||
pub(crate) use serde::Deserialize;
|
pub(crate) use serde::Deserialize;
|
||||||
pub(crate) use std::collections::VecDeque;
|
pub(crate) use std::collections::VecDeque;
|
||||||
pub(crate) use std::future::Future;
|
pub(crate) use std::future::Future;
|
||||||
|
pub(crate) use std::sync::atomic::AtomicBool;
|
||||||
pub(crate) use std::sync::Arc;
|
pub(crate) use std::sync::Arc;
|
||||||
|
|
||||||
pub(crate) use itertools::Itertools;
|
pub(crate) use itertools::Itertools;
|
||||||
@ -160,3 +161,16 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub trait Interruptible<V> {
|
||||||
|
fn interruptible(self, ctrl_c: Arc<AtomicBool>) -> InterruptibleStream<V>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, V> Interruptible<V> for S
|
||||||
|
where
|
||||||
|
S: Stream<Item = V> + Send + 'static,
|
||||||
|
{
|
||||||
|
fn interruptible(self, ctrl_c: Arc<AtomicBool>) -> InterruptibleStream<V> {
|
||||||
|
InterruptibleStream::new(self, ctrl_c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
35
crates/nu-cli/src/stream/interruptible.rs
Normal file
35
crates/nu-cli/src/stream/interruptible.rs
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
use crate::prelude::*;
|
||||||
|
use futures::task::Poll;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
|
||||||
|
pub struct InterruptibleStream<V> {
|
||||||
|
inner: BoxStream<'static, V>,
|
||||||
|
ctrl_c: Arc<AtomicBool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<V> InterruptibleStream<V> {
|
||||||
|
pub fn new<S>(inner: S, ctrl_c: Arc<AtomicBool>) -> InterruptibleStream<V>
|
||||||
|
where
|
||||||
|
S: Stream<Item = V> + Send + 'static,
|
||||||
|
{
|
||||||
|
InterruptibleStream {
|
||||||
|
inner: inner.boxed(),
|
||||||
|
ctrl_c,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<V> Stream for InterruptibleStream<V> {
|
||||||
|
type Item = V;
|
||||||
|
|
||||||
|
fn poll_next(
|
||||||
|
mut self: std::pin::Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
) -> core::task::Poll<Option<Self::Item>> {
|
||||||
|
if self.ctrl_c.load(Ordering::SeqCst) {
|
||||||
|
Poll::Ready(None)
|
||||||
|
} else {
|
||||||
|
Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,5 +1,7 @@
|
|||||||
mod input;
|
mod input;
|
||||||
|
mod interruptible;
|
||||||
mod output;
|
mod output;
|
||||||
|
|
||||||
pub use input::*;
|
pub use input::*;
|
||||||
|
pub use interruptible::*;
|
||||||
pub use output::*;
|
pub use output::*;
|
||||||
|
Loading…
Reference in New Issue
Block a user