Fix sink plugins

This commit is contained in:
Jonathan Turner 2019-08-09 19:54:21 +12:00
parent 83030094e0
commit cabd5bf009
4 changed files with 69 additions and 5 deletions

View File

@ -4,7 +4,7 @@ use crate::commands::classified::{
StreamNext,
};
use crate::commands::plugin::JsonRpc;
use crate::commands::plugin::PluginCommand;
use crate::commands::plugin::{PluginCommand, PluginSink};
use crate::commands::static_command;
use crate::context::Context;
crate use crate::errors::ShellError;
@ -72,6 +72,11 @@ fn load_plugin(path: &std::path::Path, context: &mut Context) -> Result<(), Shel
))]);
Ok(())
} else {
let fname = fname.to_string();
let name = params.name.clone();
context.add_commands(vec![static_command(PluginSink::new(
name, fname, params,
))]);
Ok(())
}
}

View File

@ -40,9 +40,12 @@ pub fn autoview(
} = input[0usize]
{
let binary = context.expect_command("binaryview");
binary.run(raw.with_input(input), &context.commands).await;
let result = binary.run(raw.with_input(input), &context.commands).await.unwrap();
result.collect::<Vec<_>>().await;
} else if is_single_text_value(&input) {
//view_text_value(&input[0], &raw.call_info.source_map);
let text = context.expect_command("textview");
let result = text.run(raw.with_input(input), &context.commands).await.unwrap();
result.collect::<Vec<_>>().await;
} else if equal_shapes(&input) {
let table = context.expect_command("table");
let result = table.run(raw.with_input(input), &context.commands).await.unwrap();

View File

@ -251,3 +251,56 @@ pub fn filter_plugin(
Ok(stream.to_output_stream())
}
#[derive(new)]
pub struct PluginSink {
name: String,
path: String,
config: registry::Signature,
}
impl StaticCommand for PluginSink {
fn name(&self) -> &str {
&self.name
}
fn signature(&self) -> registry::Signature {
self.config.clone()
}
fn run(
&self,
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
sink_plugin(self.path.clone(), args, registry)
}
}
pub fn sink_plugin(
path: String,
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
//use subprocess::Exec;
let args = args.evaluate_once(registry)?;
let call_info = args.call_info.clone();
let stream = async_stream_block! {
let input: Vec<Tagged<Value>> = args.input.values.collect().await;
let request = JsonRpc::new("sink", (call_info.clone(), input));
let request_raw = serde_json::to_string(&request).unwrap();
let mut tmpfile = tempfile::NamedTempFile::new().unwrap();
let _ = writeln!(tmpfile, "{}", request_raw);
let _ = tmpfile.flush();
let mut child = std::process::Command::new(path)
.arg(tmpfile.path())
.spawn()
.expect("Failed to spawn child process");
let _ = child.wait();
};
Ok(OutputStream::new(stream))
}

View File

@ -84,8 +84,11 @@ impl Shell for FilesystemShell {
s.span(),
));
} else {
//FIXME
return Err(ShellError::string(e.to_string()));
return Err(ShellError::labeled_error(
e.to_string(),
e.to_string(),
args.name_span(),
));
}
}
Ok(o) => o,