use crate::prelude::*; use futures::stream::{iter, once}; use nu_errors::ShellError; use nu_protocol::{Primitive, Type, UntaggedValue, Value}; use nu_source::{Tagged, TaggedItem}; pub struct InputStream { values: BoxStream<'static, Value>, // Whether or not an empty stream was explicitly requeted via InputStream::empty empty: bool, } impl InputStream { pub fn empty() -> InputStream { InputStream { values: once(async { UntaggedValue::nothing().into_untagged_value() }).boxed(), empty: true, } } pub fn one(item: impl Into) -> InputStream { let mut v: VecDeque = VecDeque::new(); v.push_back(item.into()); v.into() } pub fn into_vec(self) -> impl Future> { self.values.collect() } pub fn is_empty(&self) -> bool { self.empty } pub fn drain_vec(&mut self) -> impl Future> { let mut values: BoxStream<'static, Value> = iter(VecDeque::new()).boxed(); std::mem::swap(&mut values, &mut self.values); values.collect() } pub fn from_stream(input: impl Stream + Send + 'static) -> InputStream { InputStream { values: input.boxed(), empty: false, } } pub async fn collect_string(mut self, tag: Tag) -> Result, ShellError> { let mut bytes = vec![]; let mut value_tag = tag.clone(); loop { match self.values.next().await { Some(Value { value: UntaggedValue::Primitive(Primitive::String(s)), tag: value_t, }) => { value_tag = value_t; bytes.extend_from_slice(&s.into_bytes()); } Some(Value { value: UntaggedValue::Primitive(Primitive::Line(s)), tag: value_t, }) => { value_tag = value_t; bytes.extend_from_slice(&s.into_bytes()); } Some(Value { value: UntaggedValue::Primitive(Primitive::Binary(b)), tag: value_t, }) => { value_tag = value_t; bytes.extend_from_slice(&b); } Some(Value { value: UntaggedValue::Primitive(Primitive::Nothing), tag: value_t, }) => { value_tag = value_t; } Some(Value { tag: value_tag, value, }) => { return Err(ShellError::labeled_error_with_secondary( "Expected a string from pipeline", "requires string input", tag, format!( "{} originates from here", Type::from_value(&value).plain_string(100000) ), value_tag, )) } None => break, } } match String::from_utf8(bytes) { Ok(s) => Ok(s.tagged(value_tag.clone())), Err(_) => Err(ShellError::labeled_error_with_secondary( "Expected a string from pipeline", "requires string input", tag, "value originates from here", value_tag, )), } } pub async fn collect_binary(mut self, tag: Tag) -> Result>, ShellError> { let mut bytes = vec![]; let mut value_tag = tag.clone(); loop { match self.values.next().await { Some(Value { value: UntaggedValue::Primitive(Primitive::Binary(b)), tag: value_t, }) => { value_tag = value_t; bytes.extend_from_slice(&b); } Some(Value { tag: value_tag, value: _, }) => { return Err(ShellError::labeled_error_with_secondary( "Expected binary from pipeline", "requires binary input", tag, "value originates from here", value_tag, )); } None => break, } } Ok(bytes.tagged(value_tag)) } } impl Stream for InputStream { type Item = Value; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> core::task::Poll> { Stream::poll_next(std::pin::Pin::new(&mut self.values), cx) } } impl From> for InputStream { fn from(input: BoxStream<'static, Value>) -> InputStream { InputStream { values: input, empty: false, } } } impl From> for InputStream { fn from(input: VecDeque) -> InputStream { InputStream { values: futures::stream::iter(input).boxed(), empty: false, } } } impl From> for InputStream { fn from(input: Vec) -> InputStream { InputStream { values: futures::stream::iter(input).boxed(), empty: false, } } }