From f9fb353c5ceaa64eea25d8a09d8b21ecc252b146 Mon Sep 17 00:00:00 2001 From: Yehuda Katz Date: Fri, 24 May 2019 00:29:16 -0700 Subject: [PATCH] Streams are wired up pairwise --- src/cli.rs | 91 +++++++++++++++++++++++++++++++------- src/commands/classified.rs | 90 ++++++++++++++++++++++++++++++++++++- src/errors.rs | 9 ++++ src/main.rs | 1 + 4 files changed, 172 insertions(+), 19 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 15ee6c499..f7c380b93 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,6 +1,8 @@ use crate::prelude::*; -use crate::commands::classified::{ClassifiedCommand, ExternalCommand, InternalCommand}; +use crate::commands::classified::{ + ClassifiedCommand, ClassifiedInputStream, ExternalCommand, InternalCommand, +}; use crate::context::Context; crate use crate::errors::ShellError; crate use crate::format::{EntriesListView, GenericView}; @@ -11,6 +13,7 @@ use rustyline::error::ReadlineError; use rustyline::{self, ColorMode, Config, Editor}; use std::collections::VecDeque; use std::error::Error; +use std::iter::Iterator; use std::sync::Arc; #[derive(Debug)] @@ -145,18 +148,72 @@ async fn process_line(readline: Result, ctx: &mut Context Ok(val) => val, }; - let parsed = result.1; + let parsed: Result, _> = result + .1 + .into_iter() + .map(|item| classify_command(&item, ctx)) + .collect(); - let mut input: InputStream = VecDeque::new().boxed(); + let parsed = parsed?; - for item in parsed { - input = match process_command(item.clone(), input, ctx).await { - Ok(val) => val, - Err(err) => return LineResult::Error(format!("{}", err.description())), - }; + let mut input = ClassifiedInputStream::new(); + + let mut iter = parsed.into_iter().peekable(); + + loop { + let item: Option = iter.next(); + let next: Option<&ClassifiedCommand> = iter.peek(); + + input = match (item, next) { + (None, _) => break, + + ( + Some(ClassifiedCommand::Internal(left)), + Some(ClassifiedCommand::Internal(_)), + ) => match left.run(ctx, input).await { + Ok(val) => ClassifiedInputStream::from_input_stream(val), + Err(err) => return LineResult::Error(format!("{}", err.description())), + }, + + (Some(ClassifiedCommand::Internal(left)), None) => { + match left.run(ctx, input).await { + Ok(val) => ClassifiedInputStream::from_input_stream(val), + Err(err) => return LineResult::Error(format!("{}", err.description())), + } + } + + ( + Some(ClassifiedCommand::External(left)), + Some(ClassifiedCommand::External(_)), + ) => match left.run(ctx, input, true).await { + Ok(val) => val, + Err(err) => return LineResult::Error(format!("{}", err.description())), + }, + + ( + Some(ClassifiedCommand::Internal(_)), + Some(ClassifiedCommand::External(_)), + ) => unimplemented!(), + + ( + Some(ClassifiedCommand::External(_)), + Some(ClassifiedCommand::Internal(_)), + ) => unimplemented!(), + + (Some(ClassifiedCommand::External(left)), None) => { + match left.run(ctx, input, false).await { + Ok(val) => val, + Err(err) => return LineResult::Error(format!("{}", err.description())), + } + } + } + // input = match process_command(item.clone(), input, ctx).await { + // Ok(val) => val, + // Err(err) => return LineResult::Error(format!("{}", err.description())), + // }; } - let input_vec: VecDeque<_> = input.collect().await; + let input_vec: VecDeque<_> = input.objects.collect().await; if input_vec.len() > 0 { if equal_shapes(&input_vec) { @@ -188,15 +245,15 @@ async fn process_line(readline: Result, ctx: &mut Context } } -async fn process_command( - parsed: Vec, - input: InputStream, - context: &mut Context, -) -> Result { - let command = classify_command(&parsed, context)?; +// async fn process_command( +// parsed: Vec, +// input: InputStream, +// context: &mut Context, +// ) -> Result { +// let command = classify_command(&parsed, context)?; - command.run(input, context).await -} +// command.run(context, input).await +// } fn classify_command( command: &[crate::parser::Item], diff --git a/src/commands/classified.rs b/src/commands/classified.rs index a528b9bd3..acf57d85f 100644 --- a/src/commands/classified.rs +++ b/src/commands/classified.rs @@ -2,6 +2,34 @@ use crate::prelude::*; use std::sync::Arc; use subprocess::Exec; +crate struct ClassifiedInputStream { + crate objects: InputStream, + crate stdin: Option, +} + +impl ClassifiedInputStream { + crate fn new() -> ClassifiedInputStream { + ClassifiedInputStream { + objects: VecDeque::new().boxed(), + stdin: None, + } + } + + crate fn from_input_stream(stream: InputStream) -> ClassifiedInputStream { + ClassifiedInputStream { + objects: stream, + stdin: None, + } + } + + crate fn from_stdout(stdout: std::fs::File) -> ClassifiedInputStream { + ClassifiedInputStream { + objects: VecDeque::new().boxed(), + stdin: Some(stdout), + } + } +} + crate enum ClassifiedCommand { Internal(InternalCommand), External(ExternalCommand), @@ -10,12 +38,12 @@ crate enum ClassifiedCommand { impl ClassifiedCommand { crate async fn run( self, - input: InputStream, context: &mut Context, + input: ClassifiedInputStream, ) -> Result { match self { ClassifiedCommand::Internal(internal) => { - let result = context.run_command(internal.command, internal.args, input)?; + let result = context.run_command(internal.command, internal.args, input.objects)?; let env = context.env.clone(); let stream = result.filter_map(move |v| match v { @@ -50,7 +78,65 @@ crate struct InternalCommand { crate args: Vec, } +impl InternalCommand { + crate async fn run( + self, + context: &mut Context, + input: ClassifiedInputStream, + ) -> Result { + let result = context.run_command(self.command, self.args, input.objects)?; + let env = context.env.clone(); + + let stream = result.filter_map(move |v| match v { + ReturnValue::Action(action) => match action { + CommandAction::ChangeCwd(cwd) => { + env.lock().unwrap().cwd = cwd; + futures::future::ready(None) + } + }, + + ReturnValue::Value(v) => futures::future::ready(Some(v)), + }); + + Ok(stream.boxed() as InputStream) + } +} + crate struct ExternalCommand { crate name: String, crate args: Vec, } + +impl ExternalCommand { + crate async fn run( + self, + context: &mut Context, + mut input: ClassifiedInputStream, + stream_next: bool, + ) -> Result { + let mut process = Exec::shell(&self.name) + .args(&self.args) + .cwd(context.env.lock().unwrap().cwd()); + + if stream_next { + process = process.stdout(subprocess::Redirection::Pipe) + } + + if let Some(stdin) = input.stdin { + process = process.stdin(stdin); + } + + let mut popen = process.popen().unwrap(); + + if stream_next { + match &popen.stdout { + None => unreachable!(), + Some(stdout) => Ok(ClassifiedInputStream::from_stdout(stdout.try_clone()?)), + } + } else { + popen.stdin.take(); + popen.wait()?; + Ok(ClassifiedInputStream::new()) + } + } +} diff --git a/src/errors.rs b/src/errors.rs index 6cee99ebc..b18f3ff55 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -51,3 +51,12 @@ impl std::convert::From for ShellError { } } } + +impl std::convert::From for ShellError { + fn from(input: subprocess::PopenError) -> ShellError { + ShellError { + title: format!("{}", input), + error: Value::nothing(), + } + } +} diff --git a/src/main.rs b/src/main.rs index 45da4ec08..8a5da7c13 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ #![feature(in_band_lifetimes)] #![feature(async_await)] #![feature(try_trait)] +#![feature(bind_by_move_pattern_guards)] mod cli; mod commands;