Refactor PipelineDataHeader ⇄ PipelineData mapping (#12248)

# Description
It was a bit ugly that when new `EngineCall`s or response types were
added, they needed to be added to multiple places with redundant code
just to change the types, even if they didn't have any stream content.

This fixes that and locates all of that logic in one place.

# User-Facing Changes
None

# Tests + Formatting
- 🟢 `toolkit fmt`
- 🟢 `toolkit clippy`
- 🟢 `toolkit test`
- 🟢 `toolkit test stdlib`
This commit is contained in:
Devyn Cairns 2024-03-20 01:57:22 -07:00 committed by GitHub
parent f8c1e03ea7
commit dcf2e8ce9a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 180 additions and 146 deletions

View File

@ -252,64 +252,58 @@ impl InterfaceManager for EngineInterfaceManager {
})
}
PluginInput::Stream(message) => self.consume_stream_message(message),
PluginInput::Call(id, call) => match call {
// We just let the receiver handle it rather than trying to store signature here
// or something
PluginCall::Signature => self.send_plugin_call(ReceivedPluginCall::Signature {
engine: self.interface_for_context(id),
}),
// Set up the streams from the input and reformat to a ReceivedPluginCall
PluginCall::Run(CallInfo {
name,
mut call,
input,
}) => {
let interface = self.interface_for_context(id);
// If there's an error with initialization of the input stream, just send
// the error response rather than failing here
match self.read_pipeline_data(input, None) {
Ok(input) => {
// Deserialize custom values in the arguments
if let Err(err) = deserialize_call_args(&mut call) {
return interface.write_response(Err(err))?.write();
}
// Send the plugin call to the receiver
self.send_plugin_call(ReceivedPluginCall::Run {
engine: interface,
call: CallInfo { name, call, input },
})
PluginInput::Call(id, call) => {
let interface = self.interface_for_context(id);
// Read streams in the input
let call = match call.map_data(|input| self.read_pipeline_data(input, None)) {
Ok(call) => call,
Err(err) => {
// If there's an error with initialization of the input stream, just send
// the error response rather than failing here
return interface.write_response(Err(err))?.write();
}
};
match call {
// We just let the receiver handle it rather than trying to store signature here
// or something
PluginCall::Signature => {
self.send_plugin_call(ReceivedPluginCall::Signature { engine: interface })
}
// Parse custom values and send a ReceivedPluginCall
PluginCall::Run(mut call_info) => {
// Deserialize custom values in the arguments
if let Err(err) = deserialize_call_args(&mut call_info.call) {
return interface.write_response(Err(err))?.write();
}
err @ Err(_) => interface.write_response(err)?.write(),
// Send the plugin call to the receiver
self.send_plugin_call(ReceivedPluginCall::Run {
engine: interface,
call: call_info,
})
}
// Send request with the custom value
PluginCall::CustomValueOp(custom_value, op) => {
self.send_plugin_call(ReceivedPluginCall::CustomValueOp {
engine: interface,
custom_value,
op,
})
}
}
// Send request with the custom value
PluginCall::CustomValueOp(custom_value, op) => {
self.send_plugin_call(ReceivedPluginCall::CustomValueOp {
engine: self.interface_for_context(id),
custom_value,
op,
})
}
},
}
PluginInput::Goodbye => {
// Remove the plugin call sender so it hangs up
drop(self.plugin_call_sender.take());
Ok(())
}
PluginInput::EngineCallResponse(id, response) => {
let response = match response {
EngineCallResponse::Error(err) => EngineCallResponse::Error(err),
EngineCallResponse::Config(config) => EngineCallResponse::Config(config),
EngineCallResponse::ValueMap(map) => EngineCallResponse::ValueMap(map),
EngineCallResponse::PipelineData(header) => {
let response = response
.map_data(|header| self.read_pipeline_data(header, None))
.unwrap_or_else(|err| {
// If there's an error with initializing this stream, change it to an engine
// call error response, but send it anyway
match self.read_pipeline_data(header, None) {
Ok(data) => EngineCallResponse::PipelineData(data),
Err(err) => EngineCallResponse::Error(err),
}
}
};
EngineCallResponse::Error(err)
});
self.send_engine_call_response(id, response)
}
}
@ -442,36 +436,13 @@ impl EngineInterface {
let (tx, rx) = mpsc::channel();
// Convert the call into one with a header and handle the stream, if necessary
let (call, writer) = match call {
EngineCall::EvalClosure {
closure,
positional,
input,
redirect_stdout,
redirect_stderr,
} => {
let (header, writer) = self.init_write_pipeline_data(input)?;
(
EngineCall::EvalClosure {
closure,
positional,
input: header,
redirect_stdout,
redirect_stderr,
},
writer,
)
}
// These calls have no pipeline data, so they're just the same on both sides
EngineCall::GetConfig => (EngineCall::GetConfig, Default::default()),
EngineCall::GetPluginConfig => (EngineCall::GetPluginConfig, Default::default()),
EngineCall::GetEnvVar(name) => (EngineCall::GetEnvVar(name), Default::default()),
EngineCall::GetEnvVars => (EngineCall::GetEnvVars, Default::default()),
EngineCall::GetCurrentDir => (EngineCall::GetCurrentDir, Default::default()),
EngineCall::AddEnvVar(name, value) => {
(EngineCall::AddEnvVar(name, value), Default::default())
}
};
let mut writer = None;
let call = call.map_data(|input| {
let (input_header, input_writer) = self.init_write_pipeline_data(input)?;
writer = Some(input_writer);
Ok(input_header)
})?;
// Register the channel
self.state
@ -486,7 +457,7 @@ impl EngineInterface {
self.write(PluginOutput::EngineCall { context, id, call })?;
self.flush()?;
Ok((writer, rx))
Ok((writer.unwrap_or_default(), rx))
}
/// Perform an engine call. Input and output streams are handled.

View File

@ -460,15 +460,8 @@ impl InterfaceManager for PluginInterfaceManager {
},
PluginOutput::CallResponse(id, response) => {
// Handle reading the pipeline data, if any
let response = match response {
PluginCallResponse::Error(err) => PluginCallResponse::Error(err),
PluginCallResponse::Signature(sigs) => PluginCallResponse::Signature(sigs),
PluginCallResponse::Ordering(ordering) => {
PluginCallResponse::Ordering(ordering)
}
PluginCallResponse::PipelineData(data) => {
// If there's an error with initializing this stream, change it to a plugin
// error response, but send it anyway
let response = response
.map_data(|data| {
let ctrlc = self.get_ctrlc(id)?;
// Register the streams in the response
@ -476,12 +469,13 @@ impl InterfaceManager for PluginInterfaceManager {
self.recv_stream_started(id, stream_id);
}
match self.read_pipeline_data(data, ctrlc.as_ref()) {
Ok(data) => PluginCallResponse::PipelineData(data),
Err(err) => PluginCallResponse::Error(err.into()),
}
}
};
self.read_pipeline_data(data, ctrlc.as_ref())
})
.unwrap_or_else(|err| {
// If there's an error with initializing this stream, change it to a plugin
// error response, but send it anyway
PluginCallResponse::Error(err.into())
});
let result = self.send_plugin_call_response(id, response);
if result.is_ok() {
// When a call ends, it releases a lock on the GC
@ -493,36 +487,19 @@ impl InterfaceManager for PluginInterfaceManager {
}
PluginOutput::EngineCall { context, id, call } => {
// Handle reading the pipeline data, if any
let ctrlc = self.get_ctrlc(context)?;
let call = match call {
EngineCall::GetConfig => Ok(EngineCall::GetConfig),
EngineCall::GetPluginConfig => Ok(EngineCall::GetPluginConfig),
EngineCall::GetEnvVar(name) => Ok(EngineCall::GetEnvVar(name)),
EngineCall::GetEnvVars => Ok(EngineCall::GetEnvVars),
EngineCall::GetCurrentDir => Ok(EngineCall::GetCurrentDir),
EngineCall::AddEnvVar(name, value) => Ok(EngineCall::AddEnvVar(name, value)),
EngineCall::EvalClosure {
closure,
mut positional,
input,
redirect_stdout,
redirect_stderr,
} => {
// Add source to any plugin custom values in the arguments
for arg in positional.iter_mut() {
PluginCustomValue::add_source(arg, &self.state.source);
}
self.read_pipeline_data(input, ctrlc.as_ref()).map(|input| {
EngineCall::EvalClosure {
closure,
positional,
input,
redirect_stdout,
redirect_stderr,
}
})
let mut call = call.map_data(|input| {
let ctrlc = self.get_ctrlc(context)?;
self.read_pipeline_data(input, ctrlc.as_ref())
});
// Add source to any plugin custom values in the arguments
if let Ok(EngineCall::EvalClosure {
ref mut positional, ..
}) = call
{
for arg in positional.iter_mut() {
PluginCustomValue::add_source(arg, &self.state.source);
}
};
}
match call {
Ok(call) => self.send_engine_call(context, id, call),
// If there was an error with setting up the call, just write the error
@ -603,16 +580,12 @@ impl PluginInterface {
response: EngineCallResponse<PipelineData>,
) -> Result<(), ShellError> {
// Set up any stream if necessary
let (response, writer) = match response {
EngineCallResponse::PipelineData(data) => {
let (header, writer) = self.init_write_pipeline_data(data)?;
(EngineCallResponse::PipelineData(header), Some(writer))
}
// No pipeline data:
EngineCallResponse::Error(err) => (EngineCallResponse::Error(err), None),
EngineCallResponse::Config(config) => (EngineCallResponse::Config(config), None),
EngineCallResponse::ValueMap(map) => (EngineCallResponse::ValueMap(map), None),
};
let mut writer = None;
let response = response.map_data(|data| {
let (data_header, data_writer) = self.init_write_pipeline_data(data)?;
writer = Some(data_writer);
Ok(data_header)
})?;
// Write the response, including the pipeline data header if present
self.write(PluginInput::EngineCallResponse(id, response))?;
@ -753,20 +726,18 @@ impl PluginInterface {
let resp =
handle_engine_call(engine_call, context).unwrap_or_else(EngineCallResponse::Error);
// Handle stream
let (resp, writer) = match resp {
EngineCallResponse::Error(error) => (EngineCallResponse::Error(error), None),
EngineCallResponse::Config(config) => (EngineCallResponse::Config(config), None),
EngineCallResponse::ValueMap(map) => (EngineCallResponse::ValueMap(map), None),
EngineCallResponse::PipelineData(data) => {
match self.init_write_pipeline_data(data) {
Ok((header, writer)) => {
(EngineCallResponse::PipelineData(header), Some(writer))
}
// just respond with the error if we fail to set it up
Err(err) => (EngineCallResponse::Error(err), None),
}
}
};
let mut writer = None;
let resp = resp
.map_data(|data| {
let (data_header, data_writer) = self.init_write_pipeline_data(data)?;
writer = Some(data_writer);
Ok(data_header)
})
.unwrap_or_else(|err| {
// If we fail to set up the response write, change to an error response here
writer = None;
EngineCallResponse::Error(err)
});
// Write the response, then the stream
self.write(PluginInput::EngineCallResponse(engine_call_id, resp))?;
self.flush()?;

View File

@ -43,6 +43,20 @@ pub struct CallInfo<D> {
pub input: D,
}
impl<D> CallInfo<D> {
/// Convert the type of `input` from `D` to `T`.
pub(crate) fn map_data<T>(
self,
f: impl FnOnce(D) -> Result<T, ShellError>,
) -> Result<CallInfo<T>, ShellError> {
Ok(CallInfo {
name: self.name,
call: self.call,
input: f(self.input)?,
})
}
}
/// The initial (and perhaps only) part of any [`nu_protocol::PipelineData`] sent over the wire.
///
/// This may contain a single value, or may initiate a stream with a [`StreamId`].
@ -128,6 +142,23 @@ pub enum PluginCall<D> {
CustomValueOp(Spanned<PluginCustomValue>, CustomValueOp),
}
impl<D> PluginCall<D> {
/// Convert the data type from `D` to `T`. The function will not be called if the variant does
/// not contain data.
pub(crate) fn map_data<T>(
self,
f: impl FnOnce(D) -> Result<T, ShellError>,
) -> Result<PluginCall<T>, ShellError> {
Ok(match self {
PluginCall::Signature => PluginCall::Signature,
PluginCall::Run(call) => PluginCall::Run(call.map_data(f)?),
PluginCall::CustomValueOp(custom_value, op) => {
PluginCall::CustomValueOp(custom_value, op)
}
})
}
}
/// Operations supported for custom values.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum CustomValueOp {
@ -337,6 +368,22 @@ pub enum PluginCallResponse<D> {
PipelineData(D),
}
impl<D> PluginCallResponse<D> {
/// Convert the data type from `D` to `T`. The function will not be called if the variant does
/// not contain data.
pub(crate) fn map_data<T>(
self,
f: impl FnOnce(D) -> Result<T, ShellError>,
) -> Result<PluginCallResponse<T>, ShellError> {
Ok(match self {
PluginCallResponse::Error(err) => PluginCallResponse::Error(err),
PluginCallResponse::Signature(sigs) => PluginCallResponse::Signature(sigs),
PluginCallResponse::Ordering(ordering) => PluginCallResponse::Ordering(ordering),
PluginCallResponse::PipelineData(input) => PluginCallResponse::PipelineData(f(input)?),
})
}
}
impl PluginCallResponse<PipelineDataHeader> {
/// Construct a plugin call response with a single value
pub fn value(value: Value) -> PluginCallResponse<PipelineDataHeader> {
@ -494,6 +541,35 @@ impl<D> EngineCall<D> {
EngineCall::EvalClosure { .. } => "EvalClosure",
}
}
/// Convert the data type from `D` to `T`. The function will not be called if the variant does
/// not contain data.
pub(crate) fn map_data<T>(
self,
f: impl FnOnce(D) -> Result<T, ShellError>,
) -> Result<EngineCall<T>, ShellError> {
Ok(match self {
EngineCall::GetConfig => EngineCall::GetConfig,
EngineCall::GetPluginConfig => EngineCall::GetPluginConfig,
EngineCall::GetEnvVar(name) => EngineCall::GetEnvVar(name),
EngineCall::GetEnvVars => EngineCall::GetEnvVars,
EngineCall::GetCurrentDir => EngineCall::GetCurrentDir,
EngineCall::AddEnvVar(name, value) => EngineCall::AddEnvVar(name, value),
EngineCall::EvalClosure {
closure,
positional,
input,
redirect_stdout,
redirect_stderr,
} => EngineCall::EvalClosure {
closure,
positional,
input: f(input)?,
redirect_stdout,
redirect_stderr,
},
})
}
}
/// The response to an [EngineCall]. The type parameter determines the output type for pipeline
@ -506,6 +582,22 @@ pub enum EngineCallResponse<D> {
ValueMap(HashMap<String, Value>),
}
impl<D> EngineCallResponse<D> {
/// Convert the data type from `D` to `T`. The function will not be called if the variant does
/// not contain data.
pub(crate) fn map_data<T>(
self,
f: impl FnOnce(D) -> Result<T, ShellError>,
) -> Result<EngineCallResponse<T>, ShellError> {
Ok(match self {
EngineCallResponse::Error(err) => EngineCallResponse::Error(err),
EngineCallResponse::PipelineData(data) => EngineCallResponse::PipelineData(f(data)?),
EngineCallResponse::Config(config) => EngineCallResponse::Config(config),
EngineCallResponse::ValueMap(map) => EngineCallResponse::ValueMap(map),
})
}
}
impl EngineCallResponse<PipelineData> {
/// Build an [`EngineCallResponse::PipelineData`] from a [`Value`]
pub(crate) fn value(value: Value) -> EngineCallResponse<PipelineData> {