Introduce metadata into the pipeline (#397)

This commit is contained in:
JT
2021-12-02 18:59:10 +13:00
committed by GitHub
parent 56307553ae
commit 45eba8b922
28 changed files with 329 additions and 199 deletions

View File

@ -363,8 +363,8 @@ impl EngineState {
let decl = self.get_decl(decl_id);
match input {
PipelineData::Stream(_) => decl,
PipelineData::Value(value) => match value {
PipelineData::Stream(..) => decl,
PipelineData::Value(value, ..) => match value {
Value::CustomValue { val, .. } => {
// This filter works because the custom definitions were declared
// before the default nushell declarations. This means that the custom

View File

@ -33,19 +33,29 @@ use crate::{ast::PathMember, Config, ShellError, Span, Value, ValueStream};
/// Nushell.
#[derive(Debug)]
pub enum PipelineData {
Value(Value),
Stream(ValueStream),
Value(Value, Option<PipelineMetadata>),
Stream(ValueStream, Option<PipelineMetadata>),
}
#[derive(Debug)]
pub struct PipelineMetadata {
pub data_source: DataSource,
}
#[derive(Debug)]
pub enum DataSource {
Ls,
}
impl PipelineData {
pub fn new(span: Span) -> PipelineData {
PipelineData::Value(Value::Nothing { span })
PipelineData::Value(Value::Nothing { span }, None)
}
pub fn into_value(self, span: Span) -> Value {
match self {
PipelineData::Value(v) => v,
PipelineData::Stream(s) => Value::List {
PipelineData::Value(v, ..) => v,
PipelineData::Stream(s, ..) => Value::List {
vals: s.collect(),
span, // FIXME?
},
@ -55,7 +65,7 @@ impl PipelineData {
pub fn into_interruptible_iter(self, ctrlc: Option<Arc<AtomicBool>>) -> PipelineIterator {
let mut iter = self.into_iter();
if let PipelineIterator(PipelineData::Stream(s)) = &mut iter {
if let PipelineIterator(PipelineData::Stream(s, ..)) = &mut iter {
s.ctrlc = ctrlc;
}
@ -64,20 +74,20 @@ impl PipelineData {
pub fn collect_string(self, separator: &str, config: &Config) -> String {
match self {
PipelineData::Value(v) => v.into_string(separator, config),
PipelineData::Stream(s) => s.into_string(separator, config),
PipelineData::Value(v, ..) => v.into_string(separator, config),
PipelineData::Stream(s, ..) => s.into_string(separator, config),
}
}
pub fn follow_cell_path(self, cell_path: &[PathMember]) -> Result<Value, ShellError> {
match self {
// FIXME: there are probably better ways of doing this
PipelineData::Stream(stream) => Value::List {
PipelineData::Stream(stream, ..) => Value::List {
vals: stream.collect(),
span: Span::unknown(),
}
.follow_cell_path(cell_path),
PipelineData::Value(v) => v.follow_cell_path(cell_path),
PipelineData::Value(v, ..) => v.follow_cell_path(cell_path),
}
}
@ -88,12 +98,12 @@ impl PipelineData {
) -> Result<(), ShellError> {
match self {
// FIXME: there are probably better ways of doing this
PipelineData::Stream(stream) => Value::List {
PipelineData::Stream(stream, ..) => Value::List {
vals: stream.collect(),
span: Span::unknown(),
}
.update_cell_path(cell_path, callback),
PipelineData::Value(v) => v.update_cell_path(cell_path, callback),
PipelineData::Value(v, ..) => v.update_cell_path(cell_path, callback),
}
}
@ -108,14 +118,14 @@ impl PipelineData {
F: FnMut(Value) -> Value + 'static + Send,
{
match self {
PipelineData::Value(Value::List { vals, .. }) => {
PipelineData::Value(Value::List { vals, .. }, ..) => {
Ok(vals.into_iter().map(f).into_pipeline_data(ctrlc))
}
PipelineData::Stream(stream) => Ok(stream.map(f).into_pipeline_data(ctrlc)),
PipelineData::Value(Value::Range { val, .. }) => {
PipelineData::Stream(stream, ..) => Ok(stream.map(f).into_pipeline_data(ctrlc)),
PipelineData::Value(Value::Range { val, .. }, ..) => {
Ok(val.into_range_iter()?.map(f).into_pipeline_data(ctrlc))
}
PipelineData::Value(v) => match f(v) {
PipelineData::Value(v, ..) => match f(v) {
Value::Error { error } => Err(error),
v => Ok(v.into_pipeline_data()),
},
@ -135,15 +145,17 @@ impl PipelineData {
F: FnMut(Value) -> U + 'static + Send,
{
match self {
PipelineData::Value(Value::List { vals, .. }) => {
PipelineData::Value(Value::List { vals, .. }, ..) => {
Ok(vals.into_iter().map(f).flatten().into_pipeline_data(ctrlc))
}
PipelineData::Stream(stream) => Ok(stream.map(f).flatten().into_pipeline_data(ctrlc)),
PipelineData::Value(Value::Range { val, .. }) => match val.into_range_iter() {
PipelineData::Stream(stream, ..) => {
Ok(stream.map(f).flatten().into_pipeline_data(ctrlc))
}
PipelineData::Value(Value::Range { val, .. }, ..) => match val.into_range_iter() {
Ok(iter) => Ok(iter.map(f).flatten().into_pipeline_data(ctrlc)),
Err(error) => Err(error),
},
PipelineData::Value(v) => Ok(f(v).into_iter().into_pipeline_data(ctrlc)),
PipelineData::Value(v, ..) => Ok(f(v).into_iter().into_pipeline_data(ctrlc)),
}
}
@ -157,14 +169,14 @@ impl PipelineData {
F: FnMut(&Value) -> bool + 'static + Send,
{
match self {
PipelineData::Value(Value::List { vals, .. }) => {
PipelineData::Value(Value::List { vals, .. }, ..) => {
Ok(vals.into_iter().filter(f).into_pipeline_data(ctrlc))
}
PipelineData::Stream(stream) => Ok(stream.filter(f).into_pipeline_data(ctrlc)),
PipelineData::Value(Value::Range { val, .. }) => {
PipelineData::Stream(stream, ..) => Ok(stream.filter(f).into_pipeline_data(ctrlc)),
PipelineData::Value(Value::Range { val, .. }, ..) => {
Ok(val.into_range_iter()?.filter(f).into_pipeline_data(ctrlc))
}
PipelineData::Value(v) => {
PipelineData::Value(v, ..) => {
if f(&v) {
Ok(v.into_pipeline_data())
} else {
@ -190,22 +202,33 @@ impl IntoIterator for PipelineData {
fn into_iter(self) -> Self::IntoIter {
match self {
PipelineData::Value(Value::List { vals, .. }) => {
PipelineIterator(PipelineData::Stream(ValueStream {
stream: Box::new(vals.into_iter()),
ctrlc: None,
}))
PipelineData::Value(Value::List { vals, .. }, metadata) => {
PipelineIterator(PipelineData::Stream(
ValueStream {
stream: Box::new(vals.into_iter()),
ctrlc: None,
},
metadata,
))
}
PipelineData::Value(Value::Range { val, .. }, metadata) => {
match val.into_range_iter() {
Ok(iter) => PipelineIterator(PipelineData::Stream(
ValueStream {
stream: Box::new(iter),
ctrlc: None,
},
metadata,
)),
Err(error) => PipelineIterator(PipelineData::Stream(
ValueStream {
stream: Box::new(std::iter::once(Value::Error { error })),
ctrlc: None,
},
metadata,
)),
}
}
PipelineData::Value(Value::Range { val, .. }) => match val.into_range_iter() {
Ok(iter) => PipelineIterator(PipelineData::Stream(ValueStream {
stream: Box::new(iter),
ctrlc: None,
})),
Err(error) => PipelineIterator(PipelineData::Stream(ValueStream {
stream: Box::new(std::iter::once(Value::Error { error })),
ctrlc: None,
})),
},
x => PipelineIterator(x),
}
}
@ -216,9 +239,9 @@ impl Iterator for PipelineIterator {
fn next(&mut self) -> Option<Self::Item> {
match &mut self.0 {
PipelineData::Value(Value::Nothing { .. }) => None,
PipelineData::Value(v) => Some(std::mem::take(v)),
PipelineData::Stream(stream) => stream.next(),
PipelineData::Value(Value::Nothing { .. }, ..) => None,
PipelineData::Value(v, ..) => Some(std::mem::take(v)),
PipelineData::Stream(stream, ..) => stream.next(),
}
}
}
@ -232,12 +255,17 @@ where
V: Into<Value>,
{
fn into_pipeline_data(self) -> PipelineData {
PipelineData::Value(self.into())
PipelineData::Value(self.into(), None)
}
}
pub trait IntoInterruptiblePipelineData {
fn into_pipeline_data(self, ctrlc: Option<Arc<AtomicBool>>) -> PipelineData;
fn into_pipeline_data_with_metadata(
self,
metadata: PipelineMetadata,
ctrlc: Option<Arc<AtomicBool>>,
) -> PipelineData;
}
impl<I> IntoInterruptiblePipelineData for I
@ -247,9 +275,26 @@ where
<I::IntoIter as Iterator>::Item: Into<Value>,
{
fn into_pipeline_data(self, ctrlc: Option<Arc<AtomicBool>>) -> PipelineData {
PipelineData::Stream(ValueStream {
stream: Box::new(self.into_iter().map(Into::into)),
ctrlc,
})
PipelineData::Stream(
ValueStream {
stream: Box::new(self.into_iter().map(Into::into)),
ctrlc,
},
None,
)
}
fn into_pipeline_data_with_metadata(
self,
metadata: PipelineMetadata,
ctrlc: Option<Arc<AtomicBool>>,
) -> PipelineData {
PipelineData::Stream(
ValueStream {
stream: Box::new(self.into_iter().map(Into::into)),
ctrlc,
},
Some(metadata),
)
}
}