diff --git a/crates/nu-plugin/src/plugin/interface/engine.rs b/crates/nu-plugin/src/plugin/interface/engine.rs index b49f145bd68..e98eed06fbf 100644 --- a/crates/nu-plugin/src/plugin/interface/engine.rs +++ b/crates/nu-plugin/src/plugin/interface/engine.rs @@ -250,7 +250,17 @@ impl InterfaceManager for EngineInterfaceManager { .into(), }) } - PluginInput::Stream(message) => self.consume_stream_message(message), + // Stream messages + PluginInput::Data(..) + | PluginInput::End(..) + | PluginInput::Drop(..) + | PluginInput::Ack(..) => { + self.consume_stream_message(input.try_into().map_err(|msg| { + ShellError::NushellFailed { + msg: format!("Failed to convert message {msg:?} to StreamMessage"), + } + })?) + } PluginInput::Call(id, call) => { let interface = self.interface_for_context(id); // Read streams in the input diff --git a/crates/nu-plugin/src/plugin/interface/engine/tests.rs b/crates/nu-plugin/src/plugin/interface/engine/tests.rs index 8d29a6d8e63..4693b1774b3 100644 --- a/crates/nu-plugin/src/plugin/interface/engine/tests.rs +++ b/crates/nu-plugin/src/plugin/interface/engine/tests.rs @@ -5,7 +5,7 @@ use crate::{ test_util::{expected_test_custom_value, test_plugin_custom_value, TestCustomValue}, CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, ExternalStreamInfo, ListStreamInfo, PipelineDataHeader, PluginCall, PluginCustomValue, PluginInput, Protocol, - ProtocolInfo, RawStreamInfo, StreamData, StreamMessage, + ProtocolInfo, RawStreamInfo, StreamData, }, EvaluatedCall, PluginCallResponse, PluginOutput, }; @@ -278,7 +278,7 @@ fn manager_consume_errors_on_sending_other_messages_before_hello() -> Result<(), assert!(manager.protocol_info.is_none()); let error = manager - .consume(PluginInput::Stream(StreamMessage::Drop(0))) + .consume(PluginInput::Drop(0)) .expect_err("consume before Hello should cause an error"); assert!(format!("{error:?}").contains("Hello")); @@ -381,13 +381,10 @@ fn manager_consume_call_run_forwards_to_receiver_with_pipeline_data() -> Result< ))?; for i in 0..10 { - manager.consume(PluginInput::Stream(StreamMessage::Data( - 6, - Value::test_int(i).into(), - )))?; + manager.consume(PluginInput::Data(6, Value::test_int(i).into()))?; } - manager.consume(PluginInput::Stream(StreamMessage::End(6)))?; + manager.consume(PluginInput::End(6))?; // Make sure the streams end and we don't deadlock drop(manager); @@ -522,13 +519,10 @@ fn manager_consume_engine_call_response_forwards_to_subscriber_with_pipeline_dat ))?; for i in 0..2 { - manager.consume(PluginInput::Stream(StreamMessage::Data( - 0, - Value::test_int(i).into(), - )))?; + manager.consume(PluginInput::Data(0, Value::test_int(i).into()))?; } - manager.consume(PluginInput::Stream(StreamMessage::End(0)))?; + manager.consume(PluginInput::End(0))?; // Make sure the streams end and we don't deadlock drop(manager); @@ -710,20 +704,20 @@ fn interface_write_response_with_stream() -> Result<(), ShellError> { for number in [3, 4, 5] { match test.next_written().expect("missing stream Data message") { - PluginOutput::Stream(StreamMessage::Data(id, data)) => { + PluginOutput::Data(id, data) => { assert_eq!(info.id, id, "Data id"); match data { StreamData::List(val) => assert_eq!(number, val.as_int()?), _ => panic!("expected List data: {data:?}"), } } - message => panic!("expected Stream(Data(..)): {message:?}"), + message => panic!("expected Data(..): {message:?}"), } } match test.next_written().expect("missing stream End message") { - PluginOutput::Stream(StreamMessage::End(id)) => assert_eq!(info.id, id, "End id"), - message => panic!("expected Stream(Data(..)): {message:?}"), + PluginOutput::End(id) => assert_eq!(info.id, id, "End id"), + message => panic!("expected Data(..): {message:?}"), } assert!(!test.has_unconsumed_write()); diff --git a/crates/nu-plugin/src/plugin/interface/plugin.rs b/crates/nu-plugin/src/plugin/interface/plugin.rs index e8afd3a38cc..93b04c8ef99 100644 --- a/crates/nu-plugin/src/plugin/interface/plugin.rs +++ b/crates/nu-plugin/src/plugin/interface/plugin.rs @@ -480,7 +480,17 @@ impl InterfaceManager for PluginInterfaceManager { ), }) } - PluginOutput::Stream(message) => self.consume_stream_message(message), + // Stream messages + PluginOutput::Data(..) + | PluginOutput::End(..) + | PluginOutput::Drop(..) + | PluginOutput::Ack(..) => { + self.consume_stream_message(input.try_into().map_err(|msg| { + ShellError::NushellFailed { + msg: format!("Failed to convert message {msg:?} to StreamMessage"), + } + })?) + } PluginOutput::Option(option) => match option { PluginOption::GcDisabled(disabled) => { // Turn garbage collection off/on. diff --git a/crates/nu-plugin/src/plugin/interface/plugin/tests.rs b/crates/nu-plugin/src/plugin/interface/plugin/tests.rs index 5ca7209b163..290b04a3ce0 100644 --- a/crates/nu-plugin/src/plugin/interface/plugin/tests.rs +++ b/crates/nu-plugin/src/plugin/interface/plugin/tests.rs @@ -310,7 +310,7 @@ fn manager_consume_errors_on_sending_other_messages_before_hello() -> Result<(), assert!(manager.protocol_info.is_none()); let error = manager - .consume(PluginOutput::Stream(StreamMessage::Drop(0))) + .consume(PluginOutput::Drop(0)) .expect_err("consume before Hello should cause an error"); assert!(format!("{error:?}").contains("Hello")); @@ -331,13 +331,10 @@ fn manager_consume_call_response_forwards_to_subscriber_with_pipeline_data( ))?; for i in 0..2 { - manager.consume(PluginOutput::Stream(StreamMessage::Data( - 0, - Value::test_int(i).into(), - )))?; + manager.consume(PluginOutput::Data(0, Value::test_int(i).into()))?; } - manager.consume(PluginOutput::Stream(StreamMessage::End(0)))?; + manager.consume(PluginOutput::End(0))?; // Make sure the streams end and we don't deadlock drop(manager); @@ -454,12 +451,9 @@ fn manager_consume_engine_call_forwards_to_subscriber_with_pipeline_data() -> Re })?; for i in 0..2 { - manager.consume(PluginOutput::Stream(StreamMessage::Data( - 2, - Value::test_int(i).into(), - )))?; + manager.consume(PluginOutput::Data(2, Value::test_int(i).into()))?; } - manager.consume(PluginOutput::Stream(StreamMessage::End(2)))?; + manager.consume(PluginOutput::End(2))?; // Make sure the streams end and we don't deadlock drop(manager); @@ -889,7 +883,7 @@ fn interface_write_plugin_call_writes_run_with_stream_input() -> Result<(), Shel .next_written() .expect("failed to get Data stream message") { - PluginInput::Stream(StreamMessage::Data(id, data)) => { + PluginInput::Data(id, data) => { assert_eq!(info.id, id, "id"); match data { StreamData::List(data_value) => { @@ -906,10 +900,10 @@ fn interface_write_plugin_call_writes_run_with_stream_input() -> Result<(), Shel .next_written() .expect("failed to get End stream message") { - PluginInput::Stream(StreamMessage::End(id)) => { + PluginInput::End(id) => { assert_eq!(info.id, id, "id"); } - message => panic!("expected Stream(End(_)) message: {message:?}"), + message => panic!("expected End(_) message: {message:?}"), } Ok(()) diff --git a/crates/nu-plugin/src/plugin/interface/tests.rs b/crates/nu-plugin/src/plugin/interface/tests.rs index 706798b31f6..cbb974d1015 100644 --- a/crates/nu-plugin/src/plugin/interface/tests.rs +++ b/crates/nu-plugin/src/plugin/interface/tests.rs @@ -66,7 +66,14 @@ impl InterfaceManager for TestInterfaceManager { fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> { match input { - PluginInput::Stream(msg) => self.consume_stream_message(msg), + PluginInput::Data(..) + | PluginInput::End(..) + | PluginInput::Drop(..) + | PluginInput::Ack(..) => self.consume_stream_message( + input + .try_into() + .expect("failed to convert message to StreamMessage"), + ), _ => unimplemented!(), } } @@ -414,7 +421,7 @@ fn write_pipeline_data_list_stream() -> Result<(), ShellError> { // Now make sure the stream messages have been written for value in values { match test.next_written().expect("unexpected end of stream") { - PluginOutput::Stream(StreamMessage::Data(id, data)) => { + PluginOutput::Data(id, data) => { assert_eq!(info.id, id, "Data id"); match data { StreamData::List(read_value) => assert_eq!(value, read_value, "Data value"), @@ -426,7 +433,7 @@ fn write_pipeline_data_list_stream() -> Result<(), ShellError> { } match test.next_written().expect("unexpected end of stream") { - PluginOutput::Stream(StreamMessage::End(id)) => { + PluginOutput::End(id) => { assert_eq!(info.id, id, "End id"); } other => panic!("unexpected output: {other:?}"), @@ -510,7 +517,7 @@ fn write_pipeline_data_external_stream() -> Result<(), ShellError> { // End must come after all Data for msg in test.written() { match msg { - PluginOutput::Stream(StreamMessage::Data(id, data)) => { + PluginOutput::Data(id, data) => { if id == stdout_info.id { let result: Result, ShellError> = data.try_into().expect("wrong data in stdout stream"); @@ -535,7 +542,7 @@ fn write_pipeline_data_external_stream() -> Result<(), ShellError> { panic!("unrecognized stream id: {id}"); } } - PluginOutput::Stream(StreamMessage::End(id)) => { + PluginOutput::End(id) => { if id == stdout_info.id { assert!(!stdout_ended, "double End of stdout"); assert!(stdout_iter.next().is_none(), "unexpected end of stdout"); diff --git a/crates/nu-plugin/src/protocol/mod.rs b/crates/nu-plugin/src/protocol/mod.rs index 84835c8b508..6f71efc4c38 100644 --- a/crates/nu-plugin/src/protocol/mod.rs +++ b/crates/nu-plugin/src/protocol/mod.rs @@ -208,11 +208,14 @@ pub enum PluginInput { /// Response to an [`EngineCall`]. The ID should be the same one sent with the engine call this /// is responding to EngineCallResponse(EngineCallId, EngineCallResponse), - /// Stream control or data message. Untagged to keep them as small as possible. - /// - /// For example, `Stream(Ack(0))` is encoded as `{"Ack": 0}` - #[serde(untagged)] - Stream(StreamMessage), + /// See [`StreamMessage::Data`]. + Data(StreamId, StreamData), + /// See [`StreamMessage::End`]. + End(StreamId), + /// See [`StreamMessage::Drop`]. + Drop(StreamId), + /// See [`StreamMessage::Ack`]. + Ack(StreamId), } impl TryFrom for StreamMessage { @@ -220,7 +223,10 @@ impl TryFrom for StreamMessage { fn try_from(msg: PluginInput) -> Result { match msg { - PluginInput::Stream(stream_msg) => Ok(stream_msg), + PluginInput::Data(id, data) => Ok(StreamMessage::Data(id, data)), + PluginInput::End(id) => Ok(StreamMessage::End(id)), + PluginInput::Drop(id) => Ok(StreamMessage::Drop(id)), + PluginInput::Ack(id) => Ok(StreamMessage::Ack(id)), _ => Err(msg), } } @@ -228,7 +234,12 @@ impl TryFrom for StreamMessage { impl From for PluginInput { fn from(stream_msg: StreamMessage) -> PluginInput { - PluginInput::Stream(stream_msg) + match stream_msg { + StreamMessage::Data(id, data) => PluginInput::Data(id, data), + StreamMessage::End(id) => PluginInput::End(id), + StreamMessage::Drop(id) => PluginInput::Drop(id), + StreamMessage::Ack(id) => PluginInput::Ack(id), + } } } @@ -420,11 +431,14 @@ pub enum PluginOutput { id: EngineCallId, call: EngineCall, }, - /// Stream control or data message. Untagged to keep them as small as possible. - /// - /// For example, `Stream(Ack(0))` is encoded as `{"Ack": 0}` - #[serde(untagged)] - Stream(StreamMessage), + /// See [`StreamMessage::Data`]. + Data(StreamId, StreamData), + /// See [`StreamMessage::End`]. + End(StreamId), + /// See [`StreamMessage::Drop`]. + Drop(StreamId), + /// See [`StreamMessage::Ack`]. + Ack(StreamId), } impl TryFrom for StreamMessage { @@ -432,7 +446,10 @@ impl TryFrom for StreamMessage { fn try_from(msg: PluginOutput) -> Result { match msg { - PluginOutput::Stream(stream_msg) => Ok(stream_msg), + PluginOutput::Data(id, data) => Ok(StreamMessage::Data(id, data)), + PluginOutput::End(id) => Ok(StreamMessage::End(id)), + PluginOutput::Drop(id) => Ok(StreamMessage::Drop(id)), + PluginOutput::Ack(id) => Ok(StreamMessage::Ack(id)), _ => Err(msg), } } @@ -440,7 +457,12 @@ impl TryFrom for StreamMessage { impl From for PluginOutput { fn from(stream_msg: StreamMessage) -> PluginOutput { - PluginOutput::Stream(stream_msg) + match stream_msg { + StreamMessage::Data(id, data) => PluginOutput::Data(id, data), + StreamMessage::End(id) => PluginOutput::End(id), + StreamMessage::Drop(id) => PluginOutput::Drop(id), + StreamMessage::Ack(id) => PluginOutput::Ack(id), + } } } diff --git a/crates/nu-plugin/src/serializers/json.rs b/crates/nu-plugin/src/serializers/json.rs index 25dca0ce146..fc66db39931 100644 --- a/crates/nu-plugin/src/serializers/json.rs +++ b/crates/nu-plugin/src/serializers/json.rs @@ -117,14 +117,14 @@ mod tests { fn json_has_no_other_newlines() { let mut out = vec![]; // use something deeply nested, to try to trigger any pretty printing - let output = PluginOutput::Stream(StreamMessage::Data( + let output = PluginOutput::Data( 0, StreamData::List(Value::test_list(vec![ Value::test_int(4), // in case escaping failed Value::test_string("newline\ncontaining\nstring"), ])), - )); + ); JsonSerializer {} .encode(&output, &mut out) .expect("serialization error"); diff --git a/crates/nu-plugin/src/serializers/tests.rs b/crates/nu-plugin/src/serializers/tests.rs index dc134a9bdf8..af965abad28 100644 --- a/crates/nu-plugin/src/serializers/tests.rs +++ b/crates/nu-plugin/src/serializers/tests.rs @@ -3,7 +3,7 @@ macro_rules! generate_tests { use crate::protocol::{ CallInfo, CustomValueOp, EvaluatedCall, PipelineDataHeader, PluginCall, PluginCallResponse, PluginCustomValue, PluginInput, PluginOption, PluginOutput, - StreamData, StreamMessage, + StreamData, }; use nu_protocol::{ LabeledError, PluginSignature, Signature, Span, Spanned, SyntaxShape, Value, @@ -429,7 +429,7 @@ macro_rules! generate_tests { let item = Value::int(1, span); let stream_data = StreamData::List(item.clone()); - let plugin_input = PluginInput::Stream(StreamMessage::Data(0, stream_data)); + let plugin_input = PluginInput::Data(0, stream_data); let encoder = $encoder; let mut buffer: Vec = Vec::new(); @@ -442,7 +442,7 @@ macro_rules! generate_tests { .expect("eof"); match returned { - PluginInput::Stream(StreamMessage::Data(id, StreamData::List(list_data))) => { + PluginInput::Data(id, StreamData::List(list_data)) => { assert_eq!(0, id); assert_eq!(item, list_data); } @@ -455,7 +455,7 @@ macro_rules! generate_tests { let data = b"Hello world"; let stream_data = StreamData::Raw(Ok(data.to_vec())); - let plugin_input = PluginInput::Stream(StreamMessage::Data(1, stream_data)); + let plugin_input = PluginInput::Data(1, stream_data); let encoder = $encoder; let mut buffer: Vec = Vec::new(); @@ -468,7 +468,7 @@ macro_rules! generate_tests { .expect("eof"); match returned { - PluginInput::Stream(StreamMessage::Data(id, StreamData::Raw(bytes))) => { + PluginInput::Data(id, StreamData::Raw(bytes)) => { assert_eq!(1, id); match bytes { Ok(bytes) => assert_eq!(data, &bytes[..]), @@ -485,7 +485,7 @@ macro_rules! generate_tests { let item = Value::int(1, span); let stream_data = StreamData::List(item.clone()); - let plugin_output = PluginOutput::Stream(StreamMessage::Data(4, stream_data)); + let plugin_output = PluginOutput::Data(4, stream_data); let encoder = $encoder; let mut buffer: Vec = Vec::new(); @@ -498,7 +498,7 @@ macro_rules! generate_tests { .expect("eof"); match returned { - PluginOutput::Stream(StreamMessage::Data(id, StreamData::List(list_data))) => { + PluginOutput::Data(id, StreamData::List(list_data)) => { assert_eq!(4, id); assert_eq!(item, list_data); } @@ -511,7 +511,7 @@ macro_rules! generate_tests { let data = b"Hello world"; let stream_data = StreamData::Raw(Ok(data.to_vec())); - let plugin_output = PluginOutput::Stream(StreamMessage::Data(5, stream_data)); + let plugin_output = PluginOutput::Data(5, stream_data); let encoder = $encoder; let mut buffer: Vec = Vec::new(); @@ -524,7 +524,7 @@ macro_rules! generate_tests { .expect("eof"); match returned { - PluginOutput::Stream(StreamMessage::Data(id, StreamData::Raw(bytes))) => { + PluginOutput::Data(id, StreamData::Raw(bytes)) => { assert_eq!(5, id); match bytes { Ok(bytes) => assert_eq!(data, &bytes[..]),