2019-05-22 09:12:03 +02:00
|
|
|
use crate::prelude::*;
|
2019-05-26 08:54:41 +02:00
|
|
|
use bytes::{BufMut, BytesMut};
|
|
|
|
use futures_codec::{Decoder, Encoder, Framed};
|
|
|
|
use std::io::{Error, ErrorKind};
|
2019-05-22 09:12:03 +02:00
|
|
|
use std::sync::Arc;
|
|
|
|
use subprocess::Exec;
|
2019-05-25 21:07:52 +02:00
|
|
|
|
|
|
|
/// A simple `Codec` implementation that splits up data into lines.
|
|
|
|
pub struct LinesCodec {}
|
|
|
|
|
|
|
|
impl Encoder for LinesCodec {
|
|
|
|
type Item = String;
|
|
|
|
type Error = Error;
|
|
|
|
|
|
|
|
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
|
|
|
dst.put(item);
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Decoder for LinesCodec {
|
|
|
|
type Item = String;
|
|
|
|
type Error = Error;
|
|
|
|
|
|
|
|
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
|
|
|
match src.iter().position(|b| b == &b'\n') {
|
|
|
|
Some(pos) if !src.is_empty() => {
|
|
|
|
let buf = src.split_to(pos + 1);
|
|
|
|
String::from_utf8(buf.to_vec())
|
|
|
|
.map(Some)
|
|
|
|
.map_err(|e| Error::new(ErrorKind::InvalidData, e))
|
|
|
|
}
|
|
|
|
_ if !src.is_empty() => {
|
|
|
|
let drained = src.take();
|
|
|
|
String::from_utf8(drained.to_vec())
|
|
|
|
.map(Some)
|
|
|
|
.map_err(|e| Error::new(ErrorKind::InvalidData, e))
|
|
|
|
}
|
2019-05-26 08:54:41 +02:00
|
|
|
_ => Ok(None),
|
2019-05-25 21:07:52 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2019-05-22 09:12:03 +02:00
|
|
|
|
2019-05-24 09:29:16 +02:00
|
|
|
crate struct ClassifiedInputStream {
|
|
|
|
crate objects: InputStream,
|
|
|
|
crate stdin: Option<std::fs::File>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ClassifiedInputStream {
|
|
|
|
crate fn new() -> ClassifiedInputStream {
|
|
|
|
ClassifiedInputStream {
|
|
|
|
objects: VecDeque::new().boxed(),
|
|
|
|
stdin: None,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
crate fn from_input_stream(stream: InputStream) -> ClassifiedInputStream {
|
|
|
|
ClassifiedInputStream {
|
|
|
|
objects: stream,
|
|
|
|
stdin: None,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
crate fn from_stdout(stdout: std::fs::File) -> ClassifiedInputStream {
|
|
|
|
ClassifiedInputStream {
|
|
|
|
objects: VecDeque::new().boxed(),
|
|
|
|
stdin: Some(stdout),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-05-26 08:54:41 +02:00
|
|
|
crate struct ClassifiedPipeline {
|
|
|
|
crate commands: Vec<ClassifiedCommand>,
|
|
|
|
}
|
|
|
|
|
2019-05-22 09:12:03 +02:00
|
|
|
crate enum ClassifiedCommand {
|
|
|
|
Internal(InternalCommand),
|
|
|
|
External(ExternalCommand),
|
|
|
|
}
|
|
|
|
|
|
|
|
crate struct InternalCommand {
|
|
|
|
crate command: Arc<dyn Command>,
|
|
|
|
crate args: Vec<Value>,
|
|
|
|
}
|
|
|
|
|
2019-05-24 09:29:16 +02:00
|
|
|
impl InternalCommand {
|
|
|
|
crate async fn run(
|
|
|
|
self,
|
|
|
|
context: &mut Context,
|
|
|
|
input: ClassifiedInputStream,
|
|
|
|
) -> Result<InputStream, ShellError> {
|
|
|
|
let result = context.run_command(self.command, self.args, input.objects)?;
|
|
|
|
let env = context.env.clone();
|
|
|
|
|
|
|
|
let stream = result.filter_map(move |v| match v {
|
|
|
|
ReturnValue::Action(action) => match action {
|
|
|
|
CommandAction::ChangeCwd(cwd) => {
|
|
|
|
env.lock().unwrap().cwd = cwd;
|
|
|
|
futures::future::ready(None)
|
|
|
|
}
|
|
|
|
},
|
|
|
|
|
|
|
|
ReturnValue::Value(v) => futures::future::ready(Some(v)),
|
|
|
|
});
|
|
|
|
|
|
|
|
Ok(stream.boxed() as InputStream)
|
|
|
|
}
|
2019-05-28 08:45:18 +02:00
|
|
|
|
|
|
|
crate fn name(&self) -> &str {
|
|
|
|
self.command.name()
|
|
|
|
}
|
2019-05-24 09:29:16 +02:00
|
|
|
}
|
|
|
|
|
2019-05-22 09:12:03 +02:00
|
|
|
crate struct ExternalCommand {
|
|
|
|
crate name: String,
|
|
|
|
crate args: Vec<String>,
|
|
|
|
}
|
2019-05-24 09:29:16 +02:00
|
|
|
|
2019-05-24 20:48:33 +02:00
|
|
|
crate enum StreamNext {
|
|
|
|
Last,
|
|
|
|
External,
|
|
|
|
Internal,
|
|
|
|
}
|
|
|
|
|
2019-05-24 09:29:16 +02:00
|
|
|
impl ExternalCommand {
|
|
|
|
crate async fn run(
|
|
|
|
self,
|
|
|
|
context: &mut Context,
|
2019-05-24 21:35:22 +02:00
|
|
|
input: ClassifiedInputStream,
|
2019-05-24 20:48:33 +02:00
|
|
|
stream_next: StreamNext,
|
2019-05-24 09:29:16 +02:00
|
|
|
) -> Result<ClassifiedInputStream, ShellError> {
|
2019-05-26 00:23:35 +02:00
|
|
|
let mut cmd = self.name.clone();
|
|
|
|
for arg in self.args {
|
|
|
|
cmd.push_str(" ");
|
|
|
|
cmd.push_str(&arg);
|
|
|
|
}
|
2019-05-26 08:54:41 +02:00
|
|
|
let process = Exec::shell(&cmd).cwd(context.env.lock().unwrap().cwd());
|
2019-05-24 09:29:16 +02:00
|
|
|
|
2019-05-24 20:48:33 +02:00
|
|
|
let mut process = match stream_next {
|
|
|
|
StreamNext::Last => process,
|
|
|
|
StreamNext::External | StreamNext::Internal => {
|
|
|
|
process.stdout(subprocess::Redirection::Pipe)
|
|
|
|
}
|
|
|
|
};
|
2019-05-24 09:29:16 +02:00
|
|
|
|
|
|
|
if let Some(stdin) = input.stdin {
|
|
|
|
process = process.stdin(stdin);
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut popen = process.popen().unwrap();
|
|
|
|
|
2019-05-24 20:48:33 +02:00
|
|
|
match stream_next {
|
|
|
|
StreamNext::Last => {
|
|
|
|
popen.wait()?;
|
|
|
|
Ok(ClassifiedInputStream::new())
|
|
|
|
}
|
|
|
|
StreamNext::External => {
|
|
|
|
let stdout = popen.stdout.take().unwrap();
|
|
|
|
Ok(ClassifiedInputStream::from_stdout(stdout))
|
|
|
|
}
|
|
|
|
StreamNext::Internal => {
|
|
|
|
let stdout = popen.stdout.take().unwrap();
|
|
|
|
let file = futures::io::AllowStdIo::new(stdout);
|
|
|
|
let stream = Framed::new(file, LinesCodec {});
|
|
|
|
let stream = stream.map(|line| Value::string(line.unwrap()));
|
|
|
|
Ok(ClassifiedInputStream::from_input_stream(stream.boxed()))
|
2019-05-24 09:29:16 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2019-05-28 08:45:18 +02:00
|
|
|
|
|
|
|
crate fn name(&self) -> &str {
|
|
|
|
&self.name[..]
|
|
|
|
}
|
2019-05-24 09:29:16 +02:00
|
|
|
}
|