From 625a35636176baeb5c98dfcda6feaf2ec6d81dc5 Mon Sep 17 00:00:00 2001 From: Yehuda Katz Date: Thu, 23 May 2019 00:23:06 -0700 Subject: [PATCH] Data flows across commands via streams now --- rustfmt.toml | 1 + src/cli.rs | 71 +++++++++++++++++++------------------- src/commands/cd.rs | 6 ++-- src/commands/classified.rs | 33 +++++++++--------- src/commands/command.rs | 24 ++++++------- src/commands/ls.rs | 6 ++-- src/commands/ps.rs | 4 +-- src/commands/reject.rs | 12 +++---- src/commands/select.rs | 11 +++--- src/commands/skip.rs | 18 +++------- src/commands/sort_by.rs | 29 +++++++++------- src/commands/take.rs | 18 +++------- src/commands/to_array.rs | 16 +++++---- src/commands/view.rs | 6 ++-- src/commands/where_.rs | 23 ++++++------ src/context.rs | 16 ++++----- src/format/entries.rs | 6 ++-- src/main.rs | 4 ++- src/parser/parse.rs | 2 +- src/prelude.rs | 6 ++++ src/stream.rs | 16 +++++++++ 21 files changed, 171 insertions(+), 157 deletions(-) create mode 100644 rustfmt.toml diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000000..c51666e8f4 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +edition = "2018" \ No newline at end of file diff --git a/src/cli.rs b/src/cli.rs index bc16001036..afa320e3c8 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,12 +1,11 @@ use crate::prelude::*; use crate::commands::classified::{ClassifiedCommand, ExternalCommand, InternalCommand}; -use crate::commands::command::ReturnValue; use crate::context::Context; -crate use crate::env::Host; crate use crate::errors::ShellError; crate use crate::format::{EntriesListView, GenericView}; use crate::object::Value; +use crate::stream::empty_stream; use rustyline::error::ReadlineError; use rustyline::{self, ColorMode, Config, Editor}; @@ -29,7 +28,7 @@ impl MaybeOwned<'a, T> { } } -pub fn cli() -> Result<(), Box> { +pub async fn cli() -> Result<(), Box> { let config = Config::builder().color_mode(ColorMode::Forced).build(); let h = crate::shell::Helper::new(); let mut rl: Editor = Editor::with_config(config); @@ -50,8 +49,6 @@ pub fn cli() -> Result<(), Box> { use crate::commands::*; context.add_commands(vec![ - ("format", Arc::new(format)), - ("format-list", Arc::new(format_list)), ("ps", Arc::new(ps::ps)), ("ls", Arc::new(ls::ls)), ("cd", Arc::new(cd::cd)), @@ -67,15 +64,18 @@ pub fn cli() -> Result<(), Box> { } loop { - let readline = rl.readline(&format!("{}> ", context.env.cwd().display().to_string())); + let readline = rl.readline(&format!( + "{}> ", + context.env.lock().unwrap().cwd().display().to_string() + )); - match process_line(readline, &mut context) { + match process_line(readline, &mut context).await { LineResult::Success(line) => { rl.add_history_entry(line.clone()); } LineResult::Error(err) => { - context.host.stdout(&err); + context.host.lock().unwrap().stdout(&err); } LineResult::Break => { @@ -85,6 +85,8 @@ pub fn cli() -> Result<(), Box> { LineResult::FatalError(err) => { context .host + .lock() + .unwrap() .stdout(&format!("A surprising fatal error occurred.\n{:?}", err)); } } @@ -103,7 +105,7 @@ enum LineResult { FatalError(ShellError), } -fn process_line(readline: Result, ctx: &mut Context) -> LineResult { +async fn process_line(readline: Result, ctx: &mut Context) -> LineResult { match &readline { Ok(line) if line.trim() == "exit" => LineResult::Break, @@ -120,29 +122,25 @@ fn process_line(readline: Result, ctx: &mut Context) -> L let parsed = result.1; - let mut input = VecDeque::new(); + let mut input: InputStream = VecDeque::new().boxed(); for item in parsed { - input = match process_command(item.clone(), input, ctx) { + input = match process_command(item.clone(), input, ctx).await { Ok(val) => val, Err(err) => return LineResult::Error(format!("{}", err.description())), }; } - if input.len() > 0 { - if equal_shapes(&input) { - let array = crate::commands::stream_to_array(input); + let input_vec: VecDeque<_> = input.collect().await; + + if input_vec.len() > 0 { + if equal_shapes(&input_vec) { + let array = crate::commands::stream_to_array(input_vec.boxed()).await; let args = CommandArgs::from_context(ctx, vec![], array); - match format(args) { - Ok(_) => {} - Err(err) => return LineResult::Error(err.to_string()), - } + format(args).await; } else { - let args = CommandArgs::from_context(ctx, vec![], input); - match format(args) { - Ok(_) => {} - Err(err) => return LineResult::Error(err.to_string()), - } + let args = CommandArgs::from_context(ctx, vec![], input_vec.boxed()); + format(args).await; } } @@ -163,14 +161,14 @@ fn process_line(readline: Result, ctx: &mut Context) -> L } } -fn process_command( +async fn process_command( parsed: Vec, - input: VecDeque, + input: InputStream, context: &mut Context, -) -> Result, ShellError> { +) -> Result { let command = classify_command(&parsed, context)?; - command.run(input, context) + command.run(input, context).await } fn classify_command( @@ -199,25 +197,26 @@ fn classify_command( } } -fn format(args: CommandArgs<'caller>) -> Result, ShellError> { - let last = args.input.len() - 1; - for (i, item) in args.input.iter().enumerate() { +async fn format(args: CommandArgs) -> OutputStream { + let input: Vec<_> = args.input.collect().await; + let last = input.len() - 1; + for (i, item) in input.iter().enumerate() { let view = GenericView::new(item); - crate::format::print_view(&view, args.host); + crate::format::print_view(&view, &mut *args.host.lock().unwrap()); if last != i { println!(""); } } - Ok(VecDeque::new()) + empty_stream() } -fn format_list(args: CommandArgs<'caller>) -> Result, ShellError> { - let view = EntriesListView::from_stream(args.input); - crate::format::print_view(&view, args.host); +async fn format_list(args: CommandArgs) -> OutputStream { + let view = EntriesListView::from_stream(args.input).await; + crate::format::print_view(&view, &mut *args.host.lock().unwrap()); - Ok(VecDeque::new()) + empty_stream() } fn equal_shapes(input: &VecDeque) -> bool { diff --git a/src/commands/cd.rs b/src/commands/cd.rs index a4f6523427..ab2f59b314 100644 --- a/src/commands/cd.rs +++ b/src/commands/cd.rs @@ -2,18 +2,18 @@ use crate::errors::ShellError; use crate::prelude::*; use std::env; -pub fn cd(args: CommandArgs<'caller>) -> Result, ShellError> { +pub fn cd(args: CommandArgs) -> Result { let target = match args.args.first() { // TODO: This needs better infra None => return Err(ShellError::string(format!("cd must take one arg"))), Some(v) => v.as_string()?.clone(), }; - let cwd = args.env.cwd().to_path_buf(); + let cwd = args.env.lock().unwrap().cwd().to_path_buf(); let mut stream = VecDeque::new(); let path = dunce::canonicalize(cwd.join(&target).as_path())?; let _ = env::set_current_dir(&path); stream.push_back(ReturnValue::change_cwd(path)); - Ok(stream) + Ok(stream.boxed()) } diff --git a/src/commands/classified.rs b/src/commands/classified.rs index 36a23ce6dd..a528b9bd34 100644 --- a/src/commands/classified.rs +++ b/src/commands/classified.rs @@ -8,37 +8,38 @@ crate enum ClassifiedCommand { } impl ClassifiedCommand { - crate fn run( + crate async fn run( self, - input: VecDeque, + input: InputStream, context: &mut Context, - ) -> Result, ShellError> { + ) -> Result { match self { ClassifiedCommand::Internal(internal) => { let result = context.run_command(internal.command, internal.args, input)?; + let env = context.env.clone(); - let mut next = VecDeque::new(); + let stream = result.filter_map(move |v| match v { + ReturnValue::Action(action) => match action { + CommandAction::ChangeCwd(cwd) => { + env.lock().unwrap().cwd = cwd; + futures::future::ready(None) + } + }, - for v in result { - match v { - ReturnValue::Action(action) => match action { - CommandAction::ChangeCwd(cwd) => context.env.cwd = cwd, - }, + ReturnValue::Value(v) => futures::future::ready(Some(v)), + }); - ReturnValue::Value(v) => next.push_back(v), - } - } - - Ok(next) + Ok(stream.boxed() as InputStream) } ClassifiedCommand::External(external) => { Exec::shell(&external.name) .args(&external.args) - .cwd(context.env.cwd()) + .cwd(context.env.lock().unwrap().cwd()) .join() .unwrap(); - Ok(VecDeque::new()) + + Ok(VecDeque::new().boxed() as InputStream) } } } diff --git a/src/commands/command.rs b/src/commands/command.rs index d48adab03e..71479ceb0f 100644 --- a/src/commands/command.rs +++ b/src/commands/command.rs @@ -3,22 +3,22 @@ use crate::object::Value; use crate::prelude::*; use std::path::PathBuf; -pub struct CommandArgs<'caller> { - pub host: &'caller mut dyn Host, - pub env: &'caller Environment, +pub struct CommandArgs { + pub host: Arc>, + pub env: Arc>, pub args: Vec, - pub input: VecDeque, + pub input: InputStream, } -impl CommandArgs<'caller> { +impl CommandArgs { crate fn from_context( ctx: &'caller mut Context, args: Vec, - input: VecDeque, - ) -> CommandArgs<'caller> { + input: InputStream, + ) -> CommandArgs { CommandArgs { - host: &mut ctx.host, - env: &ctx.env, + host: ctx.host.clone(), + env: ctx.env.clone(), args, input, } @@ -49,14 +49,14 @@ impl ReturnValue { } pub trait Command { - fn run(&self, args: CommandArgs<'caller>) -> Result, ShellError>; + fn run(&self, args: CommandArgs) -> Result; } impl Command for F where - F: Fn(CommandArgs<'_>) -> Result, ShellError>, + F: Fn(CommandArgs) -> Result, { - fn run(&self, args: CommandArgs<'caller>) -> Result, ShellError> { + fn run(&self, args: CommandArgs) -> Result { self(args) } } diff --git a/src/commands/ls.rs b/src/commands/ls.rs index b591bdc02b..8b07c90af4 100644 --- a/src/commands/ls.rs +++ b/src/commands/ls.rs @@ -2,8 +2,8 @@ use crate::errors::ShellError; use crate::object::{dir_entry_dict, Value}; use crate::prelude::*; -pub fn ls(args: CommandArgs<'value>) -> Result, ShellError> { - let cwd = args.env.cwd().to_path_buf(); +pub fn ls(args: CommandArgs) -> Result { + let cwd = args.env.lock().unwrap().cwd().to_path_buf(); let entries = std::fs::read_dir(&cwd).map_err(|e| ShellError::string(format!("{:?}", e)))?; @@ -14,5 +14,5 @@ pub fn ls(args: CommandArgs<'value>) -> Result, ShellError shell_entries.push_back(ReturnValue::Value(value)) } - Ok(shell_entries) + Ok(shell_entries.boxed()) } diff --git a/src/commands/ps.rs b/src/commands/ps.rs index 8e34dab582..dcd61b9b2e 100644 --- a/src/commands/ps.rs +++ b/src/commands/ps.rs @@ -4,7 +4,7 @@ use crate::object::Value; use crate::prelude::*; use sysinfo::SystemExt; -pub fn ps(_args: CommandArgs<'caller>) -> Result, ShellError> { +pub fn ps(_args: CommandArgs) -> Result { let mut system = sysinfo::System::new(); system.refresh_all(); @@ -15,5 +15,5 @@ pub fn ps(_args: CommandArgs<'caller>) -> Result, ShellErr .map(|(_, process)| ReturnValue::Value(Value::Object(process_dict(process)))) .collect::>(); - Ok(list) + Ok(list.boxed()) } diff --git a/src/commands/reject.rs b/src/commands/reject.rs index c7ab2b0142..ef8313ff7b 100644 --- a/src/commands/reject.rs +++ b/src/commands/reject.rs @@ -3,7 +3,7 @@ use crate::object::base::reject_fields; use crate::object::Value; use crate::prelude::*; -pub fn reject(args: CommandArgs<'value>) -> Result, ShellError> { +pub fn reject(args: CommandArgs) -> Result { if args.args.is_empty() { return Err(ShellError::string("select requires a field")); } @@ -11,12 +11,10 @@ pub fn reject(args: CommandArgs<'value>) -> Result, ShellE let fields: Result, _> = args.args.iter().map(|a| a.as_string()).collect(); let fields = fields?; - let objects = args + let stream = args .input - .iter() - .map(|item| Value::Object(reject_fields(item, &fields))) - .map(|item| ReturnValue::Value(item)) - .collect(); + .map(move |item| Value::Object(reject_fields(&item, &fields))) + .map(|item| ReturnValue::Value(item)); - Ok(objects) + Ok(stream.boxed()) } diff --git a/src/commands/select.rs b/src/commands/select.rs index 6e346ed567..3703c6217a 100644 --- a/src/commands/select.rs +++ b/src/commands/select.rs @@ -3,7 +3,7 @@ use crate::object::base::select_fields; use crate::object::Value; use crate::prelude::*; -pub fn select(args: CommandArgs<'caller>) -> Result, ShellError> { +pub fn select(args: CommandArgs) -> Result { if args.args.is_empty() { return Err(ShellError::string("select requires a field")); } @@ -13,10 +13,9 @@ pub fn select(args: CommandArgs<'caller>) -> Result, Shell let objects = args .input - .iter() - .map(|item| Value::Object(select_fields(item, &fields))) - .map(|item| ReturnValue::Value(item)) - .collect(); + .map(move |item| Value::Object(select_fields(&item, &fields))) + .map(|item| ReturnValue::Value(item)); - Ok(objects) + let stream = Pin::new(Box::new(objects)); + Ok(stream) } diff --git a/src/commands/skip.rs b/src/commands/skip.rs index f1059adce7..0ff5ca1267 100644 --- a/src/commands/skip.rs +++ b/src/commands/skip.rs @@ -1,21 +1,13 @@ use crate::errors::ShellError; use crate::prelude::*; -pub fn skip(args: CommandArgs<'caller>) -> Result, ShellError> { +pub fn skip(args: CommandArgs) -> Result { let amount = args.args[0].as_int()?; - let amount = if args.input.len() > amount as usize { - amount as usize - } else { - args.input.len() - }; + let input = args.input; - let out: VecDeque = args - .input - .into_iter() - .skip(amount) + Ok(input + .skip(amount as u64) .map(|v| ReturnValue::Value(v)) - .collect(); - - Ok(out) + .boxed()) } diff --git a/src/commands/sort_by.rs b/src/commands/sort_by.rs index 0f289fdfdf..13663d88a1 100644 --- a/src/commands/sort_by.rs +++ b/src/commands/sort_by.rs @@ -1,23 +1,26 @@ use crate::errors::ShellError; use crate::prelude::*; +use futures::stream::BoxStream; -pub fn sort_by(args: CommandArgs<'caller>) -> Result, ShellError> { +pub fn sort_by(args: CommandArgs) -> Result { let fields: Result, _> = args.args.iter().map(|a| a.as_string()).collect(); let fields = fields?; - let mut output = args.input.into_iter().collect::>(); + let output = args.input.collect::>(); - output.sort_by_key(|item| { - fields - .iter() - .map(|f| item.get_data_by_key(f).borrow().copy()) - .collect::>() + let output = output.map(move |mut vec| { + vec.sort_by_key(|item| { + fields + .iter() + .map(|f| item.get_data_by_key(f).borrow().copy()) + .collect::>() + }); + + vec.into_iter() + .map(|v| ReturnValue::Value(v.copy())) + .collect::>() + .boxed() }); - let output = output - .iter() - .map(|o| ReturnValue::Value(o.copy())) - .collect(); - - Ok(output) + Ok(output.flatten_stream().boxed()) } diff --git a/src/commands/take.rs b/src/commands/take.rs index 540ec4204d..2352f16b2f 100644 --- a/src/commands/take.rs +++ b/src/commands/take.rs @@ -3,21 +3,13 @@ use crate::prelude::*; // TODO: "Amount remaining" wrapper -pub fn take(args: CommandArgs<'caller>) -> Result, ShellError> { +pub fn take(args: CommandArgs) -> Result { let amount = args.args[0].as_int()?; - let amount = if args.input.len() > amount as usize { - amount as usize - } else { - args.input.len() - }; + let input = args.input; - let out: VecDeque = args - .input - .into_iter() - .take(amount) + Ok(input + .take(amount as u64) .map(|v| ReturnValue::Value(v)) - .collect(); - - Ok(out) + .boxed()) } diff --git a/src/commands/to_array.rs b/src/commands/to_array.rs index 6ae59714d5..93a0272973 100644 --- a/src/commands/to_array.rs +++ b/src/commands/to_array.rs @@ -1,15 +1,17 @@ -use crate::errors::ShellError; use crate::object::Value; use crate::prelude::*; -pub fn to_array(args: CommandArgs<'caller>) -> Result, ShellError> { - let out = args.input.into_iter().collect(); - Ok(ReturnValue::single(Value::List(out))) +pub fn to_array(args: CommandArgs) -> Result { + let out = args.input.collect(); + Ok(out + .map(|vec: Vec<_>| single_output(Value::List(vec))) + .flatten_stream() + .boxed()) } -crate fn stream_to_array(stream: VecDeque) -> VecDeque { - let out = Value::List(stream.into_iter().collect()); +crate async fn stream_to_array(stream: InputStream) -> InputStream { + let out = Value::List(stream.collect().await); let mut stream = VecDeque::new(); stream.push_back(out); - stream + stream.boxed() as InputStream } diff --git a/src/commands/view.rs b/src/commands/view.rs index 728fa93623..0c6b5526a5 100644 --- a/src/commands/view.rs +++ b/src/commands/view.rs @@ -2,14 +2,14 @@ use crate::errors::ShellError; use crate::prelude::*; use prettyprint::PrettyPrinter; -pub fn view(args: CommandArgs<'caller>) -> Result, ShellError> { +pub fn view(args: CommandArgs) -> Result { let target = match args.args.first() { // TODO: This needs better infra None => return Err(ShellError::string(format!("cat must take one arg"))), Some(v) => v.as_string()?.clone(), }; - let cwd = args.env.cwd().to_path_buf(); + let cwd = args.env.lock().unwrap().cwd().to_path_buf(); let printer = PrettyPrinter::default() .line_numbers(false) @@ -22,5 +22,5 @@ pub fn view(args: CommandArgs<'caller>) -> Result, ShellEr let _ = printer.file(file.display().to_string()); - Ok(VecDeque::new()) + Ok(VecDeque::new().boxed()) } diff --git a/src/commands/where_.rs b/src/commands/where_.rs index 17f53e61bc..3db875f756 100644 --- a/src/commands/where_.rs +++ b/src/commands/where_.rs @@ -2,26 +2,27 @@ use crate::errors::ShellError; use crate::object::base::find; use crate::prelude::*; -pub fn r#where(args: CommandArgs<'caller>) -> Result, ShellError> { +pub fn r#where(args: CommandArgs) -> Result { if args.args.is_empty() { return Err(ShellError::string("select requires a field")); } let field: Result = args.args[0].as_string(); let field = field?; + let input = args.input; + let operator = args.args[1].copy(); - match args.args[1] { - Value::Primitive(Primitive::Operator(ref operator)) => { - let objects = args - .input - .iter() - .filter(|item| find(&item, &field, operator, &args.args[2])) - .map(|item| ReturnValue::Value(item.copy())) - .collect(); + match operator { + Value::Primitive(Primitive::Operator(operator)) => { + let right = args.args[2].copy(); - Ok(objects) + let objects = input + .filter(move |item| futures::future::ready(find(&item, &field, &operator, &right))) + .map(|item| ReturnValue::Value(item.copy())); + + Ok(objects.boxed()) } - ref x => { + x => { println!("{:?}", x); Err(ShellError::string("expected a comparison operator")) } diff --git a/src/context.rs b/src/context.rs index bcbb30afcc..6f49f6b279 100644 --- a/src/context.rs +++ b/src/context.rs @@ -5,16 +5,16 @@ use std::sync::Arc; pub struct Context { commands: indexmap::IndexMap>, - crate host: Box, - crate env: Environment, + crate host: Arc>, + crate env: Arc>, } impl Context { crate fn basic() -> Result> { Ok(Context { commands: indexmap::IndexMap::new(), - host: Box::new(crate::env::host::BasicHost), - env: Environment::basic()?, + host: Arc::new(Mutex::new(crate::env::host::BasicHost)), + env: Arc::new(Mutex::new(Environment::basic()?)), }) } @@ -36,11 +36,11 @@ impl Context { &mut self, command: Arc, arg_list: Vec, - input: VecDeque, - ) -> Result, ShellError> { + input: InputStream, + ) -> Result { let command_args = CommandArgs { - host: &mut self.host, - env: &self.env, + host: self.host.clone(), + env: self.env.clone(), args: arg_list, input, }; diff --git a/src/format/entries.rs b/src/format/entries.rs index 02845c0854..c619f98b15 100644 --- a/src/format/entries.rs +++ b/src/format/entries.rs @@ -50,8 +50,10 @@ pub struct EntriesListView { } impl EntriesListView { - crate fn from_stream(values: VecDeque) -> EntriesListView { - EntriesListView { values } + crate async fn from_stream(values: InputStream) -> EntriesListView { + EntriesListView { + values: values.collect().await, + } } } diff --git a/src/main.rs b/src/main.rs index 6cf266b286..3f08472912 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ #![feature(crate_visibility_modifier)] #![feature(in_band_lifetimes)] +#![feature(async_await)] mod cli; mod commands; @@ -16,5 +17,6 @@ mod stream; use std::error::Error; fn main() -> Result<(), Box> { - crate::cli::cli() + futures::executor::block_on(crate::cli::cli()); + Ok(()) } diff --git a/src/parser/parse.rs b/src/parser/parse.rs index 6ad5807a0d..c379eab6d4 100644 --- a/src/parser/parse.rs +++ b/src/parser/parse.rs @@ -17,7 +17,7 @@ pub enum Item { Operator(Operator), } -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub enum Operator { Equal, NotEqual, diff --git a/src/prelude.rs b/src/prelude.rs index 0480c26932..ebf084ad9f 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -4,4 +4,10 @@ crate use crate::context::Context; crate use crate::env::{Environment, Host}; crate use crate::errors::ShellError; crate use crate::object::{Primitive, Value}; +#[allow(unused)] +crate use crate::stream::{empty_stream, single_output, InputStream, OutputStream}; +#[allow(unused)] +crate use futures::{Future, FutureExt, Stream, StreamExt}; crate use std::collections::VecDeque; +crate use std::pin::Pin; +crate use std::sync::{Arc, Mutex}; diff --git a/src/stream.rs b/src/stream.rs index e69de29bb2..dc4def8cba 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -0,0 +1,16 @@ +use crate::prelude::*; +use futures::stream::BoxStream; + +pub type InputStream = BoxStream<'static, Value>; +pub type OutputStream = BoxStream<'static, ReturnValue>; + +crate fn empty_stream() -> OutputStream { + VecDeque::new().boxed() +} + +crate fn single_output(item: Value) -> OutputStream { + let value = ReturnValue::Value(item); + let mut vec = VecDeque::new(); + vec.push_back(value); + vec.boxed() +}