diff --git a/src/cli.rs b/src/cli.rs index 6ed091c80..386d16734 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,6 +1,6 @@ use crate::commands::classified::{ ClassifiedCommand, ClassifiedInputStream, ClassifiedPipeline, ExternalArg, ExternalArgs, - ExternalCommand, InternalCommand, StreamNext, + ExternalCommand, InternalCommand, }; use crate::commands::plugin::JsonRpc; use crate::commands::plugin::{PluginCommand, PluginSink}; @@ -604,113 +604,15 @@ async fn process_line(readline: Result, ctx: &mut Context })), } - let mut input = ClassifiedInputStream::new(); - let mut iter = pipeline.commands.list.into_iter().peekable(); - // Check the config to see if we need to update the path // TODO: make sure config is cached so we don't path this load every call set_env_from_config(); - loop { - let item: Option = iter.next(); - let next: Option<&ClassifiedCommand> = iter.peek(); - - input = match (item, next) { - (None, _) => break, - - (Some(ClassifiedCommand::Dynamic(_)), _) - | (_, Some(ClassifiedCommand::Dynamic(_))) => { - return LineResult::Error( - line.to_string(), - ShellError::unimplemented("Dynamic commands"), - ) - } - - (Some(ClassifiedCommand::Expr(_)), _) => { - return LineResult::Error( - line.to_string(), - ShellError::unimplemented("Expression-only commands"), - ) - } - - (_, Some(ClassifiedCommand::Expr(_))) => { - return LineResult::Error( - line.to_string(), - ShellError::unimplemented("Expression-only commands"), - ) - } - - ( - Some(ClassifiedCommand::Internal(left)), - Some(ClassifiedCommand::External(_)), - ) => match left.run(ctx, input, Text::from(line)) { - Ok(val) => ClassifiedInputStream::from_input_stream(val), - Err(err) => return LineResult::Error(line.to_string(), err), - }, - - (Some(ClassifiedCommand::Internal(left)), Some(_)) => { - match left.run(ctx, input, Text::from(line)) { - Ok(val) => ClassifiedInputStream::from_input_stream(val), - Err(err) => return LineResult::Error(line.to_string(), err), - } - } - - (Some(ClassifiedCommand::Internal(left)), None) => { - match left.run(ctx, input, Text::from(line)) { - Ok(val) => { - use futures::stream::TryStreamExt; - - let mut output_stream: OutputStream = val.into(); - loop { - match output_stream.try_next().await { - Ok(Some(ReturnSuccess::Value(Value { - value: UntaggedValue::Error(e), - .. - }))) => { - return LineResult::Error(line.to_string(), e); - } - Ok(Some(_item)) => { - if ctx.ctrl_c.load(Ordering::SeqCst) { - break; - } - } - _ => { - break; - } - } - } - - return LineResult::Success(line.to_string()); - } - Err(err) => return LineResult::Error(line.to_string(), err), - } - } - - ( - Some(ClassifiedCommand::External(left)), - Some(ClassifiedCommand::External(_)), - ) => match left.run(ctx, input, StreamNext::External).await { - Ok(val) => val, - Err(err) => return LineResult::Error(line.to_string(), err), - }, - - (Some(ClassifiedCommand::External(left)), Some(_)) => { - match left.run(ctx, input, StreamNext::Internal).await { - Ok(val) => val, - Err(err) => return LineResult::Error(line.to_string(), err), - } - } - - (Some(ClassifiedCommand::External(left)), None) => { - match left.run(ctx, input, StreamNext::Last).await { - Ok(val) => val, - Err(err) => return LineResult::Error(line.to_string(), err), - } - } - }; + let input = ClassifiedInputStream::new(); + match pipeline.run(ctx, input, line).await { + Ok(_) => LineResult::Success(line.to_string()), + Err(err) => LineResult::Error(line.to_string(), err), } - - LineResult::Success(line.to_string()) } Err(ReadlineError::Interrupted) => LineResult::CtrlC, Err(ReadlineError::Eof) => LineResult::Break, diff --git a/src/commands/classified/pipeline.rs b/src/commands/classified/pipeline.rs index f40b62743..a9e829023 100644 --- a/src/commands/classified/pipeline.rs +++ b/src/commands/classified/pipeline.rs @@ -1,5 +1,7 @@ -use super::ClassifiedCommand; +use super::{ClassifiedCommand, ClassifiedInputStream, StreamNext}; +use crate::data::base::Value; use crate::prelude::*; +use std::sync::atomic::Ordering; #[derive(Debug, Clone)] pub(crate) struct Pipeline { @@ -15,6 +17,71 @@ impl Pipeline { }, } } + + pub(crate) async fn run( + self, + ctx: &mut Context, + mut input: ClassifiedInputStream, + line: &str, + ) -> Result<(), ShellError> { + let mut iter = self.commands.list.into_iter().peekable(); + + loop { + let item: Option = iter.next(); + let next: Option<&ClassifiedCommand> = iter.peek(); + + input = match (item, next) { + (Some(ClassifiedCommand::Dynamic(_)), _) + | (_, Some(ClassifiedCommand::Dynamic(_))) => { + return Err(ShellError::unimplemented("Dynamic commands")) + } + + (Some(ClassifiedCommand::Expr(_)), _) | (_, Some(ClassifiedCommand::Expr(_))) => { + return Err(ShellError::unimplemented("Expression-only commands")) + } + + (Some(ClassifiedCommand::Internal(left)), _) => { + let stream = left.run(ctx, input, Text::from(line))?; + ClassifiedInputStream::from_input_stream(stream) + } + + (Some(ClassifiedCommand::External(left)), Some(ClassifiedCommand::External(_))) => { + left.run(ctx, input, StreamNext::External).await? + } + + (Some(ClassifiedCommand::External(left)), Some(_)) => { + left.run(ctx, input, StreamNext::Internal).await? + } + + (Some(ClassifiedCommand::External(left)), None) => { + left.run(ctx, input, StreamNext::Last).await? + } + + (None, _) => break, + }; + } + + use futures::stream::TryStreamExt; + let mut output_stream: OutputStream = input.objects.into(); + loop { + match output_stream.try_next().await { + Ok(Some(ReturnSuccess::Value(Value { + value: UntaggedValue::Error(e), + .. + }))) => return Err(e), + Ok(Some(_item)) => { + if ctx.ctrl_c.load(Ordering::SeqCst) { + break; + } + } + _ => { + break; + } + } + } + + Ok(()) + } } #[derive(Debug, Clone)]