nushell/crates/nu-cli/src/stream.rs

254 lines
7.4 KiB
Rust
Raw Normal View History

use crate::prelude::*;
Futures v0.3 upgrade (#1344) * Upgrade futures, async-stream, and futures_codec These were the last three dependencies on futures-preview. `nu` itself is now fully dependent on `futures@0.3`, as opposed to `futures-preview` alpha. Because the update to `futures` from `0.3.0-alpha.19` to `0.3.0` removed the `Stream` implementation of `VecDeque` ([changelog][changelog]), most commands that convert a `VecDeque` to an `OutputStream` broke and had to be fixed. The current solution is to now convert `VecDeque`s to a `Stream` via `futures::stream::iter`. However, it may be useful for `futures` to create an `IntoStream` trait, implemented on the `std::collections` (or really any `IntoIterator`). If something like this happends, it may be worthwhile to update the trait implementations on `OutputStream` and refactor these commands again. While upgrading `futures_codec`, we remove a custom implementation of `LinesCodec`, as one has been added to the library. There's also a small refactor to make the stream output more idiomatic. [changelog]: https://github.com/rust-lang/futures-rs/blob/master/CHANGELOG.md#030---2019-11-5 * Upgrade sys & ps plugin dependencies They were previously dependent on `futures-preview`, and `nu_plugin_ps` was dependent on an old version of `futures-timer`. * Remove dependency on futures-timer from nu * Update Cargo.lock * Fix formatting * Revert fmt regressions CI is still on 1.40.0, but the latest rustfmt v1.41.0 has changes to the `val @ pattern` syntax, causing the linting job to fail. * Fix clippy warnings
2020-02-06 04:46:48 +01:00
use futures::stream::iter;
use nu_errors::ShellError;
use nu_protocol::{Primitive, ReturnSuccess, ReturnValue, UntaggedValue, Value};
use nu_source::{Tagged, TaggedItem};
pub struct InputStream {
pub(crate) values: BoxStream<'static, Value>,
}
impl InputStream {
pub fn empty() -> InputStream {
vec![UntaggedValue::nothing().into_value(Tag::unknown())].into()
}
pub fn into_vec(self) -> impl Future<Output = Vec<Value>> {
self.values.collect()
}
pub fn drain_vec(&mut self) -> impl Future<Output = Vec<Value>> {
Futures v0.3 upgrade (#1344) * Upgrade futures, async-stream, and futures_codec These were the last three dependencies on futures-preview. `nu` itself is now fully dependent on `futures@0.3`, as opposed to `futures-preview` alpha. Because the update to `futures` from `0.3.0-alpha.19` to `0.3.0` removed the `Stream` implementation of `VecDeque` ([changelog][changelog]), most commands that convert a `VecDeque` to an `OutputStream` broke and had to be fixed. The current solution is to now convert `VecDeque`s to a `Stream` via `futures::stream::iter`. However, it may be useful for `futures` to create an `IntoStream` trait, implemented on the `std::collections` (or really any `IntoIterator`). If something like this happends, it may be worthwhile to update the trait implementations on `OutputStream` and refactor these commands again. While upgrading `futures_codec`, we remove a custom implementation of `LinesCodec`, as one has been added to the library. There's also a small refactor to make the stream output more idiomatic. [changelog]: https://github.com/rust-lang/futures-rs/blob/master/CHANGELOG.md#030---2019-11-5 * Upgrade sys & ps plugin dependencies They were previously dependent on `futures-preview`, and `nu_plugin_ps` was dependent on an old version of `futures-timer`. * Remove dependency on futures-timer from nu * Update Cargo.lock * Fix formatting * Revert fmt regressions CI is still on 1.40.0, but the latest rustfmt v1.41.0 has changes to the `val @ pattern` syntax, causing the linting job to fail. * Fix clippy warnings
2020-02-06 04:46:48 +01:00
let mut values: BoxStream<'static, Value> = iter(VecDeque::new()).boxed();
2019-08-03 04:17:28 +02:00
std::mem::swap(&mut values, &mut self.values);
values.collect()
}
pub fn from_stream(input: impl Stream<Item = Value> + Send + 'static) -> InputStream {
InputStream {
values: input.boxed(),
}
}
pub async fn collect_string(mut self, tag: Tag) -> Result<Tagged<String>, ShellError> {
let mut bytes = vec![];
let mut value_tag = tag.clone();
loop {
match self.values.next().await {
Some(Value {
value: UntaggedValue::Primitive(Primitive::String(s)),
tag: value_t,
}) => {
value_tag = value_t;
bytes.extend_from_slice(&s.into_bytes());
}
Some(Value {
value: UntaggedValue::Primitive(Primitive::Line(s)),
tag: value_t,
}) => {
value_tag = value_t;
bytes.extend_from_slice(&s.into_bytes());
}
Some(Value {
value: UntaggedValue::Primitive(Primitive::Binary(b)),
tag: value_t,
}) => {
value_tag = value_t;
bytes.extend_from_slice(&b);
}
Some(Value { tag: value_tag, .. }) => {
return Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
tag,
"value originates from here",
value_tag,
))
}
None => break,
}
}
match String::from_utf8(bytes) {
Ok(s) => Ok(s.tagged(value_tag.clone())),
Err(_) => Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
tag,
"value originates from here",
value_tag,
)),
}
}
pub async fn collect_binary(mut self, tag: Tag) -> Result<Tagged<Vec<u8>>, ShellError> {
let mut bytes = vec![];
let mut value_tag = tag.clone();
loop {
match self.values.next().await {
Some(Value {
value: UntaggedValue::Primitive(Primitive::Binary(b)),
tag: value_t,
}) => {
value_tag = value_t;
bytes.extend_from_slice(&b);
}
Some(Value {
tag: value_tag,
value: v,
}) => {
println!("{:?}", v);
return Err(ShellError::labeled_error_with_secondary(
"Expected binary from pipeline",
"requires binary input",
tag,
"value originates from here",
value_tag,
));
}
None => break,
}
}
Ok(bytes.tagged(value_tag))
}
}
impl Stream for InputStream {
type Item = Value;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> core::task::Poll<Option<Self::Item>> {
Stream::poll_next(std::pin::Pin::new(&mut self.values), cx)
}
}
impl From<BoxStream<'static, Value>> for InputStream {
fn from(input: BoxStream<'static, Value>) -> InputStream {
InputStream { values: input }
}
}
impl From<VecDeque<Value>> for InputStream {
fn from(input: VecDeque<Value>) -> InputStream {
InputStream {
Futures v0.3 upgrade (#1344) * Upgrade futures, async-stream, and futures_codec These were the last three dependencies on futures-preview. `nu` itself is now fully dependent on `futures@0.3`, as opposed to `futures-preview` alpha. Because the update to `futures` from `0.3.0-alpha.19` to `0.3.0` removed the `Stream` implementation of `VecDeque` ([changelog][changelog]), most commands that convert a `VecDeque` to an `OutputStream` broke and had to be fixed. The current solution is to now convert `VecDeque`s to a `Stream` via `futures::stream::iter`. However, it may be useful for `futures` to create an `IntoStream` trait, implemented on the `std::collections` (or really any `IntoIterator`). If something like this happends, it may be worthwhile to update the trait implementations on `OutputStream` and refactor these commands again. While upgrading `futures_codec`, we remove a custom implementation of `LinesCodec`, as one has been added to the library. There's also a small refactor to make the stream output more idiomatic. [changelog]: https://github.com/rust-lang/futures-rs/blob/master/CHANGELOG.md#030---2019-11-5 * Upgrade sys & ps plugin dependencies They were previously dependent on `futures-preview`, and `nu_plugin_ps` was dependent on an old version of `futures-timer`. * Remove dependency on futures-timer from nu * Update Cargo.lock * Fix formatting * Revert fmt regressions CI is still on 1.40.0, but the latest rustfmt v1.41.0 has changes to the `val @ pattern` syntax, causing the linting job to fail. * Fix clippy warnings
2020-02-06 04:46:48 +01:00
values: futures::stream::iter(input).boxed(),
}
}
}
impl From<Vec<Value>> for InputStream {
fn from(input: Vec<Value>) -> InputStream {
InputStream {
Futures v0.3 upgrade (#1344) * Upgrade futures, async-stream, and futures_codec These were the last three dependencies on futures-preview. `nu` itself is now fully dependent on `futures@0.3`, as opposed to `futures-preview` alpha. Because the update to `futures` from `0.3.0-alpha.19` to `0.3.0` removed the `Stream` implementation of `VecDeque` ([changelog][changelog]), most commands that convert a `VecDeque` to an `OutputStream` broke and had to be fixed. The current solution is to now convert `VecDeque`s to a `Stream` via `futures::stream::iter`. However, it may be useful for `futures` to create an `IntoStream` trait, implemented on the `std::collections` (or really any `IntoIterator`). If something like this happends, it may be worthwhile to update the trait implementations on `OutputStream` and refactor these commands again. While upgrading `futures_codec`, we remove a custom implementation of `LinesCodec`, as one has been added to the library. There's also a small refactor to make the stream output more idiomatic. [changelog]: https://github.com/rust-lang/futures-rs/blob/master/CHANGELOG.md#030---2019-11-5 * Upgrade sys & ps plugin dependencies They were previously dependent on `futures-preview`, and `nu_plugin_ps` was dependent on an old version of `futures-timer`. * Remove dependency on futures-timer from nu * Update Cargo.lock * Fix formatting * Revert fmt regressions CI is still on 1.40.0, but the latest rustfmt v1.41.0 has changes to the `val @ pattern` syntax, causing the linting job to fail. * Fix clippy warnings
2020-02-06 04:46:48 +01:00
values: futures::stream::iter(input).boxed(),
}
}
}
pub struct OutputStream {
pub(crate) values: BoxStream<'static, ReturnValue>,
}
impl OutputStream {
2019-08-03 04:17:28 +02:00
pub fn new(values: impl Stream<Item = ReturnValue> + Send + 'static) -> OutputStream {
OutputStream {
values: values.boxed(),
}
}
2019-07-09 06:31:26 +02:00
pub fn empty() -> OutputStream {
let v: VecDeque<ReturnValue> = VecDeque::new();
v.into()
}
2019-08-02 21:15:07 +02:00
pub fn one(item: impl Into<ReturnValue>) -> OutputStream {
2019-08-03 04:17:28 +02:00
let mut v: VecDeque<ReturnValue> = VecDeque::new();
2019-08-02 21:15:07 +02:00
v.push_back(item.into());
v.into()
}
pub fn from_input(input: impl Stream<Item = Value> + Send + 'static) -> OutputStream {
OutputStream {
values: input.map(ReturnSuccess::value).boxed(),
}
}
pub fn drain_vec(&mut self) -> impl Future<Output = Vec<ReturnValue>> {
Futures v0.3 upgrade (#1344) * Upgrade futures, async-stream, and futures_codec These were the last three dependencies on futures-preview. `nu` itself is now fully dependent on `futures@0.3`, as opposed to `futures-preview` alpha. Because the update to `futures` from `0.3.0-alpha.19` to `0.3.0` removed the `Stream` implementation of `VecDeque` ([changelog][changelog]), most commands that convert a `VecDeque` to an `OutputStream` broke and had to be fixed. The current solution is to now convert `VecDeque`s to a `Stream` via `futures::stream::iter`. However, it may be useful for `futures` to create an `IntoStream` trait, implemented on the `std::collections` (or really any `IntoIterator`). If something like this happends, it may be worthwhile to update the trait implementations on `OutputStream` and refactor these commands again. While upgrading `futures_codec`, we remove a custom implementation of `LinesCodec`, as one has been added to the library. There's also a small refactor to make the stream output more idiomatic. [changelog]: https://github.com/rust-lang/futures-rs/blob/master/CHANGELOG.md#030---2019-11-5 * Upgrade sys & ps plugin dependencies They were previously dependent on `futures-preview`, and `nu_plugin_ps` was dependent on an old version of `futures-timer`. * Remove dependency on futures-timer from nu * Update Cargo.lock * Fix formatting * Revert fmt regressions CI is still on 1.40.0, but the latest rustfmt v1.41.0 has changes to the `val @ pattern` syntax, causing the linting job to fail. * Fix clippy warnings
2020-02-06 04:46:48 +01:00
let mut values: BoxStream<'static, ReturnValue> = iter(VecDeque::new()).boxed();
std::mem::swap(&mut values, &mut self.values);
values.collect()
}
}
2019-08-03 04:17:28 +02:00
impl Stream for OutputStream {
type Item = ReturnValue;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> core::task::Poll<Option<Self::Item>> {
Stream::poll_next(std::pin::Pin::new(&mut self.values), cx)
}
}
impl From<InputStream> for OutputStream {
fn from(input: InputStream) -> OutputStream {
OutputStream {
values: input.values.map(ReturnSuccess::value).boxed(),
}
}
}
impl From<BoxStream<'static, Value>> for OutputStream {
fn from(input: BoxStream<'static, Value>) -> OutputStream {
OutputStream {
values: input.map(ReturnSuccess::value).boxed(),
}
}
}
impl From<BoxStream<'static, ReturnValue>> for OutputStream {
fn from(input: BoxStream<'static, ReturnValue>) -> OutputStream {
OutputStream { values: input }
}
}
impl From<VecDeque<ReturnValue>> for OutputStream {
fn from(input: VecDeque<ReturnValue>) -> OutputStream {
OutputStream {
Futures v0.3 upgrade (#1344) * Upgrade futures, async-stream, and futures_codec These were the last three dependencies on futures-preview. `nu` itself is now fully dependent on `futures@0.3`, as opposed to `futures-preview` alpha. Because the update to `futures` from `0.3.0-alpha.19` to `0.3.0` removed the `Stream` implementation of `VecDeque` ([changelog][changelog]), most commands that convert a `VecDeque` to an `OutputStream` broke and had to be fixed. The current solution is to now convert `VecDeque`s to a `Stream` via `futures::stream::iter`. However, it may be useful for `futures` to create an `IntoStream` trait, implemented on the `std::collections` (or really any `IntoIterator`). If something like this happends, it may be worthwhile to update the trait implementations on `OutputStream` and refactor these commands again. While upgrading `futures_codec`, we remove a custom implementation of `LinesCodec`, as one has been added to the library. There's also a small refactor to make the stream output more idiomatic. [changelog]: https://github.com/rust-lang/futures-rs/blob/master/CHANGELOG.md#030---2019-11-5 * Upgrade sys & ps plugin dependencies They were previously dependent on `futures-preview`, and `nu_plugin_ps` was dependent on an old version of `futures-timer`. * Remove dependency on futures-timer from nu * Update Cargo.lock * Fix formatting * Revert fmt regressions CI is still on 1.40.0, but the latest rustfmt v1.41.0 has changes to the `val @ pattern` syntax, causing the linting job to fail. * Fix clippy warnings
2020-02-06 04:46:48 +01:00
values: futures::stream::iter(input).boxed(),
}
}
}
impl From<VecDeque<Value>> for OutputStream {
fn from(input: VecDeque<Value>) -> OutputStream {
Futures v0.3 upgrade (#1344) * Upgrade futures, async-stream, and futures_codec These were the last three dependencies on futures-preview. `nu` itself is now fully dependent on `futures@0.3`, as opposed to `futures-preview` alpha. Because the update to `futures` from `0.3.0-alpha.19` to `0.3.0` removed the `Stream` implementation of `VecDeque` ([changelog][changelog]), most commands that convert a `VecDeque` to an `OutputStream` broke and had to be fixed. The current solution is to now convert `VecDeque`s to a `Stream` via `futures::stream::iter`. However, it may be useful for `futures` to create an `IntoStream` trait, implemented on the `std::collections` (or really any `IntoIterator`). If something like this happends, it may be worthwhile to update the trait implementations on `OutputStream` and refactor these commands again. While upgrading `futures_codec`, we remove a custom implementation of `LinesCodec`, as one has been added to the library. There's also a small refactor to make the stream output more idiomatic. [changelog]: https://github.com/rust-lang/futures-rs/blob/master/CHANGELOG.md#030---2019-11-5 * Upgrade sys & ps plugin dependencies They were previously dependent on `futures-preview`, and `nu_plugin_ps` was dependent on an old version of `futures-timer`. * Remove dependency on futures-timer from nu * Update Cargo.lock * Fix formatting * Revert fmt regressions CI is still on 1.40.0, but the latest rustfmt v1.41.0 has changes to the `val @ pattern` syntax, causing the linting job to fail. * Fix clippy warnings
2020-02-06 04:46:48 +01:00
let stream = input.into_iter().map(ReturnSuccess::value);
2019-07-09 06:31:26 +02:00
OutputStream {
Futures v0.3 upgrade (#1344) * Upgrade futures, async-stream, and futures_codec These were the last three dependencies on futures-preview. `nu` itself is now fully dependent on `futures@0.3`, as opposed to `futures-preview` alpha. Because the update to `futures` from `0.3.0-alpha.19` to `0.3.0` removed the `Stream` implementation of `VecDeque` ([changelog][changelog]), most commands that convert a `VecDeque` to an `OutputStream` broke and had to be fixed. The current solution is to now convert `VecDeque`s to a `Stream` via `futures::stream::iter`. However, it may be useful for `futures` to create an `IntoStream` trait, implemented on the `std::collections` (or really any `IntoIterator`). If something like this happends, it may be worthwhile to update the trait implementations on `OutputStream` and refactor these commands again. While upgrading `futures_codec`, we remove a custom implementation of `LinesCodec`, as one has been added to the library. There's also a small refactor to make the stream output more idiomatic. [changelog]: https://github.com/rust-lang/futures-rs/blob/master/CHANGELOG.md#030---2019-11-5 * Upgrade sys & ps plugin dependencies They were previously dependent on `futures-preview`, and `nu_plugin_ps` was dependent on an old version of `futures-timer`. * Remove dependency on futures-timer from nu * Update Cargo.lock * Fix formatting * Revert fmt regressions CI is still on 1.40.0, but the latest rustfmt v1.41.0 has changes to the `val @ pattern` syntax, causing the linting job to fail. * Fix clippy warnings
2020-02-06 04:46:48 +01:00
values: futures::stream::iter(stream).boxed(),
2019-07-09 06:31:26 +02:00
}
}
}
impl From<Vec<ReturnValue>> for OutputStream {
fn from(input: Vec<ReturnValue>) -> OutputStream {
OutputStream {
Futures v0.3 upgrade (#1344) * Upgrade futures, async-stream, and futures_codec These were the last three dependencies on futures-preview. `nu` itself is now fully dependent on `futures@0.3`, as opposed to `futures-preview` alpha. Because the update to `futures` from `0.3.0-alpha.19` to `0.3.0` removed the `Stream` implementation of `VecDeque` ([changelog][changelog]), most commands that convert a `VecDeque` to an `OutputStream` broke and had to be fixed. The current solution is to now convert `VecDeque`s to a `Stream` via `futures::stream::iter`. However, it may be useful for `futures` to create an `IntoStream` trait, implemented on the `std::collections` (or really any `IntoIterator`). If something like this happends, it may be worthwhile to update the trait implementations on `OutputStream` and refactor these commands again. While upgrading `futures_codec`, we remove a custom implementation of `LinesCodec`, as one has been added to the library. There's also a small refactor to make the stream output more idiomatic. [changelog]: https://github.com/rust-lang/futures-rs/blob/master/CHANGELOG.md#030---2019-11-5 * Upgrade sys & ps plugin dependencies They were previously dependent on `futures-preview`, and `nu_plugin_ps` was dependent on an old version of `futures-timer`. * Remove dependency on futures-timer from nu * Update Cargo.lock * Fix formatting * Revert fmt regressions CI is still on 1.40.0, but the latest rustfmt v1.41.0 has changes to the `val @ pattern` syntax, causing the linting job to fail. * Fix clippy warnings
2020-02-06 04:46:48 +01:00
values: futures::stream::iter(input).boxed(),
}
}
}
impl From<Vec<Value>> for OutputStream {
fn from(input: Vec<Value>) -> OutputStream {
Futures v0.3 upgrade (#1344) * Upgrade futures, async-stream, and futures_codec These were the last three dependencies on futures-preview. `nu` itself is now fully dependent on `futures@0.3`, as opposed to `futures-preview` alpha. Because the update to `futures` from `0.3.0-alpha.19` to `0.3.0` removed the `Stream` implementation of `VecDeque` ([changelog][changelog]), most commands that convert a `VecDeque` to an `OutputStream` broke and had to be fixed. The current solution is to now convert `VecDeque`s to a `Stream` via `futures::stream::iter`. However, it may be useful for `futures` to create an `IntoStream` trait, implemented on the `std::collections` (or really any `IntoIterator`). If something like this happends, it may be worthwhile to update the trait implementations on `OutputStream` and refactor these commands again. While upgrading `futures_codec`, we remove a custom implementation of `LinesCodec`, as one has been added to the library. There's also a small refactor to make the stream output more idiomatic. [changelog]: https://github.com/rust-lang/futures-rs/blob/master/CHANGELOG.md#030---2019-11-5 * Upgrade sys & ps plugin dependencies They were previously dependent on `futures-preview`, and `nu_plugin_ps` was dependent on an old version of `futures-timer`. * Remove dependency on futures-timer from nu * Update Cargo.lock * Fix formatting * Revert fmt regressions CI is still on 1.40.0, but the latest rustfmt v1.41.0 has changes to the `val @ pattern` syntax, causing the linting job to fail. * Fix clippy warnings
2020-02-06 04:46:48 +01:00
let stream = input.into_iter().map(ReturnSuccess::value);
OutputStream {
Futures v0.3 upgrade (#1344) * Upgrade futures, async-stream, and futures_codec These were the last three dependencies on futures-preview. `nu` itself is now fully dependent on `futures@0.3`, as opposed to `futures-preview` alpha. Because the update to `futures` from `0.3.0-alpha.19` to `0.3.0` removed the `Stream` implementation of `VecDeque` ([changelog][changelog]), most commands that convert a `VecDeque` to an `OutputStream` broke and had to be fixed. The current solution is to now convert `VecDeque`s to a `Stream` via `futures::stream::iter`. However, it may be useful for `futures` to create an `IntoStream` trait, implemented on the `std::collections` (or really any `IntoIterator`). If something like this happends, it may be worthwhile to update the trait implementations on `OutputStream` and refactor these commands again. While upgrading `futures_codec`, we remove a custom implementation of `LinesCodec`, as one has been added to the library. There's also a small refactor to make the stream output more idiomatic. [changelog]: https://github.com/rust-lang/futures-rs/blob/master/CHANGELOG.md#030---2019-11-5 * Upgrade sys & ps plugin dependencies They were previously dependent on `futures-preview`, and `nu_plugin_ps` was dependent on an old version of `futures-timer`. * Remove dependency on futures-timer from nu * Update Cargo.lock * Fix formatting * Revert fmt regressions CI is still on 1.40.0, but the latest rustfmt v1.41.0 has changes to the `val @ pattern` syntax, causing the linting job to fail. * Fix clippy warnings
2020-02-06 04:46:48 +01:00
values: futures::stream::iter(stream).boxed(),
}
}
}