From dcf2e8ce9a8d08b4003a3877369399bd9fd26582 Mon Sep 17 00:00:00 2001 From: Devyn Cairns Date: Wed, 20 Mar 2024 01:57:22 -0700 Subject: [PATCH] =?UTF-8?q?Refactor=20PipelineDataHeader=20=E2=87=84=20Pip?= =?UTF-8?q?elineData=20mapping=20(#12248)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Description It was a bit ugly that when new `EngineCall`s or response types were added, they needed to be added to multiple places with redundant code just to change the types, even if they didn't have any stream content. This fixes that and locates all of that logic in one place. # User-Facing Changes None # Tests + Formatting - :green_circle: `toolkit fmt` - :green_circle: `toolkit clippy` - :green_circle: `toolkit test` - :green_circle: `toolkit test stdlib` --- .../nu-plugin/src/plugin/interface/engine.rs | 127 +++++++----------- .../nu-plugin/src/plugin/interface/plugin.rs | 107 ++++++--------- crates/nu-plugin/src/protocol/mod.rs | 92 +++++++++++++ 3 files changed, 180 insertions(+), 146 deletions(-) diff --git a/crates/nu-plugin/src/plugin/interface/engine.rs b/crates/nu-plugin/src/plugin/interface/engine.rs index 547c57a1f8..2cceecbda3 100644 --- a/crates/nu-plugin/src/plugin/interface/engine.rs +++ b/crates/nu-plugin/src/plugin/interface/engine.rs @@ -252,64 +252,58 @@ impl InterfaceManager for EngineInterfaceManager { }) } PluginInput::Stream(message) => self.consume_stream_message(message), - PluginInput::Call(id, call) => match call { - // We just let the receiver handle it rather than trying to store signature here - // or something - PluginCall::Signature => self.send_plugin_call(ReceivedPluginCall::Signature { - engine: self.interface_for_context(id), - }), - // Set up the streams from the input and reformat to a ReceivedPluginCall - PluginCall::Run(CallInfo { - name, - mut call, - input, - }) => { - let interface = self.interface_for_context(id); - // If there's an error with initialization of the input stream, just send - // the error response rather than failing here - match self.read_pipeline_data(input, None) { - Ok(input) => { - // Deserialize custom values in the arguments - if let Err(err) = deserialize_call_args(&mut call) { - return interface.write_response(Err(err))?.write(); - } - // Send the plugin call to the receiver - self.send_plugin_call(ReceivedPluginCall::Run { - engine: interface, - call: CallInfo { name, call, input }, - }) + PluginInput::Call(id, call) => { + let interface = self.interface_for_context(id); + // Read streams in the input + let call = match call.map_data(|input| self.read_pipeline_data(input, None)) { + Ok(call) => call, + Err(err) => { + // If there's an error with initialization of the input stream, just send + // the error response rather than failing here + return interface.write_response(Err(err))?.write(); + } + }; + match call { + // We just let the receiver handle it rather than trying to store signature here + // or something + PluginCall::Signature => { + self.send_plugin_call(ReceivedPluginCall::Signature { engine: interface }) + } + // Parse custom values and send a ReceivedPluginCall + PluginCall::Run(mut call_info) => { + // Deserialize custom values in the arguments + if let Err(err) = deserialize_call_args(&mut call_info.call) { + return interface.write_response(Err(err))?.write(); } - err @ Err(_) => interface.write_response(err)?.write(), + // Send the plugin call to the receiver + self.send_plugin_call(ReceivedPluginCall::Run { + engine: interface, + call: call_info, + }) + } + // Send request with the custom value + PluginCall::CustomValueOp(custom_value, op) => { + self.send_plugin_call(ReceivedPluginCall::CustomValueOp { + engine: interface, + custom_value, + op, + }) } } - // Send request with the custom value - PluginCall::CustomValueOp(custom_value, op) => { - self.send_plugin_call(ReceivedPluginCall::CustomValueOp { - engine: self.interface_for_context(id), - custom_value, - op, - }) - } - }, + } PluginInput::Goodbye => { // Remove the plugin call sender so it hangs up drop(self.plugin_call_sender.take()); Ok(()) } PluginInput::EngineCallResponse(id, response) => { - let response = match response { - EngineCallResponse::Error(err) => EngineCallResponse::Error(err), - EngineCallResponse::Config(config) => EngineCallResponse::Config(config), - EngineCallResponse::ValueMap(map) => EngineCallResponse::ValueMap(map), - EngineCallResponse::PipelineData(header) => { + let response = response + .map_data(|header| self.read_pipeline_data(header, None)) + .unwrap_or_else(|err| { // If there's an error with initializing this stream, change it to an engine // call error response, but send it anyway - match self.read_pipeline_data(header, None) { - Ok(data) => EngineCallResponse::PipelineData(data), - Err(err) => EngineCallResponse::Error(err), - } - } - }; + EngineCallResponse::Error(err) + }); self.send_engine_call_response(id, response) } } @@ -442,36 +436,13 @@ impl EngineInterface { let (tx, rx) = mpsc::channel(); // Convert the call into one with a header and handle the stream, if necessary - let (call, writer) = match call { - EngineCall::EvalClosure { - closure, - positional, - input, - redirect_stdout, - redirect_stderr, - } => { - let (header, writer) = self.init_write_pipeline_data(input)?; - ( - EngineCall::EvalClosure { - closure, - positional, - input: header, - redirect_stdout, - redirect_stderr, - }, - writer, - ) - } - // These calls have no pipeline data, so they're just the same on both sides - EngineCall::GetConfig => (EngineCall::GetConfig, Default::default()), - EngineCall::GetPluginConfig => (EngineCall::GetPluginConfig, Default::default()), - EngineCall::GetEnvVar(name) => (EngineCall::GetEnvVar(name), Default::default()), - EngineCall::GetEnvVars => (EngineCall::GetEnvVars, Default::default()), - EngineCall::GetCurrentDir => (EngineCall::GetCurrentDir, Default::default()), - EngineCall::AddEnvVar(name, value) => { - (EngineCall::AddEnvVar(name, value), Default::default()) - } - }; + let mut writer = None; + + let call = call.map_data(|input| { + let (input_header, input_writer) = self.init_write_pipeline_data(input)?; + writer = Some(input_writer); + Ok(input_header) + })?; // Register the channel self.state @@ -486,7 +457,7 @@ impl EngineInterface { self.write(PluginOutput::EngineCall { context, id, call })?; self.flush()?; - Ok((writer, rx)) + Ok((writer.unwrap_or_default(), rx)) } /// Perform an engine call. Input and output streams are handled. diff --git a/crates/nu-plugin/src/plugin/interface/plugin.rs b/crates/nu-plugin/src/plugin/interface/plugin.rs index 3e678c9440..5d1288166f 100644 --- a/crates/nu-plugin/src/plugin/interface/plugin.rs +++ b/crates/nu-plugin/src/plugin/interface/plugin.rs @@ -460,15 +460,8 @@ impl InterfaceManager for PluginInterfaceManager { }, PluginOutput::CallResponse(id, response) => { // Handle reading the pipeline data, if any - let response = match response { - PluginCallResponse::Error(err) => PluginCallResponse::Error(err), - PluginCallResponse::Signature(sigs) => PluginCallResponse::Signature(sigs), - PluginCallResponse::Ordering(ordering) => { - PluginCallResponse::Ordering(ordering) - } - PluginCallResponse::PipelineData(data) => { - // If there's an error with initializing this stream, change it to a plugin - // error response, but send it anyway + let response = response + .map_data(|data| { let ctrlc = self.get_ctrlc(id)?; // Register the streams in the response @@ -476,12 +469,13 @@ impl InterfaceManager for PluginInterfaceManager { self.recv_stream_started(id, stream_id); } - match self.read_pipeline_data(data, ctrlc.as_ref()) { - Ok(data) => PluginCallResponse::PipelineData(data), - Err(err) => PluginCallResponse::Error(err.into()), - } - } - }; + self.read_pipeline_data(data, ctrlc.as_ref()) + }) + .unwrap_or_else(|err| { + // If there's an error with initializing this stream, change it to a plugin + // error response, but send it anyway + PluginCallResponse::Error(err.into()) + }); let result = self.send_plugin_call_response(id, response); if result.is_ok() { // When a call ends, it releases a lock on the GC @@ -493,36 +487,19 @@ impl InterfaceManager for PluginInterfaceManager { } PluginOutput::EngineCall { context, id, call } => { // Handle reading the pipeline data, if any - let ctrlc = self.get_ctrlc(context)?; - let call = match call { - EngineCall::GetConfig => Ok(EngineCall::GetConfig), - EngineCall::GetPluginConfig => Ok(EngineCall::GetPluginConfig), - EngineCall::GetEnvVar(name) => Ok(EngineCall::GetEnvVar(name)), - EngineCall::GetEnvVars => Ok(EngineCall::GetEnvVars), - EngineCall::GetCurrentDir => Ok(EngineCall::GetCurrentDir), - EngineCall::AddEnvVar(name, value) => Ok(EngineCall::AddEnvVar(name, value)), - EngineCall::EvalClosure { - closure, - mut positional, - input, - redirect_stdout, - redirect_stderr, - } => { - // Add source to any plugin custom values in the arguments - for arg in positional.iter_mut() { - PluginCustomValue::add_source(arg, &self.state.source); - } - self.read_pipeline_data(input, ctrlc.as_ref()).map(|input| { - EngineCall::EvalClosure { - closure, - positional, - input, - redirect_stdout, - redirect_stderr, - } - }) + let mut call = call.map_data(|input| { + let ctrlc = self.get_ctrlc(context)?; + self.read_pipeline_data(input, ctrlc.as_ref()) + }); + // Add source to any plugin custom values in the arguments + if let Ok(EngineCall::EvalClosure { + ref mut positional, .. + }) = call + { + for arg in positional.iter_mut() { + PluginCustomValue::add_source(arg, &self.state.source); } - }; + } match call { Ok(call) => self.send_engine_call(context, id, call), // If there was an error with setting up the call, just write the error @@ -603,16 +580,12 @@ impl PluginInterface { response: EngineCallResponse, ) -> Result<(), ShellError> { // Set up any stream if necessary - let (response, writer) = match response { - EngineCallResponse::PipelineData(data) => { - let (header, writer) = self.init_write_pipeline_data(data)?; - (EngineCallResponse::PipelineData(header), Some(writer)) - } - // No pipeline data: - EngineCallResponse::Error(err) => (EngineCallResponse::Error(err), None), - EngineCallResponse::Config(config) => (EngineCallResponse::Config(config), None), - EngineCallResponse::ValueMap(map) => (EngineCallResponse::ValueMap(map), None), - }; + let mut writer = None; + let response = response.map_data(|data| { + let (data_header, data_writer) = self.init_write_pipeline_data(data)?; + writer = Some(data_writer); + Ok(data_header) + })?; // Write the response, including the pipeline data header if present self.write(PluginInput::EngineCallResponse(id, response))?; @@ -753,20 +726,18 @@ impl PluginInterface { let resp = handle_engine_call(engine_call, context).unwrap_or_else(EngineCallResponse::Error); // Handle stream - let (resp, writer) = match resp { - EngineCallResponse::Error(error) => (EngineCallResponse::Error(error), None), - EngineCallResponse::Config(config) => (EngineCallResponse::Config(config), None), - EngineCallResponse::ValueMap(map) => (EngineCallResponse::ValueMap(map), None), - EngineCallResponse::PipelineData(data) => { - match self.init_write_pipeline_data(data) { - Ok((header, writer)) => { - (EngineCallResponse::PipelineData(header), Some(writer)) - } - // just respond with the error if we fail to set it up - Err(err) => (EngineCallResponse::Error(err), None), - } - } - }; + let mut writer = None; + let resp = resp + .map_data(|data| { + let (data_header, data_writer) = self.init_write_pipeline_data(data)?; + writer = Some(data_writer); + Ok(data_header) + }) + .unwrap_or_else(|err| { + // If we fail to set up the response write, change to an error response here + writer = None; + EngineCallResponse::Error(err) + }); // Write the response, then the stream self.write(PluginInput::EngineCallResponse(engine_call_id, resp))?; self.flush()?; diff --git a/crates/nu-plugin/src/protocol/mod.rs b/crates/nu-plugin/src/protocol/mod.rs index d6ad73e857..6a636f268a 100644 --- a/crates/nu-plugin/src/protocol/mod.rs +++ b/crates/nu-plugin/src/protocol/mod.rs @@ -43,6 +43,20 @@ pub struct CallInfo { pub input: D, } +impl CallInfo { + /// Convert the type of `input` from `D` to `T`. + pub(crate) fn map_data( + self, + f: impl FnOnce(D) -> Result, + ) -> Result, ShellError> { + Ok(CallInfo { + name: self.name, + call: self.call, + input: f(self.input)?, + }) + } +} + /// The initial (and perhaps only) part of any [`nu_protocol::PipelineData`] sent over the wire. /// /// This may contain a single value, or may initiate a stream with a [`StreamId`]. @@ -128,6 +142,23 @@ pub enum PluginCall { CustomValueOp(Spanned, CustomValueOp), } +impl PluginCall { + /// Convert the data type from `D` to `T`. The function will not be called if the variant does + /// not contain data. + pub(crate) fn map_data( + self, + f: impl FnOnce(D) -> Result, + ) -> Result, ShellError> { + Ok(match self { + PluginCall::Signature => PluginCall::Signature, + PluginCall::Run(call) => PluginCall::Run(call.map_data(f)?), + PluginCall::CustomValueOp(custom_value, op) => { + PluginCall::CustomValueOp(custom_value, op) + } + }) + } +} + /// Operations supported for custom values. #[derive(Serialize, Deserialize, Debug, Clone)] pub enum CustomValueOp { @@ -337,6 +368,22 @@ pub enum PluginCallResponse { PipelineData(D), } +impl PluginCallResponse { + /// Convert the data type from `D` to `T`. The function will not be called if the variant does + /// not contain data. + pub(crate) fn map_data( + self, + f: impl FnOnce(D) -> Result, + ) -> Result, ShellError> { + Ok(match self { + PluginCallResponse::Error(err) => PluginCallResponse::Error(err), + PluginCallResponse::Signature(sigs) => PluginCallResponse::Signature(sigs), + PluginCallResponse::Ordering(ordering) => PluginCallResponse::Ordering(ordering), + PluginCallResponse::PipelineData(input) => PluginCallResponse::PipelineData(f(input)?), + }) + } +} + impl PluginCallResponse { /// Construct a plugin call response with a single value pub fn value(value: Value) -> PluginCallResponse { @@ -494,6 +541,35 @@ impl EngineCall { EngineCall::EvalClosure { .. } => "EvalClosure", } } + + /// Convert the data type from `D` to `T`. The function will not be called if the variant does + /// not contain data. + pub(crate) fn map_data( + self, + f: impl FnOnce(D) -> Result, + ) -> Result, ShellError> { + Ok(match self { + EngineCall::GetConfig => EngineCall::GetConfig, + EngineCall::GetPluginConfig => EngineCall::GetPluginConfig, + EngineCall::GetEnvVar(name) => EngineCall::GetEnvVar(name), + EngineCall::GetEnvVars => EngineCall::GetEnvVars, + EngineCall::GetCurrentDir => EngineCall::GetCurrentDir, + EngineCall::AddEnvVar(name, value) => EngineCall::AddEnvVar(name, value), + EngineCall::EvalClosure { + closure, + positional, + input, + redirect_stdout, + redirect_stderr, + } => EngineCall::EvalClosure { + closure, + positional, + input: f(input)?, + redirect_stdout, + redirect_stderr, + }, + }) + } } /// The response to an [EngineCall]. The type parameter determines the output type for pipeline @@ -506,6 +582,22 @@ pub enum EngineCallResponse { ValueMap(HashMap), } +impl EngineCallResponse { + /// Convert the data type from `D` to `T`. The function will not be called if the variant does + /// not contain data. + pub(crate) fn map_data( + self, + f: impl FnOnce(D) -> Result, + ) -> Result, ShellError> { + Ok(match self { + EngineCallResponse::Error(err) => EngineCallResponse::Error(err), + EngineCallResponse::PipelineData(data) => EngineCallResponse::PipelineData(f(data)?), + EngineCallResponse::Config(config) => EngineCallResponse::Config(config), + EngineCallResponse::ValueMap(map) => EngineCallResponse::ValueMap(map), + }) + } +} + impl EngineCallResponse { /// Build an [`EngineCallResponse::PipelineData`] from a [`Value`] pub(crate) fn value(value: Value) -> EngineCallResponse {