diff --git a/crates/nu-engine/src/evaluate/internal.rs b/crates/nu-engine/src/evaluate/internal.rs index 02c387e00b..4aa2836348 100644 --- a/crates/nu-engine/src/evaluate/internal.rs +++ b/crates/nu-engine/src/evaluate/internal.rs @@ -63,7 +63,7 @@ impl Iterator for InternalIteratorSimple { pub struct InternalIterator { pub context: EvaluationContext, - pub leftovers: Vec, + pub leftovers: InputStream, pub input: ActionStream, } @@ -71,8 +71,7 @@ impl Iterator for InternalIterator { type Item = Value; fn next(&mut self) -> Option { - if !self.leftovers.is_empty() { - let output = self.leftovers.remove(0); + if let Some(output) = self.leftovers.next() { return Some(output); } @@ -114,45 +113,24 @@ impl Iterator for InternalIterator { }, scope: self.context.scope.clone(), }; - let result = converter - .run_with_actions(new_args.with_input(vec![tagged_contents])); + let result = converter.run(new_args.with_input(vec![tagged_contents])); match result { Ok(mut result) => { - let result_vec: Vec> = - result.drain_vec(); - - let mut output = vec![]; - for res in result_vec { - match res { - Ok(ReturnSuccess::Value(Value { - value: UntaggedValue::Table(list), - .. - })) => { - for l in list { - output.push(l); - } - } - Ok(ReturnSuccess::Value(Value { value, .. })) => { - output.push(value.into_value(contents_tag.clone())); - } - Err(e) => output.push( - UntaggedValue::Error(e).into_untagged_value(), - ), - _ => {} - } - } - - let mut output = output.into_iter(); - - if let Some(x) = output.next() { - self.leftovers = output.collect(); - + if let Some(x) = result.next() { + self.leftovers = + InputStream::from_stream(result.map(move |x| Value { + value: x.value, + tag: contents_tag.clone(), + })); return Some(x); + } else { + return None; } } Err(err) => { - self.context.error(err); + self.leftovers = InputStream::empty(); + return Some(Value::error(err)); } } } else { diff --git a/crates/nu-engine/src/whole_stream_command.rs b/crates/nu-engine/src/whole_stream_command.rs index b37a8b65cf..c80bff856f 100644 --- a/crates/nu-engine/src/whole_stream_command.rs +++ b/crates/nu-engine/src/whole_stream_command.rs @@ -38,7 +38,7 @@ pub trait WholeStreamCommand: Send + Sync { Ok(Box::new(crate::evaluate::internal::InternalIterator { context, input: stream, - leftovers: vec![], + leftovers: InputStream::empty(), }) .to_output_stream()) } diff --git a/crates/nu-stream/src/output.rs b/crates/nu-stream/src/output.rs index fefd903ae2..778fb47c14 100644 --- a/crates/nu-stream/src/output.rs +++ b/crates/nu-stream/src/output.rs @@ -44,9 +44,11 @@ impl ActionStream { pub fn drain_vec(&mut self) -> Vec { let mut output = vec![]; + while let Some(x) = self.values.next() { output.push(x); } + output } }