Move all pipeline execution code from cli to classified::pipeline

This commit is contained in:
Jason Gedge 2019-11-24 18:16:45 -05:00
parent 339ec46961
commit 71e7eb7cfc
4 changed files with 76 additions and 107 deletions

View File

@ -1,6 +1,5 @@
use crate::commands::classified::{ use crate::commands::classified::{
ClassifiedCommand, ClassifiedInputStream, ClassifiedPipeline, ExternalCommand, InternalCommand, ClassifiedCommand, ClassifiedInputStream, ClassifiedPipeline, ExternalCommand, InternalCommand,
StreamNext,
}; };
use crate::commands::plugin::JsonRpc; use crate::commands::plugin::JsonRpc;
use crate::commands::plugin::{PluginCommand, PluginSink}; use crate::commands::plugin::{PluginCommand, PluginSink};
@ -600,114 +599,17 @@ async fn process_line(readline: Result<String, ReadlineError>, ctx: &mut Context
})), })),
} }
let mut input = ClassifiedInputStream::new();
let mut iter = pipeline.commands.item.into_iter().peekable();
// Check the config to see if we need to update the path // Check the config to see if we need to update the path
// TODO: make sure config is cached so we don't path this load every call // TODO: make sure config is cached so we don't path this load every call
set_env_from_config(); set_env_from_config();
loop { let input = ClassifiedInputStream::new();
let item: Option<ClassifiedCommand> = iter.next(); match pipeline.run(ctx, input, line).await {
let next: Option<&ClassifiedCommand> = iter.peek(); Ok(_) => LineResult::Success(line.to_string()),
Err(err) => LineResult::Error(line.to_string(), err),
input = match (item, next) {
(None, _) => break,
(Some(ClassifiedCommand::Dynamic(_)), _)
| (_, Some(ClassifiedCommand::Dynamic(_))) => {
return LineResult::Error(
line.to_string(),
ShellError::unimplemented("Dynamic commands"),
)
}
(Some(ClassifiedCommand::Expr(_)), _) => {
return LineResult::Error(
line.to_string(),
ShellError::unimplemented("Expression-only commands"),
)
}
(_, Some(ClassifiedCommand::Expr(_))) => {
return LineResult::Error(
line.to_string(),
ShellError::unimplemented("Expression-only commands"),
)
}
(
Some(ClassifiedCommand::Internal(left)),
Some(ClassifiedCommand::External(_)),
) => match left.run(ctx, input, Text::from(line)) {
Ok(val) => ClassifiedInputStream::from_input_stream(val),
Err(err) => return LineResult::Error(line.to_string(), err),
},
(Some(ClassifiedCommand::Internal(left)), Some(_)) => {
match left.run(ctx, input, Text::from(line)) {
Ok(val) => ClassifiedInputStream::from_input_stream(val),
Err(err) => return LineResult::Error(line.to_string(), err),
} }
} }
(Some(ClassifiedCommand::Internal(left)), None) => {
match left.run(ctx, input, Text::from(line)) {
Ok(val) => {
use futures::stream::TryStreamExt;
let mut output_stream: OutputStream = val.into();
loop {
match output_stream.try_next().await {
Ok(Some(ReturnSuccess::Value(Tagged {
item: Value::Error(e),
..
}))) => {
return LineResult::Error(line.to_string(), e);
}
Ok(Some(_item)) => {
if ctx.ctrl_c.load(Ordering::SeqCst) {
break;
}
}
_ => {
break;
}
}
}
return LineResult::Success(line.to_string());
}
Err(err) => return LineResult::Error(line.to_string(), err),
}
}
(
Some(ClassifiedCommand::External(left)),
Some(ClassifiedCommand::External(_)),
) => match left.run(ctx, input, StreamNext::External).await {
Ok(val) => val,
Err(err) => return LineResult::Error(line.to_string(), err),
},
(Some(ClassifiedCommand::External(left)), Some(_)) => {
match left.run(ctx, input, StreamNext::Internal).await {
Ok(val) => val,
Err(err) => return LineResult::Error(line.to_string(), err),
}
}
(Some(ClassifiedCommand::External(left)), None) => {
match left.run(ctx, input, StreamNext::Last).await {
Ok(val) => val,
Err(err) => return LineResult::Error(line.to_string(), err),
}
}
};
}
LineResult::Success(line.to_string())
}
Err(ReadlineError::Interrupted) => LineResult::CtrlC, Err(ReadlineError::Interrupted) => LineResult::CtrlC,
Err(ReadlineError::Eof) => LineResult::Break, Err(ReadlineError::Eof) => LineResult::Break,
Err(err) => { Err(err) => {

View File

@ -1,3 +1,4 @@
use super::ClassifiedInputStream;
use crate::prelude::*; use crate::prelude::*;
use bytes::{BufMut, BytesMut}; use bytes::{BufMut, BytesMut};
use futures::stream::StreamExt; use futures::stream::StreamExt;
@ -8,8 +9,6 @@ use std::fmt;
use std::io::{Error, ErrorKind}; use std::io::{Error, ErrorKind};
use subprocess::Exec; use subprocess::Exec;
use super::ClassifiedInputStream;
/// A simple `Codec` implementation that splits up data into lines. /// A simple `Codec` implementation that splits up data into lines.
pub struct LinesCodec {} pub struct LinesCodec {}

View File

@ -9,6 +9,7 @@ mod pipeline;
#[allow(unused_imports)] #[allow(unused_imports)]
pub(crate) use dynamic::Command as DynamicCommand; pub(crate) use dynamic::Command as DynamicCommand;
#[allow(unused_imports)]
pub(crate) use external::{Command as ExternalCommand, StreamNext}; pub(crate) use external::{Command as ExternalCommand, StreamNext};
pub(crate) use internal::Command as InternalCommand; pub(crate) use internal::Command as InternalCommand;
pub(crate) use pipeline::Pipeline as ClassifiedPipeline; pub(crate) use pipeline::Pipeline as ClassifiedPipeline;

View File

@ -1,7 +1,7 @@
use super::{ClassifiedCommand, ClassifiedInputStream, StreamNext};
use crate::prelude::*; use crate::prelude::*;
use std::fmt; use std::fmt;
use std::sync::atomic::Ordering;
use super::ClassifiedCommand;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct Pipeline { pub(crate) struct Pipeline {
@ -22,3 +22,70 @@ impl HasSpan for Pipeline {
self.commands.span self.commands.span
} }
} }
impl Pipeline {
pub(crate) async fn run(
self,
ctx: &mut Context,
mut input: ClassifiedInputStream,
line: &str,
) -> Result<(), ShellError> {
let mut iter = self.commands.item.into_iter().peekable();
loop {
let item: Option<ClassifiedCommand> = iter.next();
let next: Option<&ClassifiedCommand> = iter.peek();
input = match (item, next) {
(Some(ClassifiedCommand::Dynamic(_)), _)
| (_, Some(ClassifiedCommand::Dynamic(_))) => {
return Err(ShellError::unimplemented("Dynamic commands"))
}
(Some(ClassifiedCommand::Expr(_)), _) | (_, Some(ClassifiedCommand::Expr(_))) => {
return Err(ShellError::unimplemented("Expression-only commands"))
}
(Some(ClassifiedCommand::Internal(left)), _) => {
let stream = left.run(ctx, input, Text::from(line))?;
ClassifiedInputStream::from_input_stream(stream)
}
(Some(ClassifiedCommand::External(left)), Some(ClassifiedCommand::External(_))) => {
left.run(ctx, input, StreamNext::External).await?
}
(Some(ClassifiedCommand::External(left)), Some(_)) => {
left.run(ctx, input, StreamNext::Internal).await?
}
(Some(ClassifiedCommand::External(left)), None) => {
left.run(ctx, input, StreamNext::Last).await?
}
(None, _) => break,
};
}
use futures::stream::TryStreamExt;
let mut output_stream: OutputStream = input.objects.into();
loop {
match output_stream.try_next().await {
Ok(Some(ReturnSuccess::Value(Tagged {
item: Value::Error(e),
..
}))) => return Err(e),
Ok(Some(_item)) => {
if ctx.ctrl_c.load(Ordering::SeqCst) {
break;
}
}
_ => {
break;
}
}
}
Ok(())
}
}