From af72a187858814be53e62318834a69c548b9e618 Mon Sep 17 00:00:00 2001 From: Devyn Cairns Date: Sun, 14 Apr 2024 08:55:18 -0700 Subject: [PATCH] Improve error messages for plugin protocol by removing `#[serde(untagged)]` (#12510) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Description In the plugin protocol, I had used `#[serde(untagged)]` on the `Stream` variant to make it smaller and include all of the stream messages at the top level, but unfortunately this causes serde to make really unhelpful errors if anything fails to decode anywhere: ``` Error: nu::shell::plugin_failed_to_decode × Plugin failed to decode: data did not match any variant of untagged enum PluginOutput ``` If you are trying to develop something using the plugin protocol directly, this error is incredibly unhelpful. Even as a user, this basically just says 'something is wrong'. With this change, the errors are much better: ``` Error: nu::shell::plugin_failed_to_decode × Plugin failed to decode: unknown variant `PipelineDatra`, expected one of `Error`, `Signature`, `Ordering`, `PipelineData` at line 2 column 37 ``` The only downside is it means I have to duplicate all of the `StreamMessage` variants manually, but there's only 4 of them and they're small. This doesn't actually change the protocol at all - everything is still identical on the wire. # Tests + Formatting - :green_circle: `toolkit fmt` - :green_circle: `toolkit clippy` - :green_circle: `toolkit test` - :green_circle: `toolkit test stdlib` --- .../nu-plugin/src/plugin/interface/engine.rs | 12 ++++- .../src/plugin/interface/engine/tests.rs | 26 ++++------ .../nu-plugin/src/plugin/interface/plugin.rs | 12 ++++- .../src/plugin/interface/plugin/tests.rs | 22 +++----- .../nu-plugin/src/plugin/interface/tests.rs | 17 +++++-- crates/nu-plugin/src/protocol/mod.rs | 50 +++++++++++++------ crates/nu-plugin/src/serializers/json.rs | 4 +- crates/nu-plugin/src/serializers/tests.rs | 18 +++---- 8 files changed, 99 insertions(+), 62 deletions(-) diff --git a/crates/nu-plugin/src/plugin/interface/engine.rs b/crates/nu-plugin/src/plugin/interface/engine.rs index b49f145bd68..e98eed06fbf 100644 --- a/crates/nu-plugin/src/plugin/interface/engine.rs +++ b/crates/nu-plugin/src/plugin/interface/engine.rs @@ -250,7 +250,17 @@ impl InterfaceManager for EngineInterfaceManager { .into(), }) } - PluginInput::Stream(message) => self.consume_stream_message(message), + // Stream messages + PluginInput::Data(..) + | PluginInput::End(..) + | PluginInput::Drop(..) + | PluginInput::Ack(..) => { + self.consume_stream_message(input.try_into().map_err(|msg| { + ShellError::NushellFailed { + msg: format!("Failed to convert message {msg:?} to StreamMessage"), + } + })?) + } PluginInput::Call(id, call) => { let interface = self.interface_for_context(id); // Read streams in the input diff --git a/crates/nu-plugin/src/plugin/interface/engine/tests.rs b/crates/nu-plugin/src/plugin/interface/engine/tests.rs index 8d29a6d8e63..4693b1774b3 100644 --- a/crates/nu-plugin/src/plugin/interface/engine/tests.rs +++ b/crates/nu-plugin/src/plugin/interface/engine/tests.rs @@ -5,7 +5,7 @@ use crate::{ test_util::{expected_test_custom_value, test_plugin_custom_value, TestCustomValue}, CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, ExternalStreamInfo, ListStreamInfo, PipelineDataHeader, PluginCall, PluginCustomValue, PluginInput, Protocol, - ProtocolInfo, RawStreamInfo, StreamData, StreamMessage, + ProtocolInfo, RawStreamInfo, StreamData, }, EvaluatedCall, PluginCallResponse, PluginOutput, }; @@ -278,7 +278,7 @@ fn manager_consume_errors_on_sending_other_messages_before_hello() -> Result<(), assert!(manager.protocol_info.is_none()); let error = manager - .consume(PluginInput::Stream(StreamMessage::Drop(0))) + .consume(PluginInput::Drop(0)) .expect_err("consume before Hello should cause an error"); assert!(format!("{error:?}").contains("Hello")); @@ -381,13 +381,10 @@ fn manager_consume_call_run_forwards_to_receiver_with_pipeline_data() -> Result< ))?; for i in 0..10 { - manager.consume(PluginInput::Stream(StreamMessage::Data( - 6, - Value::test_int(i).into(), - )))?; + manager.consume(PluginInput::Data(6, Value::test_int(i).into()))?; } - manager.consume(PluginInput::Stream(StreamMessage::End(6)))?; + manager.consume(PluginInput::End(6))?; // Make sure the streams end and we don't deadlock drop(manager); @@ -522,13 +519,10 @@ fn manager_consume_engine_call_response_forwards_to_subscriber_with_pipeline_dat ))?; for i in 0..2 { - manager.consume(PluginInput::Stream(StreamMessage::Data( - 0, - Value::test_int(i).into(), - )))?; + manager.consume(PluginInput::Data(0, Value::test_int(i).into()))?; } - manager.consume(PluginInput::Stream(StreamMessage::End(0)))?; + manager.consume(PluginInput::End(0))?; // Make sure the streams end and we don't deadlock drop(manager); @@ -710,20 +704,20 @@ fn interface_write_response_with_stream() -> Result<(), ShellError> { for number in [3, 4, 5] { match test.next_written().expect("missing stream Data message") { - PluginOutput::Stream(StreamMessage::Data(id, data)) => { + PluginOutput::Data(id, data) => { assert_eq!(info.id, id, "Data id"); match data { StreamData::List(val) => assert_eq!(number, val.as_int()?), _ => panic!("expected List data: {data:?}"), } } - message => panic!("expected Stream(Data(..)): {message:?}"), + message => panic!("expected Data(..): {message:?}"), } } match test.next_written().expect("missing stream End message") { - PluginOutput::Stream(StreamMessage::End(id)) => assert_eq!(info.id, id, "End id"), - message => panic!("expected Stream(Data(..)): {message:?}"), + PluginOutput::End(id) => assert_eq!(info.id, id, "End id"), + message => panic!("expected Data(..): {message:?}"), } assert!(!test.has_unconsumed_write()); diff --git a/crates/nu-plugin/src/plugin/interface/plugin.rs b/crates/nu-plugin/src/plugin/interface/plugin.rs index e8afd3a38cc..93b04c8ef99 100644 --- a/crates/nu-plugin/src/plugin/interface/plugin.rs +++ b/crates/nu-plugin/src/plugin/interface/plugin.rs @@ -480,7 +480,17 @@ impl InterfaceManager for PluginInterfaceManager { ), }) } - PluginOutput::Stream(message) => self.consume_stream_message(message), + // Stream messages + PluginOutput::Data(..) + | PluginOutput::End(..) + | PluginOutput::Drop(..) + | PluginOutput::Ack(..) => { + self.consume_stream_message(input.try_into().map_err(|msg| { + ShellError::NushellFailed { + msg: format!("Failed to convert message {msg:?} to StreamMessage"), + } + })?) + } PluginOutput::Option(option) => match option { PluginOption::GcDisabled(disabled) => { // Turn garbage collection off/on. diff --git a/crates/nu-plugin/src/plugin/interface/plugin/tests.rs b/crates/nu-plugin/src/plugin/interface/plugin/tests.rs index 5ca7209b163..290b04a3ce0 100644 --- a/crates/nu-plugin/src/plugin/interface/plugin/tests.rs +++ b/crates/nu-plugin/src/plugin/interface/plugin/tests.rs @@ -310,7 +310,7 @@ fn manager_consume_errors_on_sending_other_messages_before_hello() -> Result<(), assert!(manager.protocol_info.is_none()); let error = manager - .consume(PluginOutput::Stream(StreamMessage::Drop(0))) + .consume(PluginOutput::Drop(0)) .expect_err("consume before Hello should cause an error"); assert!(format!("{error:?}").contains("Hello")); @@ -331,13 +331,10 @@ fn manager_consume_call_response_forwards_to_subscriber_with_pipeline_data( ))?; for i in 0..2 { - manager.consume(PluginOutput::Stream(StreamMessage::Data( - 0, - Value::test_int(i).into(), - )))?; + manager.consume(PluginOutput::Data(0, Value::test_int(i).into()))?; } - manager.consume(PluginOutput::Stream(StreamMessage::End(0)))?; + manager.consume(PluginOutput::End(0))?; // Make sure the streams end and we don't deadlock drop(manager); @@ -454,12 +451,9 @@ fn manager_consume_engine_call_forwards_to_subscriber_with_pipeline_data() -> Re })?; for i in 0..2 { - manager.consume(PluginOutput::Stream(StreamMessage::Data( - 2, - Value::test_int(i).into(), - )))?; + manager.consume(PluginOutput::Data(2, Value::test_int(i).into()))?; } - manager.consume(PluginOutput::Stream(StreamMessage::End(2)))?; + manager.consume(PluginOutput::End(2))?; // Make sure the streams end and we don't deadlock drop(manager); @@ -889,7 +883,7 @@ fn interface_write_plugin_call_writes_run_with_stream_input() -> Result<(), Shel .next_written() .expect("failed to get Data stream message") { - PluginInput::Stream(StreamMessage::Data(id, data)) => { + PluginInput::Data(id, data) => { assert_eq!(info.id, id, "id"); match data { StreamData::List(data_value) => { @@ -906,10 +900,10 @@ fn interface_write_plugin_call_writes_run_with_stream_input() -> Result<(), Shel .next_written() .expect("failed to get End stream message") { - PluginInput::Stream(StreamMessage::End(id)) => { + PluginInput::End(id) => { assert_eq!(info.id, id, "id"); } - message => panic!("expected Stream(End(_)) message: {message:?}"), + message => panic!("expected End(_) message: {message:?}"), } Ok(()) diff --git a/crates/nu-plugin/src/plugin/interface/tests.rs b/crates/nu-plugin/src/plugin/interface/tests.rs index 706798b31f6..cbb974d1015 100644 --- a/crates/nu-plugin/src/plugin/interface/tests.rs +++ b/crates/nu-plugin/src/plugin/interface/tests.rs @@ -66,7 +66,14 @@ impl InterfaceManager for TestInterfaceManager { fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> { match input { - PluginInput::Stream(msg) => self.consume_stream_message(msg), + PluginInput::Data(..) + | PluginInput::End(..) + | PluginInput::Drop(..) + | PluginInput::Ack(..) => self.consume_stream_message( + input + .try_into() + .expect("failed to convert message to StreamMessage"), + ), _ => unimplemented!(), } } @@ -414,7 +421,7 @@ fn write_pipeline_data_list_stream() -> Result<(), ShellError> { // Now make sure the stream messages have been written for value in values { match test.next_written().expect("unexpected end of stream") { - PluginOutput::Stream(StreamMessage::Data(id, data)) => { + PluginOutput::Data(id, data) => { assert_eq!(info.id, id, "Data id"); match data { StreamData::List(read_value) => assert_eq!(value, read_value, "Data value"), @@ -426,7 +433,7 @@ fn write_pipeline_data_list_stream() -> Result<(), ShellError> { } match test.next_written().expect("unexpected end of stream") { - PluginOutput::Stream(StreamMessage::End(id)) => { + PluginOutput::End(id) => { assert_eq!(info.id, id, "End id"); } other => panic!("unexpected output: {other:?}"), @@ -510,7 +517,7 @@ fn write_pipeline_data_external_stream() -> Result<(), ShellError> { // End must come after all Data for msg in test.written() { match msg { - PluginOutput::Stream(StreamMessage::Data(id, data)) => { + PluginOutput::Data(id, data) => { if id == stdout_info.id { let result: Result, ShellError> = data.try_into().expect("wrong data in stdout stream"); @@ -535,7 +542,7 @@ fn write_pipeline_data_external_stream() -> Result<(), ShellError> { panic!("unrecognized stream id: {id}"); } } - PluginOutput::Stream(StreamMessage::End(id)) => { + PluginOutput::End(id) => { if id == stdout_info.id { assert!(!stdout_ended, "double End of stdout"); assert!(stdout_iter.next().is_none(), "unexpected end of stdout"); diff --git a/crates/nu-plugin/src/protocol/mod.rs b/crates/nu-plugin/src/protocol/mod.rs index 84835c8b508..6f71efc4c38 100644 --- a/crates/nu-plugin/src/protocol/mod.rs +++ b/crates/nu-plugin/src/protocol/mod.rs @@ -208,11 +208,14 @@ pub enum PluginInput { /// Response to an [`EngineCall`]. The ID should be the same one sent with the engine call this /// is responding to EngineCallResponse(EngineCallId, EngineCallResponse), - /// Stream control or data message. Untagged to keep them as small as possible. - /// - /// For example, `Stream(Ack(0))` is encoded as `{"Ack": 0}` - #[serde(untagged)] - Stream(StreamMessage), + /// See [`StreamMessage::Data`]. + Data(StreamId, StreamData), + /// See [`StreamMessage::End`]. + End(StreamId), + /// See [`StreamMessage::Drop`]. + Drop(StreamId), + /// See [`StreamMessage::Ack`]. + Ack(StreamId), } impl TryFrom for StreamMessage { @@ -220,7 +223,10 @@ impl TryFrom for StreamMessage { fn try_from(msg: PluginInput) -> Result { match msg { - PluginInput::Stream(stream_msg) => Ok(stream_msg), + PluginInput::Data(id, data) => Ok(StreamMessage::Data(id, data)), + PluginInput::End(id) => Ok(StreamMessage::End(id)), + PluginInput::Drop(id) => Ok(StreamMessage::Drop(id)), + PluginInput::Ack(id) => Ok(StreamMessage::Ack(id)), _ => Err(msg), } } @@ -228,7 +234,12 @@ impl TryFrom for StreamMessage { impl From for PluginInput { fn from(stream_msg: StreamMessage) -> PluginInput { - PluginInput::Stream(stream_msg) + match stream_msg { + StreamMessage::Data(id, data) => PluginInput::Data(id, data), + StreamMessage::End(id) => PluginInput::End(id), + StreamMessage::Drop(id) => PluginInput::Drop(id), + StreamMessage::Ack(id) => PluginInput::Ack(id), + } } } @@ -420,11 +431,14 @@ pub enum PluginOutput { id: EngineCallId, call: EngineCall, }, - /// Stream control or data message. Untagged to keep them as small as possible. - /// - /// For example, `Stream(Ack(0))` is encoded as `{"Ack": 0}` - #[serde(untagged)] - Stream(StreamMessage), + /// See [`StreamMessage::Data`]. + Data(StreamId, StreamData), + /// See [`StreamMessage::End`]. + End(StreamId), + /// See [`StreamMessage::Drop`]. + Drop(StreamId), + /// See [`StreamMessage::Ack`]. + Ack(StreamId), } impl TryFrom for StreamMessage { @@ -432,7 +446,10 @@ impl TryFrom for StreamMessage { fn try_from(msg: PluginOutput) -> Result { match msg { - PluginOutput::Stream(stream_msg) => Ok(stream_msg), + PluginOutput::Data(id, data) => Ok(StreamMessage::Data(id, data)), + PluginOutput::End(id) => Ok(StreamMessage::End(id)), + PluginOutput::Drop(id) => Ok(StreamMessage::Drop(id)), + PluginOutput::Ack(id) => Ok(StreamMessage::Ack(id)), _ => Err(msg), } } @@ -440,7 +457,12 @@ impl TryFrom for StreamMessage { impl From for PluginOutput { fn from(stream_msg: StreamMessage) -> PluginOutput { - PluginOutput::Stream(stream_msg) + match stream_msg { + StreamMessage::Data(id, data) => PluginOutput::Data(id, data), + StreamMessage::End(id) => PluginOutput::End(id), + StreamMessage::Drop(id) => PluginOutput::Drop(id), + StreamMessage::Ack(id) => PluginOutput::Ack(id), + } } } diff --git a/crates/nu-plugin/src/serializers/json.rs b/crates/nu-plugin/src/serializers/json.rs index 25dca0ce146..fc66db39931 100644 --- a/crates/nu-plugin/src/serializers/json.rs +++ b/crates/nu-plugin/src/serializers/json.rs @@ -117,14 +117,14 @@ mod tests { fn json_has_no_other_newlines() { let mut out = vec![]; // use something deeply nested, to try to trigger any pretty printing - let output = PluginOutput::Stream(StreamMessage::Data( + let output = PluginOutput::Data( 0, StreamData::List(Value::test_list(vec![ Value::test_int(4), // in case escaping failed Value::test_string("newline\ncontaining\nstring"), ])), - )); + ); JsonSerializer {} .encode(&output, &mut out) .expect("serialization error"); diff --git a/crates/nu-plugin/src/serializers/tests.rs b/crates/nu-plugin/src/serializers/tests.rs index dc134a9bdf8..af965abad28 100644 --- a/crates/nu-plugin/src/serializers/tests.rs +++ b/crates/nu-plugin/src/serializers/tests.rs @@ -3,7 +3,7 @@ macro_rules! generate_tests { use crate::protocol::{ CallInfo, CustomValueOp, EvaluatedCall, PipelineDataHeader, PluginCall, PluginCallResponse, PluginCustomValue, PluginInput, PluginOption, PluginOutput, - StreamData, StreamMessage, + StreamData, }; use nu_protocol::{ LabeledError, PluginSignature, Signature, Span, Spanned, SyntaxShape, Value, @@ -429,7 +429,7 @@ macro_rules! generate_tests { let item = Value::int(1, span); let stream_data = StreamData::List(item.clone()); - let plugin_input = PluginInput::Stream(StreamMessage::Data(0, stream_data)); + let plugin_input = PluginInput::Data(0, stream_data); let encoder = $encoder; let mut buffer: Vec = Vec::new(); @@ -442,7 +442,7 @@ macro_rules! generate_tests { .expect("eof"); match returned { - PluginInput::Stream(StreamMessage::Data(id, StreamData::List(list_data))) => { + PluginInput::Data(id, StreamData::List(list_data)) => { assert_eq!(0, id); assert_eq!(item, list_data); } @@ -455,7 +455,7 @@ macro_rules! generate_tests { let data = b"Hello world"; let stream_data = StreamData::Raw(Ok(data.to_vec())); - let plugin_input = PluginInput::Stream(StreamMessage::Data(1, stream_data)); + let plugin_input = PluginInput::Data(1, stream_data); let encoder = $encoder; let mut buffer: Vec = Vec::new(); @@ -468,7 +468,7 @@ macro_rules! generate_tests { .expect("eof"); match returned { - PluginInput::Stream(StreamMessage::Data(id, StreamData::Raw(bytes))) => { + PluginInput::Data(id, StreamData::Raw(bytes)) => { assert_eq!(1, id); match bytes { Ok(bytes) => assert_eq!(data, &bytes[..]), @@ -485,7 +485,7 @@ macro_rules! generate_tests { let item = Value::int(1, span); let stream_data = StreamData::List(item.clone()); - let plugin_output = PluginOutput::Stream(StreamMessage::Data(4, stream_data)); + let plugin_output = PluginOutput::Data(4, stream_data); let encoder = $encoder; let mut buffer: Vec = Vec::new(); @@ -498,7 +498,7 @@ macro_rules! generate_tests { .expect("eof"); match returned { - PluginOutput::Stream(StreamMessage::Data(id, StreamData::List(list_data))) => { + PluginOutput::Data(id, StreamData::List(list_data)) => { assert_eq!(4, id); assert_eq!(item, list_data); } @@ -511,7 +511,7 @@ macro_rules! generate_tests { let data = b"Hello world"; let stream_data = StreamData::Raw(Ok(data.to_vec())); - let plugin_output = PluginOutput::Stream(StreamMessage::Data(5, stream_data)); + let plugin_output = PluginOutput::Data(5, stream_data); let encoder = $encoder; let mut buffer: Vec = Vec::new(); @@ -524,7 +524,7 @@ macro_rules! generate_tests { .expect("eof"); match returned { - PluginOutput::Stream(StreamMessage::Data(id, StreamData::Raw(bytes))) => { + PluginOutput::Data(id, StreamData::Raw(bytes)) => { assert_eq!(5, id); match bytes { Ok(bytes) => assert_eq!(data, &bytes[..]),