Refactor: introduce 2 associated functions to PipelineData (#16233)

# Description
As title: this pr is try to introduce 2 functions to `PipelineData`:
1. PipelineData::list_stream --> create a PipelineData::ListStream
2. PipelineData::byte_stream -> create a PipelineData::ByteStream
And use these functions everywhere.

### Reason behind this change
I tried to implement `pipefail` feature, but this would required to
change `PipelineData` from enum to struct. So use these functions can
reduce diff if I finally change to struct. [Discord message
here](https://discord.com/channels/601130461678272522/615962413203718156/1396999539000479784)
is my plan.

# User-Facing Changes
NaN

# Tests + Formatting
NaN

# After Submitting
NaN
This commit is contained in:
Wind
2025-08-02 09:30:30 +08:00
committed by GitHub
parent ee5b5bd39e
commit eb8d2d3206
126 changed files with 299 additions and 304 deletions

View File

@ -417,7 +417,7 @@ pub fn eval_constant_with_input(
let block = working_set.get_block(*block_id);
eval_const_subexpression(working_set, block, input, expr.span(&working_set))
}
_ => eval_constant(working_set, expr).map(|v| PipelineData::Value(v, None)),
_ => eval_constant(working_set, expr).map(|v| PipelineData::value(v, None)),
}
}

View File

@ -737,7 +737,7 @@ impl ByteStream {
impl From<ByteStream> for PipelineData {
fn from(stream: ByteStream) -> Self {
Self::ByteStream(stream, None)
Self::byte_stream(stream, None)
}
}

View File

@ -139,7 +139,7 @@ impl IntoIterator for ListStream {
impl From<ListStream> for PipelineData {
fn from(stream: ListStream) -> Self {
Self::ListStream(stream, None)
Self::list_stream(stream, None)
}
}

View File

@ -48,10 +48,22 @@ pub enum PipelineData {
}
impl PipelineData {
pub fn empty() -> PipelineData {
pub const fn empty() -> PipelineData {
PipelineData::Empty
}
pub fn value(val: Value, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
PipelineData::Value(val, metadata.into())
}
pub fn list_stream(stream: ListStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
PipelineData::ListStream(stream, metadata.into())
}
pub fn byte_stream(stream: ByteStream, metadata: impl Into<Option<PipelineMetadata>>) -> Self {
PipelineData::ByteStream(stream, metadata.into())
}
pub fn metadata(&self) -> Option<PipelineMetadata> {
match self {
PipelineData::Empty => None,
@ -88,18 +100,18 @@ impl PipelineData {
/// Change the span of the [`PipelineData`].
///
/// Returns `Value(Nothing)` with the given span if it was [`PipelineData::Empty`].
/// Returns `Value(Nothing)` with the given span if it was [`PipelineData::empty()`].
pub fn with_span(self, span: Span) -> Self {
match self {
PipelineData::Empty => PipelineData::Value(Value::nothing(span), None),
PipelineData::Empty => PipelineData::value(Value::nothing(span), None),
PipelineData::Value(value, metadata) => {
PipelineData::Value(value.with_span(span), metadata)
PipelineData::value(value.with_span(span), metadata)
}
PipelineData::ListStream(stream, metadata) => {
PipelineData::ListStream(stream.with_span(span), metadata)
PipelineData::list_stream(stream.with_span(span), metadata)
}
PipelineData::ByteStream(stream, metadata) => {
PipelineData::ByteStream(stream.with_span(span), metadata)
PipelineData::byte_stream(stream.with_span(span), metadata)
}
}
}
@ -191,19 +203,19 @@ impl PipelineData {
PipelineData::ListStream(..) | PipelineData::ByteStream(..) => Ok(self),
PipelineData::Value(Value::List { .. } | Value::Range { .. }, ref metadata) => {
let metadata = metadata.clone();
Ok(PipelineData::ListStream(
Ok(PipelineData::list_stream(
ListStream::new(self.into_iter(), span, engine_state.signals().clone()),
metadata,
))
}
PipelineData::Value(Value::String { val, .. }, metadata) => {
Ok(PipelineData::ByteStream(
Ok(PipelineData::byte_stream(
ByteStream::read_string(val, span, engine_state.signals().clone()),
metadata,
))
}
PipelineData::Value(Value::Binary { val, .. }, metadata) => {
Ok(PipelineData::ByteStream(
Ok(PipelineData::byte_stream(
ByteStream::read_binary(val, span, engine_state.signals().clone()),
metadata,
))
@ -454,9 +466,9 @@ impl PipelineData {
};
Ok(pipeline.set_metadata(metadata))
}
PipelineData::Empty => Ok(PipelineData::Empty),
PipelineData::Empty => Ok(PipelineData::empty()),
PipelineData::ListStream(stream, metadata) => {
Ok(PipelineData::ListStream(stream.map(f), metadata))
Ok(PipelineData::list_stream(stream.map(f), metadata))
}
PipelineData::ByteStream(stream, metadata) => {
Ok(f(stream.into_value()?).into_pipeline_data_with_metadata(metadata))
@ -473,7 +485,7 @@ impl PipelineData {
F: FnMut(Value) -> U + 'static + Send,
{
match self {
PipelineData::Empty => Ok(PipelineData::Empty),
PipelineData::Empty => Ok(PipelineData::empty()),
PipelineData::Value(value, metadata) => {
let span = value.span();
let pipeline = match value {
@ -491,7 +503,7 @@ impl PipelineData {
};
Ok(pipeline.set_metadata(metadata))
}
PipelineData::ListStream(stream, metadata) => Ok(PipelineData::ListStream(
PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
stream.modify(|iter| iter.flat_map(f)),
metadata,
)),
@ -520,7 +532,7 @@ impl PipelineData {
F: FnMut(&Value) -> bool + 'static + Send,
{
match self {
PipelineData::Empty => Ok(PipelineData::Empty),
PipelineData::Empty => Ok(PipelineData::empty()),
PipelineData::Value(value, metadata) => {
let span = value.span();
let pipeline = match value {
@ -542,7 +554,7 @@ impl PipelineData {
};
Ok(pipeline.set_metadata(metadata))
}
PipelineData::ListStream(stream, metadata) => Ok(PipelineData::ListStream(
PipelineData::ListStream(stream, metadata) => Ok(PipelineData::list_stream(
stream.modify(|iter| iter.filter(f)),
metadata,
)),
@ -602,9 +614,9 @@ impl PipelineData {
}
let range_values: Vec<Value> =
val.into_range_iter(span, Signals::empty()).collect();
Ok(PipelineData::Value(Value::list(range_values, span), None))
Ok(PipelineData::value(Value::list(range_values, span), None))
}
x => Ok(PipelineData::Value(x, metadata)),
x => Ok(PipelineData::value(x, metadata)),
}
}
_ => Ok(self),
@ -874,14 +886,14 @@ where
V: Into<Value>,
{
fn into_pipeline_data(self) -> PipelineData {
PipelineData::Value(self.into(), None)
PipelineData::value(self.into(), None)
}
fn into_pipeline_data_with_metadata(
self,
metadata: impl Into<Option<PipelineMetadata>>,
) -> PipelineData {
PipelineData::Value(self.into(), metadata.into())
PipelineData::value(self.into(), metadata.into())
}
}
@ -911,7 +923,7 @@ where
signals: Signals,
metadata: impl Into<Option<PipelineMetadata>>,
) -> PipelineData {
PipelineData::ListStream(
PipelineData::list_stream(
ListStream::new(self.into_iter().map(Into::into), span, signals),
metadata.into(),
)