//! Module managing the streaming of individual [`Value`]s as a [`ListStream`] between pipeline //! elements //! //! For more general infos regarding our pipelining model refer to [`PipelineData`] use crate::{Config, PipelineData, ShellError, Signals, Span, Value}; use std::fmt::Debug; pub type ValueIterator = Box + Send + 'static>; /// A potentially infinite, interruptible stream of [`Value`]s. /// /// In practice, a "stream" here means anything which can be iterated and produces Values. /// Like other iterators in Rust, observing values from this stream will drain the items /// as you view them and the stream cannot be replayed. pub struct ListStream { stream: ValueIterator, span: Span, caller_spans: Vec, } impl ListStream { /// Create a new [`ListStream`] from a [`Value`] `Iterator`. pub fn new( iter: impl Iterator + Send + 'static, span: Span, signals: Signals, ) -> Self { Self { stream: Box::new(InterruptIter::new(iter, signals)), span, caller_spans: vec![], } } /// Returns the [`Span`] associated with this [`ListStream`]. pub fn span(&self) -> Span { self.span } /// Push a caller [`Span`] to the bytestream, it's useful to construct a backtrace. pub fn push_caller_span(&mut self, span: Span) { if span != self.span { self.caller_spans.push(span) } } /// Get all caller [`Span`], it's useful to construct a backtrace. pub fn get_caller_spans(&self) -> &Vec { &self.caller_spans } /// Changes the [`Span`] associated with this [`ListStream`]. pub fn with_span(mut self, span: Span) -> Self { self.span = span; self } /// Convert a [`ListStream`] into its inner [`Value`] `Iterator`. pub fn into_inner(self) -> ValueIterator { self.stream } /// Take a single value from the inner `Iterator`, modifying the stream. pub fn next_value(&mut self) -> Option { self.stream.next() } /// Converts each value in a [`ListStream`] into a string and then joins the strings together /// using the given separator. pub fn into_string(self, separator: &str, config: &Config) -> String { self.into_iter() .map(|val| val.to_expanded_string(", ", config)) .collect::>() .join(separator) } /// Collect the values of a [`ListStream`] into a list [`Value`]. pub fn into_value(self) -> Value { Value::list(self.stream.collect(), self.span) } /// Consume all values in the stream, returning an error if any of the values is a `Value::Error`. pub fn drain(self) -> Result<(), ShellError> { for next in self { if let Value::Error { error, .. } = next { return Err(*error); } } Ok(()) } /// Modify the inner iterator of a [`ListStream`] using a function. /// /// This can be used to call any number of standard iterator functions on the [`ListStream`]. /// E.g., `take`, `filter`, `step_by`, and more. /// /// ``` /// use nu_protocol::{ListStream, Signals, Span, Value}; /// /// let span = Span::unknown(); /// let stream = ListStream::new(std::iter::repeat(Value::int(0, span)), span, Signals::empty()); /// let new_stream = stream.modify(|iter| iter.take(100)); /// ``` pub fn modify(self, f: impl FnOnce(ValueIterator) -> I) -> Self where I: Iterator + Send + 'static, { Self { stream: Box::new(f(self.stream)), span: self.span, caller_spans: self.caller_spans, } } /// Create a new [`ListStream`] whose values are the results of applying the given function /// to each of the values in the original [`ListStream`]. pub fn map(self, mapping: impl FnMut(Value) -> Value + Send + 'static) -> Self { self.modify(|iter| iter.map(mapping)) } } impl Debug for ListStream { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ListStream").finish() } } impl IntoIterator for ListStream { type Item = Value; type IntoIter = IntoIter; fn into_iter(self) -> Self::IntoIter { IntoIter { stream: self.into_inner(), } } } impl From for PipelineData { fn from(stream: ListStream) -> Self { Self::ListStream(stream, None) } } pub struct IntoIter { stream: ValueIterator, } impl Iterator for IntoIter { type Item = Value; fn next(&mut self) -> Option { self.stream.next() } } struct InterruptIter { iter: I, signals: Signals, } impl InterruptIter { fn new(iter: I, signals: Signals) -> Self { Self { iter, signals } } } impl Iterator for InterruptIter { type Item = ::Item; fn next(&mut self) -> Option { if self.signals.interrupted() { None } else { self.iter.next() } } fn size_hint(&self) -> (usize, Option) { self.iter.size_hint() } }