Switch to "engine-p" (#3270)

* WIP

* WIP

* first builds

* Tests pass
This commit is contained in:
Jonathan Turner
2021-04-07 04:19:43 +12:00
committed by GitHub
parent ad1c4f5e39
commit 073e5727c6
262 changed files with 2269 additions and 2660 deletions

View File

@ -1,20 +1,27 @@
use crate::prelude::*;
use futures::stream::iter;
use nu_errors::ShellError;
use nu_protocol::{Primitive, Type, UntaggedValue, Value};
use nu_source::{HasFallibleSpan, PrettyDebug, Tag, Tagged, TaggedItem};
pub struct InputStream {
values: BoxStream<'static, Value>,
values: Box<dyn Iterator<Item = Value> + Send + Sync>,
// Whether or not an empty stream was explicitly requested via InputStream::empty
empty: bool,
}
impl Iterator for InputStream {
type Item = Value;
fn next(&mut self) -> Option<Self::Item> {
self.values.next()
}
}
impl InputStream {
pub fn empty() -> InputStream {
InputStream {
values: futures::stream::empty().boxed(),
values: Box::new(std::iter::empty()),
empty: true,
}
}
@ -25,7 +32,7 @@ impl InputStream {
v.into()
}
pub fn into_vec(self) -> impl Future<Output = Vec<Value>> {
pub fn into_vec(self) -> Vec<Value> {
self.values.collect()
}
@ -33,26 +40,27 @@ impl InputStream {
self.empty
}
pub fn drain_vec(&mut self) -> impl Future<Output = Vec<Value>> {
let mut values: BoxStream<'static, Value> = iter(VecDeque::new()).boxed();
std::mem::swap(&mut values, &mut self.values);
values.collect()
pub fn drain_vec(&mut self) -> Vec<Value> {
let mut output = vec![];
while let Some(x) = self.values.next() {
output.push(x);
}
output
}
pub fn from_stream(input: impl Stream<Item = Value> + Send + 'static) -> InputStream {
pub fn from_stream(input: impl Iterator<Item = Value> + Send + Sync + 'static) -> InputStream {
InputStream {
values: input.boxed(),
values: Box::new(input),
empty: false,
}
}
pub async fn collect_string(mut self, tag: Tag) -> Result<Tagged<String>, ShellError> {
pub fn collect_string(mut self, tag: Tag) -> Result<Tagged<String>, ShellError> {
let mut bytes = vec![];
let mut value_tag = tag.clone();
loop {
match self.values.next().await {
match self.values.next() {
Some(Value {
value: UntaggedValue::Primitive(Primitive::String(s)),
tag: value_t,
@ -93,7 +101,7 @@ impl InputStream {
}
match String::from_utf8(bytes) {
Ok(s) => Ok(s.tagged(value_tag.clone())),
Ok(s) => Ok(s.tagged(value_tag)),
Err(_) => Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
@ -104,12 +112,12 @@ impl InputStream {
}
}
pub async fn collect_binary(mut self, tag: Tag) -> Result<Tagged<Vec<u8>>, ShellError> {
pub fn collect_binary(mut self, tag: Tag) -> Result<Tagged<Vec<u8>>, ShellError> {
let mut bytes = vec![];
let mut value_tag = tag.clone();
loop {
match self.values.next().await {
match self.values.next() {
Some(Value {
value: UntaggedValue::Primitive(Primitive::Binary(b)),
tag: value_t,
@ -137,30 +145,10 @@ impl InputStream {
}
}
impl Stream for InputStream {
type Item = Value;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> core::task::Poll<Option<Self::Item>> {
Stream::poll_next(std::pin::Pin::new(&mut self.values), cx)
}
}
impl From<BoxStream<'static, Value>> for InputStream {
fn from(input: BoxStream<'static, Value>) -> InputStream {
InputStream {
values: input,
empty: false,
}
}
}
impl From<VecDeque<Value>> for InputStream {
fn from(input: VecDeque<Value>) -> InputStream {
InputStream {
values: futures::stream::iter(input).boxed(),
values: Box::new(input.into_iter()),
empty: false,
}
}
@ -169,7 +157,7 @@ impl From<VecDeque<Value>> for InputStream {
impl From<Vec<Value>> for InputStream {
fn from(input: Vec<Value>) -> InputStream {
InputStream {
values: futures::stream::iter(input).boxed(),
values: Box::new(input.into_iter()),
empty: false,
}
}
@ -182,16 +170,19 @@ pub trait ToInputStream {
impl<T, U> ToInputStream for T
where
T: Stream<Item = U> + Send + 'static,
T: Iterator<Item = U> + Send + Sync + 'static,
U: Into<Result<nu_protocol::Value, nu_errors::ShellError>>,
{
fn to_input_stream(self) -> InputStream {
InputStream::from_stream(self.map(|item| match item.into() {
Ok(result) => result,
Err(err) => match HasFallibleSpan::maybe_span(&err) {
Some(span) => nu_protocol::UntaggedValue::Error(err).into_value(span),
None => nu_protocol::UntaggedValue::Error(err).into_untagged_value(),
},
}))
InputStream {
empty: false,
values: Box::new(self.map(|item| match item.into() {
Ok(result) => result,
Err(err) => match HasFallibleSpan::maybe_span(&err) {
Some(span) => nu_protocol::UntaggedValue::Error(err).into_value(span),
None => nu_protocol::UntaggedValue::Error(err).into_untagged_value(),
},
})),
}
}
}