Make pipeline metadata available to plugins (#13495)

# Description
Fixes an issue with pipeline metadata not being passed to plugins.
This commit is contained in:
Jack Wright
2024-08-02 11:01:20 -07:00
committed by GitHub
parent ca8eb856e8
commit d081e3386f
10 changed files with 208 additions and 130 deletions

View File

@ -177,16 +177,19 @@ pub trait InterfaceManager {
) -> Result<PipelineData, ShellError> {
self.prepare_pipeline_data(match header {
PipelineDataHeader::Empty => PipelineData::Empty,
PipelineDataHeader::Value(value) => PipelineData::Value(value, None),
PipelineDataHeader::Value(value, metadata) => PipelineData::Value(value, metadata),
PipelineDataHeader::ListStream(info) => {
let handle = self.stream_manager().get_handle();
let reader = handle.read_stream(info.id, self.get_interface())?;
ListStream::new(reader, info.span, signals.clone()).into()
let ls = ListStream::new(reader, info.span, signals.clone());
PipelineData::ListStream(ls, info.metadata)
}
PipelineDataHeader::ByteStream(info) => {
let handle = self.stream_manager().get_handle();
let reader = handle.read_stream(info.id, self.get_interface())?;
ByteStream::from_result_iter(reader, info.span, signals.clone(), info.type_).into()
let bs =
ByteStream::from_result_iter(reader, info.span, signals.clone(), info.type_);
PipelineData::ByteStream(bs, info.metadata)
}
})
}
@ -248,26 +251,33 @@ pub trait Interface: Clone + Send {
Ok::<_, ShellError>((id, writer))
};
match self.prepare_pipeline_data(data, context)? {
PipelineData::Value(value, ..) => {
Ok((PipelineDataHeader::Value(value), PipelineDataWriter::None))
}
PipelineData::Value(value, metadata) => Ok((
PipelineDataHeader::Value(value, metadata),
PipelineDataWriter::None,
)),
PipelineData::Empty => Ok((PipelineDataHeader::Empty, PipelineDataWriter::None)),
PipelineData::ListStream(stream, ..) => {
PipelineData::ListStream(stream, metadata) => {
let (id, writer) = new_stream(LIST_STREAM_HIGH_PRESSURE)?;
Ok((
PipelineDataHeader::ListStream(ListStreamInfo {
id,
span: stream.span(),
metadata,
}),
PipelineDataWriter::ListStream(writer, stream),
))
}
PipelineData::ByteStream(stream, ..) => {
PipelineData::ByteStream(stream, metadata) => {
let span = stream.span();
let type_ = stream.type_();
if let Some(reader) = stream.reader() {
let (id, writer) = new_stream(RAW_STREAM_HIGH_PRESSURE)?;
let header = PipelineDataHeader::ByteStream(ByteStreamInfo { id, span, type_ });
let header = PipelineDataHeader::ByteStream(ByteStreamInfo {
id,
span,
type_,
metadata,
});
Ok((header, PipelineDataWriter::ByteStream(writer, reader)))
} else {
Ok((PipelineDataHeader::Empty, PipelineDataWriter::None))

View File

@ -137,10 +137,16 @@ fn read_pipeline_data_empty() -> Result<(), ShellError> {
fn read_pipeline_data_value() -> Result<(), ShellError> {
let manager = TestInterfaceManager::new(&TestCase::new());
let value = Value::test_int(4);
let header = PipelineDataHeader::Value(value.clone());
let metadata = Some(PipelineMetadata {
data_source: DataSource::FilePath("/test/path".into()),
content_type: None,
});
let header = PipelineDataHeader::Value(value.clone(), metadata.clone());
match manager.read_pipeline_data(header, &Signals::empty())? {
PipelineData::Value(read_value, ..) => assert_eq!(value, read_value),
PipelineData::Value(read_value, read_metadata) => {
assert_eq!(value, read_value);
assert_eq!(metadata, read_metadata);
}
PipelineData::ListStream(..) => panic!("unexpected ListStream"),
PipelineData::ByteStream(..) => panic!("unexpected ByteStream"),
PipelineData::Empty => panic!("unexpected Empty"),
@ -161,9 +167,15 @@ fn read_pipeline_data_list_stream() -> Result<(), ShellError> {
}
test.add(StreamMessage::End(7));
let metadata = Some(PipelineMetadata {
data_source: DataSource::None,
content_type: Some("foobar".into()),
});
let header = PipelineDataHeader::ListStream(ListStreamInfo {
id: 7,
span: Span::test_data(),
metadata,
});
let pipe = manager.read_pipeline_data(header, &Signals::empty())?;
@ -204,10 +216,17 @@ fn read_pipeline_data_byte_stream() -> Result<(), ShellError> {
test.add(StreamMessage::End(12));
let test_span = Span::new(10, 13);
let metadata = Some(PipelineMetadata {
data_source: DataSource::None,
content_type: Some("foobar".into()),
});
let header = PipelineDataHeader::ByteStream(ByteStreamInfo {
id: 12,
span: test_span,
type_: ByteStreamType::Unknown,
metadata,
});
let pipe = manager.read_pipeline_data(header, &Signals::empty())?;
@ -251,9 +270,15 @@ fn read_pipeline_data_byte_stream() -> Result<(), ShellError> {
#[test]
fn read_pipeline_data_prepared_properly() -> Result<(), ShellError> {
let manager = TestInterfaceManager::new(&TestCase::new());
let metadata = Some(PipelineMetadata {
data_source: DataSource::None,
content_type: Some("foobar".into()),
});
let header = PipelineDataHeader::ListStream(ListStreamInfo {
id: 0,
span: Span::test_data(),
metadata,
});
match manager.read_pipeline_data(header, &Signals::empty())? {
PipelineData::ListStream(_, meta) => match meta {
@ -301,7 +326,7 @@ fn write_pipeline_data_value() -> Result<(), ShellError> {
interface.init_write_pipeline_data(PipelineData::Value(value.clone(), None), &())?;
match header {
PipelineDataHeader::Value(read_value) => assert_eq!(value, read_value),
PipelineDataHeader::Value(read_value, _) => assert_eq!(value, read_value),
_ => panic!("unexpected header: {header:?}"),
}

View File

@ -6,7 +6,8 @@ macro_rules! generate_tests {
StreamData,
};
use nu_protocol::{
LabeledError, PluginSignature, Signature, Span, Spanned, SyntaxShape, Value,
DataSource, LabeledError, PipelineMetadata, PluginSignature, Signature, Span, Spanned,
SyntaxShape, Value,
};
#[test]
@ -123,10 +124,15 @@ macro_rules! generate_tests {
)],
};
let metadata = Some(PipelineMetadata {
data_source: DataSource::None,
content_type: Some("foobar".into()),
});
let plugin_call = PluginCall::Run(CallInfo {
name: name.clone(),
call: call.clone(),
input: PipelineDataHeader::Value(input.clone()),
input: PipelineDataHeader::Value(input.clone(), metadata.clone()),
});
let plugin_input = PluginInput::Call(1, plugin_call);
@ -144,7 +150,7 @@ macro_rules! generate_tests {
match returned {
PluginInput::Call(1, PluginCall::Run(call_info)) => {
assert_eq!(name, call_info.name);
assert_eq!(PipelineDataHeader::Value(input), call_info.input);
assert_eq!(PipelineDataHeader::Value(input, metadata), call_info.input);
assert_eq!(call.head, call_info.call.head);
assert_eq!(call.positional.len(), call_info.call.positional.len());
@ -305,7 +311,7 @@ macro_rules! generate_tests {
match returned {
PluginOutput::CallResponse(
4,
PluginCallResponse::PipelineData(PipelineDataHeader::Value(returned_value)),
PluginCallResponse::PipelineData(PipelineDataHeader::Value(returned_value, _)),
) => {
assert_eq!(value, returned_value)
}
@ -325,7 +331,7 @@ macro_rules! generate_tests {
span,
);
let response = PluginCallResponse::PipelineData(PipelineDataHeader::Value(value));
let response = PluginCallResponse::PipelineData(PipelineDataHeader::value(value));
let output = PluginOutput::CallResponse(5, response);
let encoder = $encoder;
@ -341,7 +347,7 @@ macro_rules! generate_tests {
match returned {
PluginOutput::CallResponse(
5,
PluginCallResponse::PipelineData(PipelineDataHeader::Value(returned_value)),
PluginCallResponse::PipelineData(PipelineDataHeader::Value(returned_value, _)),
) => {
assert_eq!(span, returned_value.span());