"maybe text codec" version 2 (#871)

* Add a RawStream that can be binary or string

* Finish up updating the into's
This commit is contained in:
JT 2022-01-28 13:32:33 -05:00 committed by GitHub
parent 3f9fa28ae3
commit 020ad24b25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 326 additions and 433 deletions

View File

@ -2,7 +2,8 @@ use nu_engine::CallExt;
use nu_protocol::{
ast::{Call, CellPath},
engine::{Command, EngineState, Stack},
Category, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Value,
Category, Example, IntoPipelineData, PipelineData, ShellError, Signature, Span, SyntaxShape,
Value,
};
#[derive(Clone)]
@ -98,7 +99,15 @@ fn into_binary(
let column_paths: Vec<CellPath> = call.rest(engine_state, stack, 0)?;
match input {
PipelineData::ByteStream(..) => Ok(input),
PipelineData::RawStream(stream, ..) => {
// TODO: in the future, we may want this to stream out, converting each to bytes
let output = stream.into_bytes()?;
Ok(Value::Binary {
val: output,
span: head,
}
.into_pipeline_data())
}
_ => input.map(
move |v| {
if column_paths.is_empty() {

View File

@ -2,7 +2,8 @@ use nu_engine::CallExt;
use nu_protocol::{
ast::{Call, CellPath},
engine::{Command, EngineState, Stack},
Category, Config, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Value,
Category, Config, Example, IntoPipelineData, PipelineData, ShellError, Signature, Span,
SyntaxShape, Value,
};
// TODO num_format::SystemLocale once platform-specific dependencies are stable (see Cargo.toml)
@ -148,30 +149,41 @@ fn string_helper(
}
}
input.map(
move |v| {
if column_paths.is_empty() {
action(&v, head, decimals, decimals_value, false, &config)
} else {
let mut ret = v;
for path in &column_paths {
let config = config.clone();
let r = ret.update_cell_path(
&path.members,
Box::new(move |old| {
action(old, head, decimals, decimals_value, false, &config)
}),
);
if let Err(error) = r {
return Value::Error { error };
}
}
ret
match input {
PipelineData::RawStream(stream, ..) => {
// TODO: in the future, we may want this to stream out, converting each to bytes
let output = stream.into_string()?;
Ok(Value::String {
val: output,
span: head,
}
},
engine_state.ctrlc.clone(),
)
.into_pipeline_data())
}
_ => input.map(
move |v| {
if column_paths.is_empty() {
action(&v, head, decimals, decimals_value, false, &config)
} else {
let mut ret = v;
for path in &column_paths {
let config = config.clone();
let r = ret.update_cell_path(
&path.members,
Box::new(move |old| {
action(old, head, decimals, decimals_value, false, &config)
}),
);
if let Err(error) = r {
return Value::Error { error };
}
}
ret
}
},
engine_state.ctrlc.clone(),
),
}
}
pub fn action(

View File

@ -26,9 +26,9 @@ impl Command for Describe {
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let head = call.head;
if matches!(input, PipelineData::ByteStream(..)) {
if matches!(input, PipelineData::RawStream(..)) {
Ok(PipelineData::Value(
Value::string("binary", call.head),
Value::string("raw input", call.head),
None,
))
} else {

View File

@ -2,7 +2,7 @@ use nu_engine::CallExt;
use nu_protocol::ast::Call;
use nu_protocol::engine::{Command, EngineState, Stack};
use nu_protocol::{
Category, Example, PipelineData, ShellError, Signature, SyntaxShape, Value, ValueStream,
Category, Example, ListStream, PipelineData, ShellError, Signature, SyntaxShape, Value,
};
#[derive(Clone)]
@ -35,7 +35,7 @@ impl Command for Echo {
match n.cmp(&1usize) {
// More than one value is converted in a stream of values
std::cmp::Ordering::Greater => PipelineData::ListStream(
ValueStream::from_stream(to_be_echoed.into_iter(), engine_state.ctrlc.clone()),
ListStream::from_stream(to_be_echoed.into_iter(), engine_state.ctrlc.clone()),
None,
),

View File

@ -2,7 +2,7 @@ use nu_engine::{get_full_help, CallExt};
use nu_protocol::ast::Call;
use nu_protocol::engine::{Command, EngineState, Stack};
use nu_protocol::{
ByteStream, Category, Example, IntoPipelineData, PipelineData, ShellError, Signature, Spanned,
Category, Example, IntoPipelineData, PipelineData, RawStream, ShellError, Signature, Spanned,
SyntaxShape, Value,
};
use std::io::{BufRead, BufReader, Read};
@ -120,11 +120,8 @@ impl Command for Open {
let buf_reader = BufReader::new(file);
let output = PipelineData::ByteStream(
ByteStream {
stream: Box::new(BufferedReader { input: buf_reader }),
ctrlc,
},
let output = PipelineData::RawStream(
RawStream::new(Box::new(BufferedReader { input: buf_reader }), ctrlc),
call_span,
None,
);

View File

@ -82,7 +82,7 @@ fn getcol(
.map(move |x| Value::String { val: x, span })
.into_pipeline_data(engine_state.ctrlc.clone()))
}
PipelineData::Value(..) | PipelineData::StringStream(..) | PipelineData::ByteStream(..) => {
PipelineData::Value(..) | PipelineData::RawStream(..) => {
let cols = vec![];
let vals = vec![];
Ok(Value::Record { cols, vals, span }.into_pipeline_data())

View File

@ -111,54 +111,14 @@ impl Command for Each {
}
})
.into_pipeline_data(ctrlc)),
PipelineData::ByteStream(stream, ..) => Ok(stream
PipelineData::RawStream(stream, ..) => Ok(stream
.into_iter()
.enumerate()
.map(move |(idx, x)| {
stack.with_env(&orig_env_vars, &orig_env_hidden);
let x = match x {
Ok(x) => Value::Binary { val: x, span },
Err(err) => return Value::Error { error: err },
};
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
if numbered {
stack.add_var(
*var_id,
Value::Record {
cols: vec!["index".into(), "item".into()],
vals: vec![
Value::Int {
val: idx as i64,
span,
},
x,
],
span,
},
);
} else {
stack.add_var(*var_id, x);
}
}
}
match eval_block(&engine_state, &mut stack, &block, PipelineData::new(span)) {
Ok(v) => v.into_value(span),
Err(error) => Value::Error { error },
}
})
.into_pipeline_data(ctrlc)),
PipelineData::StringStream(stream, ..) => Ok(stream
.into_iter()
.enumerate()
.map(move |(idx, x)| {
stack.with_env(&orig_env_vars, &orig_env_hidden);
let x = match x {
Ok(x) => Value::String { val: x, span },
Ok(x) => x,
Err(err) => return Value::Error { error: err },
};

View File

@ -96,7 +96,7 @@ fn getcol(
.map(move |x| Value::String { val: x, span })
.into_pipeline_data(engine_state.ctrlc.clone()))
}
PipelineData::Value(..) | PipelineData::StringStream(..) | PipelineData::ByteStream(..) => {
PipelineData::Value(..) | PipelineData::RawStream(..) => {
let cols = vec![];
let vals = vec![];
Ok(Value::Record { cols, vals, span }.into_pipeline_data())

View File

@ -88,41 +88,11 @@ impl Command for Lines {
Ok(iter.into_pipeline_data(engine_state.ctrlc.clone()))
}
PipelineData::StringStream(stream, span, ..) => {
let mut split_char = "\n";
let iter = stream
.into_iter()
.map(move |value| match value {
Ok(value) => {
if split_char != "\r\n" && value.contains("\r\n") {
split_char = "\r\n";
}
value
.split(split_char)
.filter_map(|s| {
if !s.is_empty() {
Some(Value::String {
val: s.into(),
span,
})
} else {
None
}
})
.collect::<Vec<Value>>()
}
Err(err) => vec![Value::Error { error: err }],
})
.flatten();
Ok(iter.into_pipeline_data(engine_state.ctrlc.clone()))
}
PipelineData::Value(val, ..) => Err(ShellError::UnsupportedInput(
format!("Not supported input: {}", val.as_string()?),
call.head,
)),
PipelineData::ByteStream(..) => {
PipelineData::RawStream(..) => {
let config = stack.get_config()?;
//FIXME: Make sure this can fail in the future to let the user

View File

@ -177,56 +177,12 @@ impl Command for ParEach {
.into_iter()
.flatten()
.into_pipeline_data(ctrlc)),
PipelineData::StringStream(stream, ..) => Ok(stream
PipelineData::RawStream(stream, ..) => Ok(stream
.enumerate()
.par_bridge()
.map(move |(idx, x)| {
let x = match x {
Ok(x) => Value::String { val: x, span },
Err(err) => return Value::Error { error: err }.into_pipeline_data(),
};
let block = engine_state.get_block(block_id);
let mut stack = stack.clone();
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
if numbered {
stack.add_var(
*var_id,
Value::Record {
cols: vec!["index".into(), "item".into()],
vals: vec![
Value::Int {
val: idx as i64,
span,
},
x,
],
span,
},
);
} else {
stack.add_var(*var_id, x);
}
}
}
match eval_block(&engine_state, &mut stack, block, PipelineData::new(span)) {
Ok(v) => v,
Err(error) => Value::Error { error }.into_pipeline_data(),
}
})
.collect::<Vec<_>>()
.into_iter()
.flatten()
.into_pipeline_data(ctrlc)),
PipelineData::ByteStream(stream, ..) => Ok(stream
.enumerate()
.par_bridge()
.map(move |(idx, x)| {
let x = match x {
Ok(x) => Value::Binary { val: x, span },
Ok(x) => x,
Err(err) => return Value::Error { error: err }.into_pipeline_data(),
};

View File

@ -50,13 +50,9 @@ impl Command for Wrap {
span,
})
.into_pipeline_data(engine_state.ctrlc.clone())),
PipelineData::StringStream(stream, ..) => Ok(Value::String {
val: stream.into_string("")?,
span,
}
.into_pipeline_data()),
PipelineData::ByteStream(stream, ..) => Ok(Value::Binary {
val: stream.into_vec()?,
PipelineData::RawStream(..) => Ok(Value::Record {
cols: vec![name],
vals: vec![input.into_value(call.head)],
span,
}
.into_pipeline_data()),

View File

@ -2,7 +2,7 @@ use base64::encode;
use nu_engine::CallExt;
use nu_protocol::ast::Call;
use nu_protocol::engine::{Command, EngineState, Stack};
use nu_protocol::ByteStream;
use nu_protocol::RawStream;
use nu_protocol::{
Category, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Value,
@ -356,13 +356,13 @@ fn response_to_buffer(
) -> nu_protocol::PipelineData {
let buffered_input = BufReader::new(response);
PipelineData::ByteStream(
ByteStream {
stream: Box::new(BufferedReader {
PipelineData::RawStream(
RawStream::new(
Box::new(BufferedReader {
input: buffered_input,
}),
ctrlc: engine_state.ctrlc.clone(),
},
engine_state.ctrlc.clone(),
),
span,
None,
)

View File

@ -5,8 +5,8 @@ use std::{
use nu_engine::CallExt;
use nu_protocol::{
engine::Command, Example, PipelineData, ShellError, Signature, Span, Spanned, SyntaxShape,
Value, ValueStream,
engine::Command, Example, ListStream, PipelineData, ShellError, Signature, Span, Spanned,
SyntaxShape, Value,
};
use super::PathSubcommandArguments;
@ -68,7 +68,7 @@ the output of 'path parse' and 'path split' subcommands."#
Ok(PipelineData::Value(handle_value(val, &args, head), md))
}
PipelineData::ListStream(stream, md) => Ok(PipelineData::ListStream(
ValueStream::from_stream(
ListStream::from_stream(
stream.map(move |val| handle_value(val, &args, head)),
engine_state.ctrlc.clone(),
),

View File

@ -2,7 +2,7 @@ use nu_engine::CallExt;
use nu_protocol::ast::Call;
use nu_protocol::engine::{Command, EngineState, Stack};
use nu_protocol::{
Category, Example, PipelineData, ShellError, Signature, SyntaxShape, Value, ValueStream,
Category, Example, ListStream, PipelineData, ShellError, Signature, SyntaxShape, Value,
};
use rand::prelude::{thread_rng, Rng};
@ -80,7 +80,7 @@ fn dice(
});
Ok(PipelineData::ListStream(
ValueStream::from_stream(iter, engine_state.ctrlc.clone()),
ListStream::from_stream(iter, engine_state.ctrlc.clone()),
None,
))
}

View File

@ -44,8 +44,8 @@ impl Command for Decode {
let encoding: Spanned<String> = call.req(engine_state, stack, 0)?;
match input {
PipelineData::ByteStream(stream, ..) => {
let bytes: Vec<u8> = stream.into_vec()?;
PipelineData::RawStream(stream, ..) => {
let bytes: Vec<u8> = stream.into_bytes()?;
let encoding = match Encoding::for_label(encoding.item.as_bytes()) {
None => Err(ShellError::SpannedLabeledError(

View File

@ -2,7 +2,7 @@ use nu_engine::CallExt;
use nu_protocol::ast::{Call, PathMember};
use nu_protocol::engine::{Command, EngineState, Stack};
use nu_protocol::{
Category, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Value, ValueStream,
Category, Example, ListStream, PipelineData, ShellError, Signature, Span, SyntaxShape, Value,
};
#[derive(Clone)]
@ -152,7 +152,7 @@ fn format(
}
Ok(PipelineData::ListStream(
ValueStream::from_stream(list.into_iter(), None),
ListStream::from_stream(list.into_iter(), None),
None,
))
}

View File

@ -2,8 +2,8 @@ use nu_engine::CallExt;
use nu_protocol::ast::Call;
use nu_protocol::engine::{Command, EngineState, Stack};
use nu_protocol::{
Category, Example, PipelineData, ShellError, Signature, Span, Spanned, SyntaxShape, Value,
ValueStream,
Category, Example, ListStream, PipelineData, ShellError, Signature, Span, Spanned, SyntaxShape,
Value,
};
use regex::Regex;
@ -127,7 +127,7 @@ fn operate(
}
Ok(PipelineData::ListStream(
ValueStream::from_stream(parsed.into_iter(), ctrlc),
ListStream::from_stream(parsed.into_iter(), ctrlc),
None,
))
}

View File

@ -39,6 +39,7 @@ impl Command for StrCollect {
let config = stack.get_config().unwrap_or_default();
// let output = input.collect_string(&separator.unwrap_or_default(), &config)?;
// Hmm, not sure what we actually want. If you don't use debug_string, Date comes out as human readable
// which feels funny
#[allow(clippy::needless_collect)]

View File

@ -8,7 +8,7 @@ use std::sync::mpsc;
use nu_engine::env_to_strings;
use nu_protocol::engine::{EngineState, Stack};
use nu_protocol::{ast::Call, engine::Command, ShellError, Signature, SyntaxShape, Value};
use nu_protocol::{ByteStream, Category, Config, PipelineData, Span, Spanned};
use nu_protocol::{Category, Config, PipelineData, RawStream, Span, Spanned};
use itertools::Itertools;
@ -242,11 +242,8 @@ impl ExternalCommand {
});
let receiver = ChannelReceiver::new(rx);
Ok(PipelineData::ByteStream(
ByteStream {
stream: Box::new(receiver),
ctrlc: output_ctrlc,
},
Ok(PipelineData::RawStream(
RawStream::new(Box::new(receiver), output_ctrlc),
head,
None,
))

View File

@ -5,8 +5,8 @@ use nu_engine::{env_to_string, CallExt};
use nu_protocol::ast::{Call, PathMember};
use nu_protocol::engine::{Command, EngineState, Stack};
use nu_protocol::{
Category, Config, DataSource, IntoPipelineData, PipelineData, PipelineMetadata, ShellError,
Signature, Span, StringStream, SyntaxShape, Value, ValueStream,
Category, Config, DataSource, IntoPipelineData, ListStream, PipelineData, PipelineMetadata,
RawStream, ShellError, Signature, Span, SyntaxShape, Value,
};
use nu_table::{StyledString, TextStyle, Theme};
use std::sync::atomic::{AtomicBool, Ordering};
@ -62,32 +62,15 @@ impl Command for Table {
};
match input {
PipelineData::ByteStream(stream, ..) => Ok(PipelineData::StringStream(
StringStream::from_stream(
stream.map(move |x| {
Ok(if x.iter().all(|x| x.is_ascii()) {
format!("{}", String::from_utf8_lossy(&x?))
} else {
format!("{}\n", nu_pretty_hex::pretty_hex(&x?))
})
}),
ctrlc,
),
head,
None,
)),
PipelineData::Value(Value::Binary { val, .. }, ..) => Ok(PipelineData::StringStream(
StringStream::from_stream(
vec![Ok(
if val.iter().all(|x| {
*x < 128 && (*x >= b' ' || *x == b'\t' || *x == b'\r' || *x == b'\n')
}) {
format!("{}", String::from_utf8_lossy(&val))
} else {
format!("{}\n", nu_pretty_hex::pretty_hex(&val))
},
)]
.into_iter(),
PipelineData::RawStream(..) => Ok(input),
PipelineData::Value(Value::Binary { val, .. }, ..) => Ok(PipelineData::RawStream(
RawStream::new(
Box::new(
vec![Ok(format!("{}\n", nu_pretty_hex::pretty_hex(&val))
.as_bytes()
.to_vec())]
.into_iter(),
),
ctrlc,
),
head,
@ -127,7 +110,7 @@ impl Command for Table {
None => LsColors::default(),
};
ValueStream::from_stream(
ListStream::from_stream(
stream.map(move |mut x| match &mut x {
Value::Record { cols, vals, .. } => {
let mut idx = 0;
@ -194,15 +177,15 @@ impl Command for Table {
let head = call.head;
Ok(PipelineData::StringStream(
StringStream::from_stream(
PagingTableCreator {
Ok(PipelineData::RawStream(
RawStream::new(
Box::new(PagingTableCreator {
row_offset,
config,
ctrlc: ctrlc.clone(),
head,
stream,
},
}),
ctrlc,
),
head,
@ -381,14 +364,14 @@ fn convert_with_precision(val: &str, precision: usize) -> Result<String, ShellEr
struct PagingTableCreator {
head: Span,
stream: ValueStream,
stream: ListStream,
ctrlc: Option<Arc<AtomicBool>>,
config: Config,
row_offset: usize,
}
impl Iterator for PagingTableCreator {
type Item = Result<String, ShellError>;
type Item = Result<Vec<u8>, ShellError>;
fn next(&mut self) -> Option<Self::Item> {
let mut batch = vec![];
@ -443,7 +426,7 @@ impl Iterator for PagingTableCreator {
Ok(Some(table)) => {
let result = nu_table::draw_table(&table, term_width, &color_hm, &self.config);
Some(Ok(result))
Some(Ok(result.as_bytes().to_vec()))
}
Err(err) => Some(Err(err)),
_ => None,

View File

@ -6,7 +6,7 @@ use std::io::BufReader;
use std::path::{Path, PathBuf};
use nu_protocol::engine::{Command, EngineState, Stack};
use nu_protocol::{ast::Call, Signature, Value};
use nu_protocol::{ast::Call, Signature};
use nu_protocol::{PipelineData, ShellError};
#[derive(Clone)]
@ -70,33 +70,7 @@ impl Command for PluginDeclaration {
)
})?;
let input = match input {
PipelineData::Value(value, ..) => value,
PipelineData::ListStream(stream, ..) => {
let values = stream.collect::<Vec<Value>>();
Value::List {
vals: values,
span: call.head,
}
}
PipelineData::StringStream(stream, ..) => {
let val = stream.into_string("")?;
Value::String {
val,
span: call.head,
}
}
PipelineData::ByteStream(stream, ..) => {
let val = stream.into_vec()?;
Value::Binary {
val,
span: call.head,
}
}
};
let input = input.into_value(call.head);
// Create message to plugin to indicate that signature is required and
// send call to plugin asking for signature

View File

@ -1,8 +1,6 @@
use std::sync::{atomic::AtomicBool, Arc};
use crate::{
ast::PathMember, ByteStream, Config, ShellError, Span, StringStream, Value, ValueStream,
};
use crate::{ast::PathMember, Config, ListStream, RawStream, ShellError, Span, Value};
/// The foundational abstraction for input and output to commands
///
@ -36,9 +34,8 @@ use crate::{
#[derive(Debug)]
pub enum PipelineData {
Value(Value, Option<PipelineMetadata>),
ListStream(ValueStream, Option<PipelineMetadata>),
StringStream(StringStream, Span, Option<PipelineMetadata>),
ByteStream(ByteStream, Span, Option<PipelineMetadata>),
ListStream(ListStream, Option<PipelineMetadata>),
RawStream(RawStream, Span, Option<PipelineMetadata>),
}
#[derive(Debug, Clone)]
@ -63,8 +60,7 @@ impl PipelineData {
pub fn metadata(&self) -> Option<PipelineMetadata> {
match self {
PipelineData::ListStream(_, x) => x.clone(),
PipelineData::ByteStream(_, _, x) => x.clone(),
PipelineData::StringStream(_, _, x) => x.clone(),
PipelineData::RawStream(_, _, x) => x.clone(),
PipelineData::Value(_, x) => x.clone(),
}
}
@ -72,8 +68,7 @@ impl PipelineData {
pub fn set_metadata(mut self, metadata: Option<PipelineMetadata>) -> Self {
match &mut self {
PipelineData::ListStream(_, x) => *x = metadata,
PipelineData::ByteStream(_, _, x) => *x = metadata,
PipelineData::StringStream(_, _, x) => *x = metadata,
PipelineData::RawStream(_, _, x) => *x = metadata,
PipelineData::Value(_, x) => *x = metadata,
}
@ -88,33 +83,51 @@ impl PipelineData {
vals: s.collect(),
span, // FIXME?
},
PipelineData::StringStream(s, ..) => {
let mut output = String::new();
PipelineData::RawStream(mut s, ..) => {
let mut items = vec![];
for item in s {
match item {
Ok(s) => output.push_str(&s),
Err(err) => return Value::Error { error: err },
}
}
Value::String {
val: output,
span, // FIXME?
}
}
PipelineData::ByteStream(s, ..) => {
let mut output = vec![];
for item in s {
match item {
Ok(s) => output.extend(&s),
Err(err) => return Value::Error { error: err },
for val in &mut s {
match val {
Ok(val) => {
items.push(val);
}
Err(e) => {
return Value::Error { error: e };
}
}
}
Value::Binary {
val: output,
span, // FIXME?
if s.is_binary {
let mut output = vec![];
for item in items {
match item.as_binary() {
Ok(item) => {
output.extend(item);
}
Err(err) => {
return Value::Error { error: err };
}
}
}
Value::Binary {
val: output,
span, // FIXME?
}
} else {
let mut output = String::new();
for item in items {
match item.as_string() {
Ok(s) => output.push_str(&s),
Err(err) => {
return Value::Error { error: err };
}
}
}
Value::String {
val: output,
span, // FIXME?
}
}
}
}
@ -134,9 +147,30 @@ impl PipelineData {
match self {
PipelineData::Value(v, ..) => Ok(v.into_string(separator, config)),
PipelineData::ListStream(s, ..) => Ok(s.into_string(separator, config)),
PipelineData::StringStream(s, ..) => s.into_string(separator),
PipelineData::ByteStream(s, ..) => {
Ok(String::from_utf8_lossy(&s.into_vec()?).to_string())
PipelineData::RawStream(s, ..) => {
let mut items = vec![];
for val in s {
match val {
Ok(val) => {
items.push(val);
}
Err(e) => {
return Err(e);
}
}
}
let mut output = String::new();
for item in items {
match item.as_string() {
Ok(s) => output.push_str(&s),
Err(err) => {
return Err(err);
}
}
}
Ok(output)
}
}
}
@ -191,9 +225,9 @@ impl PipelineData {
Ok(vals.into_iter().map(f).into_pipeline_data(ctrlc))
}
PipelineData::ListStream(stream, ..) => Ok(stream.map(f).into_pipeline_data(ctrlc)),
PipelineData::StringStream(stream, span, ..) => Ok(stream
PipelineData::RawStream(stream, ..) => Ok(stream
.map(move |x| match x {
Ok(s) => f(Value::String { val: s, span }),
Ok(v) => f(v),
Err(err) => Value::Error { error: err },
})
.into_pipeline_data(ctrlc)),
@ -205,11 +239,6 @@ impl PipelineData {
Value::Error { error } => Err(error),
v => Ok(v.into_pipeline_data()),
},
PipelineData::ByteStream(_, span, ..) => Err(ShellError::UnsupportedInput(
"Binary output from this command may need to be decoded using the 'decode' command"
.into(),
span,
)),
}
}
@ -232,9 +261,9 @@ impl PipelineData {
PipelineData::ListStream(stream, ..) => {
Ok(stream.map(f).flatten().into_pipeline_data(ctrlc))
}
PipelineData::StringStream(stream, span, ..) => Ok(stream
PipelineData::RawStream(stream, ..) => Ok(stream
.map(move |x| match x {
Ok(s) => Value::String { val: s, span },
Ok(v) => v,
Err(err) => Value::Error { error: err },
})
.map(f)
@ -245,11 +274,6 @@ impl PipelineData {
Err(error) => Err(error),
},
PipelineData::Value(v, ..) => Ok(f(v).into_iter().into_pipeline_data(ctrlc)),
PipelineData::ByteStream(_, span, ..) => Err(ShellError::UnsupportedInput(
"Binary output from this command may need to be decoded using the 'decode' command"
.into(),
span,
)),
}
}
@ -267,14 +291,13 @@ impl PipelineData {
Ok(vals.into_iter().filter(f).into_pipeline_data(ctrlc))
}
PipelineData::ListStream(stream, ..) => Ok(stream.filter(f).into_pipeline_data(ctrlc)),
PipelineData::StringStream(stream, span, ..) => Ok(stream
PipelineData::RawStream(stream, ..) => Ok(stream
.map(move |x| match x {
Ok(s) => Value::String { val: s, span },
Ok(v) => v,
Err(err) => Value::Error { error: err },
})
.filter(f)
.into_pipeline_data(ctrlc)),
PipelineData::Value(Value::Range { val, .. }, ..) => {
Ok(val.into_range_iter()?.filter(f).into_pipeline_data(ctrlc))
}
@ -285,11 +308,6 @@ impl PipelineData {
Ok(Value::Nothing { span: v.span()? }.into_pipeline_data())
}
}
PipelineData::ByteStream(_, span, ..) => Err(ShellError::UnsupportedInput(
"Binary output from this command may need to be decoded using the 'decode' command"
.into(),
span,
)),
}
}
}
@ -305,7 +323,7 @@ impl IntoIterator for PipelineData {
match self {
PipelineData::Value(Value::List { vals, .. }, metadata) => {
PipelineIterator(PipelineData::ListStream(
ValueStream {
ListStream {
stream: Box::new(vals.into_iter()),
ctrlc: None,
},
@ -315,14 +333,14 @@ impl IntoIterator for PipelineData {
PipelineData::Value(Value::Range { val, .. }, metadata) => {
match val.into_range_iter() {
Ok(iter) => PipelineIterator(PipelineData::ListStream(
ValueStream {
ListStream {
stream: Box::new(iter),
ctrlc: None,
},
metadata,
)),
Err(error) => PipelineIterator(PipelineData::ListStream(
ValueStream {
ListStream {
stream: Box::new(std::iter::once(Value::Error { error })),
ctrlc: None,
},
@ -343,18 +361,8 @@ impl Iterator for PipelineIterator {
PipelineData::Value(Value::Nothing { .. }, ..) => None,
PipelineData::Value(v, ..) => Some(std::mem::take(v)),
PipelineData::ListStream(stream, ..) => stream.next(),
PipelineData::StringStream(stream, span, ..) => stream.next().map(|x| match x {
Ok(x) => Value::String {
val: x,
span: *span,
},
Err(err) => Value::Error { error: err },
}),
PipelineData::ByteStream(stream, span, ..) => stream.next().map(|x| match x {
Ok(x) => Value::Binary {
val: x,
span: *span,
},
PipelineData::RawStream(stream, ..) => stream.next().map(|x| match x {
Ok(x) => x,
Err(err) => Value::Error { error: err },
}),
}
@ -391,7 +399,7 @@ where
{
fn into_pipeline_data(self, ctrlc: Option<Arc<AtomicBool>>) -> PipelineData {
PipelineData::ListStream(
ValueStream {
ListStream {
stream: Box::new(self.into_iter().map(Into::into)),
ctrlc,
},
@ -405,7 +413,7 @@ where
ctrlc: Option<Arc<AtomicBool>>,
) -> PipelineData {
PipelineData::ListStream(
ValueStream {
ListStream {
stream: Box::new(self.into_iter().map(Into::into)),
ctrlc,
},

View File

@ -239,6 +239,18 @@ impl Value {
}
}
pub fn as_binary(&self) -> Result<&[u8], ShellError> {
match self {
Value::Binary { val, .. } => Ok(val),
Value::String { val, .. } => Ok(val.as_bytes()),
x => Err(ShellError::CantConvert(
"binary".into(),
x.get_type().to_string(),
self.span()?,
)),
}
}
pub fn as_record(&self) -> Result<(&[String], &[Value]), ShellError> {
match self {
Value::Record { cols, vals, .. } => Ok((cols, vals)),

View File

@ -7,95 +7,139 @@ use std::{
},
};
/// A single buffer of binary data streamed over multiple parts. Optionally contains ctrl-c that can be used
/// to break the stream.
pub struct ByteStream {
pub struct RawStream {
pub stream: Box<dyn Iterator<Item = Result<Vec<u8>, ShellError>> + Send + 'static>,
pub leftover: Vec<u8>,
pub ctrlc: Option<Arc<AtomicBool>>,
pub is_binary: bool,
pub span: Span,
}
impl ByteStream {
pub fn into_vec(self) -> Result<Vec<u8>, ShellError> {
impl RawStream {
pub fn new(
stream: Box<dyn Iterator<Item = Result<Vec<u8>, ShellError>> + Send + 'static>,
ctrlc: Option<Arc<AtomicBool>>,
) -> Self {
Self {
stream,
leftover: vec![],
ctrlc,
is_binary: false,
span: Span::new(0, 0),
}
}
pub fn into_bytes(self) -> Result<Vec<u8>, ShellError> {
let mut output = vec![];
for item in self.stream {
output.append(&mut item?);
output.extend(item?);
}
Ok(output)
}
}
impl Debug for ByteStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ByteStream").finish()
}
}
impl Iterator for ByteStream {
type Item = Result<Vec<u8>, ShellError>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(ctrlc) = &self.ctrlc {
if ctrlc.load(Ordering::SeqCst) {
None
} else {
self.stream.next()
}
} else {
self.stream.next()
}
}
}
/// A single string streamed over multiple parts. Optionally contains ctrl-c that can be used
/// to break the stream.
pub struct StringStream {
pub stream: Box<dyn Iterator<Item = Result<String, ShellError>> + Send + 'static>,
pub ctrlc: Option<Arc<AtomicBool>>,
}
impl StringStream {
pub fn into_string(self, separator: &str) -> Result<String, ShellError> {
pub fn into_string(self) -> Result<String, ShellError> {
let mut output = String::new();
let mut first = true;
for s in self.stream {
output.push_str(&s?);
if !first {
output.push_str(separator);
} else {
first = false;
}
for item in self {
output.push_str(&item?.as_string()?);
}
Ok(output)
}
pub fn from_stream(
input: impl Iterator<Item = Result<String, ShellError>> + Send + 'static,
ctrlc: Option<Arc<AtomicBool>>,
) -> StringStream {
StringStream {
stream: Box::new(input),
ctrlc,
}
}
}
impl Debug for StringStream {
impl Debug for RawStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StringStream").finish()
f.debug_struct("RawStream").finish()
}
}
impl Iterator for StringStream {
type Item = Result<String, ShellError>;
impl Iterator for RawStream {
type Item = Result<Value, ShellError>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(ctrlc) = &self.ctrlc {
if ctrlc.load(Ordering::SeqCst) {
None
} else {
self.stream.next()
// If we know we're already binary, just output that
if self.is_binary {
match self.stream.next() {
Some(buffer) => match buffer {
Ok(mut v) => {
while let Some(b) = self.leftover.pop() {
v.insert(0, b);
}
Some(Ok(Value::Binary {
val: v,
span: self.span,
}))
}
Err(e) => Some(Err(e)),
},
None => None,
}
} else {
self.stream.next()
// We *may* be text. We're only going to try utf-8. Other decodings
// needs to be taken as binary first, then passed through `decode`.
match self.stream.next() {
Some(buffer) => match buffer {
Ok(mut v) => {
while let Some(b) = self.leftover.pop() {
v.insert(0, b);
}
match String::from_utf8(v.clone()) {
Ok(s) => {
// Great, we have a complete string, let's output it
Some(Ok(Value::String {
val: s,
span: self.span,
}))
}
Err(err) => {
// Okay, we *might* have a string but we've also got some errors
if v.is_empty() {
// We can just end here
None
} else if v.len() > 3
&& (v.len() - err.utf8_error().valid_up_to() > 3)
{
// As UTF-8 characters are max 4 bytes, if we have more than that in error we know
// that it's not just a character spanning two frames.
// We now know we are definitely binary, so switch to binary and stay there.
self.is_binary = true;
Some(Ok(Value::Binary {
val: v,
span: self.span,
}))
} else {
// Okay, we have a tiny bit of error at the end of the buffer. This could very well be
// a character that spans two frames. Since this is the case, remove the error from
// the current frame an dput it in the leftover buffer.
self.leftover =
v[(err.utf8_error().valid_up_to() + 1)..].to_vec();
let buf = v[0..err.utf8_error().valid_up_to()].to_vec();
match String::from_utf8(buf) {
Ok(s) => Some(Ok(Value::String {
val: s,
span: self.span,
})),
Err(_) => {
// Something is definitely wrong. Switch to binary, and stay there
self.is_binary = true;
Some(Ok(Value::Binary {
val: v,
span: self.span,
}))
}
}
}
}
}
}
Err(e) => Some(Err(e)),
},
None => None,
}
}
}
}
@ -106,12 +150,12 @@ impl Iterator for StringStream {
/// In practice, a "stream" here means anything which can be iterated and produce Values as it iterates.
/// Like other iterators in Rust, observing values from this stream will drain the items as you view them
/// and the stream cannot be replayed.
pub struct ValueStream {
pub struct ListStream {
pub stream: Box<dyn Iterator<Item = Value> + Send + 'static>,
pub ctrlc: Option<Arc<AtomicBool>>,
}
impl ValueStream {
impl ListStream {
pub fn into_string(self, separator: &str, config: &Config) -> String {
self.map(|x: Value| x.into_string(", ", config))
.collect::<Vec<String>>()
@ -121,21 +165,21 @@ impl ValueStream {
pub fn from_stream(
input: impl Iterator<Item = Value> + Send + 'static,
ctrlc: Option<Arc<AtomicBool>>,
) -> ValueStream {
ValueStream {
) -> ListStream {
ListStream {
stream: Box::new(input),
ctrlc,
}
}
}
impl Debug for ValueStream {
impl Debug for ListStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ValueStream").finish()
}
}
impl Iterator for ValueStream {
impl Iterator for ListStream {
type Item = Value;
fn next(&mut self) -> Option<Self::Item> {

View File

@ -17,7 +17,7 @@ use nu_parser::parse;
use nu_protocol::{
ast::{Call, Expr, Expression, Pipeline, Statement},
engine::{Command, EngineState, Stack, StateWorkingSet},
ByteStream, Category, Example, IntoPipelineData, PipelineData, ShellError, Signature, Span,
Category, Example, IntoPipelineData, PipelineData, RawStream, ShellError, Signature, Span,
Spanned, SyntaxShape, Value, CONFIG_VARIABLE_ID,
};
use std::{
@ -119,11 +119,8 @@ fn main() -> Result<()> {
let stdin = std::io::stdin();
let buf_reader = BufReader::new(stdin);
PipelineData::ByteStream(
ByteStream {
stream: Box::new(BufferedReader::new(buf_reader)),
ctrlc: Some(ctrlc),
},
PipelineData::RawStream(
RawStream::new(Box::new(BufferedReader::new(buf_reader)), Some(ctrlc)),
redirect_stdin.span,
None,
)

View File

@ -198,36 +198,13 @@ fn print_pipeline_data(
let config = stack.get_config().unwrap_or_default();
match input {
PipelineData::StringStream(stream, _, _) => {
for s in stream {
print!("{}", s?);
let _ = std::io::stdout().flush();
}
return Ok(());
}
PipelineData::ByteStream(stream, _, _) => {
let mut address_offset = 0;
for v in stream {
let cfg = nu_pretty_hex::HexConfig {
title: false,
address_offset,
..Default::default()
};
let mut stdout = std::io::stdout();
let v = v?;
address_offset += v.len();
let s = if v.iter().all(|x| x.is_ascii()) {
format!("{}", String::from_utf8_lossy(&v))
} else {
nu_pretty_hex::config_hex(&v, cfg)
};
println!("{}", s);
}
return Ok(());
if let PipelineData::RawStream(stream, _, _) = input {
for s in stream {
let _ = stdout.write(s?.as_binary()?);
}
_ => {}
return Ok(());
}
match engine_state.find_decl("table".as_bytes()) {