use crate::*; use std::{ fmt::Debug, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, }; /// A single buffer of binary data streamed over multiple parts. Optionally contains ctrl-c that can be used /// to break the stream. pub struct ByteStream { pub stream: Box> + Send + 'static>, pub ctrlc: Option>, } impl ByteStream { pub fn into_vec(self) -> Vec { self.flatten().collect::>() } } impl Debug for ByteStream { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ByteStream").finish() } } impl Iterator for ByteStream { type Item = Vec; fn next(&mut self) -> Option { if let Some(ctrlc) = &self.ctrlc { if ctrlc.load(Ordering::SeqCst) { None } else { self.stream.next() } } else { self.stream.next() } } } /// A single string streamed over multiple parts. Optionally contains ctrl-c that can be used /// to break the stream. pub struct StringStream { pub stream: Box> + Send + 'static>, pub ctrlc: Option>, } impl StringStream { pub fn into_string(self, separator: &str) -> Result { let mut output = String::new(); let mut first = true; for s in self.stream { output.push_str(&s?); if !first { output.push_str(separator); } else { first = false; } } Ok(output) } pub fn from_stream( input: impl Iterator> + Send + 'static, ctrlc: Option>, ) -> StringStream { StringStream { stream: Box::new(input), ctrlc, } } } impl Debug for StringStream { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("StringStream").finish() } } impl Iterator for StringStream { type Item = Result; fn next(&mut self) -> Option { if let Some(ctrlc) = &self.ctrlc { if ctrlc.load(Ordering::SeqCst) { None } else { self.stream.next() } } else { self.stream.next() } } } /// A potentially infinite stream of values, optinally with a mean to send a Ctrl-C signal to stop /// the stream from continuing. /// /// In practice, a "stream" here means anything which can be iterated and produce Values as it iterates. /// Like other iterators in Rust, observing values from this stream will drain the items as you view them /// and the stream cannot be replayed. pub struct ValueStream { pub stream: Box + Send + 'static>, pub ctrlc: Option>, } impl ValueStream { pub fn into_string(self, separator: &str, config: &Config) -> String { self.map(|x: Value| x.into_string(", ", config)) .collect::>() .join(separator) } pub fn from_stream( input: impl Iterator + Send + 'static, ctrlc: Option>, ) -> ValueStream { ValueStream { stream: Box::new(input), ctrlc, } } } impl Debug for ValueStream { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ValueStream").finish() } } impl Iterator for ValueStream { type Item = Value; fn next(&mut self) -> Option { if let Some(ctrlc) = &self.ctrlc { if ctrlc.load(Ordering::SeqCst) { None } else { self.stream.next() } } else { self.stream.next() } } }