use crate::prelude::*; use futures::task::Poll; use std::sync::atomic::{AtomicBool, Ordering}; pub struct InterruptibleStream { inner: BoxStream<'static, V>, interrupt_signal: Arc, } impl InterruptibleStream { pub fn new(inner: S, interrupt_signal: Arc) -> InterruptibleStream where S: Stream + Send + 'static, { InterruptibleStream { inner: inner.boxed(), interrupt_signal, } } } impl Stream for InterruptibleStream { type Item = V; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> core::task::Poll> { if self.interrupt_signal.load(Ordering::SeqCst) { Poll::Ready(None) } else { Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx) } } } pub trait Interruptible { fn interruptible(self, ctrl_c: Arc) -> InterruptibleStream; } impl Interruptible for S where S: Stream + Send + 'static, { fn interruptible(self, ctrl_c: Arc) -> InterruptibleStream { InterruptibleStream::new(self, ctrl_c) } }