Improved streams

This commit is contained in:
Yehuda Katz 2019-05-23 21:34:43 -07:00
parent 625a356361
commit bf332ea50c
10 changed files with 100 additions and 31 deletions

1
Cargo.lock generated
View File

@ -916,6 +916,7 @@ dependencies = [
"derive-new 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", "derive-new 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)",
"dunce 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "dunce 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)", "futures-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-sink-preview 0.3.0-alpha.16 (registry+https://github.com/rust-lang/crates.io-index)",
"indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"nom 5.0.0-beta1 (registry+https://github.com/rust-lang/crates.io-index)", "nom 5.0.0-beta1 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -28,6 +28,7 @@ ordered-float = "1.0.2"
prettyprint = "0.6.0" prettyprint = "0.6.0"
cursive = { version = "0.12.0", features = ["pancurses-backend"], default-features = false } cursive = { version = "0.12.0", features = ["pancurses-backend"], default-features = false }
futures-preview = "0.3.0-alpha.16" futures-preview = "0.3.0-alpha.16"
futures-sink-preview = "0.3.0-alpha.16"
[dependencies.pancurses] [dependencies.pancurses]
version = "0.16" version = "0.16"

View File

@ -49,12 +49,13 @@ pub async fn cli() -> Result<(), Box<Error>> {
use crate::commands::*; use crate::commands::*;
context.add_commands(vec![ context.add_commands(vec![
("format-list", Arc::new(format_list)),
("ps", Arc::new(ps::ps)), ("ps", Arc::new(ps::ps)),
("ls", Arc::new(ls::ls)), ("ls", Arc::new(ls::ls)),
("cd", Arc::new(cd::cd)), ("cd", Arc::new(cd::cd)),
("view", Arc::new(view::view)), ("view", Arc::new(view::view)),
("skip", Arc::new(skip::skip)), ("skip", Arc::new(skip::skip)),
("take", Arc::new(take::take)), ("first", Arc::new(take::take)),
("select", Arc::new(select::select)), ("select", Arc::new(select::select)),
("reject", Arc::new(reject::reject)), ("reject", Arc::new(reject::reject)),
("to-array", Arc::new(to_array::to_array)), ("to-array", Arc::new(to_array::to_array)),
@ -105,6 +106,30 @@ enum LineResult {
FatalError(ShellError), FatalError(ShellError),
} }
impl std::ops::Try for LineResult {
type Ok = Option<String>;
type Error = ShellError;
fn into_result(self) -> Result<Option<String>, ShellError> {
match self {
LineResult::Success(s) => Ok(Some(s)),
LineResult::Error(s) => Err(ShellError::string(s)),
LineResult::Break => Ok(None),
LineResult::FatalError(err) => Err(err),
}
}
fn from_error(v: ShellError) -> Self {
LineResult::Error(v.to_string())
}
fn from_ok(v: Option<String>) -> Self {
match v {
None => LineResult::Break,
Some(v) => LineResult::Success(v),
}
}
}
async fn process_line(readline: Result<String, ReadlineError>, ctx: &mut Context) -> LineResult { async fn process_line(readline: Result<String, ReadlineError>, ctx: &mut Context) -> LineResult {
match &readline { match &readline {
Ok(line) if line.trim() == "exit" => LineResult::Break, Ok(line) if line.trim() == "exit" => LineResult::Break,
@ -137,10 +162,12 @@ async fn process_line(readline: Result<String, ReadlineError>, ctx: &mut Context
if equal_shapes(&input_vec) { if equal_shapes(&input_vec) {
let array = crate::commands::stream_to_array(input_vec.boxed()).await; let array = crate::commands::stream_to_array(input_vec.boxed()).await;
let args = CommandArgs::from_context(ctx, vec![], array); let args = CommandArgs::from_context(ctx, vec![], array);
format(args).await; let mut result = format(args);
let mut vec = vec![];
vec.send_all(&mut result).await?;
} else { } else {
let args = CommandArgs::from_context(ctx, vec![], input_vec.boxed()); let args = CommandArgs::from_context(ctx, vec![], input_vec.boxed());
format(args).await; format(args).collect::<Vec<_>>().await;
} }
} }
@ -197,26 +224,57 @@ fn classify_command(
} }
} }
async fn format(args: CommandArgs) -> OutputStream { crate fn format(args: CommandArgs) -> OutputStream {
let input: Vec<_> = args.input.collect().await; let host = args.host.clone();
let input = args.input.map(|a| a.copy());
let input = input.collect::<Vec<_>>();
input
.then(move |input| {
let last = input.len() - 1; let last = input.len() - 1;
let mut host = host.lock().unwrap();
for (i, item) in input.iter().enumerate() { for (i, item) in input.iter().enumerate() {
let view = GenericView::new(item); let view = GenericView::new(item);
crate::format::print_view(&view, &mut *args.host.lock().unwrap()); crate::format::print_view(&view, &mut *host);
if last != i { if last != i {
println!(""); println!("");
} }
} }
empty_stream() futures::future::ready(empty_stream())
})
.flatten_stream()
.boxed()
} }
async fn format_list(args: CommandArgs) -> OutputStream { crate fn format_list(args: CommandArgs) -> Result<OutputStream, ShellError> {
let view = EntriesListView::from_stream(args.input).await; let host = args.host.clone();
crate::format::print_view(&view, &mut *args.host.lock().unwrap()); // let input = args.input.map(|a| a.copy());
// let input = input.collect::<Vec<_>>();
empty_stream() let view = EntriesListView::from_stream(args.input);
Ok(view
.then(move |view| {
crate::format::print_view(&view, &mut *host.lock().unwrap());
futures::future::ready(empty_stream())
})
.flatten_stream()
.boxed())
// Ok(empty_stream())
// Ok(args
// .input
// .map(|input| {
// let view = EntriesListView::from_stream(input);
// crate::format::print_view(&view, &mut *args.host.lock().unwrap());
// })
// .collect()
// .then(|_| empty_stream())
// .flatten_stream()
// .boxed())
} }
fn equal_shapes(input: &VecDeque<Value>) -> bool { fn equal_shapes(input: &VecDeque<Value>) -> bool {

View File

@ -4,7 +4,7 @@ use crate::prelude::*;
use std::path::PathBuf; use std::path::PathBuf;
pub struct CommandArgs { pub struct CommandArgs {
pub host: Arc<Mutex<dyn Host>>, pub host: Arc<Mutex<dyn Host + Send>>,
pub env: Arc<Mutex<Environment>>, pub env: Arc<Mutex<Environment>>,
pub args: Vec<Value>, pub args: Vec<Value>,
pub input: InputStream, pub input: InputStream,
@ -37,12 +37,6 @@ pub enum ReturnValue {
} }
impl ReturnValue { impl ReturnValue {
crate fn single(value: Value) -> VecDeque<ReturnValue> {
let mut v = VecDeque::new();
v.push_back(ReturnValue::Value(value));
v
}
crate fn change_cwd(path: PathBuf) -> ReturnValue { crate fn change_cwd(path: PathBuf) -> ReturnValue {
ReturnValue::Action(CommandAction::ChangeCwd(path)) ReturnValue::Action(CommandAction::ChangeCwd(path))
} }

View File

@ -1,6 +1,5 @@
use crate::errors::ShellError; use crate::errors::ShellError;
use crate::prelude::*; use crate::prelude::*;
use futures::stream::BoxStream;
pub fn sort_by(args: CommandArgs) -> Result<OutputStream, ShellError> { pub fn sort_by(args: CommandArgs) -> Result<OutputStream, ShellError> {
let fields: Result<Vec<_>, _> = args.args.iter().map(|a| a.as_string()).collect(); let fields: Result<Vec<_>, _> = args.args.iter().map(|a| a.as_string()).collect();

View File

@ -5,7 +5,7 @@ use std::sync::Arc;
pub struct Context { pub struct Context {
commands: indexmap::IndexMap<String, Arc<dyn Command>>, commands: indexmap::IndexMap<String, Arc<dyn Command>>,
crate host: Arc<Mutex<dyn Host>>, crate host: Arc<Mutex<dyn Host + Send>>,
crate env: Arc<Mutex<Environment>>, crate env: Arc<Mutex<Environment>>,
} }

10
src/env/host.rs vendored
View File

@ -17,10 +17,16 @@ crate struct BasicHost;
impl Host for BasicHost { impl Host for BasicHost {
fn stdout(&mut self, out: &str) { fn stdout(&mut self, out: &str) {
println!("{}", out) match out {
"\n" => println!(""),
other => println!("{}", other),
}
} }
fn stderr(&mut self, out: &str) { fn stderr(&mut self, out: &str) {
eprintln!("{}", out) match out {
"\n" => eprintln!(""),
other => eprintln!("{}", other),
}
} }
} }

View File

@ -42,3 +42,12 @@ impl std::convert::From<std::io::Error> for ShellError {
} }
} }
} }
impl std::convert::From<futures_sink::VecSinkError> for ShellError {
fn from(_input: futures_sink::VecSinkError) -> ShellError {
ShellError {
title: format!("Unexpected Vec Sink Error"),
error: Value::nothing(),
}
}
}

View File

@ -1,6 +1,7 @@
#![feature(crate_visibility_modifier)] #![feature(crate_visibility_modifier)]
#![feature(in_band_lifetimes)] #![feature(in_band_lifetimes)]
#![feature(async_await)] #![feature(async_await)]
#![feature(try_trait)]
mod cli; mod cli;
mod commands; mod commands;
@ -17,6 +18,6 @@ mod stream;
use std::error::Error; use std::error::Error;
fn main() -> Result<(), Box<Error>> { fn main() -> Result<(), Box<Error>> {
futures::executor::block_on(crate::cli::cli()); futures::executor::block_on(crate::cli::cli())?;
Ok(()) Ok(())
} }

View File

@ -7,7 +7,7 @@ crate use crate::object::{Primitive, Value};
#[allow(unused)] #[allow(unused)]
crate use crate::stream::{empty_stream, single_output, InputStream, OutputStream}; crate use crate::stream::{empty_stream, single_output, InputStream, OutputStream};
#[allow(unused)] #[allow(unused)]
crate use futures::{Future, FutureExt, Stream, StreamExt}; crate use futures::{Future, FutureExt, Sink, SinkExt, Stream, StreamExt};
crate use std::collections::VecDeque; crate use std::collections::VecDeque;
crate use std::pin::Pin; crate use std::pin::Pin;
crate use std::sync::{Arc, Mutex}; crate use std::sync::{Arc, Mutex};