Streams are wired up pairwise

This commit is contained in:
Yehuda Katz 2019-05-24 00:29:16 -07:00
parent bf332ea50c
commit f9fb353c5c
4 changed files with 172 additions and 19 deletions

View File

@ -1,6 +1,8 @@
use crate::prelude::*; use crate::prelude::*;
use crate::commands::classified::{ClassifiedCommand, ExternalCommand, InternalCommand}; use crate::commands::classified::{
ClassifiedCommand, ClassifiedInputStream, ExternalCommand, InternalCommand,
};
use crate::context::Context; use crate::context::Context;
crate use crate::errors::ShellError; crate use crate::errors::ShellError;
crate use crate::format::{EntriesListView, GenericView}; crate use crate::format::{EntriesListView, GenericView};
@ -11,6 +13,7 @@ use rustyline::error::ReadlineError;
use rustyline::{self, ColorMode, Config, Editor}; use rustyline::{self, ColorMode, Config, Editor};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::error::Error; use std::error::Error;
use std::iter::Iterator;
use std::sync::Arc; use std::sync::Arc;
#[derive(Debug)] #[derive(Debug)]
@ -145,18 +148,72 @@ async fn process_line(readline: Result<String, ReadlineError>, ctx: &mut Context
Ok(val) => val, Ok(val) => val,
}; };
let parsed = result.1; let parsed: Result<Vec<_>, _> = result
.1
.into_iter()
.map(|item| classify_command(&item, ctx))
.collect();
let mut input: InputStream = VecDeque::new().boxed(); let parsed = parsed?;
for item in parsed { let mut input = ClassifiedInputStream::new();
input = match process_command(item.clone(), input, ctx).await {
Ok(val) => val, let mut iter = parsed.into_iter().peekable();
loop {
let item: Option<ClassifiedCommand> = iter.next();
let next: Option<&ClassifiedCommand> = iter.peek();
input = match (item, next) {
(None, _) => break,
(
Some(ClassifiedCommand::Internal(left)),
Some(ClassifiedCommand::Internal(_)),
) => match left.run(ctx, input).await {
Ok(val) => ClassifiedInputStream::from_input_stream(val),
Err(err) => return LineResult::Error(format!("{}", err.description())), Err(err) => return LineResult::Error(format!("{}", err.description())),
}; },
(Some(ClassifiedCommand::Internal(left)), None) => {
match left.run(ctx, input).await {
Ok(val) => ClassifiedInputStream::from_input_stream(val),
Err(err) => return LineResult::Error(format!("{}", err.description())),
}
} }
let input_vec: VecDeque<_> = input.collect().await; (
Some(ClassifiedCommand::External(left)),
Some(ClassifiedCommand::External(_)),
) => match left.run(ctx, input, true).await {
Ok(val) => val,
Err(err) => return LineResult::Error(format!("{}", err.description())),
},
(
Some(ClassifiedCommand::Internal(_)),
Some(ClassifiedCommand::External(_)),
) => unimplemented!(),
(
Some(ClassifiedCommand::External(_)),
Some(ClassifiedCommand::Internal(_)),
) => unimplemented!(),
(Some(ClassifiedCommand::External(left)), None) => {
match left.run(ctx, input, false).await {
Ok(val) => val,
Err(err) => return LineResult::Error(format!("{}", err.description())),
}
}
}
// input = match process_command(item.clone(), input, ctx).await {
// Ok(val) => val,
// Err(err) => return LineResult::Error(format!("{}", err.description())),
// };
}
let input_vec: VecDeque<_> = input.objects.collect().await;
if input_vec.len() > 0 { if input_vec.len() > 0 {
if equal_shapes(&input_vec) { if equal_shapes(&input_vec) {
@ -188,15 +245,15 @@ async fn process_line(readline: Result<String, ReadlineError>, ctx: &mut Context
} }
} }
async fn process_command( // async fn process_command(
parsed: Vec<crate::parser::Item>, // parsed: Vec<crate::parser::Item>,
input: InputStream, // input: InputStream,
context: &mut Context, // context: &mut Context,
) -> Result<InputStream, ShellError> { // ) -> Result<InputStream, ShellError> {
let command = classify_command(&parsed, context)?; // let command = classify_command(&parsed, context)?;
command.run(input, context).await // command.run(context, input).await
} // }
fn classify_command( fn classify_command(
command: &[crate::parser::Item], command: &[crate::parser::Item],

View File

@ -2,6 +2,34 @@ use crate::prelude::*;
use std::sync::Arc; use std::sync::Arc;
use subprocess::Exec; use subprocess::Exec;
crate struct ClassifiedInputStream {
crate objects: InputStream,
crate stdin: Option<std::fs::File>,
}
impl ClassifiedInputStream {
crate fn new() -> ClassifiedInputStream {
ClassifiedInputStream {
objects: VecDeque::new().boxed(),
stdin: None,
}
}
crate fn from_input_stream(stream: InputStream) -> ClassifiedInputStream {
ClassifiedInputStream {
objects: stream,
stdin: None,
}
}
crate fn from_stdout(stdout: std::fs::File) -> ClassifiedInputStream {
ClassifiedInputStream {
objects: VecDeque::new().boxed(),
stdin: Some(stdout),
}
}
}
crate enum ClassifiedCommand { crate enum ClassifiedCommand {
Internal(InternalCommand), Internal(InternalCommand),
External(ExternalCommand), External(ExternalCommand),
@ -10,12 +38,12 @@ crate enum ClassifiedCommand {
impl ClassifiedCommand { impl ClassifiedCommand {
crate async fn run( crate async fn run(
self, self,
input: InputStream,
context: &mut Context, context: &mut Context,
input: ClassifiedInputStream,
) -> Result<InputStream, ShellError> { ) -> Result<InputStream, ShellError> {
match self { match self {
ClassifiedCommand::Internal(internal) => { ClassifiedCommand::Internal(internal) => {
let result = context.run_command(internal.command, internal.args, input)?; let result = context.run_command(internal.command, internal.args, input.objects)?;
let env = context.env.clone(); let env = context.env.clone();
let stream = result.filter_map(move |v| match v { let stream = result.filter_map(move |v| match v {
@ -50,7 +78,65 @@ crate struct InternalCommand {
crate args: Vec<Value>, crate args: Vec<Value>,
} }
impl InternalCommand {
crate async fn run(
self,
context: &mut Context,
input: ClassifiedInputStream,
) -> Result<InputStream, ShellError> {
let result = context.run_command(self.command, self.args, input.objects)?;
let env = context.env.clone();
let stream = result.filter_map(move |v| match v {
ReturnValue::Action(action) => match action {
CommandAction::ChangeCwd(cwd) => {
env.lock().unwrap().cwd = cwd;
futures::future::ready(None)
}
},
ReturnValue::Value(v) => futures::future::ready(Some(v)),
});
Ok(stream.boxed() as InputStream)
}
}
crate struct ExternalCommand { crate struct ExternalCommand {
crate name: String, crate name: String,
crate args: Vec<String>, crate args: Vec<String>,
} }
impl ExternalCommand {
crate async fn run(
self,
context: &mut Context,
mut input: ClassifiedInputStream,
stream_next: bool,
) -> Result<ClassifiedInputStream, ShellError> {
let mut process = Exec::shell(&self.name)
.args(&self.args)
.cwd(context.env.lock().unwrap().cwd());
if stream_next {
process = process.stdout(subprocess::Redirection::Pipe)
}
if let Some(stdin) = input.stdin {
process = process.stdin(stdin);
}
let mut popen = process.popen().unwrap();
if stream_next {
match &popen.stdout {
None => unreachable!(),
Some(stdout) => Ok(ClassifiedInputStream::from_stdout(stdout.try_clone()?)),
}
} else {
popen.stdin.take();
popen.wait()?;
Ok(ClassifiedInputStream::new())
}
}
}

View File

@ -51,3 +51,12 @@ impl std::convert::From<futures_sink::VecSinkError> for ShellError {
} }
} }
} }
impl std::convert::From<subprocess::PopenError> for ShellError {
fn from(input: subprocess::PopenError) -> ShellError {
ShellError {
title: format!("{}", input),
error: Value::nothing(),
}
}
}

View File

@ -2,6 +2,7 @@
#![feature(in_band_lifetimes)] #![feature(in_band_lifetimes)]
#![feature(async_await)] #![feature(async_await)]
#![feature(try_trait)] #![feature(try_trait)]
#![feature(bind_by_move_pattern_guards)]
mod cli; mod cli;
mod commands; mod commands;