diff --git a/src/cli.rs b/src/cli.rs index 64d562f97..a6720106b 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,6 +1,5 @@ use crate::commands::classified::{ ClassifiedCommand, ClassifiedInputStream, ClassifiedPipeline, ExternalCommand, InternalCommand, - StreamNext, }; use crate::commands::plugin::JsonRpc; use crate::commands::plugin::{PluginCommand, PluginSink}; @@ -600,114 +599,17 @@ async fn process_line(readline: Result, ctx: &mut Context })), } - let mut input = ClassifiedInputStream::new(); - let mut iter = pipeline.commands.item.into_iter().peekable(); - // Check the config to see if we need to update the path // TODO: make sure config is cached so we don't path this load every call set_env_from_config(); - loop { - let item: Option = iter.next(); - let next: Option<&ClassifiedCommand> = iter.peek(); - - input = match (item, next) { - (None, _) => break, - - (Some(ClassifiedCommand::Dynamic(_)), _) - | (_, Some(ClassifiedCommand::Dynamic(_))) => { - return LineResult::Error( - line.to_string(), - ShellError::unimplemented("Dynamic commands"), - ) - } - - (Some(ClassifiedCommand::Expr(_)), _) => { - return LineResult::Error( - line.to_string(), - ShellError::unimplemented("Expression-only commands"), - ) - } - - (_, Some(ClassifiedCommand::Expr(_))) => { - return LineResult::Error( - line.to_string(), - ShellError::unimplemented("Expression-only commands"), - ) - } - - ( - Some(ClassifiedCommand::Internal(left)), - Some(ClassifiedCommand::External(_)), - ) => match left.run(ctx, input, Text::from(line)) { - Ok(val) => ClassifiedInputStream::from_input_stream(val), - Err(err) => return LineResult::Error(line.to_string(), err), - }, - - (Some(ClassifiedCommand::Internal(left)), Some(_)) => { - match left.run(ctx, input, Text::from(line)) { - Ok(val) => ClassifiedInputStream::from_input_stream(val), - Err(err) => return LineResult::Error(line.to_string(), err), - } - } - - (Some(ClassifiedCommand::Internal(left)), None) => { - match left.run(ctx, input, Text::from(line)) { - Ok(val) => { - use futures::stream::TryStreamExt; - - let mut output_stream: OutputStream = val.into(); - loop { - match output_stream.try_next().await { - Ok(Some(ReturnSuccess::Value(Tagged { - item: Value::Error(e), - .. - }))) => { - return LineResult::Error(line.to_string(), e); - } - Ok(Some(_item)) => { - if ctx.ctrl_c.load(Ordering::SeqCst) { - break; - } - } - _ => { - break; - } - } - } - - return LineResult::Success(line.to_string()); - } - Err(err) => return LineResult::Error(line.to_string(), err), - } - } - - ( - Some(ClassifiedCommand::External(left)), - Some(ClassifiedCommand::External(_)), - ) => match left.run(ctx, input, StreamNext::External).await { - Ok(val) => val, - Err(err) => return LineResult::Error(line.to_string(), err), - }, - - (Some(ClassifiedCommand::External(left)), Some(_)) => { - match left.run(ctx, input, StreamNext::Internal).await { - Ok(val) => val, - Err(err) => return LineResult::Error(line.to_string(), err), - } - } - - (Some(ClassifiedCommand::External(left)), None) => { - match left.run(ctx, input, StreamNext::Last).await { - Ok(val) => val, - Err(err) => return LineResult::Error(line.to_string(), err), - } - } - }; + let input = ClassifiedInputStream::new(); + match pipeline.run(ctx, input, line).await { + Ok(_) => LineResult::Success(line.to_string()), + Err(err) => LineResult::Error(line.to_string(), err), } - - LineResult::Success(line.to_string()) } + Err(ReadlineError::Interrupted) => LineResult::CtrlC, Err(ReadlineError::Eof) => LineResult::Break, Err(err) => { diff --git a/src/commands/classified.rs b/src/commands/classified.rs deleted file mode 100644 index 7dd72af4f..000000000 --- a/src/commands/classified.rs +++ /dev/null @@ -1,453 +0,0 @@ -use crate::parser::{hir, TokenNode}; -use crate::prelude::*; -use bytes::{BufMut, BytesMut}; -use derive_new::new; -use futures::stream::StreamExt; -use futures_codec::{Decoder, Encoder, Framed}; -use itertools::Itertools; -use log::{log_enabled, trace}; -use std::fmt; -use std::io::{Error, ErrorKind}; -use subprocess::Exec; - -/// A simple `Codec` implementation that splits up data into lines. -pub struct LinesCodec {} - -impl Encoder for LinesCodec { - type Item = String; - type Error = Error; - - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { - dst.put(item); - Ok(()) - } -} - -impl Decoder for LinesCodec { - type Item = String; - type Error = Error; - - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - match src.iter().position(|b| b == &b'\n') { - Some(pos) if !src.is_empty() => { - let buf = src.split_to(pos + 1); - String::from_utf8(buf.to_vec()) - .map(Some) - .map_err(|e| Error::new(ErrorKind::InvalidData, e)) - } - _ if !src.is_empty() => { - let drained = src.take(); - String::from_utf8(drained.to_vec()) - .map(Some) - .map_err(|e| Error::new(ErrorKind::InvalidData, e)) - } - _ => Ok(None), - } - } -} - -pub(crate) struct ClassifiedInputStream { - pub(crate) objects: InputStream, - pub(crate) stdin: Option, -} - -impl ClassifiedInputStream { - pub(crate) fn new() -> ClassifiedInputStream { - ClassifiedInputStream { - objects: vec![Value::nothing().tagged(Tag::unknown())].into(), - stdin: None, - } - } - - pub(crate) fn from_input_stream(stream: impl Into) -> ClassifiedInputStream { - ClassifiedInputStream { - objects: stream.into(), - stdin: None, - } - } - - pub(crate) fn from_stdout(stdout: std::fs::File) -> ClassifiedInputStream { - ClassifiedInputStream { - objects: VecDeque::new().into(), - stdin: Some(stdout), - } - } -} - -#[derive(Debug, Clone)] -pub(crate) struct ClassifiedPipeline { - pub(crate) commands: Spanned>, -} - -impl FormatDebug for ClassifiedPipeline { - fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result { - f.say_str( - "classified pipeline", - self.commands.iter().map(|c| c.debug(source)).join(" | "), - ) - } -} - -impl HasSpan for ClassifiedPipeline { - fn span(&self) -> Span { - self.commands.span - } -} - -#[derive(Debug, Clone, Eq, PartialEq)] -pub(crate) enum ClassifiedCommand { - #[allow(unused)] - Expr(TokenNode), - Internal(InternalCommand), - #[allow(unused)] - Dynamic(Spanned), - External(ExternalCommand), -} - -impl FormatDebug for ClassifiedCommand { - fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result { - match self { - ClassifiedCommand::Expr(expr) => expr.fmt_debug(f, source), - ClassifiedCommand::Internal(internal) => internal.fmt_debug(f, source), - ClassifiedCommand::Dynamic(dynamic) => dynamic.fmt_debug(f, source), - ClassifiedCommand::External(external) => external.fmt_debug(f, source), - } - } -} - -impl HasSpan for ClassifiedCommand { - fn span(&self) -> Span { - match self { - ClassifiedCommand::Expr(node) => node.span(), - ClassifiedCommand::Internal(command) => command.span(), - ClassifiedCommand::Dynamic(call) => call.span, - ClassifiedCommand::External(command) => command.span(), - } - } -} - -#[derive(new, Debug, Clone, Eq, PartialEq)] -pub(crate) struct InternalCommand { - pub(crate) name: String, - pub(crate) name_tag: Tag, - pub(crate) args: Spanned, -} - -impl HasSpan for InternalCommand { - fn span(&self) -> Span { - let start = self.name_tag.span; - - start.until(self.args.span) - } -} - -impl FormatDebug for InternalCommand { - fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result { - f.say("internal", self.args.debug(source)) - } -} - -#[derive(new, Debug, Eq, PartialEq)] -pub(crate) struct DynamicCommand { - pub(crate) args: hir::Call, -} - -impl InternalCommand { - pub(crate) fn run( - self, - context: &mut Context, - input: ClassifiedInputStream, - source: Text, - ) -> Result { - if log_enabled!(log::Level::Trace) { - trace!(target: "nu::run::internal", "->"); - trace!(target: "nu::run::internal", "{}", self.name); - trace!(target: "nu::run::internal", "{}", self.args.debug(&source)); - } - - let objects: InputStream = trace_stream!(target: "nu::trace_stream::internal", source: source, "input" = input.objects); - - let command = context.expect_command(&self.name); - - let result = { - context.run_command( - command, - self.name_tag.clone(), - self.args.item, - &source, - objects, - ) - }; - - let result = trace_out_stream!(target: "nu::trace_stream::internal", source: source, "output" = result); - let mut result = result.values; - let mut context = context.clone(); - - let stream = async_stream! { - let mut soft_errs: Vec = vec![]; - let mut yielded = false; - - while let Some(item) = result.next().await { - match item { - Ok(ReturnSuccess::Action(action)) => match action { - CommandAction::ChangePath(path) => { - context.shell_manager.set_path(path); - } - CommandAction::Exit => std::process::exit(0), // TODO: save history.txt - CommandAction::Error(err) => { - context.error(err); - break; - } - CommandAction::EnterHelpShell(value) => { - match value { - Tagged { - item: Value::Primitive(Primitive::String(cmd)), - tag, - } => { - context.shell_manager.insert_at_current(Box::new( - HelpShell::for_command( - Value::string(cmd).tagged(tag), - &context.registry(), - ).unwrap(), - )); - } - _ => { - context.shell_manager.insert_at_current(Box::new( - HelpShell::index(&context.registry()).unwrap(), - )); - } - } - } - CommandAction::EnterValueShell(value) => { - context - .shell_manager - .insert_at_current(Box::new(ValueShell::new(value))); - } - CommandAction::EnterShell(location) => { - context.shell_manager.insert_at_current(Box::new( - FilesystemShell::with_location(location, context.registry().clone()).unwrap(), - )); - } - CommandAction::PreviousShell => { - context.shell_manager.prev(); - } - CommandAction::NextShell => { - context.shell_manager.next(); - } - CommandAction::LeaveShell => { - context.shell_manager.remove_at_current(); - if context.shell_manager.is_empty() { - std::process::exit(0); // TODO: save history.txt - } - } - }, - - Ok(ReturnSuccess::Value(v)) => { - yielded = true; - yield Ok(v); - } - - Ok(ReturnSuccess::DebugValue(v)) => { - yielded = true; - - let doc = v.item.pretty_doc(); - let mut buffer = termcolor::Buffer::ansi(); - - doc.render_raw( - context.with_host(|host| host.width() - 5), - &mut crate::parser::debug::TermColored::new(&mut buffer), - ).unwrap(); - - let value = String::from_utf8_lossy(buffer.as_slice()); - - yield Ok(Value::string(value).tagged_unknown()) - } - - Err(err) => { - context.error(err); - break; - } - } - } - }; - - Ok(stream.to_input_stream()) - } -} - -#[derive(Debug, Clone, Eq, PartialEq)] -pub(crate) struct ExternalCommand { - pub(crate) name: String, - - pub(crate) name_tag: Tag, - pub(crate) args: Spanned>>, -} - -impl FormatDebug for ExternalCommand { - fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result { - write!(f, "{}", self.name)?; - - if self.args.item.len() > 0 { - write!(f, " ")?; - write!(f, "{}", self.args.iter().map(|i| i.debug(source)).join(" "))?; - } - - Ok(()) - } -} - -impl HasSpan for ExternalCommand { - fn span(&self) -> Span { - self.name_tag.span.until(self.args.span) - } -} - -#[derive(Debug)] -pub(crate) enum StreamNext { - Last, - External, - Internal, -} - -impl ExternalCommand { - pub(crate) async fn run( - self, - context: &mut Context, - input: ClassifiedInputStream, - stream_next: StreamNext, - ) -> Result { - let stdin = input.stdin; - let inputs: Vec> = input.objects.into_vec().await; - - trace!(target: "nu::run::external", "-> {}", self.name); - trace!(target: "nu::run::external", "inputs = {:?}", inputs); - - let mut arg_string = format!("{}", self.name); - for arg in &self.args.item { - arg_string.push_str(&arg); - } - - trace!(target: "nu::run::external", "command = {:?}", self.name); - - let mut process; - if arg_string.contains("$it") { - let input_strings = inputs - .iter() - .map(|i| { - i.as_string().map_err(|_| { - let arg = self.args.iter().find(|arg| arg.item.contains("$it")); - if let Some(arg) = arg { - ShellError::labeled_error( - "External $it needs string data", - "given row instead of string data", - arg.tag(), - ) - } else { - ShellError::labeled_error( - "$it needs string data", - "given something else", - self.name_tag.clone(), - ) - } - }) - }) - .collect::, ShellError>>()?; - - let commands = input_strings.iter().map(|i| { - let args = self.args.iter().filter_map(|arg| { - if arg.chars().all(|c| c.is_whitespace()) { - None - } else { - Some(arg.replace("$it", &i)) - } - }); - - format!("{} {}", self.name, itertools::join(args, " ")) - }); - - process = Exec::shell(itertools::join(commands, " && ")) - } else { - process = Exec::cmd(&self.name); - for arg in &self.args.item { - let arg_chars: Vec<_> = arg.chars().collect(); - if arg_chars.len() > 1 - && arg_chars[0] == '"' - && arg_chars[arg_chars.len() - 1] == '"' - { - // quoted string - let new_arg: String = arg_chars[1..arg_chars.len() - 1].iter().collect(); - process = process.arg(new_arg); - } else { - process = process.arg(arg.item.clone()); - } - } - } - - process = process.cwd(context.shell_manager.path()); - - trace!(target: "nu::run::external", "cwd = {:?}", context.shell_manager.path()); - - let mut process = match stream_next { - StreamNext::Last => process, - StreamNext::External | StreamNext::Internal => { - process.stdout(subprocess::Redirection::Pipe) - } - }; - - trace!(target: "nu::run::external", "set up stdout pipe"); - - if let Some(stdin) = stdin { - process = process.stdin(stdin); - } - - trace!(target: "nu::run::external", "set up stdin pipe"); - trace!(target: "nu::run::external", "built process {:?}", process); - - let popen = process.popen(); - - trace!(target: "nu::run::external", "next = {:?}", stream_next); - - let name_tag = self.name_tag.clone(); - if let Ok(mut popen) = popen { - match stream_next { - StreamNext::Last => { - let _ = popen.detach(); - loop { - match popen.poll() { - None => { - let _ = std::thread::sleep(std::time::Duration::new(0, 100000000)); - } - _ => { - let _ = popen.terminate(); - break; - } - } - } - Ok(ClassifiedInputStream::new()) - } - StreamNext::External => { - let _ = popen.detach(); - let stdout = popen.stdout.take().unwrap(); - Ok(ClassifiedInputStream::from_stdout(stdout)) - } - StreamNext::Internal => { - let _ = popen.detach(); - let stdout = popen.stdout.take().unwrap(); - let file = futures::io::AllowStdIo::new(stdout); - let stream = Framed::new(file, LinesCodec {}); - let stream = - stream.map(move |line| Value::string(line.unwrap()).tagged(&name_tag)); - Ok(ClassifiedInputStream::from_input_stream( - stream.boxed() as BoxStream<'static, Tagged> - )) - } - } - } else { - return Err(ShellError::labeled_error( - "Command not found", - "command not found", - name_tag, - )); - } - } -} diff --git a/src/commands/classified/dynamic.rs b/src/commands/classified/dynamic.rs new file mode 100644 index 000000000..8e6e7d651 --- /dev/null +++ b/src/commands/classified/dynamic.rs @@ -0,0 +1,7 @@ +use crate::parser::hir; +use derive_new::new; + +#[derive(new, Debug, Eq, PartialEq)] +pub(crate) struct Command { + pub(crate) args: hir::Call, +} diff --git a/src/commands/classified/external.rs b/src/commands/classified/external.rs new file mode 100644 index 000000000..c6dfe6c1f --- /dev/null +++ b/src/commands/classified/external.rs @@ -0,0 +1,223 @@ +use super::ClassifiedInputStream; +use crate::prelude::*; +use bytes::{BufMut, BytesMut}; +use futures::stream::StreamExt; +use futures_codec::{Decoder, Encoder, Framed}; +use itertools::Itertools; +use log::trace; +use std::fmt; +use std::io::{Error, ErrorKind}; +use subprocess::Exec; + +/// A simple `Codec` implementation that splits up data into lines. +pub struct LinesCodec {} + +impl Encoder for LinesCodec { + type Item = String; + type Error = Error; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + dst.put(item); + Ok(()) + } +} + +impl Decoder for LinesCodec { + type Item = String; + type Error = Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + match src.iter().position(|b| b == &b'\n') { + Some(pos) if !src.is_empty() => { + let buf = src.split_to(pos + 1); + String::from_utf8(buf.to_vec()) + .map(Some) + .map_err(|e| Error::new(ErrorKind::InvalidData, e)) + } + _ if !src.is_empty() => { + let drained = src.take(); + String::from_utf8(drained.to_vec()) + .map(Some) + .map_err(|e| Error::new(ErrorKind::InvalidData, e)) + } + _ => Ok(None), + } + } +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) struct Command { + pub(crate) name: String, + + pub(crate) name_tag: Tag, + pub(crate) args: Spanned>>, +} + +impl FormatDebug for Command { + fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result { + write!(f, "{}", self.name)?; + + if self.args.item.len() > 0 { + write!(f, " ")?; + write!(f, "{}", self.args.iter().map(|i| i.debug(source)).join(" "))?; + } + + Ok(()) + } +} + +impl HasSpan for Command { + fn span(&self) -> Span { + self.name_tag.span.until(self.args.span) + } +} + +#[derive(Debug)] +pub(crate) enum StreamNext { + Last, + External, + Internal, +} + +impl Command { + pub(crate) async fn run( + self, + context: &mut Context, + input: ClassifiedInputStream, + stream_next: StreamNext, + ) -> Result { + let stdin = input.stdin; + let inputs: Vec> = input.objects.into_vec().await; + + trace!(target: "nu::run::external", "-> {}", self.name); + trace!(target: "nu::run::external", "inputs = {:?}", inputs); + + let mut arg_string = format!("{}", self.name); + for arg in &self.args.item { + arg_string.push_str(&arg); + } + + trace!(target: "nu::run::external", "command = {:?}", self.name); + + let mut process; + if arg_string.contains("$it") { + let input_strings = inputs + .iter() + .map(|i| { + i.as_string().map_err(|_| { + let arg = self.args.iter().find(|arg| arg.item.contains("$it")); + if let Some(arg) = arg { + ShellError::labeled_error( + "External $it needs string data", + "given row instead of string data", + arg.tag(), + ) + } else { + ShellError::labeled_error( + "$it needs string data", + "given something else", + self.name_tag.clone(), + ) + } + }) + }) + .collect::, ShellError>>()?; + + let commands = input_strings.iter().map(|i| { + let args = self.args.iter().filter_map(|arg| { + if arg.chars().all(|c| c.is_whitespace()) { + None + } else { + Some(arg.replace("$it", &i)) + } + }); + + format!("{} {}", self.name, itertools::join(args, " ")) + }); + + process = Exec::shell(itertools::join(commands, " && ")) + } else { + process = Exec::cmd(&self.name); + for arg in &self.args.item { + let arg_chars: Vec<_> = arg.chars().collect(); + if arg_chars.len() > 1 + && arg_chars[0] == '"' + && arg_chars[arg_chars.len() - 1] == '"' + { + // quoted string + let new_arg: String = arg_chars[1..arg_chars.len() - 1].iter().collect(); + process = process.arg(new_arg); + } else { + process = process.arg(arg.item.clone()); + } + } + } + + process = process.cwd(context.shell_manager.path()); + + trace!(target: "nu::run::external", "cwd = {:?}", context.shell_manager.path()); + + let mut process = match stream_next { + StreamNext::Last => process, + StreamNext::External | StreamNext::Internal => { + process.stdout(subprocess::Redirection::Pipe) + } + }; + + trace!(target: "nu::run::external", "set up stdout pipe"); + + if let Some(stdin) = stdin { + process = process.stdin(stdin); + } + + trace!(target: "nu::run::external", "set up stdin pipe"); + trace!(target: "nu::run::external", "built process {:?}", process); + + let popen = process.popen(); + + trace!(target: "nu::run::external", "next = {:?}", stream_next); + + let name_tag = self.name_tag.clone(); + if let Ok(mut popen) = popen { + match stream_next { + StreamNext::Last => { + let _ = popen.detach(); + loop { + match popen.poll() { + None => { + let _ = std::thread::sleep(std::time::Duration::new(0, 100000000)); + } + _ => { + let _ = popen.terminate(); + break; + } + } + } + Ok(ClassifiedInputStream::new()) + } + StreamNext::External => { + let _ = popen.detach(); + let stdout = popen.stdout.take().unwrap(); + Ok(ClassifiedInputStream::from_stdout(stdout)) + } + StreamNext::Internal => { + let _ = popen.detach(); + let stdout = popen.stdout.take().unwrap(); + let file = futures::io::AllowStdIo::new(stdout); + let stream = Framed::new(file, LinesCodec {}); + let stream = + stream.map(move |line| Value::string(line.unwrap()).tagged(&name_tag)); + Ok(ClassifiedInputStream::from_input_stream( + stream.boxed() as BoxStream<'static, Tagged> + )) + } + } + } else { + return Err(ShellError::labeled_error( + "Command not found", + "command not found", + name_tag, + )); + } + } +} diff --git a/src/commands/classified/internal.rs b/src/commands/classified/internal.rs new file mode 100644 index 000000000..51f14e8f5 --- /dev/null +++ b/src/commands/classified/internal.rs @@ -0,0 +1,151 @@ +use crate::parser::hir; +use crate::prelude::*; +use derive_new::new; +use log::{log_enabled, trace}; +use std::fmt; + +use super::ClassifiedInputStream; + +#[derive(new, Debug, Clone, Eq, PartialEq)] +pub(crate) struct Command { + pub(crate) name: String, + pub(crate) name_tag: Tag, + pub(crate) args: Spanned, +} + +impl HasSpan for Command { + fn span(&self) -> Span { + let start = self.name_tag.span; + + start.until(self.args.span) + } +} + +impl FormatDebug for Command { + fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result { + f.say("internal", self.args.debug(source)) + } +} + +impl Command { + pub(crate) fn run( + self, + context: &mut Context, + input: ClassifiedInputStream, + source: Text, + ) -> Result { + if log_enabled!(log::Level::Trace) { + trace!(target: "nu::run::internal", "->"); + trace!(target: "nu::run::internal", "{}", self.name); + trace!(target: "nu::run::internal", "{}", self.args.debug(&source)); + } + + let objects: InputStream = trace_stream!(target: "nu::trace_stream::internal", source: source, "input" = input.objects); + + let command = context.expect_command(&self.name); + + let result = { + context.run_command( + command, + self.name_tag.clone(), + self.args.item, + &source, + objects, + ) + }; + + let result = trace_out_stream!(target: "nu::trace_stream::internal", source: source, "output" = result); + let mut result = result.values; + let mut context = context.clone(); + + let stream = async_stream! { + let mut soft_errs: Vec = vec![]; + let mut yielded = false; + + while let Some(item) = result.next().await { + match item { + Ok(ReturnSuccess::Action(action)) => match action { + CommandAction::ChangePath(path) => { + context.shell_manager.set_path(path); + } + CommandAction::Exit => std::process::exit(0), // TODO: save history.txt + CommandAction::Error(err) => { + context.error(err); + break; + } + CommandAction::EnterHelpShell(value) => { + match value { + Tagged { + item: Value::Primitive(Primitive::String(cmd)), + tag, + } => { + context.shell_manager.insert_at_current(Box::new( + HelpShell::for_command( + Value::string(cmd).tagged(tag), + &context.registry(), + ).unwrap(), + )); + } + _ => { + context.shell_manager.insert_at_current(Box::new( + HelpShell::index(&context.registry()).unwrap(), + )); + } + } + } + CommandAction::EnterValueShell(value) => { + context + .shell_manager + .insert_at_current(Box::new(ValueShell::new(value))); + } + CommandAction::EnterShell(location) => { + context.shell_manager.insert_at_current(Box::new( + FilesystemShell::with_location(location, context.registry().clone()).unwrap(), + )); + } + CommandAction::PreviousShell => { + context.shell_manager.prev(); + } + CommandAction::NextShell => { + context.shell_manager.next(); + } + CommandAction::LeaveShell => { + context.shell_manager.remove_at_current(); + if context.shell_manager.is_empty() { + std::process::exit(0); // TODO: save history.txt + } + } + }, + + Ok(ReturnSuccess::Value(v)) => { + yielded = true; + yield Ok(v); + } + + Ok(ReturnSuccess::DebugValue(v)) => { + yielded = true; + + let doc = v.item.pretty_doc(); + let mut buffer = termcolor::Buffer::ansi(); + + doc.render_raw( + context.with_host(|host| host.width() - 5), + &mut crate::parser::debug::TermColored::new(&mut buffer), + ).unwrap(); + + let value = String::from_utf8_lossy(buffer.as_slice()); + + yield Ok(Value::string(value).tagged_unknown()) + } + + Err(err) => { + context.error(err); + break; + } + } + } + }; + + Ok(stream.to_input_stream()) + } +} diff --git a/src/commands/classified/mod.rs b/src/commands/classified/mod.rs new file mode 100644 index 000000000..b0b320823 --- /dev/null +++ b/src/commands/classified/mod.rs @@ -0,0 +1,75 @@ +use crate::parser::{hir, TokenNode}; +use crate::prelude::*; +use std::fmt; + +mod dynamic; +mod external; +mod internal; +mod pipeline; + +#[allow(unused_imports)] +pub(crate) use dynamic::Command as DynamicCommand; +#[allow(unused_imports)] +pub(crate) use external::{Command as ExternalCommand, StreamNext}; +pub(crate) use internal::Command as InternalCommand; +pub(crate) use pipeline::Pipeline as ClassifiedPipeline; + +pub(crate) struct ClassifiedInputStream { + pub(crate) objects: InputStream, + pub(crate) stdin: Option, +} + +impl ClassifiedInputStream { + pub(crate) fn new() -> ClassifiedInputStream { + ClassifiedInputStream { + objects: vec![Value::nothing().tagged(Tag::unknown())].into(), + stdin: None, + } + } + + pub(crate) fn from_input_stream(stream: impl Into) -> ClassifiedInputStream { + ClassifiedInputStream { + objects: stream.into(), + stdin: None, + } + } + + pub(crate) fn from_stdout(stdout: std::fs::File) -> ClassifiedInputStream { + ClassifiedInputStream { + objects: VecDeque::new().into(), + stdin: Some(stdout), + } + } +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) enum ClassifiedCommand { + #[allow(unused)] + Expr(TokenNode), + Internal(InternalCommand), + #[allow(unused)] + Dynamic(Spanned), + External(ExternalCommand), +} + +impl FormatDebug for ClassifiedCommand { + fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result { + match self { + ClassifiedCommand::Expr(expr) => expr.fmt_debug(f, source), + ClassifiedCommand::Internal(internal) => internal.fmt_debug(f, source), + ClassifiedCommand::Dynamic(dynamic) => dynamic.fmt_debug(f, source), + ClassifiedCommand::External(external) => external.fmt_debug(f, source), + } + } +} + +impl HasSpan for ClassifiedCommand { + fn span(&self) -> Span { + match self { + ClassifiedCommand::Expr(node) => node.span(), + ClassifiedCommand::Internal(command) => command.span(), + ClassifiedCommand::Dynamic(call) => call.span, + ClassifiedCommand::External(command) => command.span(), + } + } +} diff --git a/src/commands/classified/pipeline.rs b/src/commands/classified/pipeline.rs new file mode 100644 index 000000000..037ba5c12 --- /dev/null +++ b/src/commands/classified/pipeline.rs @@ -0,0 +1,91 @@ +use super::{ClassifiedCommand, ClassifiedInputStream, StreamNext}; +use crate::prelude::*; +use std::fmt; +use std::sync::atomic::Ordering; + +#[derive(Debug, Clone)] +pub(crate) struct Pipeline { + pub(crate) commands: Spanned>, +} + +impl FormatDebug for Pipeline { + fn fmt_debug(&self, f: &mut DebugFormatter, source: &str) -> fmt::Result { + f.say_str( + "classified pipeline", + self.commands.iter().map(|c| c.debug(source)).join(" | "), + ) + } +} + +impl HasSpan for Pipeline { + fn span(&self) -> Span { + self.commands.span + } +} + +impl Pipeline { + pub(crate) async fn run( + self, + ctx: &mut Context, + mut input: ClassifiedInputStream, + line: &str, + ) -> Result<(), ShellError> { + let mut iter = self.commands.item.into_iter().peekable(); + + loop { + let item: Option = iter.next(); + let next: Option<&ClassifiedCommand> = iter.peek(); + + input = match (item, next) { + (Some(ClassifiedCommand::Dynamic(_)), _) + | (_, Some(ClassifiedCommand::Dynamic(_))) => { + return Err(ShellError::unimplemented("Dynamic commands")) + } + + (Some(ClassifiedCommand::Expr(_)), _) | (_, Some(ClassifiedCommand::Expr(_))) => { + return Err(ShellError::unimplemented("Expression-only commands")) + } + + (Some(ClassifiedCommand::Internal(left)), _) => { + let stream = left.run(ctx, input, Text::from(line))?; + ClassifiedInputStream::from_input_stream(stream) + } + + (Some(ClassifiedCommand::External(left)), Some(ClassifiedCommand::External(_))) => { + left.run(ctx, input, StreamNext::External).await? + } + + (Some(ClassifiedCommand::External(left)), Some(_)) => { + left.run(ctx, input, StreamNext::Internal).await? + } + + (Some(ClassifiedCommand::External(left)), None) => { + left.run(ctx, input, StreamNext::Last).await? + } + + (None, _) => break, + }; + } + + use futures::stream::TryStreamExt; + let mut output_stream: OutputStream = input.objects.into(); + loop { + match output_stream.try_next().await { + Ok(Some(ReturnSuccess::Value(Tagged { + item: Value::Error(e), + .. + }))) => return Err(e), + Ok(Some(_item)) => { + if ctx.ctrl_c.load(Ordering::SeqCst) { + break; + } + } + _ => { + break; + } + } + } + + Ok(()) + } +}