nushell/src/commands/plugin.rs

287 lines
10 KiB
Rust
Raw Normal View History

2019-08-15 07:02:02 +02:00
use crate::commands::WholeStreamCommand;
2019-06-27 18:47:24 +02:00
use crate::errors::ShellError;
2019-07-16 09:08:35 +02:00
use crate::parser::registry;
2019-06-27 18:47:24 +02:00
use crate::prelude::*;
2019-07-16 09:08:35 +02:00
use derive_new::new;
2019-06-27 18:47:24 +02:00
use serde::{self, Deserialize, Serialize};
use std::io::prelude::*;
use std::io::BufReader;
use std::io::Write;
2019-06-27 18:47:24 +02:00
#[derive(Debug, Serialize, Deserialize)]
pub struct JsonRpc<T> {
jsonrpc: String,
pub method: String,
pub params: T,
}
2019-06-27 18:47:24 +02:00
impl<T> JsonRpc<T> {
pub fn new<U: Into<String>>(method: U, params: T) -> Self {
JsonRpc {
jsonrpc: "2.0".into(),
method: method.into(),
params,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "method")]
#[allow(non_camel_case_types)]
pub enum NuResult {
2019-07-02 09:56:20 +02:00
response {
params: Result<VecDeque<ReturnValue>, ShellError>,
},
2019-06-27 18:47:24 +02:00
}
2019-07-16 09:08:35 +02:00
#[derive(new)]
pub struct PluginCommand {
name: String,
path: String,
2019-08-02 21:15:07 +02:00
config: registry::Signature,
2019-07-16 09:08:35 +02:00
}
2019-08-15 07:02:02 +02:00
impl WholeStreamCommand for PluginCommand {
2019-08-02 21:15:07 +02:00
fn name(&self) -> &str {
&self.name
}
fn signature(&self) -> registry::Signature {
self.config.clone()
}
2019-07-24 00:22:11 +02:00
fn run(
&self,
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
filter_plugin(self.path.clone(), args, registry)
2019-07-16 09:08:35 +02:00
}
}
2019-07-24 00:22:11 +02:00
pub fn filter_plugin(
path: String,
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
2019-06-27 18:47:24 +02:00
let mut child = std::process::Command::new(path)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()
.expect("Failed to spawn child process");
2019-08-01 03:58:42 +02:00
let mut bos: VecDeque<Tagged<Value>> = VecDeque::new();
bos.push_back(Value::Primitive(Primitive::BeginningOfStream).tagged_unknown());
2019-08-01 03:58:42 +02:00
let mut eos: VecDeque<Tagged<Value>> = VecDeque::new();
eos.push_back(Value::Primitive(Primitive::EndOfStream).tagged_unknown());
2019-06-27 18:47:24 +02:00
2019-08-09 06:51:21 +02:00
let call_info = args.call_info.clone();
let stream = bos
.chain(args.input.values)
2019-06-27 18:47:24 +02:00
.chain(eos)
.map(move |v| match v {
2019-08-01 03:58:42 +02:00
Tagged {
item: Value::Primitive(Primitive::BeginningOfStream),
..
} => {
let stdin = child.stdin.as_mut().expect("Failed to open stdin");
let stdout = child.stdout.as_mut().expect("Failed to open stdout");
let mut reader = BufReader::new(stdout);
let request = JsonRpc::new("begin_filter", call_info.clone());
let request_raw = serde_json::to_string(&request).unwrap();
let _ = stdin.write(format!("{}\n", request_raw).as_bytes()); // TODO: Handle error
let mut input = String::new();
match reader.read_line(&mut input) {
Ok(_) => {
let response = serde_json::from_str::<NuResult>(&input);
match response {
Ok(NuResult::response { params }) => match params {
Ok(params) => params,
Err(e) => {
let mut result = VecDeque::new();
result.push_back(ReturnValue::Err(e));
result
}
},
Err(e) => {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::string(format!(
"Error while processing begin_filter response: {:?} {}",
e, input
))));
result
}
}
}
Err(e) => {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::string(format!(
"Error while reading begin_filter response: {:?}",
e
))));
result
}
}
}
2019-08-01 03:58:42 +02:00
Tagged {
2019-07-08 18:44:53 +02:00
item: Value::Primitive(Primitive::EndOfStream),
..
} => {
let stdin = child.stdin.as_mut().expect("Failed to open stdin");
let stdout = child.stdout.as_mut().expect("Failed to open stdout");
2019-06-27 18:47:24 +02:00
2019-07-26 20:40:00 +02:00
let mut reader = BufReader::new(stdout);
let request: JsonRpc<std::vec::Vec<Value>> = JsonRpc::new("end_filter", vec![]);
2019-06-27 18:47:24 +02:00
let request_raw = serde_json::to_string(&request).unwrap();
let _ = stdin.write(format!("{}\n", request_raw).as_bytes()); // TODO: Handle error
2019-06-27 18:47:24 +02:00
2019-07-26 20:40:00 +02:00
let mut input = String::new();
match reader.read_line(&mut input) {
Ok(_) => {
let response = serde_json::from_str::<NuResult>(&input);
match response {
Ok(NuResult::response { params }) => match params {
Ok(params) => {
let request: JsonRpc<std::vec::Vec<Value>> =
JsonRpc::new("quit", vec![]);
let request_raw = serde_json::to_string(&request).unwrap();
let _ = stdin.write(format!("{}\n", request_raw).as_bytes()); // TODO: Handle error
params
}
Err(e) => {
let mut result = VecDeque::new();
result.push_back(ReturnValue::Err(e));
result
}
},
Err(e) => {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::string(format!(
"Error while processing end_filter response: {:?} {}",
2019-07-26 20:40:00 +02:00
e, input
))));
result
}
}
}
Err(e) => {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::string(format!(
"Error while reading end_filter: {:?}",
2019-07-26 20:40:00 +02:00
e
))));
result
}
}
2019-06-27 18:47:24 +02:00
}
_ => {
let stdin = child.stdin.as_mut().expect("Failed to open stdin");
let stdout = child.stdout.as_mut().expect("Failed to open stdout");
2019-06-27 18:47:24 +02:00
let mut reader = BufReader::new(stdout);
let request = JsonRpc::new("filter", v);
let request_raw = serde_json::to_string(&request).unwrap();
let _ = stdin.write(format!("{}\n", request_raw).as_bytes()); // TODO: Handle error
2019-06-27 18:47:24 +02:00
let mut input = String::new();
match reader.read_line(&mut input) {
Ok(_) => {
let response = serde_json::from_str::<NuResult>(&input);
match response {
2019-07-02 09:56:20 +02:00
Ok(NuResult::response { params }) => match params {
Ok(params) => params,
Err(e) => {
let mut result = VecDeque::new();
2019-07-13 04:07:06 +02:00
result.push_back(ReturnValue::Err(e));
2019-07-02 09:56:20 +02:00
result
}
},
Err(e) => {
2019-06-27 18:47:24 +02:00
let mut result = VecDeque::new();
result.push_back(Err(ShellError::string(format!(
"Error while processing filter response: {:?} {}",
e, input
2019-06-27 18:47:24 +02:00
))));
result
}
}
}
2019-07-02 09:56:20 +02:00
Err(e) => {
2019-06-27 18:47:24 +02:00
let mut result = VecDeque::new();
result.push_back(Err(ShellError::string(format!(
"Error while reading filter response: {:?}",
e
2019-06-27 18:47:24 +02:00
))));
result
}
}
}
})
.flatten();
Ok(stream.to_output_stream())
2019-06-27 18:47:24 +02:00
}
2019-08-09 09:54:21 +02:00
#[derive(new)]
pub struct PluginSink {
name: String,
path: String,
config: registry::Signature,
}
2019-08-15 07:02:02 +02:00
impl WholeStreamCommand for PluginSink {
2019-08-09 09:54:21 +02:00
fn name(&self) -> &str {
&self.name
}
fn signature(&self) -> registry::Signature {
self.config.clone()
}
fn run(
&self,
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
sink_plugin(self.path.clone(), args, registry)
}
}
pub fn sink_plugin(
path: String,
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
//use subprocess::Exec;
let args = args.evaluate_once(registry)?;
let call_info = args.call_info.clone();
let stream = async_stream_block! {
let input: Vec<Tagged<Value>> = args.input.values.collect().await;
let request = JsonRpc::new("sink", (call_info.clone(), input));
let request_raw = serde_json::to_string(&request).unwrap();
let mut tmpfile = tempfile::NamedTempFile::new().unwrap();
let _ = writeln!(tmpfile, "{}", request_raw);
let _ = tmpfile.flush();
let mut child = std::process::Command::new(path)
.arg(tmpfile.path())
.spawn()
.expect("Failed to spawn child process");
let _ = child.wait();
};
Ok(OutputStream::new(stream))
}