diff --git a/src/cli.rs b/src/cli.rs index 5802c7a31..e1e760799 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -4,7 +4,7 @@ use crate::commands::classified::{ StreamNext, }; use crate::commands::plugin::JsonRpc; -use crate::commands::plugin::PluginCommand; +use crate::commands::plugin::{PluginCommand, PluginSink}; use crate::commands::static_command; use crate::context::Context; crate use crate::errors::ShellError; @@ -72,6 +72,11 @@ fn load_plugin(path: &std::path::Path, context: &mut Context) -> Result<(), Shel ))]); Ok(()) } else { + let fname = fname.to_string(); + let name = params.name.clone(); + context.add_commands(vec![static_command(PluginSink::new( + name, fname, params, + ))]); Ok(()) } } diff --git a/src/commands/autoview.rs b/src/commands/autoview.rs index ee0e7b3e0..cfbcbe728 100644 --- a/src/commands/autoview.rs +++ b/src/commands/autoview.rs @@ -40,9 +40,12 @@ pub fn autoview( } = input[0usize] { let binary = context.expect_command("binaryview"); - binary.run(raw.with_input(input), &context.commands).await; + let result = binary.run(raw.with_input(input), &context.commands).await.unwrap(); + result.collect::>().await; } else if is_single_text_value(&input) { - //view_text_value(&input[0], &raw.call_info.source_map); + let text = context.expect_command("textview"); + let result = text.run(raw.with_input(input), &context.commands).await.unwrap(); + result.collect::>().await; } else if equal_shapes(&input) { let table = context.expect_command("table"); let result = table.run(raw.with_input(input), &context.commands).await.unwrap(); diff --git a/src/commands/plugin.rs b/src/commands/plugin.rs index 03a84ec93..9d597a859 100644 --- a/src/commands/plugin.rs +++ b/src/commands/plugin.rs @@ -251,3 +251,56 @@ pub fn filter_plugin( Ok(stream.to_output_stream()) } + +#[derive(new)] +pub struct PluginSink { + name: String, + path: String, + config: registry::Signature, +} + +impl StaticCommand for PluginSink { + fn name(&self) -> &str { + &self.name + } + + fn signature(&self) -> registry::Signature { + self.config.clone() + } + + fn run( + &self, + args: CommandArgs, + registry: &CommandRegistry, + ) -> Result { + sink_plugin(self.path.clone(), args, registry) + } +} + +pub fn sink_plugin( + path: String, + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { + //use subprocess::Exec; + let args = args.evaluate_once(registry)?; + let call_info = args.call_info.clone(); + + let stream = async_stream_block! { + let input: Vec> = args.input.values.collect().await; + + let request = JsonRpc::new("sink", (call_info.clone(), input)); + let request_raw = serde_json::to_string(&request).unwrap(); + let mut tmpfile = tempfile::NamedTempFile::new().unwrap(); + let _ = writeln!(tmpfile, "{}", request_raw); + let _ = tmpfile.flush(); + + let mut child = std::process::Command::new(path) + .arg(tmpfile.path()) + .spawn() + .expect("Failed to spawn child process"); + + let _ = child.wait(); + }; + Ok(OutputStream::new(stream)) +} diff --git a/src/shell/filesystem_shell.rs b/src/shell/filesystem_shell.rs index 00003a6ff..42728d39a 100644 --- a/src/shell/filesystem_shell.rs +++ b/src/shell/filesystem_shell.rs @@ -84,8 +84,11 @@ impl Shell for FilesystemShell { s.span(), )); } else { - //FIXME - return Err(ShellError::string(e.to_string())); + return Err(ShellError::labeled_error( + e.to_string(), + e.to_string(), + args.name_span(), + )); } } Ok(o) => o,