Futures v0.3 upgrade (#1344)

* Upgrade futures, async-stream, and futures_codec

These were the last three dependencies on futures-preview. `nu` itself
is now fully dependent on `futures@0.3`, as opposed to `futures-preview`
alpha.

Because the update to `futures` from `0.3.0-alpha.19` to `0.3.0` removed
the `Stream` implementation of `VecDeque` ([changelog][changelog]), most
commands that convert a `VecDeque` to an `OutputStream` broke and had to
be fixed.

The current solution is to now convert `VecDeque`s to a `Stream` via
`futures::stream::iter`. However, it may be useful for `futures` to
create an `IntoStream` trait, implemented on the `std::collections` (or
really any `IntoIterator`). If something like this happends, it may be
worthwhile to update the trait implementations on `OutputStream` and
refactor these commands again.

While upgrading `futures_codec`, we remove a custom implementation of
`LinesCodec`, as one has been added to the library. There's also a small
refactor to make the stream output more idiomatic.

[changelog]: https://github.com/rust-lang/futures-rs/blob/master/CHANGELOG.md#030---2019-11-5

* Upgrade sys & ps plugin dependencies

They were previously dependent on `futures-preview`, and `nu_plugin_ps`
was dependent on an old version of `futures-timer`.

* Remove dependency on futures-timer from nu

* Update Cargo.lock

* Fix formatting

* Revert fmt regressions

CI is still on 1.40.0, but the latest rustfmt v1.41.0 has changes to the
`val @ pattern` syntax, causing the linting job to fail.

* Fix clippy warnings
This commit is contained in:
Alex van de Sandt
2020-02-05 22:46:48 -05:00
committed by GitHub
parent ba1b67c072
commit e3be849c2a
32 changed files with 163 additions and 187 deletions

View File

@ -43,6 +43,7 @@ fn append(
) -> Result<OutputStream, ShellError> {
let mut after: VecDeque<Value> = VecDeque::new();
after.push_back(row);
let after = futures::stream::iter(after);
Ok(OutputStream::from_input(input.values.chain(after)))
}

View File

@ -1,53 +1,14 @@
use crate::prelude::*;
use bytes::{BufMut, BytesMut};
use futures::stream::StreamExt;
use futures_codec::{Decoder, Encoder, FramedRead};
use futures_codec::{FramedRead, LinesCodec};
use log::trace;
use nu_errors::ShellError;
use nu_parser::ExternalCommand;
use nu_protocol::{Primitive, ShellTypeName, UntaggedValue, Value};
use std::io::{Error, ErrorKind, Write};
use std::io::Write;
use std::ops::Deref;
use std::process::{Command, Stdio};
/// A simple `Codec` implementation that splits up data into lines.
pub struct LinesCodec {}
impl Encoder for LinesCodec {
type Item = String;
type Error = Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
dst.put(item);
Ok(())
}
}
impl Decoder for LinesCodec {
type Item = nu_protocol::UntaggedValue;
type Error = Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match src.iter().position(|b| b == &b'\n') {
Some(pos) if !src.is_empty() => {
let buf = src.split_to(pos + 1);
String::from_utf8(buf.to_vec())
.map(UntaggedValue::line)
.map(Some)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))
}
_ if !src.is_empty() => {
let drained = src.take();
String::from_utf8(drained.to_vec())
.map(UntaggedValue::string)
.map(Some)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))
}
_ => Ok(None),
}
}
}
pub fn nu_value_to_string(command: &ExternalCommand, from: &Value) -> Result<String, ShellError> {
match &from.value {
UntaggedValue::Primitive(Primitive::Int(i)) => Ok(i.to_string()),
@ -376,21 +337,19 @@ async fn spawn(
};
let file = futures::io::AllowStdIo::new(stdout);
let stream = FramedRead::new(file, LinesCodec {});
let mut stream = stream.map(|line| {
let mut stream = FramedRead::new(file, LinesCodec).map(|line| {
if let Ok(line) = line {
line.into_value(&name_tag)
Value {
value: UntaggedValue::Primitive(Primitive::String(line)),
tag: name_tag.clone(),
}
} else {
panic!("Internal error: could not read lines of text from stdin")
}
});
loop {
match stream.next().await {
Some(item) => yield Ok(item),
None => break,
}
while let Some(item) = stream.next().await {
yield Ok(item)
}
}

View File

@ -525,9 +525,11 @@ impl Command {
match call_info {
Ok(call_info) => match command.run(&call_info, &registry, &raw_args, x) {
Ok(o) => o,
Err(e) => VecDeque::from(vec![ReturnValue::Err(e)]).to_output_stream(),
Err(e) => {
futures::stream::iter(vec![ReturnValue::Err(e)]).to_output_stream()
}
},
Err(e) => VecDeque::from(vec![ReturnValue::Err(e)]).to_output_stream(),
Err(e) => futures::stream::iter(vec![ReturnValue::Err(e)]).to_output_stream(),
}
})
.flatten();

View File

@ -91,5 +91,5 @@ pub fn date(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStrea
date_out.push_back(value);
Ok(date_out.to_output_stream())
Ok(futures::stream::iter(date_out).to_output_stream())
}

View File

@ -67,7 +67,8 @@ fn default(
} else {
result.push_back(ReturnSuccess::value(item));
}
result
futures::stream::iter(result)
})
.flatten();

View File

@ -60,7 +60,8 @@ fn run(
}
}
let stream = VecDeque::from(output);
// TODO: This whole block can probably be replaced with `.map()`
let stream = futures::stream::iter(output);
Ok(stream.to_output_stream())
}

View File

@ -48,7 +48,7 @@ impl PerItemCommand for Edit {
value: UntaggedValue::Row(_),
..
} => match obj.replace_data_at_column_path(&field, replacement.item.clone()) {
Some(v) => VecDeque::from(vec![Ok(ReturnSuccess::Value(v))]),
Some(v) => futures::stream::iter(vec![Ok(ReturnSuccess::Value(v))]),
None => {
return Err(ShellError::labeled_error(
"edit could not find place to insert column",

View File

@ -85,5 +85,7 @@ pub fn env(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream
let value = get_environment(tag)?;
env_out.push_back(value);
let env_out = futures::stream::iter(env_out);
Ok(env_out.to_output_stream())
}

View File

@ -9,7 +9,7 @@ pub struct First;
#[derive(Deserialize)]
pub struct FirstArgs {
rows: Option<Tagged<u64>>,
rows: Option<Tagged<usize>>,
}
impl WholeStreamCommand for First {

View File

@ -76,7 +76,7 @@ impl PerItemCommand for Format {
String::new()
};
Ok(VecDeque::from(vec![ReturnSuccess::value(
Ok(futures::stream::iter(vec![ReturnSuccess::value(
UntaggedValue::string(output).into_untagged_value(),
)])
.to_output_stream())

View File

@ -239,7 +239,7 @@ pub fn get(
}
}
result
futures::stream::iter(result)
})
.flatten();

View File

@ -77,6 +77,7 @@ impl PerItemCommand for Help {
get_help(&command.name(), &command.usage(), command.signature()).into(),
);
}
let help = futures::stream::iter(help);
Ok(help.to_output_stream())
}
_ => {
@ -102,11 +103,9 @@ Get the processes on your system actively using CPU:
You can also learn more at https://www.nushell.sh/book/"#;
let mut output_stream = VecDeque::new();
output_stream.push_back(ReturnSuccess::value(
let output_stream = futures::stream::iter(vec![ReturnSuccess::value(
UntaggedValue::string(msg).into_value(tag),
));
)]);
Ok(output_stream.to_output_stream())
}

View File

@ -48,7 +48,7 @@ impl PerItemCommand for Insert {
value: UntaggedValue::Row(_),
..
} => match obj.insert_data_at_column_path(&field, replacement.item.clone()) {
Ok(v) => VecDeque::from(vec![Ok(ReturnSuccess::Value(v))]),
Ok(v) => futures::stream::iter(vec![Ok(ReturnSuccess::Value(v))]),
Err(err) => return Err(err),
},

View File

@ -44,25 +44,27 @@ fn lines(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream,
trace!("split result = {:?}", split_result);
let mut result = VecDeque::new();
for s in split_result {
result.push_back(ReturnSuccess::value(
UntaggedValue::Primitive(Primitive::Line(s.into())).into_untagged_value(),
));
}
result
let result = split_result
.into_iter()
.map(|s| {
ReturnSuccess::value(
UntaggedValue::Primitive(Primitive::Line(s.into()))
.into_untagged_value(),
)
})
.collect::<Vec<_>>();
futures::stream::iter(result)
} else {
let mut result = VecDeque::new();
let value_span = v.tag.span;
result.push_back(Err(ShellError::labeled_error_with_secondary(
futures::stream::iter(vec![Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
name_span,
"value originates from here",
value_span,
)));
result
))])
}
})
.flatten();

View File

@ -68,7 +68,7 @@ fn nth(
result.push_back(ReturnSuccess::value(item));
}
result
futures::stream::iter(result)
})
.flatten();

View File

@ -136,6 +136,6 @@ impl PerItemCommand for Parse {
} else {
VecDeque::new()
};
Ok(output.to_output_stream())
Ok(output.into())
}
}

View File

@ -84,9 +84,11 @@ pub fn filter_plugin(
let mut bos: VecDeque<Value> = VecDeque::new();
bos.push_back(UntaggedValue::Primitive(Primitive::BeginningOfStream).into_untagged_value());
let bos = futures::stream::iter(bos);
let mut eos: VecDeque<Value> = VecDeque::new();
eos.push_back(UntaggedValue::Primitive(Primitive::EndOfStream).into_untagged_value());
let eos = futures::stream::iter(eos);
let call_info = args.call_info.clone();
@ -294,6 +296,7 @@ pub fn filter_plugin(
}
}
})
.map(futures::stream::iter) // convert to a stream
.flatten();
Ok(stream.to_output_stream())

View File

@ -41,8 +41,7 @@ fn prepend(
PrependArgs { row }: PrependArgs,
RunnableContext { input, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let mut prepend: VecDeque<Value> = VecDeque::new();
prepend.push_back(row);
let prepend = futures::stream::iter(vec![row]);
Ok(OutputStream::from_input(prepend.chain(input.values)))
}

View File

@ -47,7 +47,10 @@ fn range(
let (from, _) = range.from;
let (to, _) = range.to;
let from = *from as usize;
let to = *to as usize;
Ok(OutputStream::from_input(
input.values.skip(*from).take(*to - *from + 1),
input.values.skip(from).take(to - from + 1),
))
}

View File

@ -32,11 +32,11 @@ fn reverse(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream
let args = args.evaluate_once(registry)?;
let (input, _args) = args.parts();
let output = input.values.collect::<Vec<_>>();
let input = input.values.collect::<Vec<_>>();
let output = output.map(move |mut vec| {
let output = input.map(move |mut vec| {
vec.reverse();
vec.into_iter().collect::<VecDeque<_>>()
futures::stream::iter(vec)
});
Ok(output.flatten_stream().from_input_stream())

View File

@ -46,5 +46,5 @@ fn shells(args: CommandArgs, _registry: &CommandRegistry) -> Result<OutputStream
shells_out.push_back(dict.into_value());
}
Ok(shells_out.to_output_stream())
Ok(shells_out.into())
}

View File

@ -9,7 +9,7 @@ pub struct Skip;
#[derive(Deserialize)]
pub struct SkipArgs {
rows: Option<Tagged<u64>>,
rows: Option<Tagged<usize>>,
}
impl WholeStreamCommand for Skip {

View File

@ -58,7 +58,7 @@ fn split_row(
UntaggedValue::Primitive(Primitive::String(s.into())).into_value(&v.tag),
));
}
result
futures::stream::iter(result)
} else {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::labeled_error_with_secondary(
@ -68,7 +68,7 @@ fn split_row(
"value originates from here",
v.tag.span,
)));
result
futures::stream::iter(result)
}
})
.flatten();

View File

@ -57,6 +57,6 @@ impl PerItemCommand for Where {
}
};
Ok(stream.to_output_stream())
Ok(stream.into())
}
}

View File

@ -142,7 +142,12 @@ impl Shell for HelpShell {
_args: LsArgs,
_context: &RunnablePerItemContext,
) -> Result<OutputStream, ShellError> {
Ok(self.commands().map(ReturnSuccess::value).to_output_stream())
let output = self
.commands()
.into_iter()
.map(ReturnSuccess::value)
.collect::<VecDeque<_>>();
Ok(output.into())
}
fn cd(&self, args: EvaluatedWholeStreamCommandArgs) -> Result<OutputStream, ShellError> {

View File

@ -120,10 +120,12 @@ impl Shell for ValueShell {
));
}
Ok(self
let output = self
.members_under(full_path.as_path())
.into_iter()
.map(ReturnSuccess::value)
.to_output_stream())
.collect::<VecDeque<_>>();
Ok(output.into())
}
fn cd(&self, args: EvaluatedWholeStreamCommandArgs) -> Result<OutputStream, ShellError> {

View File

@ -1,4 +1,5 @@
use crate::prelude::*;
use futures::stream::iter;
use nu_protocol::{ReturnSuccess, ReturnValue, UntaggedValue, Value};
pub struct InputStream {
@ -15,7 +16,7 @@ impl InputStream {
}
pub fn drain_vec(&mut self) -> impl Future<Output = Vec<Value>> {
let mut values: BoxStream<'static, Value> = VecDeque::new().boxed();
let mut values: BoxStream<'static, Value> = iter(VecDeque::new()).boxed();
std::mem::swap(&mut values, &mut self.values);
values.collect()
@ -48,18 +49,15 @@ impl From<BoxStream<'static, Value>> for InputStream {
impl From<VecDeque<Value>> for InputStream {
fn from(input: VecDeque<Value>) -> InputStream {
InputStream {
values: input.boxed(),
values: futures::stream::iter(input).boxed(),
}
}
}
impl From<Vec<Value>> for InputStream {
fn from(input: Vec<Value>) -> InputStream {
let mut list = VecDeque::default();
list.extend(input);
InputStream {
values: list.boxed(),
values: futures::stream::iter(input).boxed(),
}
}
}
@ -93,7 +91,7 @@ impl OutputStream {
}
pub fn drain_vec(&mut self) -> impl Future<Output = Vec<ReturnValue>> {
let mut values: BoxStream<'static, ReturnValue> = VecDeque::new().boxed();
let mut values: BoxStream<'static, ReturnValue> = iter(VecDeque::new()).boxed();
std::mem::swap(&mut values, &mut self.values);
values.collect()
@ -136,41 +134,33 @@ impl From<BoxStream<'static, ReturnValue>> for OutputStream {
impl From<VecDeque<ReturnValue>> for OutputStream {
fn from(input: VecDeque<ReturnValue>) -> OutputStream {
OutputStream {
values: input.boxed(),
values: futures::stream::iter(input).boxed(),
}
}
}
impl From<VecDeque<Value>> for OutputStream {
fn from(input: VecDeque<Value>) -> OutputStream {
let stream = input.into_iter().map(ReturnSuccess::value);
OutputStream {
values: input
.into_iter()
.map(ReturnSuccess::value)
.collect::<VecDeque<ReturnValue>>()
.boxed(),
values: futures::stream::iter(stream).boxed(),
}
}
}
impl From<Vec<ReturnValue>> for OutputStream {
fn from(input: Vec<ReturnValue>) -> OutputStream {
let mut list = VecDeque::default();
list.extend(input);
OutputStream {
values: list.boxed(),
values: futures::stream::iter(input).boxed(),
}
}
}
impl From<Vec<Value>> for OutputStream {
fn from(input: Vec<Value>) -> OutputStream {
let mut list = VecDeque::default();
list.extend(input.into_iter().map(ReturnSuccess::value));
let stream = input.into_iter().map(ReturnSuccess::value);
OutputStream {
values: list.boxed(),
values: futures::stream::iter(stream).boxed(),
}
}
}