Data flows across commands via streams now

This commit is contained in:
Yehuda Katz 2019-05-23 00:23:06 -07:00
parent 31dd579d6f
commit 625a356361
21 changed files with 171 additions and 157 deletions

1
rustfmt.toml Normal file
View File

@ -0,0 +1 @@
edition = "2018"

View File

@ -1,12 +1,11 @@
use crate::prelude::*;
use crate::commands::classified::{ClassifiedCommand, ExternalCommand, InternalCommand};
use crate::commands::command::ReturnValue;
use crate::context::Context;
crate use crate::env::Host;
crate use crate::errors::ShellError;
crate use crate::format::{EntriesListView, GenericView};
use crate::object::Value;
use crate::stream::empty_stream;
use rustyline::error::ReadlineError;
use rustyline::{self, ColorMode, Config, Editor};
@ -29,7 +28,7 @@ impl<T> MaybeOwned<'a, T> {
}
}
pub fn cli() -> Result<(), Box<Error>> {
pub async fn cli() -> Result<(), Box<Error>> {
let config = Config::builder().color_mode(ColorMode::Forced).build();
let h = crate::shell::Helper::new();
let mut rl: Editor<crate::shell::Helper> = Editor::with_config(config);
@ -50,8 +49,6 @@ pub fn cli() -> Result<(), Box<Error>> {
use crate::commands::*;
context.add_commands(vec![
("format", Arc::new(format)),
("format-list", Arc::new(format_list)),
("ps", Arc::new(ps::ps)),
("ls", Arc::new(ls::ls)),
("cd", Arc::new(cd::cd)),
@ -67,15 +64,18 @@ pub fn cli() -> Result<(), Box<Error>> {
}
loop {
let readline = rl.readline(&format!("{}> ", context.env.cwd().display().to_string()));
let readline = rl.readline(&format!(
"{}> ",
context.env.lock().unwrap().cwd().display().to_string()
));
match process_line(readline, &mut context) {
match process_line(readline, &mut context).await {
LineResult::Success(line) => {
rl.add_history_entry(line.clone());
}
LineResult::Error(err) => {
context.host.stdout(&err);
context.host.lock().unwrap().stdout(&err);
}
LineResult::Break => {
@ -85,6 +85,8 @@ pub fn cli() -> Result<(), Box<Error>> {
LineResult::FatalError(err) => {
context
.host
.lock()
.unwrap()
.stdout(&format!("A surprising fatal error occurred.\n{:?}", err));
}
}
@ -103,7 +105,7 @@ enum LineResult {
FatalError(ShellError),
}
fn process_line(readline: Result<String, ReadlineError>, ctx: &mut Context) -> LineResult {
async fn process_line(readline: Result<String, ReadlineError>, ctx: &mut Context) -> LineResult {
match &readline {
Ok(line) if line.trim() == "exit" => LineResult::Break,
@ -120,29 +122,25 @@ fn process_line(readline: Result<String, ReadlineError>, ctx: &mut Context) -> L
let parsed = result.1;
let mut input = VecDeque::new();
let mut input: InputStream = VecDeque::new().boxed();
for item in parsed {
input = match process_command(item.clone(), input, ctx) {
input = match process_command(item.clone(), input, ctx).await {
Ok(val) => val,
Err(err) => return LineResult::Error(format!("{}", err.description())),
};
}
if input.len() > 0 {
if equal_shapes(&input) {
let array = crate::commands::stream_to_array(input);
let input_vec: VecDeque<_> = input.collect().await;
if input_vec.len() > 0 {
if equal_shapes(&input_vec) {
let array = crate::commands::stream_to_array(input_vec.boxed()).await;
let args = CommandArgs::from_context(ctx, vec![], array);
match format(args) {
Ok(_) => {}
Err(err) => return LineResult::Error(err.to_string()),
}
format(args).await;
} else {
let args = CommandArgs::from_context(ctx, vec![], input);
match format(args) {
Ok(_) => {}
Err(err) => return LineResult::Error(err.to_string()),
}
let args = CommandArgs::from_context(ctx, vec![], input_vec.boxed());
format(args).await;
}
}
@ -163,14 +161,14 @@ fn process_line(readline: Result<String, ReadlineError>, ctx: &mut Context) -> L
}
}
fn process_command(
async fn process_command(
parsed: Vec<crate::parser::Item>,
input: VecDeque<Value>,
input: InputStream,
context: &mut Context,
) -> Result<VecDeque<Value>, ShellError> {
) -> Result<InputStream, ShellError> {
let command = classify_command(&parsed, context)?;
command.run(input, context)
command.run(input, context).await
}
fn classify_command(
@ -199,25 +197,26 @@ fn classify_command(
}
}
fn format(args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellError> {
let last = args.input.len() - 1;
for (i, item) in args.input.iter().enumerate() {
async fn format(args: CommandArgs) -> OutputStream {
let input: Vec<_> = args.input.collect().await;
let last = input.len() - 1;
for (i, item) in input.iter().enumerate() {
let view = GenericView::new(item);
crate::format::print_view(&view, args.host);
crate::format::print_view(&view, &mut *args.host.lock().unwrap());
if last != i {
println!("");
}
}
Ok(VecDeque::new())
empty_stream()
}
fn format_list(args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellError> {
let view = EntriesListView::from_stream(args.input);
crate::format::print_view(&view, args.host);
async fn format_list(args: CommandArgs) -> OutputStream {
let view = EntriesListView::from_stream(args.input).await;
crate::format::print_view(&view, &mut *args.host.lock().unwrap());
Ok(VecDeque::new())
empty_stream()
}
fn equal_shapes(input: &VecDeque<Value>) -> bool {

View File

@ -2,18 +2,18 @@ use crate::errors::ShellError;
use crate::prelude::*;
use std::env;
pub fn cd(args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellError> {
pub fn cd(args: CommandArgs) -> Result<OutputStream, ShellError> {
let target = match args.args.first() {
// TODO: This needs better infra
None => return Err(ShellError::string(format!("cd must take one arg"))),
Some(v) => v.as_string()?.clone(),
};
let cwd = args.env.cwd().to_path_buf();
let cwd = args.env.lock().unwrap().cwd().to_path_buf();
let mut stream = VecDeque::new();
let path = dunce::canonicalize(cwd.join(&target).as_path())?;
let _ = env::set_current_dir(&path);
stream.push_back(ReturnValue::change_cwd(path));
Ok(stream)
Ok(stream.boxed())
}

View File

@ -8,37 +8,38 @@ crate enum ClassifiedCommand {
}
impl ClassifiedCommand {
crate fn run(
crate async fn run(
self,
input: VecDeque<Value>,
input: InputStream,
context: &mut Context,
) -> Result<VecDeque<Value>, ShellError> {
) -> Result<InputStream, ShellError> {
match self {
ClassifiedCommand::Internal(internal) => {
let result = context.run_command(internal.command, internal.args, input)?;
let env = context.env.clone();
let mut next = VecDeque::new();
for v in result {
match v {
let stream = result.filter_map(move |v| match v {
ReturnValue::Action(action) => match action {
CommandAction::ChangeCwd(cwd) => context.env.cwd = cwd,
CommandAction::ChangeCwd(cwd) => {
env.lock().unwrap().cwd = cwd;
futures::future::ready(None)
}
},
ReturnValue::Value(v) => next.push_back(v),
}
}
ReturnValue::Value(v) => futures::future::ready(Some(v)),
});
Ok(next)
Ok(stream.boxed() as InputStream)
}
ClassifiedCommand::External(external) => {
Exec::shell(&external.name)
.args(&external.args)
.cwd(context.env.cwd())
.cwd(context.env.lock().unwrap().cwd())
.join()
.unwrap();
Ok(VecDeque::new())
Ok(VecDeque::new().boxed() as InputStream)
}
}
}

View File

@ -3,22 +3,22 @@ use crate::object::Value;
use crate::prelude::*;
use std::path::PathBuf;
pub struct CommandArgs<'caller> {
pub host: &'caller mut dyn Host,
pub env: &'caller Environment,
pub struct CommandArgs {
pub host: Arc<Mutex<dyn Host>>,
pub env: Arc<Mutex<Environment>>,
pub args: Vec<Value>,
pub input: VecDeque<Value>,
pub input: InputStream,
}
impl CommandArgs<'caller> {
impl CommandArgs {
crate fn from_context(
ctx: &'caller mut Context,
args: Vec<Value>,
input: VecDeque<Value>,
) -> CommandArgs<'caller> {
input: InputStream,
) -> CommandArgs {
CommandArgs {
host: &mut ctx.host,
env: &ctx.env,
host: ctx.host.clone(),
env: ctx.env.clone(),
args,
input,
}
@ -49,14 +49,14 @@ impl ReturnValue {
}
pub trait Command {
fn run(&self, args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellError>;
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError>;
}
impl<F> Command for F
where
F: Fn(CommandArgs<'_>) -> Result<VecDeque<ReturnValue>, ShellError>,
F: Fn(CommandArgs) -> Result<OutputStream, ShellError>,
{
fn run(&self, args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellError> {
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
self(args)
}
}

View File

@ -2,8 +2,8 @@ use crate::errors::ShellError;
use crate::object::{dir_entry_dict, Value};
use crate::prelude::*;
pub fn ls(args: CommandArgs<'value>) -> Result<VecDeque<ReturnValue>, ShellError> {
let cwd = args.env.cwd().to_path_buf();
pub fn ls(args: CommandArgs) -> Result<OutputStream, ShellError> {
let cwd = args.env.lock().unwrap().cwd().to_path_buf();
let entries = std::fs::read_dir(&cwd).map_err(|e| ShellError::string(format!("{:?}", e)))?;
@ -14,5 +14,5 @@ pub fn ls(args: CommandArgs<'value>) -> Result<VecDeque<ReturnValue>, ShellError
shell_entries.push_back(ReturnValue::Value(value))
}
Ok(shell_entries)
Ok(shell_entries.boxed())
}

View File

@ -4,7 +4,7 @@ use crate::object::Value;
use crate::prelude::*;
use sysinfo::SystemExt;
pub fn ps(_args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellError> {
pub fn ps(_args: CommandArgs) -> Result<OutputStream, ShellError> {
let mut system = sysinfo::System::new();
system.refresh_all();
@ -15,5 +15,5 @@ pub fn ps(_args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellErr
.map(|(_, process)| ReturnValue::Value(Value::Object(process_dict(process))))
.collect::<VecDeque<_>>();
Ok(list)
Ok(list.boxed())
}

View File

@ -3,7 +3,7 @@ use crate::object::base::reject_fields;
use crate::object::Value;
use crate::prelude::*;
pub fn reject(args: CommandArgs<'value>) -> Result<VecDeque<ReturnValue>, ShellError> {
pub fn reject(args: CommandArgs) -> Result<OutputStream, ShellError> {
if args.args.is_empty() {
return Err(ShellError::string("select requires a field"));
}
@ -11,12 +11,10 @@ pub fn reject(args: CommandArgs<'value>) -> Result<VecDeque<ReturnValue>, ShellE
let fields: Result<Vec<String>, _> = args.args.iter().map(|a| a.as_string()).collect();
let fields = fields?;
let objects = args
let stream = args
.input
.iter()
.map(|item| Value::Object(reject_fields(item, &fields)))
.map(|item| ReturnValue::Value(item))
.collect();
.map(move |item| Value::Object(reject_fields(&item, &fields)))
.map(|item| ReturnValue::Value(item));
Ok(objects)
Ok(stream.boxed())
}

View File

@ -3,7 +3,7 @@ use crate::object::base::select_fields;
use crate::object::Value;
use crate::prelude::*;
pub fn select(args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellError> {
pub fn select(args: CommandArgs) -> Result<OutputStream, ShellError> {
if args.args.is_empty() {
return Err(ShellError::string("select requires a field"));
}
@ -13,10 +13,9 @@ pub fn select(args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, Shell
let objects = args
.input
.iter()
.map(|item| Value::Object(select_fields(item, &fields)))
.map(|item| ReturnValue::Value(item))
.collect();
.map(move |item| Value::Object(select_fields(&item, &fields)))
.map(|item| ReturnValue::Value(item));
Ok(objects)
let stream = Pin::new(Box::new(objects));
Ok(stream)
}

View File

@ -1,21 +1,13 @@
use crate::errors::ShellError;
use crate::prelude::*;
pub fn skip(args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellError> {
pub fn skip(args: CommandArgs) -> Result<OutputStream, ShellError> {
let amount = args.args[0].as_int()?;
let amount = if args.input.len() > amount as usize {
amount as usize
} else {
args.input.len()
};
let input = args.input;
let out: VecDeque<ReturnValue> = args
.input
.into_iter()
.skip(amount)
Ok(input
.skip(amount as u64)
.map(|v| ReturnValue::Value(v))
.collect();
Ok(out)
.boxed())
}

View File

@ -1,23 +1,26 @@
use crate::errors::ShellError;
use crate::prelude::*;
use futures::stream::BoxStream;
pub fn sort_by(args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellError> {
pub fn sort_by(args: CommandArgs) -> Result<OutputStream, ShellError> {
let fields: Result<Vec<_>, _> = args.args.iter().map(|a| a.as_string()).collect();
let fields = fields?;
let mut output = args.input.into_iter().collect::<Vec<_>>();
let output = args.input.collect::<Vec<_>>();
output.sort_by_key(|item| {
let output = output.map(move |mut vec| {
vec.sort_by_key(|item| {
fields
.iter()
.map(|f| item.get_data_by_key(f).borrow().copy())
.collect::<Vec<Value>>()
});
let output = output
.iter()
.map(|o| ReturnValue::Value(o.copy()))
.collect();
vec.into_iter()
.map(|v| ReturnValue::Value(v.copy()))
.collect::<VecDeque<_>>()
.boxed()
});
Ok(output)
Ok(output.flatten_stream().boxed())
}

View File

@ -3,21 +3,13 @@ use crate::prelude::*;
// TODO: "Amount remaining" wrapper
pub fn take(args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellError> {
pub fn take(args: CommandArgs) -> Result<OutputStream, ShellError> {
let amount = args.args[0].as_int()?;
let amount = if args.input.len() > amount as usize {
amount as usize
} else {
args.input.len()
};
let input = args.input;
let out: VecDeque<ReturnValue> = args
.input
.into_iter()
.take(amount)
Ok(input
.take(amount as u64)
.map(|v| ReturnValue::Value(v))
.collect();
Ok(out)
.boxed())
}

View File

@ -1,15 +1,17 @@
use crate::errors::ShellError;
use crate::object::Value;
use crate::prelude::*;
pub fn to_array(args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellError> {
let out = args.input.into_iter().collect();
Ok(ReturnValue::single(Value::List(out)))
pub fn to_array(args: CommandArgs) -> Result<OutputStream, ShellError> {
let out = args.input.collect();
Ok(out
.map(|vec: Vec<_>| single_output(Value::List(vec)))
.flatten_stream()
.boxed())
}
crate fn stream_to_array(stream: VecDeque<Value>) -> VecDeque<Value> {
let out = Value::List(stream.into_iter().collect());
crate async fn stream_to_array(stream: InputStream) -> InputStream {
let out = Value::List(stream.collect().await);
let mut stream = VecDeque::new();
stream.push_back(out);
stream
stream.boxed() as InputStream
}

View File

@ -2,14 +2,14 @@ use crate::errors::ShellError;
use crate::prelude::*;
use prettyprint::PrettyPrinter;
pub fn view(args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellError> {
pub fn view(args: CommandArgs) -> Result<OutputStream, ShellError> {
let target = match args.args.first() {
// TODO: This needs better infra
None => return Err(ShellError::string(format!("cat must take one arg"))),
Some(v) => v.as_string()?.clone(),
};
let cwd = args.env.cwd().to_path_buf();
let cwd = args.env.lock().unwrap().cwd().to_path_buf();
let printer = PrettyPrinter::default()
.line_numbers(false)
@ -22,5 +22,5 @@ pub fn view(args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellEr
let _ = printer.file(file.display().to_string());
Ok(VecDeque::new())
Ok(VecDeque::new().boxed())
}

View File

@ -2,26 +2,27 @@ use crate::errors::ShellError;
use crate::object::base::find;
use crate::prelude::*;
pub fn r#where(args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellError> {
pub fn r#where(args: CommandArgs) -> Result<OutputStream, ShellError> {
if args.args.is_empty() {
return Err(ShellError::string("select requires a field"));
}
let field: Result<String, _> = args.args[0].as_string();
let field = field?;
let input = args.input;
let operator = args.args[1].copy();
match args.args[1] {
Value::Primitive(Primitive::Operator(ref operator)) => {
let objects = args
.input
.iter()
.filter(|item| find(&item, &field, operator, &args.args[2]))
.map(|item| ReturnValue::Value(item.copy()))
.collect();
match operator {
Value::Primitive(Primitive::Operator(operator)) => {
let right = args.args[2].copy();
Ok(objects)
let objects = input
.filter(move |item| futures::future::ready(find(&item, &field, &operator, &right)))
.map(|item| ReturnValue::Value(item.copy()));
Ok(objects.boxed())
}
ref x => {
x => {
println!("{:?}", x);
Err(ShellError::string("expected a comparison operator"))
}

View File

@ -5,16 +5,16 @@ use std::sync::Arc;
pub struct Context {
commands: indexmap::IndexMap<String, Arc<dyn Command>>,
crate host: Box<dyn Host>,
crate env: Environment,
crate host: Arc<Mutex<dyn Host>>,
crate env: Arc<Mutex<Environment>>,
}
impl Context {
crate fn basic() -> Result<Context, Box<Error>> {
Ok(Context {
commands: indexmap::IndexMap::new(),
host: Box::new(crate::env::host::BasicHost),
env: Environment::basic()?,
host: Arc::new(Mutex::new(crate::env::host::BasicHost)),
env: Arc::new(Mutex::new(Environment::basic()?)),
})
}
@ -36,11 +36,11 @@ impl Context {
&mut self,
command: Arc<dyn Command>,
arg_list: Vec<Value>,
input: VecDeque<Value>,
) -> Result<VecDeque<ReturnValue>, ShellError> {
input: InputStream,
) -> Result<OutputStream, ShellError> {
let command_args = CommandArgs {
host: &mut self.host,
env: &self.env,
host: self.host.clone(),
env: self.env.clone(),
args: arg_list,
input,
};

View File

@ -50,8 +50,10 @@ pub struct EntriesListView {
}
impl EntriesListView {
crate fn from_stream(values: VecDeque<Value>) -> EntriesListView {
EntriesListView { values }
crate async fn from_stream(values: InputStream) -> EntriesListView {
EntriesListView {
values: values.collect().await,
}
}
}

View File

@ -1,5 +1,6 @@
#![feature(crate_visibility_modifier)]
#![feature(in_band_lifetimes)]
#![feature(async_await)]
mod cli;
mod commands;
@ -16,5 +17,6 @@ mod stream;
use std::error::Error;
fn main() -> Result<(), Box<Error>> {
crate::cli::cli()
futures::executor::block_on(crate::cli::cli());
Ok(())
}

View File

@ -17,7 +17,7 @@ pub enum Item {
Operator(Operator),
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum Operator {
Equal,
NotEqual,

View File

@ -4,4 +4,10 @@ crate use crate::context::Context;
crate use crate::env::{Environment, Host};
crate use crate::errors::ShellError;
crate use crate::object::{Primitive, Value};
#[allow(unused)]
crate use crate::stream::{empty_stream, single_output, InputStream, OutputStream};
#[allow(unused)]
crate use futures::{Future, FutureExt, Stream, StreamExt};
crate use std::collections::VecDeque;
crate use std::pin::Pin;
crate use std::sync::{Arc, Mutex};

View File

@ -0,0 +1,16 @@
use crate::prelude::*;
use futures::stream::BoxStream;
pub type InputStream = BoxStream<'static, Value>;
pub type OutputStream = BoxStream<'static, ReturnValue>;
crate fn empty_stream() -> OutputStream {
VecDeque::new().boxed()
}
crate fn single_output(item: Value) -> OutputStream {
let value = ReturnValue::Value(item);
let mut vec = VecDeque::new();
vec.push_back(value);
vec.boxed()
}