Pipe external to internal

Each line is a string object
This commit is contained in:
Yehuda Katz
2019-05-24 11:48:33 -07:00
parent f9fb353c5c
commit 9f8d2a4de5
15 changed files with 391 additions and 97 deletions

View File

@@ -1,4 +1,6 @@
use crate::prelude::*;
use futures::compat::AsyncRead01CompatExt;
use futures_codec::{Framed, LinesCodec};
use std::sync::Arc;
use subprocess::Exec;
@@ -107,20 +109,29 @@ crate struct ExternalCommand {
crate args: Vec<String>,
}
crate enum StreamNext {
Last,
External,
Internal,
}
impl ExternalCommand {
crate async fn run(
self,
context: &mut Context,
mut input: ClassifiedInputStream,
stream_next: bool,
stream_next: StreamNext,
) -> Result<ClassifiedInputStream, ShellError> {
let mut process = Exec::shell(&self.name)
.args(&self.args)
.cwd(context.env.lock().unwrap().cwd());
if stream_next {
process = process.stdout(subprocess::Redirection::Pipe)
}
let mut process = match stream_next {
StreamNext::Last => process,
StreamNext::External | StreamNext::Internal => {
process.stdout(subprocess::Redirection::Pipe)
}
};
if let Some(stdin) = input.stdin {
process = process.stdin(stdin);
@@ -128,15 +139,31 @@ impl ExternalCommand {
let mut popen = process.popen().unwrap();
if stream_next {
match &popen.stdout {
None => unreachable!(),
Some(stdout) => Ok(ClassifiedInputStream::from_stdout(stdout.try_clone()?)),
match stream_next {
StreamNext::Last => {
popen.wait()?;
Ok(ClassifiedInputStream::new())
}
StreamNext::External => {
let stdout = popen.stdout.take().unwrap();
Ok(ClassifiedInputStream::from_stdout(stdout))
}
StreamNext::Internal => {
let stdout = popen.stdout.take().unwrap();
let file = futures::io::AllowStdIo::new(stdout);
let stream = Framed::new(file, LinesCodec {});
let stream = stream.map(|line| Value::string(line.unwrap()));
Ok(ClassifiedInputStream::from_input_stream(stream.boxed()))
}
} else {
popen.stdin.take();
popen.wait()?;
Ok(ClassifiedInputStream::new())
}
// if stream_next {
// let stdout = popen.stdout.take().unwrap();
// Ok(ClassifiedInputStream::from_stdout(stdout))
// } else {
// // popen.stdin.take();
// popen.wait()?;
// Ok(ClassifiedInputStream::new())
// }
}
}