Add string stream and binary stream, add text decoding (#570)

* WIP

* Add binary/string streams and text decoding

* Make string collection fallible

* Oops, forgot pretty hex

* Oops, forgot pretty hex

* clippy
This commit is contained in:
JT
2021-12-24 18:22:11 +11:00
committed by GitHub
parent 7f0921a14b
commit 3522bead97
50 changed files with 1633 additions and 119 deletions

View File

@ -1,6 +1,8 @@
use std::sync::{atomic::AtomicBool, Arc};
use crate::{ast::PathMember, Config, ShellError, Span, Value, ValueStream};
use crate::{
ast::PathMember, ByteStream, Config, ShellError, Span, StringStream, Value, ValueStream,
};
/// The foundational abstraction for input and output to commands
///
@ -34,7 +36,9 @@ use crate::{ast::PathMember, Config, ShellError, Span, Value, ValueStream};
#[derive(Debug)]
pub enum PipelineData {
Value(Value, Option<PipelineMetadata>),
Stream(ValueStream, Option<PipelineMetadata>),
ListStream(ValueStream, Option<PipelineMetadata>),
StringStream(StringStream, Span, Option<PipelineMetadata>),
ByteStream(ByteStream, Span, Option<PipelineMetadata>),
}
#[derive(Debug, Clone)]
@ -54,7 +58,9 @@ impl PipelineData {
pub fn metadata(&self) -> Option<PipelineMetadata> {
match self {
PipelineData::Stream(_, x) => x.clone(),
PipelineData::ListStream(_, x) => x.clone(),
PipelineData::ByteStream(_, _, x) => x.clone(),
PipelineData::StringStream(_, _, x) => x.clone(),
PipelineData::Value(_, x) => x.clone(),
}
}
@ -63,27 +69,49 @@ impl PipelineData {
match self {
PipelineData::Value(Value::Nothing { .. }, ..) => Value::nothing(span),
PipelineData::Value(v, ..) => v,
PipelineData::Stream(s, ..) => Value::List {
PipelineData::ListStream(s, ..) => Value::List {
vals: s.collect(),
span, // FIXME?
},
PipelineData::StringStream(s, ..) => {
let mut output = String::new();
for item in s {
match item {
Ok(s) => output.push_str(&s),
Err(err) => return Value::Error { error: err },
}
}
Value::String {
val: output,
span, // FIXME?
}
}
PipelineData::ByteStream(s, ..) => Value::Binary {
val: s.flatten().collect(),
span, // FIXME?
},
}
}
pub fn into_interruptible_iter(self, ctrlc: Option<Arc<AtomicBool>>) -> PipelineIterator {
let mut iter = self.into_iter();
if let PipelineIterator(PipelineData::Stream(s, ..)) = &mut iter {
if let PipelineIterator(PipelineData::ListStream(s, ..)) = &mut iter {
s.ctrlc = ctrlc;
}
iter
}
pub fn collect_string(self, separator: &str, config: &Config) -> String {
pub fn collect_string(self, separator: &str, config: &Config) -> Result<String, ShellError> {
match self {
PipelineData::Value(v, ..) => v.into_string(separator, config),
PipelineData::Stream(s, ..) => s.into_string(separator, config),
PipelineData::Value(v, ..) => Ok(v.into_string(separator, config)),
PipelineData::ListStream(s, ..) => Ok(s.into_string(separator, config)),
PipelineData::StringStream(s, ..) => s.into_string(separator),
PipelineData::ByteStream(s, ..) => {
Ok(String::from_utf8_lossy(&s.flatten().collect::<Vec<_>>()).to_string())
}
}
}
@ -94,12 +122,13 @@ impl PipelineData {
) -> Result<Value, ShellError> {
match self {
// FIXME: there are probably better ways of doing this
PipelineData::Stream(stream, ..) => Value::List {
PipelineData::ListStream(stream, ..) => Value::List {
vals: stream.collect(),
span: head,
}
.follow_cell_path(cell_path),
PipelineData::Value(v, ..) => v.follow_cell_path(cell_path),
_ => Err(ShellError::IOError("can't follow stream paths".into())),
}
}
@ -111,12 +140,13 @@ impl PipelineData {
) -> Result<(), ShellError> {
match self {
// FIXME: there are probably better ways of doing this
PipelineData::Stream(stream, ..) => Value::List {
PipelineData::ListStream(stream, ..) => Value::List {
vals: stream.collect(),
span: head,
}
.update_cell_path(cell_path, callback),
PipelineData::Value(v, ..) => v.update_cell_path(cell_path, callback),
_ => Ok(()),
}
}
@ -134,7 +164,14 @@ impl PipelineData {
PipelineData::Value(Value::List { vals, .. }, ..) => {
Ok(vals.into_iter().map(f).into_pipeline_data(ctrlc))
}
PipelineData::Stream(stream, ..) => Ok(stream.map(f).into_pipeline_data(ctrlc)),
PipelineData::ListStream(stream, ..) => Ok(stream.map(f).into_pipeline_data(ctrlc)),
PipelineData::StringStream(stream, span, ..) => Ok(stream
.map(move |x| match x {
Ok(s) => f(Value::String { val: s, span }),
Err(err) => Value::Error { error: err },
})
.into_pipeline_data(ctrlc)),
PipelineData::Value(Value::Range { val, .. }, ..) => {
Ok(val.into_range_iter()?.map(f).into_pipeline_data(ctrlc))
}
@ -142,6 +179,11 @@ impl PipelineData {
Value::Error { error } => Err(error),
v => Ok(v.into_pipeline_data()),
},
PipelineData::ByteStream(_, span, ..) => Err(ShellError::UnsupportedInput(
"Binary output from this command may need to be decoded using the 'decode' command"
.into(),
span,
)),
}
}
@ -161,14 +203,27 @@ impl PipelineData {
PipelineData::Value(Value::List { vals, .. }, ..) => {
Ok(vals.into_iter().map(f).flatten().into_pipeline_data(ctrlc))
}
PipelineData::Stream(stream, ..) => {
PipelineData::ListStream(stream, ..) => {
Ok(stream.map(f).flatten().into_pipeline_data(ctrlc))
}
PipelineData::StringStream(stream, span, ..) => Ok(stream
.map(move |x| match x {
Ok(s) => Value::String { val: s, span },
Err(err) => Value::Error { error: err },
})
.map(f)
.flatten()
.into_pipeline_data(ctrlc)),
PipelineData::Value(Value::Range { val, .. }, ..) => match val.into_range_iter() {
Ok(iter) => Ok(iter.map(f).flatten().into_pipeline_data(ctrlc)),
Err(error) => Err(error),
},
PipelineData::Value(v, ..) => Ok(f(v).into_iter().into_pipeline_data(ctrlc)),
PipelineData::ByteStream(_, span, ..) => Err(ShellError::UnsupportedInput(
"Binary output from this command may need to be decoded using the 'decode' command"
.into(),
span,
)),
}
}
@ -185,7 +240,15 @@ impl PipelineData {
PipelineData::Value(Value::List { vals, .. }, ..) => {
Ok(vals.into_iter().filter(f).into_pipeline_data(ctrlc))
}
PipelineData::Stream(stream, ..) => Ok(stream.filter(f).into_pipeline_data(ctrlc)),
PipelineData::ListStream(stream, ..) => Ok(stream.filter(f).into_pipeline_data(ctrlc)),
PipelineData::StringStream(stream, span, ..) => Ok(stream
.map(move |x| match x {
Ok(s) => Value::String { val: s, span },
Err(err) => Value::Error { error: err },
})
.filter(f)
.into_pipeline_data(ctrlc)),
PipelineData::Value(Value::Range { val, .. }, ..) => {
Ok(val.into_range_iter()?.filter(f).into_pipeline_data(ctrlc))
}
@ -196,16 +259,15 @@ impl PipelineData {
Ok(Value::Nothing { span: v.span()? }.into_pipeline_data())
}
}
PipelineData::ByteStream(_, span, ..) => Err(ShellError::UnsupportedInput(
"Binary output from this command may need to be decoded using the 'decode' command"
.into(),
span,
)),
}
}
}
// impl Default for PipelineData {
// fn default() -> Self {
// PipelineData::new()
// }
// }
pub struct PipelineIterator(PipelineData);
impl IntoIterator for PipelineData {
@ -216,7 +278,7 @@ impl IntoIterator for PipelineData {
fn into_iter(self) -> Self::IntoIter {
match self {
PipelineData::Value(Value::List { vals, .. }, metadata) => {
PipelineIterator(PipelineData::Stream(
PipelineIterator(PipelineData::ListStream(
ValueStream {
stream: Box::new(vals.into_iter()),
ctrlc: None,
@ -226,14 +288,14 @@ impl IntoIterator for PipelineData {
}
PipelineData::Value(Value::Range { val, .. }, metadata) => {
match val.into_range_iter() {
Ok(iter) => PipelineIterator(PipelineData::Stream(
Ok(iter) => PipelineIterator(PipelineData::ListStream(
ValueStream {
stream: Box::new(iter),
ctrlc: None,
},
metadata,
)),
Err(error) => PipelineIterator(PipelineData::Stream(
Err(error) => PipelineIterator(PipelineData::ListStream(
ValueStream {
stream: Box::new(std::iter::once(Value::Error { error })),
ctrlc: None,
@ -254,7 +316,18 @@ impl Iterator for PipelineIterator {
match &mut self.0 {
PipelineData::Value(Value::Nothing { .. }, ..) => None,
PipelineData::Value(v, ..) => Some(std::mem::take(v)),
PipelineData::Stream(stream, ..) => stream.next(),
PipelineData::ListStream(stream, ..) => stream.next(),
PipelineData::StringStream(stream, span, ..) => stream.next().map(|x| match x {
Ok(x) => Value::String {
val: x,
span: *span,
},
Err(err) => Value::Error { error: err },
}),
PipelineData::ByteStream(stream, span, ..) => stream.next().map(|x| Value::Binary {
val: x,
span: *span,
}),
}
}
}
@ -288,7 +361,7 @@ where
<I::IntoIter as Iterator>::Item: Into<Value>,
{
fn into_pipeline_data(self, ctrlc: Option<Arc<AtomicBool>>) -> PipelineData {
PipelineData::Stream(
PipelineData::ListStream(
ValueStream {
stream: Box::new(self.into_iter().map(Into::into)),
ctrlc,
@ -302,7 +375,7 @@ where
metadata: PipelineMetadata,
ctrlc: Option<Arc<AtomicBool>>,
) -> PipelineData {
PipelineData::Stream(
PipelineData::ListStream(
ValueStream {
stream: Box::new(self.into_iter().map(Into::into)),
ctrlc,

View File

@ -7,6 +7,94 @@ use std::{
},
};
/// A single buffer of binary data streamed over multiple parts. Optionally contains ctrl-c that can be used
/// to break the stream.
pub struct ByteStream {
pub stream: Box<dyn Iterator<Item = Vec<u8>> + Send + 'static>,
pub ctrlc: Option<Arc<AtomicBool>>,
}
impl ByteStream {
pub fn into_vec(self) -> Vec<u8> {
self.flatten().collect::<Vec<u8>>()
}
}
impl Debug for ByteStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ByteStream").finish()
}
}
impl Iterator for ByteStream {
type Item = Vec<u8>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(ctrlc) = &self.ctrlc {
if ctrlc.load(Ordering::SeqCst) {
None
} else {
self.stream.next()
}
} else {
self.stream.next()
}
}
}
/// A single string streamed over multiple parts. Optionally contains ctrl-c that can be used
/// to break the stream.
pub struct StringStream {
pub stream: Box<dyn Iterator<Item = Result<String, ShellError>> + Send + 'static>,
pub ctrlc: Option<Arc<AtomicBool>>,
}
impl StringStream {
pub fn into_string(self, separator: &str) -> Result<String, ShellError> {
let mut output = String::new();
let mut first = true;
for s in self.stream {
output.push_str(&s?);
if !first {
output.push_str(separator);
} else {
first = false;
}
}
Ok(output)
}
pub fn from_stream(
input: impl Iterator<Item = Result<String, ShellError>> + Send + 'static,
ctrlc: Option<Arc<AtomicBool>>,
) -> StringStream {
StringStream {
stream: Box::new(input),
ctrlc,
}
}
}
impl Debug for StringStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StringStream").finish()
}
}
impl Iterator for StringStream {
type Item = Result<String, ShellError>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(ctrlc) = &self.ctrlc {
if ctrlc.load(Ordering::SeqCst) {
None
} else {
self.stream.next()
}
} else {
self.stream.next()
}
}
}
/// A potentially infinite stream of values, optinally with a mean to send a Ctrl-C signal to stop
/// the stream from continuing.
///