use crate::parser::{hir, TokenNode}; use crate::prelude::*; use bytes::{BufMut, BytesMut}; use derive_new::new; use futures::stream::StreamExt; use futures_codec::{Decoder, Encoder, Framed}; use log::{log_enabled, trace}; use std::io::{Error, ErrorKind}; use subprocess::Exec; /// A simple `Codec` implementation that splits up data into lines. pub struct LinesCodec {} impl Encoder for LinesCodec { type Item = String; type Error = Error; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { dst.put(item); Ok(()) } } impl Decoder for LinesCodec { type Item = String; type Error = Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { match src.iter().position(|b| b == &b'\n') { Some(pos) if !src.is_empty() => { let buf = src.split_to(pos + 1); String::from_utf8(buf.to_vec()) .map(Some) .map_err(|e| Error::new(ErrorKind::InvalidData, e)) } _ if !src.is_empty() => { let drained = src.take(); String::from_utf8(drained.to_vec()) .map(Some) .map_err(|e| Error::new(ErrorKind::InvalidData, e)) } _ => Ok(None), } } } pub(crate) struct ClassifiedInputStream { pub(crate) objects: InputStream, pub(crate) stdin: Option, } impl ClassifiedInputStream { pub(crate) fn new() -> ClassifiedInputStream { ClassifiedInputStream { objects: VecDeque::new().into(), stdin: None, } } pub(crate) fn from_input_stream(stream: impl Into) -> ClassifiedInputStream { ClassifiedInputStream { objects: stream.into(), stdin: None, } } pub(crate) fn from_stdout(stdout: std::fs::File) -> ClassifiedInputStream { ClassifiedInputStream { objects: VecDeque::new().into(), stdin: Some(stdout), } } } #[derive(Debug)] pub(crate) struct ClassifiedPipeline { pub(crate) commands: Vec, } #[derive(Debug, Eq, PartialEq)] pub(crate) enum ClassifiedCommand { #[allow(unused)] Expr(TokenNode), Internal(InternalCommand), #[allow(unused)] Dynamic(hir::Call), External(ExternalCommand), } #[derive(new, Debug, Eq, PartialEq)] pub(crate) struct InternalCommand { pub(crate) name: String, pub(crate) name_tag: Tag, pub(crate) args: hir::Call, } #[derive(new, Debug, Eq, PartialEq)] pub(crate) struct DynamicCommand { pub(crate) args: hir::Call, } impl InternalCommand { pub(crate) fn run( self, context: &mut Context, input: ClassifiedInputStream, source: Text, is_first_command: bool, ) -> Result { if log_enabled!(log::Level::Trace) { trace!(target: "nu::run::internal", "->"); trace!(target: "nu::run::internal", "{}", self.name); trace!(target: "nu::run::internal", "{}", self.args.debug(&source)); } let objects: InputStream = trace_stream!(target: "nu::trace_stream::internal", "input" = input.objects); let command = context.expect_command(&self.name); let result = { context.run_command( command, self.name_tag.clone(), self.args, &source, objects, is_first_command, ) }; let result = trace_out_stream!(target: "nu::trace_stream::internal", source: &source, "output" = result); let mut result = result.values; let mut context = context.clone(); let stream = async_stream! { while let Some(item) = result.next().await { match item { Ok(ReturnSuccess::Action(action)) => match action { CommandAction::ChangePath(path) => { context.shell_manager.set_path(path); } CommandAction::Exit => std::process::exit(0), // TODO: save history.txt CommandAction::EnterHelpShell(value) => { match value { Tagged { item: Value::Primitive(Primitive::String(cmd)), tag, } => { context.shell_manager.insert_at_current(Box::new( HelpShell::for_command( Value::string(cmd).tagged(tag), &context.registry(), ).unwrap(), )); } _ => { context.shell_manager.insert_at_current(Box::new( HelpShell::index(&context.registry()).unwrap(), )); } } } CommandAction::EnterValueShell(value) => { context .shell_manager .insert_at_current(Box::new(ValueShell::new(value))); } CommandAction::EnterShell(location) => { context.shell_manager.insert_at_current(Box::new( FilesystemShell::with_location(location, context.registry().clone()).unwrap(), )); } CommandAction::PreviousShell => { context.shell_manager.prev(); } CommandAction::NextShell => { context.shell_manager.next(); } CommandAction::LeaveShell => { context.shell_manager.remove_at_current(); if context.shell_manager.is_empty() { std::process::exit(0); // TODO: save history.txt } } }, Ok(ReturnSuccess::Value(v)) => { yield Ok(v); } Err(x) => { yield Ok(Value::Error(x).tagged_unknown()); break; } } } }; Ok(stream.to_input_stream()) } } #[derive(Debug, Eq, PartialEq)] pub(crate) struct ExternalCommand { pub(crate) name: String, pub(crate) name_tag: Tag, pub(crate) args: Vec>, } #[derive(Debug)] pub(crate) enum StreamNext { Last, External, Internal, } impl ExternalCommand { pub(crate) async fn run( self, context: &mut Context, input: ClassifiedInputStream, stream_next: StreamNext, ) -> Result { let stdin = input.stdin; let inputs: Vec> = input.objects.into_vec().await; trace!(target: "nu::run::external", "-> {}", self.name); trace!(target: "nu::run::external", "inputs = {:?}", inputs); let mut arg_string = format!("{}", self.name); for arg in &self.args { arg_string.push_str(&arg); } trace!(target: "nu::run::external", "command = {:?}", self.name); let mut process; if arg_string.contains("$it") { let input_strings = inputs .iter() .map(|i| { i.as_string().map_err(|_| { let arg = self.args.iter().find(|arg| arg.item.contains("$it")); if let Some(arg) = arg { ShellError::labeled_error( "External $it needs string data", "given row instead of string data", arg.tag(), ) } else { ShellError::labeled_error( "$it needs string data", "given something else", self.name_tag.clone(), ) } }) }) .collect::, ShellError>>()?; let commands = input_strings.iter().map(|i| { let args = self.args.iter().filter_map(|arg| { if arg.chars().all(|c| c.is_whitespace()) { None } else { Some(arg.replace("$it", &i)) } }); format!("{} {}", self.name, itertools::join(args, " ")) }); process = Exec::shell(itertools::join(commands, " && ")) } else { process = Exec::cmd(&self.name); for arg in &self.args { let arg_chars: Vec<_> = arg.chars().collect(); if arg_chars.len() > 1 && arg_chars[0] == '"' && arg_chars[arg_chars.len() - 1] == '"' { // quoted string let new_arg: String = arg_chars[1..arg_chars.len() - 1].iter().collect(); process = process.arg(new_arg); } else { process = process.arg(arg.item.clone()); } } } process = process.cwd(context.shell_manager.path()); trace!(target: "nu::run::external", "cwd = {:?}", context.shell_manager.path()); let mut process = match stream_next { StreamNext::Last => process, StreamNext::External | StreamNext::Internal => { process.stdout(subprocess::Redirection::Pipe) } }; trace!(target: "nu::run::external", "set up stdout pipe"); if let Some(stdin) = stdin { process = process.stdin(stdin); } trace!(target: "nu::run::external", "set up stdin pipe"); trace!(target: "nu::run::external", "built process {:?}", process); let popen = process.popen(); trace!(target: "nu::run::external", "next = {:?}", stream_next); let name_tag = self.name_tag.clone(); if let Ok(mut popen) = popen { match stream_next { StreamNext::Last => { let _ = popen.detach(); loop { match popen.poll() { None => { let _ = std::thread::sleep(std::time::Duration::new(0, 100000000)); } _ => { let _ = popen.terminate(); break; } } } Ok(ClassifiedInputStream::new()) } StreamNext::External => { let _ = popen.detach(); let stdout = popen.stdout.take().unwrap(); Ok(ClassifiedInputStream::from_stdout(stdout)) } StreamNext::Internal => { let _ = popen.detach(); let stdout = popen.stdout.take().unwrap(); let file = futures::io::AllowStdIo::new(stdout); let stream = Framed::new(file, LinesCodec {}); let stream = stream.map(move |line| Value::string(line.unwrap()).tagged(&name_tag)); Ok(ClassifiedInputStream::from_input_stream( stream.boxed() as BoxStream<'static, Tagged> )) } } } else { return Err(ShellError::labeled_error( "Command not found", "command not found", name_tag, )); } } }