nushell/src/commands/classified.rs

344 lines
11 KiB
Rust
Raw Normal View History

2019-06-07 08:34:42 +02:00
use crate::commands::command::Sink;
2019-06-22 18:31:51 +02:00
use crate::parser::{registry::Args, Span, Spanned, TokenNode};
2019-05-22 09:12:03 +02:00
use crate::prelude::*;
2019-05-26 08:54:41 +02:00
use bytes::{BufMut, BytesMut};
2019-06-15 19:52:55 +02:00
use futures::stream::StreamExt;
2019-05-26 08:54:41 +02:00
use futures_codec::{Decoder, Encoder, Framed};
2019-06-22 05:43:37 +02:00
use log::{log_enabled, trace};
2019-05-26 08:54:41 +02:00
use std::io::{Error, ErrorKind};
2019-06-13 23:47:25 +02:00
use std::path::PathBuf;
2019-05-22 09:12:03 +02:00
use std::sync::Arc;
use subprocess::Exec;
2019-06-22 05:43:37 +02:00
2019-05-25 21:07:52 +02:00
/// A simple `Codec` implementation that splits up data into lines.
pub struct LinesCodec {}
impl Encoder for LinesCodec {
type Item = String;
type Error = Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
dst.put(item);
Ok(())
}
}
impl Decoder for LinesCodec {
type Item = String;
type Error = Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match src.iter().position(|b| b == &b'\n') {
Some(pos) if !src.is_empty() => {
let buf = src.split_to(pos + 1);
String::from_utf8(buf.to_vec())
.map(Some)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))
}
_ if !src.is_empty() => {
let drained = src.take();
String::from_utf8(drained.to_vec())
.map(Some)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))
}
2019-05-26 08:54:41 +02:00
_ => Ok(None),
2019-05-25 21:07:52 +02:00
}
}
}
2019-05-22 09:12:03 +02:00
2019-05-24 09:29:16 +02:00
crate struct ClassifiedInputStream {
crate objects: InputStream,
crate stdin: Option<std::fs::File>,
}
impl ClassifiedInputStream {
crate fn new() -> ClassifiedInputStream {
ClassifiedInputStream {
objects: VecDeque::new().into(),
2019-05-24 09:29:16 +02:00
stdin: None,
}
}
crate fn from_input_stream(stream: impl Into<InputStream>) -> ClassifiedInputStream {
ClassifiedInputStream {
objects: stream.into(),
2019-05-24 09:29:16 +02:00
stdin: None,
}
}
crate fn from_stdout(stdout: std::fs::File) -> ClassifiedInputStream {
ClassifiedInputStream {
objects: VecDeque::new().into(),
2019-05-24 09:29:16 +02:00
stdin: Some(stdout),
}
}
}
2019-05-26 08:54:41 +02:00
crate struct ClassifiedPipeline {
crate commands: Vec<ClassifiedCommand>,
}
2019-05-22 09:12:03 +02:00
crate enum ClassifiedCommand {
#[allow(unused)]
2019-06-22 05:43:37 +02:00
Expr(TokenNode),
2019-05-22 09:12:03 +02:00
Internal(InternalCommand),
2019-06-07 08:34:42 +02:00
Sink(SinkCommand),
2019-05-22 09:12:03 +02:00
External(ExternalCommand),
}
2019-07-09 06:31:26 +02:00
impl ClassifiedCommand {
pub fn span(&self) -> Span {
match self {
ClassifiedCommand::Expr(token) => token.span(),
ClassifiedCommand::Internal(internal) => internal.name_span.into(),
ClassifiedCommand::Sink(sink) => sink.name_span.into(),
ClassifiedCommand::External(external) => external.name_span.into(),
}
}
}
2019-06-07 08:34:42 +02:00
crate struct SinkCommand {
crate command: Arc<dyn Sink>,
2019-06-08 00:35:07 +02:00
crate name_span: Option<Span>,
2019-06-07 08:34:42 +02:00
crate args: Args,
}
impl SinkCommand {
2019-07-08 18:44:53 +02:00
crate fn run(
self,
context: &mut Context,
input: Vec<Spanned<Value>>,
) -> Result<(), ShellError> {
2019-06-08 00:35:07 +02:00
context.run_sink(self.command, self.name_span.clone(), self.args, input)
2019-06-07 08:34:42 +02:00
}
}
2019-05-22 09:12:03 +02:00
crate struct InternalCommand {
crate command: Arc<dyn Command>,
2019-06-08 00:35:07 +02:00
crate name_span: Option<Span>,
crate args: Args,
2019-05-22 09:12:03 +02:00
}
2019-05-24 09:29:16 +02:00
impl InternalCommand {
crate async fn run(
self,
context: &mut Context,
input: ClassifiedInputStream,
) -> Result<InputStream, ShellError> {
if log_enabled!(log::Level::Trace) {
2019-06-22 05:43:37 +02:00
trace!("->");
trace!("{}", self.command.name());
trace!("{:?}", self.args.debug());
}
let objects: InputStream = trace_stream!("input" = input.objects);
2019-06-22 05:43:37 +02:00
let result =
2019-06-22 05:43:37 +02:00
context.run_command(self.command, self.name_span.clone(), self.args, objects)?;
let mut result = result.values;
2019-06-15 19:52:55 +02:00
let mut stream = VecDeque::new();
while let Some(item) = result.next().await {
match item? {
ReturnSuccess::Action(action) => match action {
2019-06-15 19:52:55 +02:00
CommandAction::ChangePath(path) => {
context.env.lock().unwrap().back_mut().map(|x| {
x.path = path;
x
});
2019-06-14 00:49:16 +02:00
}
2019-06-15 19:52:55 +02:00
CommandAction::Enter(obj) => {
let new_env = Environment {
obj: obj,
path: PathBuf::from("/"),
};
context.env.lock().unwrap().push_back(new_env);
}
CommandAction::Exit => match context.env.lock().unwrap().pop_back() {
Some(Environment {
2019-07-08 18:44:53 +02:00
obj:
Spanned {
item: Value::Filesystem,
..
},
2019-06-15 19:52:55 +02:00
..
}) => std::process::exit(0),
None => std::process::exit(-1),
_ => {}
},
},
2019-05-24 09:29:16 +02:00
ReturnSuccess::Value(v) => {
2019-06-15 19:52:55 +02:00
stream.push_back(v);
}
}
}
Ok(stream.into())
2019-05-24 09:29:16 +02:00
}
}
2019-05-22 09:12:03 +02:00
crate struct ExternalCommand {
crate name: String,
2019-06-15 06:56:18 +02:00
#[allow(unused)]
2019-06-15 06:20:58 +02:00
crate name_span: Option<Span>,
crate args: Vec<Spanned<String>>,
2019-05-22 09:12:03 +02:00
}
2019-05-24 09:29:16 +02:00
crate enum StreamNext {
Last,
External,
Internal,
}
2019-05-24 09:29:16 +02:00
impl ExternalCommand {
crate async fn run(
self,
context: &mut Context,
2019-05-24 21:35:22 +02:00
input: ClassifiedInputStream,
stream_next: StreamNext,
2019-05-24 09:29:16 +02:00
) -> Result<ClassifiedInputStream, ShellError> {
let stdin = input.stdin;
2019-07-08 18:44:53 +02:00
let inputs: Vec<Spanned<Value>> = input.objects.into_vec().await;
2019-07-09 06:31:26 +02:00
let name_span = self.name_span.clone();
trace!("-> {}", self.name);
trace!("inputs = {:?}", inputs);
2019-06-22 05:43:37 +02:00
let mut arg_string = format!("{}", self.name);
for arg in &self.args {
2019-06-22 05:43:37 +02:00
arg_string.push_str(&arg);
}
2019-06-02 09:51:54 +02:00
let mut process;
2019-06-18 04:04:34 +02:00
2019-06-02 09:51:54 +02:00
#[cfg(windows)]
{
process = Exec::shell(&self.name);
if arg_string.contains("$it") {
let mut first = true;
2019-06-22 05:43:37 +02:00
2019-06-02 09:51:54 +02:00
for i in &inputs {
2019-06-15 06:20:58 +02:00
if i.as_string().is_err() {
let mut span = None;
for arg in &self.args {
if arg.item.contains("$it") {
span = Some(arg.span);
}
}
if let Some(span) = span {
return Err(ShellError::labeled_error(
"External $it needs string data",
"given object instead of string data",
span,
));
} else {
return Err(ShellError::string("Error: $it needs string data"));
}
}
2019-06-02 09:51:54 +02:00
if !first {
process = process.arg("&&");
process = process.arg(&self.name);
} else {
first = false;
}
for arg in &self.args {
2019-06-22 05:43:37 +02:00
if arg.chars().all(|c| c.is_whitespace()) {
continue;
}
2019-06-02 09:51:54 +02:00
process = process.arg(&arg.replace("$it", &i.as_string().unwrap()));
}
}
} else {
for arg in &self.args {
2019-06-15 23:41:26 +02:00
process = process.arg(arg.item.clone());
2019-06-02 09:51:54 +02:00
}
}
2019-06-02 09:51:54 +02:00
}
#[cfg(not(windows))]
{
let mut new_arg_string = self.name.to_string();
if arg_string.contains("$it") {
let mut first = true;
for i in &inputs {
2019-06-15 06:20:58 +02:00
if i.as_string().is_err() {
let mut span = None;
for arg in &self.args {
if arg.item.contains("$it") {
span = Some(arg.span);
}
}
2019-06-19 07:51:24 +02:00
return Err(ShellError::maybe_labeled_error(
"External $it needs string data",
"given object instead of string data",
span,
));
2019-06-15 06:20:58 +02:00
}
2019-06-02 09:51:54 +02:00
if !first {
2019-06-03 02:37:16 +02:00
new_arg_string.push_str("&&");
2019-06-02 09:51:54 +02:00
new_arg_string.push_str(&self.name);
} else {
first = false;
}
for arg in &self.args {
2019-06-22 05:43:37 +02:00
if arg.chars().all(|c| c.is_whitespace()) {
continue;
}
2019-06-02 09:51:54 +02:00
new_arg_string.push_str(" ");
new_arg_string.push_str(&arg.replace("$it", &i.as_string().unwrap()));
}
}
} else {
for arg in &self.args {
new_arg_string.push_str(" ");
new_arg_string.push_str(&arg);
}
}
2019-06-18 04:04:34 +02:00
2019-06-02 09:51:54 +02:00
process = Exec::shell(new_arg_string);
2019-05-26 00:23:35 +02:00
}
2019-06-15 19:52:55 +02:00
process = process.cwd(context.env.lock().unwrap().front().unwrap().path());
2019-05-24 09:29:16 +02:00
let mut process = match stream_next {
StreamNext::Last => process,
StreamNext::External | StreamNext::Internal => {
process.stdout(subprocess::Redirection::Pipe)
}
};
2019-05-24 09:29:16 +02:00
if let Some(stdin) = stdin {
2019-05-24 09:29:16 +02:00
process = process.stdin(stdin);
}
let mut popen = process.popen().unwrap();
match stream_next {
StreamNext::Last => {
popen.wait()?;
Ok(ClassifiedInputStream::new())
}
StreamNext::External => {
let stdout = popen.stdout.take().unwrap();
Ok(ClassifiedInputStream::from_stdout(stdout))
}
StreamNext::Internal => {
let stdout = popen.stdout.take().unwrap();
let file = futures::io::AllowStdIo::new(stdout);
let stream = Framed::new(file, LinesCodec {});
2019-07-09 06:31:26 +02:00
let stream =
stream.map(move |line| Value::string(line.unwrap()).spanned(name_span));
Ok(ClassifiedInputStream::from_input_stream(
2019-07-09 06:31:26 +02:00
stream.boxed() as BoxStream<'static, Spanned<Value>>
))
2019-05-24 09:29:16 +02:00
}
}
}
}