nushell/crates/nu-stream/src/input.rs

189 lines
5.5 KiB
Rust
Raw Normal View History

use crate::prelude::*;
use nu_errors::ShellError;
use nu_protocol::{Primitive, Type, UntaggedValue, Value};
use nu_source::{HasFallibleSpan, PrettyDebug, Tag, Tagged, TaggedItem};
pub struct InputStream {
values: Box<dyn Iterator<Item = Value> + Send + Sync>,
2020-10-29 04:14:08 +01:00
// Whether or not an empty stream was explicitly requested via InputStream::empty
empty: bool,
}
impl Iterator for InputStream {
type Item = Value;
fn next(&mut self) -> Option<Self::Item> {
self.values.next()
}
}
impl InputStream {
pub fn empty() -> InputStream {
InputStream {
values: Box::new(std::iter::empty()),
empty: true,
}
}
pub fn one(item: impl Into<Value>) -> InputStream {
InputStream {
values: Box::new(std::iter::once(item.into())),
empty: false,
}
}
pub fn into_vec(self) -> Vec<Value> {
self.values.collect()
}
pub fn is_empty(&self) -> bool {
self.empty
}
pub fn drain_vec(&mut self) -> Vec<Value> {
let mut output = vec![];
for x in &mut self.values {
output.push(x);
}
output
2019-08-03 04:17:28 +02:00
}
pub fn from_stream(input: impl Iterator<Item = Value> + Send + Sync + 'static) -> InputStream {
InputStream {
values: Box::new(input),
empty: false,
}
}
pub fn collect_string(mut self, tag: Tag) -> Result<Tagged<String>, ShellError> {
let mut bytes = vec![];
let mut value_tag = tag.clone();
loop {
match self.values.next() {
Some(Value {
value: UntaggedValue::Primitive(Primitive::String(s)),
tag: value_t,
}) => {
value_tag = value_t;
bytes.extend_from_slice(&s.into_bytes());
}
Some(Value {
value: UntaggedValue::Primitive(Primitive::Binary(b)),
tag: value_t,
}) => {
value_tag = value_t;
bytes.extend_from_slice(&b);
}
2020-06-28 23:06:05 +02:00
Some(Value {
value: UntaggedValue::Primitive(Primitive::Nothing),
tag: value_t,
}) => {
value_tag = value_t;
}
Some(Value {
tag: value_tag,
value,
}) => {
return Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
tag,
format!(
"{} originates from here",
Type::from_value(&value).plain_string(100000)
),
value_tag,
))
}
None => break,
}
}
match String::from_utf8(bytes) {
Ok(s) => Ok(s.tagged(value_tag)),
Err(_) => Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
tag,
"value originates from here",
value_tag,
)),
}
}
pub fn collect_binary(mut self, tag: Tag) -> Result<Tagged<Vec<u8>>, ShellError> {
let mut bytes = vec![];
let mut value_tag = tag.clone();
loop {
match self.values.next() {
Some(Value {
value: UntaggedValue::Primitive(Primitive::Binary(b)),
tag: value_t,
}) => {
value_tag = value_t;
bytes.extend_from_slice(&b);
}
Some(Value {
tag: value_tag,
value: _,
}) => {
return Err(ShellError::labeled_error_with_secondary(
"Expected binary from pipeline",
"requires binary input",
tag,
"value originates from here",
value_tag,
));
}
None => break,
}
}
Ok(bytes.tagged(value_tag))
}
}
impl From<VecDeque<Value>> for InputStream {
fn from(input: VecDeque<Value>) -> InputStream {
InputStream {
values: Box::new(input.into_iter()),
empty: false,
}
}
}
impl From<Vec<Value>> for InputStream {
fn from(input: Vec<Value>) -> InputStream {
InputStream {
values: Box::new(input.into_iter()),
empty: false,
}
}
}
pub trait IntoInputStream {
fn into_input_stream(self) -> InputStream;
}
impl<T, U> IntoInputStream for T
where
T: Iterator<Item = U> + Send + Sync + 'static,
U: Into<Result<nu_protocol::Value, nu_errors::ShellError>>,
{
fn into_input_stream(self) -> InputStream {
InputStream {
empty: false,
values: Box::new(self.map(|item| match item.into() {
Ok(result) => result,
Err(err) => match HasFallibleSpan::maybe_span(&err) {
Some(span) => nu_protocol::UntaggedValue::Error(err).into_value(span),
None => nu_protocol::UntaggedValue::Error(err).into_untagged_value(),
},
})),
}
}
}