ListStream touchup (#12524)

# Description

Does some misc changes to `ListStream`:
- Moves it into its own module/file separate from `RawStream`.
- `ListStream`s now have an associated `Span`.
- This required changes to `ListStreamInfo` in `nu-plugin`. Note sure if
this is a breaking change for the plugin protocol.
- Hides the internals of `ListStream` but also adds a few more methods.
- This includes two functions to more easily alter a stream (these take
a `ListStream` and return a `ListStream` instead of having to go through
the whole `into_pipeline_data(..)` route).
  -  `map`: takes a `FnMut(Value) -> Value`
  - `modify`: takes a function to modify the inner stream.
This commit is contained in:
Ian Manske
2024-05-05 16:00:59 +00:00
committed by GitHub
parent 3143ded374
commit e879d4ecaf
106 changed files with 957 additions and 874 deletions

View File

@ -4,8 +4,7 @@ use crate::{
ExternalArgument, ListItem, Math, Operator, RecordItem,
},
debugger::DebugContext,
Config, IntoInterruptiblePipelineData, Range, Record, ShellError, Span, Value, VarId,
ENV_VARIABLE_ID,
Config, Range, Record, ShellError, Span, Value, VarId, ENV_VARIABLE_ID,
};
use std::{borrow::Cow, collections::HashMap};
@ -278,18 +277,13 @@ pub trait Eval {
Self::eval_row_condition_or_closure(state, mut_state, *block_id, expr.span)
}
Expr::StringInterpolation(exprs) => {
let mut parts = vec![];
for expr in exprs {
parts.push(Self::eval::<D>(state, mut_state, expr)?);
}
let config = Self::get_config(state, mut_state);
let str = exprs
.iter()
.map(|expr| Self::eval::<D>(state, mut_state, expr).map(|v| v.to_expanded_string(", ", &config)))
.collect::<Result<String, _>>()?;
parts
.into_iter()
.into_pipeline_data(None)
.collect_string("", &config)
.map(|x| Value::string(x, expr.span))
Ok(Value::string(str, expr.span))
}
Expr::Overlay(_) => Self::eval_overlay(state, expr.span),
Expr::GlobPattern(pattern, quoted) => {

View File

@ -0,0 +1,156 @@
use crate::{Config, PipelineData, ShellError, Span, Value};
use std::{
fmt::Debug,
sync::{atomic::AtomicBool, Arc},
};
pub type ValueIterator = Box<dyn Iterator<Item = Value> + Send + 'static>;
/// A potentially infinite, interruptible stream of [`Value`]s.
///
/// In practice, a "stream" here means anything which can be iterated and produces Values.
/// Like other iterators in Rust, observing values from this stream will drain the items
/// as you view them and the stream cannot be replayed.
pub struct ListStream {
stream: ValueIterator,
span: Span,
}
impl ListStream {
/// Create a new [`ListStream`] from a [`Value`] `Iterator`.
pub fn new(
iter: impl Iterator<Item = Value> + Send + 'static,
span: Span,
interrupt: Option<Arc<AtomicBool>>,
) -> Self {
Self {
stream: Box::new(Interrupt::new(iter, interrupt)),
span,
}
}
/// Returns the [`Span`] associated with this [`ListStream`].
pub fn span(&self) -> Span {
self.span
}
/// Convert a [`ListStream`] into its inner [`Value`] `Iterator`.
pub fn into_inner(self) -> ValueIterator {
self.stream
}
/// Converts each value in a [`ListStream`] into a string and then joins the strings together
/// using the given separator.
pub fn into_string(self, separator: &str, config: &Config) -> String {
self.into_iter()
.map(|val| val.to_expanded_string(", ", config))
.collect::<Vec<String>>()
.join(separator)
}
/// Collect the values of a [`ListStream`] into a list [`Value`].
pub fn into_value(self) -> Value {
Value::list(self.stream.collect(), self.span)
}
/// Consume all values in the stream, returning an error if any of the values is a `Value::Error`.
pub fn drain(self) -> Result<(), ShellError> {
for next in self {
if let Value::Error { error, .. } = next {
return Err(*error);
}
}
Ok(())
}
/// Modify the inner iterator of a [`ListStream`] using a function.
///
/// This can be used to call any number of standard iterator functions on the [`ListStream`].
/// E.g., `take`, `filter`, `step_by`, and more.
///
/// ```
/// use nu_protocol::{ListStream, Span, Value};
///
/// let span = Span::unknown();
/// let stream = ListStream::new(std::iter::repeat(Value::int(0, span)), span, None);
/// let new_stream = stream.modify(|iter| iter.take(100));
/// ```
pub fn modify<I>(self, f: impl FnOnce(ValueIterator) -> I) -> Self
where
I: Iterator<Item = Value> + Send + 'static,
{
Self {
stream: Box::new(f(self.stream)),
span: self.span,
}
}
/// Create a new [`ListStream`] whose values are the results of applying the given function
/// to each of the values in the original [`ListStream`].
pub fn map(self, mapping: impl FnMut(Value) -> Value + Send + 'static) -> Self {
self.modify(|iter| iter.map(mapping))
}
}
impl Debug for ListStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ListStream").finish()
}
}
impl IntoIterator for ListStream {
type Item = Value;
type IntoIter = IntoIter;
fn into_iter(self) -> Self::IntoIter {
IntoIter {
stream: self.into_inner(),
}
}
}
impl From<ListStream> for PipelineData {
fn from(stream: ListStream) -> Self {
Self::ListStream(stream, None)
}
}
pub struct IntoIter {
stream: ValueIterator,
}
impl Iterator for IntoIter {
type Item = Value;
fn next(&mut self) -> Option<Self::Item> {
self.stream.next()
}
}
struct Interrupt<I: Iterator> {
iter: I,
interrupt: Option<Arc<AtomicBool>>,
}
impl<I: Iterator> Interrupt<I> {
fn new(iter: I, interrupt: Option<Arc<AtomicBool>>) -> Self {
Self { iter, interrupt }
}
}
impl<I: Iterator> Iterator for Interrupt<I> {
type Item = <I as Iterator>::Item;
fn next(&mut self) -> Option<Self::Item> {
if nu_utils::ctrl_c::was_pressed(&self.interrupt) {
None
} else {
self.iter.next()
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
}
}

View File

@ -1,10 +1,12 @@
pub mod list_stream;
mod metadata;
mod out_dest;
mod stream;
mod raw_stream;
pub use list_stream::{ListStream, ValueIterator};
pub use metadata::*;
pub use out_dest::*;
pub use stream::*;
pub use raw_stream::*;
use crate::{
ast::{Call, PathMember},
@ -76,8 +78,9 @@ impl PipelineData {
PipelineData::ExternalStream {
stdout: None,
stderr: None,
exit_code: Some(ListStream::from_stream(
exit_code: Some(ListStream::new(
[Value::int(exit_code, Span::unknown())].into_iter(),
Span::unknown(),
None,
)),
span: Span::unknown(),
@ -118,7 +121,7 @@ impl PipelineData {
/// PipelineData doesn't always have a Span, but we can try!
pub fn span(&self) -> Option<Span> {
match self {
PipelineData::ListStream(..) => None,
PipelineData::ListStream(stream, ..) => Some(stream.span()),
PipelineData::ExternalStream { span, .. } => Some(*span),
PipelineData::Value(v, _) => Some(v.span()),
PipelineData::Empty => None,
@ -131,7 +134,7 @@ impl PipelineData {
PipelineData::Value(Value::Nothing { .. }, ..) => Value::nothing(span),
PipelineData::Value(v, ..) => v.with_span(span),
PipelineData::ListStream(s, ..) => Value::list(
s.collect(),
s.into_iter().collect(),
span, // FIXME?
),
PipelineData::ExternalStream {
@ -403,51 +406,54 @@ impl PipelineData {
///
/// It returns Err if the `self` cannot be converted to an iterator.
pub fn into_iter_strict(self, span: Span) -> Result<PipelineIterator, ShellError> {
match self {
PipelineData::Value(value, metadata) => match value {
Value::List { vals, .. } => Ok(PipelineIterator(PipelineData::ListStream(
ListStream::from_stream(vals.into_iter(), None),
metadata,
))),
Value::Binary { val, .. } => Ok(PipelineIterator(PipelineData::ListStream(
ListStream::from_stream(
val.into_iter().map(move |x| Value::int(x as i64, span)),
None,
Ok(PipelineIterator(match self {
PipelineData::Value(value, ..) => {
let val_span = value.span();
match value {
Value::List { vals, .. } => PipelineIteratorInner::ListStream(
ListStream::new(vals.into_iter(), val_span, None).into_iter(),
),
metadata,
))),
Value::Range { val, .. } => Ok(PipelineIterator(PipelineData::ListStream(
ListStream::from_stream(val.into_range_iter(value.span(), None), None),
metadata,
)))
,
// Propagate errors by explicitly matching them before the final case.
Value::Error { error, .. } => Err(*error),
other => Err(ShellError::OnlySupportsThisInputType {
Value::Binary { val, .. } => PipelineIteratorInner::ListStream(
ListStream::new(
val.into_iter().map(move |x| Value::int(x as i64, val_span)),
val_span,
None,
)
.into_iter(),
),
Value::Range { val, .. } => PipelineIteratorInner::ListStream(
ListStream::new(val.into_range_iter(value.span(), None), val_span, None)
.into_iter(),
),
// Propagate errors by explicitly matching them before the final case.
Value::Error { error, .. } => return Err(*error),
other => {
return Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "list, binary, raw data or range".into(),
wrong_type: other.get_type().to_string(),
dst_span: span,
src_span: val_span,
})
}
}
}
PipelineData::ListStream(stream, ..) => {
PipelineIteratorInner::ListStream(stream.into_iter())
}
PipelineData::Empty => {
return Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "list, binary, raw data or range".into(),
wrong_type: other.get_type().to_string(),
wrong_type: "null".into(),
dst_span: span,
src_span: other.span(),
}),
},
PipelineData::Empty => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "list, binary, raw data or range".into(),
wrong_type: "null".into(),
dst_span: span,
src_span: span,
}),
other => Ok(PipelineIterator(other)),
}
}
pub fn into_interruptible_iter(self, ctrlc: Option<Arc<AtomicBool>>) -> PipelineIterator {
let mut iter = self.into_iter();
if let PipelineIterator(PipelineData::ListStream(s, ..)) = &mut iter {
s.ctrlc = ctrlc;
}
iter
src_span: span,
})
}
PipelineData::ExternalStream {
stdout: Some(stdout),
..
} => PipelineIteratorInner::ExternalStream(stdout),
PipelineData::ExternalStream { stdout: None, .. } => PipelineIteratorInner::Empty,
}))
}
pub fn collect_string(self, separator: &str, config: &Config) -> Result<String, ShellError> {
@ -516,9 +522,8 @@ impl PipelineData {
) -> Result<Value, ShellError> {
match self {
// FIXME: there are probably better ways of doing this
PipelineData::ListStream(stream, ..) => {
Value::list(stream.collect(), head).follow_cell_path(cell_path, insensitive)
}
PipelineData::ListStream(stream, ..) => Value::list(stream.into_iter().collect(), head)
.follow_cell_path(cell_path, insensitive),
PipelineData::Value(v, ..) => v.follow_cell_path(cell_path, insensitive),
PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
type_name: "empty pipeline".to_string(),
@ -531,22 +536,6 @@ impl PipelineData {
}
}
pub fn upsert_cell_path(
&mut self,
cell_path: &[PathMember],
callback: Box<dyn FnOnce(&Value) -> Value>,
head: Span,
) -> Result<(), ShellError> {
match self {
// FIXME: there are probably better ways of doing this
PipelineData::ListStream(stream, ..) => {
Value::list(stream.collect(), head).upsert_cell_path(cell_path, callback)
}
PipelineData::Value(v, ..) => v.upsert_cell_path(cell_path, callback),
_ => Ok(()),
}
}
/// Simplified mapper to help with simple values also. For full iterator support use `.into_iter()` instead
pub fn map<F>(
self,
@ -562,12 +551,12 @@ impl PipelineData {
let span = value.span();
match value {
Value::List { vals, .. } => {
Ok(vals.into_iter().map(f).into_pipeline_data(ctrlc))
Ok(vals.into_iter().map(f).into_pipeline_data(span, ctrlc))
}
Value::Range { val, .. } => Ok(val
.into_range_iter(span, ctrlc.clone())
.map(f)
.into_pipeline_data(ctrlc)),
.into_pipeline_data(span, ctrlc)),
value => match f(value) {
Value::Error { error, .. } => Err(*error),
v => Ok(v.into_pipeline_data()),
@ -575,7 +564,9 @@ impl PipelineData {
}
}
PipelineData::Empty => Ok(PipelineData::Empty),
PipelineData::ListStream(stream, ..) => Ok(stream.map(f).into_pipeline_data(ctrlc)),
PipelineData::ListStream(stream, ..) => {
Ok(PipelineData::ListStream(stream.map(f), None))
}
PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::empty()),
PipelineData::ExternalStream {
stdout: Some(stream),
@ -614,21 +605,22 @@ impl PipelineData {
let span = value.span();
match value {
Value::List { vals, .. } => {
Ok(vals.into_iter().flat_map(f).into_pipeline_data(ctrlc))
Ok(vals.into_iter().flat_map(f).into_pipeline_data(span, ctrlc))
}
Value::Range { val, .. } => Ok(val
.into_range_iter(span, ctrlc.clone())
.flat_map(f)
.into_pipeline_data(ctrlc)),
value => Ok(f(value).into_iter().into_pipeline_data(ctrlc)),
.into_pipeline_data(span, ctrlc)),
value => Ok(f(value).into_iter().into_pipeline_data(span, ctrlc)),
}
}
PipelineData::ListStream(stream, ..) => {
Ok(stream.flat_map(f).into_pipeline_data(ctrlc))
Ok(stream.modify(|iter| iter.flat_map(f)).into())
}
PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::Empty),
PipelineData::ExternalStream {
stdout: Some(stream),
span,
trim_end_newline,
..
} => {
@ -640,11 +632,11 @@ impl PipelineData {
}
Ok(f(Value::string(st, collected.span))
.into_iter()
.into_pipeline_data(ctrlc))
.into_pipeline_data(span, ctrlc))
} else {
Ok(f(Value::binary(collected.item, collected.span))
.into_iter()
.into_pipeline_data(ctrlc))
.into_pipeline_data(span, ctrlc))
}
}
}
@ -665,12 +657,12 @@ impl PipelineData {
let span = value.span();
match value {
Value::List { vals, .. } => {
Ok(vals.into_iter().filter(f).into_pipeline_data(ctrlc))
Ok(vals.into_iter().filter(f).into_pipeline_data(span, ctrlc))
}
Value::Range { val, .. } => Ok(val
.into_range_iter(span, ctrlc.clone())
.filter(f)
.into_pipeline_data(ctrlc)),
.into_pipeline_data(span, ctrlc)),
value => {
if f(&value) {
Ok(value.into_pipeline_data())
@ -680,7 +672,7 @@ impl PipelineData {
}
}
}
PipelineData::ListStream(stream, ..) => Ok(stream.filter(f).into_pipeline_data(ctrlc)),
PipelineData::ListStream(stream, ..) => Ok(stream.modify(|iter| iter.filter(f)).into()),
PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::Empty),
PipelineData::ExternalStream {
stdout: Some(stream),
@ -764,7 +756,6 @@ impl PipelineData {
match exit_code {
Some(exit_code_stream) => {
let ctrlc = exit_code_stream.ctrlc.clone();
let exit_code: Vec<Value> = exit_code_stream.into_iter().collect();
if let Some(Value::Int { val: code, .. }) = exit_code.last() {
// if exit_code is not 0, it indicates error occurred, return back Err.
@ -776,7 +767,7 @@ impl PipelineData {
PipelineData::ExternalStream {
stdout: None,
stderr,
exit_code: Some(ListStream::from_stream(exit_code.into_iter(), ctrlc)),
exit_code: Some(ListStream::new(exit_code.into_iter(), span, None)),
span,
metadata,
trim_end_newline,
@ -947,7 +938,14 @@ impl PipelineData {
}
}
pub struct PipelineIterator(PipelineData);
enum PipelineIteratorInner {
Empty,
Value(Value),
ListStream(list_stream::IntoIter),
ExternalStream(RawStream),
}
pub struct PipelineIterator(PipelineIteratorInner);
impl IntoIterator for PipelineData {
type Item = Value;
@ -955,23 +953,29 @@ impl IntoIterator for PipelineData {
type IntoIter = PipelineIterator;
fn into_iter(self) -> Self::IntoIter {
match self {
PipelineData::Value(value, metadata) => {
PipelineIterator(match self {
PipelineData::Value(value, ..) => {
let span = value.span();
match value {
Value::List { vals, .. } => PipelineIterator(PipelineData::ListStream(
ListStream::from_stream(vals.into_iter(), None),
metadata,
)),
Value::Range { val, .. } => PipelineIterator(PipelineData::ListStream(
ListStream::from_stream(val.into_range_iter(span, None), None),
metadata,
)),
x => PipelineIterator(PipelineData::Value(x, metadata)),
Value::List { vals, .. } => PipelineIteratorInner::ListStream(
ListStream::new(vals.into_iter(), span, None).into_iter(),
),
Value::Range { val, .. } => PipelineIteratorInner::ListStream(
ListStream::new(val.into_range_iter(span, None), span, None).into_iter(),
),
x => PipelineIteratorInner::Value(x),
}
}
x => PipelineIterator(x),
}
PipelineData::ListStream(stream, ..) => {
PipelineIteratorInner::ListStream(stream.into_iter())
}
PipelineData::ExternalStream {
stdout: Some(stdout),
..
} => PipelineIteratorInner::ExternalStream(stdout),
PipelineData::ExternalStream { stdout: None, .. } => PipelineIteratorInner::Empty,
PipelineData::Empty => PipelineIteratorInner::Empty,
})
}
}
@ -1075,15 +1079,11 @@ impl Iterator for PipelineIterator {
fn next(&mut self) -> Option<Self::Item> {
match &mut self.0 {
PipelineData::Empty => None,
PipelineData::Value(Value::Nothing { .. }, ..) => None,
PipelineData::Value(v, ..) => Some(std::mem::take(v)),
PipelineData::ListStream(stream, ..) => stream.next(),
PipelineData::ExternalStream { stdout: None, .. } => None,
PipelineData::ExternalStream {
stdout: Some(stream),
..
} => stream.next().map(|x| match x {
PipelineIteratorInner::Empty => None,
PipelineIteratorInner::Value(Value::Nothing { .. }, ..) => None,
PipelineIteratorInner::Value(v, ..) => Some(std::mem::take(v)),
PipelineIteratorInner::ListStream(stream, ..) => stream.next(),
PipelineIteratorInner::ExternalStream(stream) => stream.next().map(|x| match x {
Ok(x) => x,
Err(err) => Value::error(
err,
@ -1120,11 +1120,12 @@ where
}
pub trait IntoInterruptiblePipelineData {
fn into_pipeline_data(self, ctrlc: Option<Arc<AtomicBool>>) -> PipelineData;
fn into_pipeline_data(self, span: Span, ctrlc: Option<Arc<AtomicBool>>) -> PipelineData;
fn into_pipeline_data_with_metadata(
self,
metadata: impl Into<Option<PipelineMetadata>>,
span: Span,
ctrlc: Option<Arc<AtomicBool>>,
metadata: impl Into<Option<PipelineMetadata>>,
) -> PipelineData;
}
@ -1134,20 +1135,18 @@ where
I::IntoIter: Send + 'static,
<I::IntoIter as Iterator>::Item: Into<Value>,
{
fn into_pipeline_data(self, ctrlc: Option<Arc<AtomicBool>>) -> PipelineData {
PipelineData::ListStream(
ListStream::from_stream(self.into_iter().map(Into::into), ctrlc),
None,
)
fn into_pipeline_data(self, span: Span, ctrlc: Option<Arc<AtomicBool>>) -> PipelineData {
ListStream::new(self.into_iter().map(Into::into), span, ctrlc).into()
}
fn into_pipeline_data_with_metadata(
self,
metadata: impl Into<Option<PipelineMetadata>>,
span: Span,
ctrlc: Option<Arc<AtomicBool>>,
metadata: impl Into<Option<PipelineMetadata>>,
) -> PipelineData {
PipelineData::ListStream(
ListStream::from_stream(self.into_iter().map(Into::into), ctrlc),
ListStream::new(self.into_iter().map(Into::into), span, ctrlc),
metadata.into(),
)
}

View File

@ -174,72 +174,3 @@ impl Iterator for RawStream {
}
}
}
/// A potentially infinite stream of values, optionally with a mean to send a Ctrl-C signal to stop
/// the stream from continuing.
///
/// In practice, a "stream" here means anything which can be iterated and produce Values as it iterates.
/// Like other iterators in Rust, observing values from this stream will drain the items as you view them
/// and the stream cannot be replayed.
pub struct ListStream {
pub stream: Box<dyn Iterator<Item = Value> + Send + 'static>,
pub ctrlc: Option<Arc<AtomicBool>>,
first_guard: bool,
}
impl ListStream {
pub fn into_string(self, separator: &str, config: &Config) -> String {
self.map(|x: Value| x.to_expanded_string(", ", config))
.collect::<Vec<String>>()
.join(separator)
}
pub fn drain(self) -> Result<(), ShellError> {
for next in self {
if let Value::Error { error, .. } = next {
return Err(*error);
}
}
Ok(())
}
pub fn from_stream(
input: impl Iterator<Item = Value> + Send + 'static,
ctrlc: Option<Arc<AtomicBool>>,
) -> ListStream {
ListStream {
stream: Box::new(input),
ctrlc,
first_guard: true,
}
}
}
impl Debug for ListStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ListStream").finish()
}
}
impl Iterator for ListStream {
type Item = Value;
fn next(&mut self) -> Option<Self::Item> {
// We need to check `first_guard` to guarantee that it always have something to return in
// underlying stream.
//
// A realworld example is running an external commands, which have an `exit_code`
// ListStream.
// When we press ctrl-c, the external command receives the signal too, if we don't have
// `first_guard`, the `exit_code` ListStream will return Nothing, which is not expected
if self.first_guard {
self.first_guard = false;
return self.stream.next();
}
if nu_utils::ctrl_c::was_pressed(&self.ctrlc) {
None
} else {
self.stream.next()
}
}
}