Update plugin protocol for begin, and create new sys plugin

This commit is contained in:
Jonathan Turner
2019-07-27 19:45:00 +12:00
parent 25cc69914c
commit a09361698e
19 changed files with 402 additions and 76 deletions

View File

@@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use uuid::Uuid;
#[derive(Deserialize, Serialize, Debug)]
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct CallInfo {
pub args: Args,
pub source_map: SourceMap,

View File

@@ -78,46 +78,64 @@ pub fn filter_plugin(path: String, args: CommandArgs) -> Result<OutputStream, Sh
.spawn()
.expect("Failed to spawn child process");
{
let stdin = child.stdin.as_mut().expect("Failed to open stdin");
let stdout = child.stdout.as_mut().expect("Failed to open stdout");
let mut reader = BufReader::new(stdout);
let request = JsonRpc::new("begin_filter", args.call_info);
let request_raw = serde_json::to_string(&request).unwrap();
stdin.write(format!("{}\n", request_raw).as_bytes())?;
let mut input = String::new();
match reader.read_line(&mut input) {
Ok(_) => {
let response = serde_json::from_str::<NuResult>(&input);
match response {
Ok(NuResult::response { params }) => match params {
Ok(_) => {}
Err(e) => {
return Err(e);
}
},
Err(e) => {
return Err(ShellError::string(format!(
"Error while processing input: {:?} {}",
e, input
)));
}
}
}
_ => {}
}
}
let mut bos: VecDeque<Spanned<Value>> = VecDeque::new();
bos.push_back(Value::Primitive(Primitive::BeginningOfStream).spanned_unknown());
let mut eos: VecDeque<Spanned<Value>> = VecDeque::new();
eos.push_back(Value::Primitive(Primitive::EndOfStream).spanned_unknown());
let stream = args
.input
.values
let call_info = args.call_info;
let stream = bos
.chain(args.input.values)
.chain(eos)
.map(move |v| match v {
Spanned {
item: Value::Primitive(Primitive::BeginningOfStream),
..
} => {
let stdin = child.stdin.as_mut().expect("Failed to open stdin");
let stdout = child.stdout.as_mut().expect("Failed to open stdout");
let mut reader = BufReader::new(stdout);
let request = JsonRpc::new("begin_filter", call_info.clone());
let request_raw = serde_json::to_string(&request).unwrap();
let _ = stdin.write(format!("{}\n", request_raw).as_bytes()); // TODO: Handle error
let mut input = String::new();
match reader.read_line(&mut input) {
Ok(_) => {
let response = serde_json::from_str::<NuResult>(&input);
match response {
Ok(NuResult::response { params }) => match params {
Ok(params) => params,
Err(e) => {
let mut result = VecDeque::new();
result.push_back(ReturnValue::Err(e));
result
}
},
Err(e) => {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::string(format!(
"Error while processing begin_filter response: {:?} {}",
e, input
))));
result
}
}
}
Err(e) => {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::string(format!(
"Error while reading begin_filter response: {:?}",
e
))));
result
}
}
}
Spanned {
item: Value::Primitive(Primitive::EndOfStream),
..
@@ -154,7 +172,7 @@ pub fn filter_plugin(path: String, args: CommandArgs) -> Result<OutputStream, Sh
Err(e) => {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::string(format!(
"Error while processing input: {:?} {}",
"Error while processing end_filter response: {:?} {}",
e, input
))));
result
@@ -164,7 +182,7 @@ pub fn filter_plugin(path: String, args: CommandArgs) -> Result<OutputStream, Sh
Err(e) => {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::string(format!(
"Error while processing input: {:?}",
"Error while reading end_filter: {:?}",
e
))));
result
@@ -197,7 +215,7 @@ pub fn filter_plugin(path: String, args: CommandArgs) -> Result<OutputStream, Sh
Err(e) => {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::string(format!(
"Error while processing input: {:?} {}",
"Error while processing filter response: {:?} {}",
e, input
))));
result
@@ -207,7 +225,7 @@ pub fn filter_plugin(path: String, args: CommandArgs) -> Result<OutputStream, Sh
Err(e) => {
let mut result = VecDeque::new();
result.push_back(Err(ShellError::string(format!(
"Error while processing input: {:?}",
"Error while reading filter response: {:?}",
e
))));
result

View File

@@ -9,6 +9,7 @@ pub fn value_to_json_value(v: &Value) -> serde_json::Value {
}
Value::Primitive(Primitive::Date(d)) => serde_json::Value::String(d.to_string()),
Value::Primitive(Primitive::EndOfStream) => serde_json::Value::Null,
Value::Primitive(Primitive::BeginningOfStream) => serde_json::Value::Null,
Value::Primitive(Primitive::Float(f)) => {
serde_json::Value::Number(serde_json::Number::from_f64(f.into_inner()).unwrap())
}

View File

@@ -9,6 +9,9 @@ pub fn value_to_toml_value(v: &Value) -> toml::Value {
Value::Primitive(Primitive::EndOfStream) => {
toml::Value::String("<End of Stream>".to_string())
}
Value::Primitive(Primitive::BeginningOfStream) => {
toml::Value::String("<Beginning of Stream>".to_string())
}
Value::Primitive(Primitive::Float(f)) => toml::Value::Float(f.into_inner()),
Value::Primitive(Primitive::Int(i)) => toml::Value::Integer(*i),
Value::Primitive(Primitive::Nothing) => toml::Value::String("<Nothing>".to_string()),

View File

@@ -9,6 +9,7 @@ pub fn value_to_yaml_value(v: &Value) -> serde_yaml::Value {
}
Value::Primitive(Primitive::Date(d)) => serde_yaml::Value::String(d.to_string()),
Value::Primitive(Primitive::EndOfStream) => serde_yaml::Value::Null,
Value::Primitive(Primitive::BeginningOfStream) => serde_yaml::Value::Null,
Value::Primitive(Primitive::Float(f)) => {
serde_yaml::Value::Number(serde_yaml::Number::from(f.into_inner()))
}