diff --git a/crates/nu-command/src/conversions/into/binary.rs b/crates/nu-command/src/conversions/into/binary.rs index 71393b988..667bcb9ee 100644 --- a/crates/nu-command/src/conversions/into/binary.rs +++ b/crates/nu-command/src/conversions/into/binary.rs @@ -2,7 +2,8 @@ use nu_engine::CallExt; use nu_protocol::{ ast::{Call, CellPath}, engine::{Command, EngineState, Stack}, - Category, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Value, + Category, Example, IntoPipelineData, PipelineData, ShellError, Signature, Span, SyntaxShape, + Value, }; #[derive(Clone)] @@ -98,7 +99,15 @@ fn into_binary( let column_paths: Vec = call.rest(engine_state, stack, 0)?; match input { - PipelineData::ByteStream(..) => Ok(input), + PipelineData::RawStream(stream, ..) => { + // TODO: in the future, we may want this to stream out, converting each to bytes + let output = stream.into_bytes()?; + Ok(Value::Binary { + val: output, + span: head, + } + .into_pipeline_data()) + } _ => input.map( move |v| { if column_paths.is_empty() { diff --git a/crates/nu-command/src/conversions/into/string.rs b/crates/nu-command/src/conversions/into/string.rs index 22661c087..7d40f6da8 100644 --- a/crates/nu-command/src/conversions/into/string.rs +++ b/crates/nu-command/src/conversions/into/string.rs @@ -2,7 +2,8 @@ use nu_engine::CallExt; use nu_protocol::{ ast::{Call, CellPath}, engine::{Command, EngineState, Stack}, - Category, Config, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Value, + Category, Config, Example, IntoPipelineData, PipelineData, ShellError, Signature, Span, + SyntaxShape, Value, }; // TODO num_format::SystemLocale once platform-specific dependencies are stable (see Cargo.toml) @@ -148,30 +149,41 @@ fn string_helper( } } - input.map( - move |v| { - if column_paths.is_empty() { - action(&v, head, decimals, decimals_value, false, &config) - } else { - let mut ret = v; - for path in &column_paths { - let config = config.clone(); - let r = ret.update_cell_path( - &path.members, - Box::new(move |old| { - action(old, head, decimals, decimals_value, false, &config) - }), - ); - if let Err(error) = r { - return Value::Error { error }; - } - } - - ret + match input { + PipelineData::RawStream(stream, ..) => { + // TODO: in the future, we may want this to stream out, converting each to bytes + let output = stream.into_string()?; + Ok(Value::String { + val: output, + span: head, } - }, - engine_state.ctrlc.clone(), - ) + .into_pipeline_data()) + } + _ => input.map( + move |v| { + if column_paths.is_empty() { + action(&v, head, decimals, decimals_value, false, &config) + } else { + let mut ret = v; + for path in &column_paths { + let config = config.clone(); + let r = ret.update_cell_path( + &path.members, + Box::new(move |old| { + action(old, head, decimals, decimals_value, false, &config) + }), + ); + if let Err(error) = r { + return Value::Error { error }; + } + } + + ret + } + }, + engine_state.ctrlc.clone(), + ), + } } pub fn action( diff --git a/crates/nu-command/src/core_commands/describe.rs b/crates/nu-command/src/core_commands/describe.rs index d10929d4d..4c9ec4f9c 100644 --- a/crates/nu-command/src/core_commands/describe.rs +++ b/crates/nu-command/src/core_commands/describe.rs @@ -26,9 +26,9 @@ impl Command for Describe { input: PipelineData, ) -> Result { let head = call.head; - if matches!(input, PipelineData::ByteStream(..)) { + if matches!(input, PipelineData::RawStream(..)) { Ok(PipelineData::Value( - Value::string("binary", call.head), + Value::string("raw input", call.head), None, )) } else { diff --git a/crates/nu-command/src/core_commands/echo.rs b/crates/nu-command/src/core_commands/echo.rs index 05396299b..2b7d6c150 100644 --- a/crates/nu-command/src/core_commands/echo.rs +++ b/crates/nu-command/src/core_commands/echo.rs @@ -2,7 +2,7 @@ use nu_engine::CallExt; use nu_protocol::ast::Call; use nu_protocol::engine::{Command, EngineState, Stack}; use nu_protocol::{ - Category, Example, PipelineData, ShellError, Signature, SyntaxShape, Value, ValueStream, + Category, Example, ListStream, PipelineData, ShellError, Signature, SyntaxShape, Value, }; #[derive(Clone)] @@ -35,7 +35,7 @@ impl Command for Echo { match n.cmp(&1usize) { // More than one value is converted in a stream of values std::cmp::Ordering::Greater => PipelineData::ListStream( - ValueStream::from_stream(to_be_echoed.into_iter(), engine_state.ctrlc.clone()), + ListStream::from_stream(to_be_echoed.into_iter(), engine_state.ctrlc.clone()), None, ), diff --git a/crates/nu-command/src/filesystem/open.rs b/crates/nu-command/src/filesystem/open.rs index 082efdf02..65892faa1 100644 --- a/crates/nu-command/src/filesystem/open.rs +++ b/crates/nu-command/src/filesystem/open.rs @@ -2,7 +2,7 @@ use nu_engine::{get_full_help, CallExt}; use nu_protocol::ast::Call; use nu_protocol::engine::{Command, EngineState, Stack}; use nu_protocol::{ - ByteStream, Category, Example, IntoPipelineData, PipelineData, ShellError, Signature, Spanned, + Category, Example, IntoPipelineData, PipelineData, RawStream, ShellError, Signature, Spanned, SyntaxShape, Value, }; use std::io::{BufRead, BufReader, Read}; @@ -120,11 +120,8 @@ impl Command for Open { let buf_reader = BufReader::new(file); - let output = PipelineData::ByteStream( - ByteStream { - stream: Box::new(BufferedReader { input: buf_reader }), - ctrlc, - }, + let output = PipelineData::RawStream( + RawStream::new(Box::new(BufferedReader { input: buf_reader }), ctrlc), call_span, None, ); diff --git a/crates/nu-command/src/filters/columns.rs b/crates/nu-command/src/filters/columns.rs index 7f5b06cdd..38e2ac22c 100644 --- a/crates/nu-command/src/filters/columns.rs +++ b/crates/nu-command/src/filters/columns.rs @@ -82,7 +82,7 @@ fn getcol( .map(move |x| Value::String { val: x, span }) .into_pipeline_data(engine_state.ctrlc.clone())) } - PipelineData::Value(..) | PipelineData::StringStream(..) | PipelineData::ByteStream(..) => { + PipelineData::Value(..) | PipelineData::RawStream(..) => { let cols = vec![]; let vals = vec![]; Ok(Value::Record { cols, vals, span }.into_pipeline_data()) diff --git a/crates/nu-command/src/filters/each.rs b/crates/nu-command/src/filters/each.rs index d67e93990..35ae1048a 100644 --- a/crates/nu-command/src/filters/each.rs +++ b/crates/nu-command/src/filters/each.rs @@ -111,54 +111,14 @@ impl Command for Each { } }) .into_pipeline_data(ctrlc)), - PipelineData::ByteStream(stream, ..) => Ok(stream + PipelineData::RawStream(stream, ..) => Ok(stream .into_iter() .enumerate() .map(move |(idx, x)| { stack.with_env(&orig_env_vars, &orig_env_hidden); let x = match x { - Ok(x) => Value::Binary { val: x, span }, - Err(err) => return Value::Error { error: err }, - }; - - if let Some(var) = block.signature.get_positional(0) { - if let Some(var_id) = &var.var_id { - if numbered { - stack.add_var( - *var_id, - Value::Record { - cols: vec!["index".into(), "item".into()], - vals: vec![ - Value::Int { - val: idx as i64, - span, - }, - x, - ], - span, - }, - ); - } else { - stack.add_var(*var_id, x); - } - } - } - - match eval_block(&engine_state, &mut stack, &block, PipelineData::new(span)) { - Ok(v) => v.into_value(span), - Err(error) => Value::Error { error }, - } - }) - .into_pipeline_data(ctrlc)), - PipelineData::StringStream(stream, ..) => Ok(stream - .into_iter() - .enumerate() - .map(move |(idx, x)| { - stack.with_env(&orig_env_vars, &orig_env_hidden); - - let x = match x { - Ok(x) => Value::String { val: x, span }, + Ok(x) => x, Err(err) => return Value::Error { error: err }, }; diff --git a/crates/nu-command/src/filters/length.rs b/crates/nu-command/src/filters/length.rs index d28637e9e..ef7c5275c 100644 --- a/crates/nu-command/src/filters/length.rs +++ b/crates/nu-command/src/filters/length.rs @@ -96,7 +96,7 @@ fn getcol( .map(move |x| Value::String { val: x, span }) .into_pipeline_data(engine_state.ctrlc.clone())) } - PipelineData::Value(..) | PipelineData::StringStream(..) | PipelineData::ByteStream(..) => { + PipelineData::Value(..) | PipelineData::RawStream(..) => { let cols = vec![]; let vals = vec![]; Ok(Value::Record { cols, vals, span }.into_pipeline_data()) diff --git a/crates/nu-command/src/filters/lines.rs b/crates/nu-command/src/filters/lines.rs index 348402fbb..5173a5df8 100644 --- a/crates/nu-command/src/filters/lines.rs +++ b/crates/nu-command/src/filters/lines.rs @@ -88,41 +88,11 @@ impl Command for Lines { Ok(iter.into_pipeline_data(engine_state.ctrlc.clone())) } - PipelineData::StringStream(stream, span, ..) => { - let mut split_char = "\n"; - - let iter = stream - .into_iter() - .map(move |value| match value { - Ok(value) => { - if split_char != "\r\n" && value.contains("\r\n") { - split_char = "\r\n"; - } - value - .split(split_char) - .filter_map(|s| { - if !s.is_empty() { - Some(Value::String { - val: s.into(), - span, - }) - } else { - None - } - }) - .collect::>() - } - Err(err) => vec![Value::Error { error: err }], - }) - .flatten(); - - Ok(iter.into_pipeline_data(engine_state.ctrlc.clone())) - } PipelineData::Value(val, ..) => Err(ShellError::UnsupportedInput( format!("Not supported input: {}", val.as_string()?), call.head, )), - PipelineData::ByteStream(..) => { + PipelineData::RawStream(..) => { let config = stack.get_config()?; //FIXME: Make sure this can fail in the future to let the user diff --git a/crates/nu-command/src/filters/par_each.rs b/crates/nu-command/src/filters/par_each.rs index a3f0b11ec..4308234e2 100644 --- a/crates/nu-command/src/filters/par_each.rs +++ b/crates/nu-command/src/filters/par_each.rs @@ -177,56 +177,12 @@ impl Command for ParEach { .into_iter() .flatten() .into_pipeline_data(ctrlc)), - PipelineData::StringStream(stream, ..) => Ok(stream + PipelineData::RawStream(stream, ..) => Ok(stream .enumerate() .par_bridge() .map(move |(idx, x)| { let x = match x { - Ok(x) => Value::String { val: x, span }, - Err(err) => return Value::Error { error: err }.into_pipeline_data(), - }; - let block = engine_state.get_block(block_id); - - let mut stack = stack.clone(); - - if let Some(var) = block.signature.get_positional(0) { - if let Some(var_id) = &var.var_id { - if numbered { - stack.add_var( - *var_id, - Value::Record { - cols: vec!["index".into(), "item".into()], - vals: vec![ - Value::Int { - val: idx as i64, - span, - }, - x, - ], - span, - }, - ); - } else { - stack.add_var(*var_id, x); - } - } - } - - match eval_block(&engine_state, &mut stack, block, PipelineData::new(span)) { - Ok(v) => v, - Err(error) => Value::Error { error }.into_pipeline_data(), - } - }) - .collect::>() - .into_iter() - .flatten() - .into_pipeline_data(ctrlc)), - PipelineData::ByteStream(stream, ..) => Ok(stream - .enumerate() - .par_bridge() - .map(move |(idx, x)| { - let x = match x { - Ok(x) => Value::Binary { val: x, span }, + Ok(x) => x, Err(err) => return Value::Error { error: err }.into_pipeline_data(), }; diff --git a/crates/nu-command/src/filters/wrap.rs b/crates/nu-command/src/filters/wrap.rs index 9e7f808ed..6b3667e98 100644 --- a/crates/nu-command/src/filters/wrap.rs +++ b/crates/nu-command/src/filters/wrap.rs @@ -50,13 +50,9 @@ impl Command for Wrap { span, }) .into_pipeline_data(engine_state.ctrlc.clone())), - PipelineData::StringStream(stream, ..) => Ok(Value::String { - val: stream.into_string("")?, - span, - } - .into_pipeline_data()), - PipelineData::ByteStream(stream, ..) => Ok(Value::Binary { - val: stream.into_vec()?, + PipelineData::RawStream(..) => Ok(Value::Record { + cols: vec![name], + vals: vec![input.into_value(call.head)], span, } .into_pipeline_data()), diff --git a/crates/nu-command/src/network/fetch.rs b/crates/nu-command/src/network/fetch.rs index d5674ac8c..68b04b1e9 100644 --- a/crates/nu-command/src/network/fetch.rs +++ b/crates/nu-command/src/network/fetch.rs @@ -2,7 +2,7 @@ use base64::encode; use nu_engine::CallExt; use nu_protocol::ast::Call; use nu_protocol::engine::{Command, EngineState, Stack}; -use nu_protocol::ByteStream; +use nu_protocol::RawStream; use nu_protocol::{ Category, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Value, @@ -356,13 +356,13 @@ fn response_to_buffer( ) -> nu_protocol::PipelineData { let buffered_input = BufReader::new(response); - PipelineData::ByteStream( - ByteStream { - stream: Box::new(BufferedReader { + PipelineData::RawStream( + RawStream::new( + Box::new(BufferedReader { input: buffered_input, }), - ctrlc: engine_state.ctrlc.clone(), - }, + engine_state.ctrlc.clone(), + ), span, None, ) diff --git a/crates/nu-command/src/path/join.rs b/crates/nu-command/src/path/join.rs index 0a6e54d0b..ed9dbca96 100644 --- a/crates/nu-command/src/path/join.rs +++ b/crates/nu-command/src/path/join.rs @@ -5,8 +5,8 @@ use std::{ use nu_engine::CallExt; use nu_protocol::{ - engine::Command, Example, PipelineData, ShellError, Signature, Span, Spanned, SyntaxShape, - Value, ValueStream, + engine::Command, Example, ListStream, PipelineData, ShellError, Signature, Span, Spanned, + SyntaxShape, Value, }; use super::PathSubcommandArguments; @@ -68,7 +68,7 @@ the output of 'path parse' and 'path split' subcommands."# Ok(PipelineData::Value(handle_value(val, &args, head), md)) } PipelineData::ListStream(stream, md) => Ok(PipelineData::ListStream( - ValueStream::from_stream( + ListStream::from_stream( stream.map(move |val| handle_value(val, &args, head)), engine_state.ctrlc.clone(), ), diff --git a/crates/nu-command/src/random/dice.rs b/crates/nu-command/src/random/dice.rs index f18f6edc9..af127bce4 100644 --- a/crates/nu-command/src/random/dice.rs +++ b/crates/nu-command/src/random/dice.rs @@ -2,7 +2,7 @@ use nu_engine::CallExt; use nu_protocol::ast::Call; use nu_protocol::engine::{Command, EngineState, Stack}; use nu_protocol::{ - Category, Example, PipelineData, ShellError, Signature, SyntaxShape, Value, ValueStream, + Category, Example, ListStream, PipelineData, ShellError, Signature, SyntaxShape, Value, }; use rand::prelude::{thread_rng, Rng}; @@ -80,7 +80,7 @@ fn dice( }); Ok(PipelineData::ListStream( - ValueStream::from_stream(iter, engine_state.ctrlc.clone()), + ListStream::from_stream(iter, engine_state.ctrlc.clone()), None, )) } diff --git a/crates/nu-command/src/strings/decode.rs b/crates/nu-command/src/strings/decode.rs index 18db03aed..cd2de9354 100644 --- a/crates/nu-command/src/strings/decode.rs +++ b/crates/nu-command/src/strings/decode.rs @@ -44,8 +44,8 @@ impl Command for Decode { let encoding: Spanned = call.req(engine_state, stack, 0)?; match input { - PipelineData::ByteStream(stream, ..) => { - let bytes: Vec = stream.into_vec()?; + PipelineData::RawStream(stream, ..) => { + let bytes: Vec = stream.into_bytes()?; let encoding = match Encoding::for_label(encoding.item.as_bytes()) { None => Err(ShellError::SpannedLabeledError( diff --git a/crates/nu-command/src/strings/format/command.rs b/crates/nu-command/src/strings/format/command.rs index 4f6f35112..6846ac2a7 100644 --- a/crates/nu-command/src/strings/format/command.rs +++ b/crates/nu-command/src/strings/format/command.rs @@ -2,7 +2,7 @@ use nu_engine::CallExt; use nu_protocol::ast::{Call, PathMember}; use nu_protocol::engine::{Command, EngineState, Stack}; use nu_protocol::{ - Category, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Value, ValueStream, + Category, Example, ListStream, PipelineData, ShellError, Signature, Span, SyntaxShape, Value, }; #[derive(Clone)] @@ -152,7 +152,7 @@ fn format( } Ok(PipelineData::ListStream( - ValueStream::from_stream(list.into_iter(), None), + ListStream::from_stream(list.into_iter(), None), None, )) } diff --git a/crates/nu-command/src/strings/parse.rs b/crates/nu-command/src/strings/parse.rs index 2b5051c5f..874f8b06c 100644 --- a/crates/nu-command/src/strings/parse.rs +++ b/crates/nu-command/src/strings/parse.rs @@ -2,8 +2,8 @@ use nu_engine::CallExt; use nu_protocol::ast::Call; use nu_protocol::engine::{Command, EngineState, Stack}; use nu_protocol::{ - Category, Example, PipelineData, ShellError, Signature, Span, Spanned, SyntaxShape, Value, - ValueStream, + Category, Example, ListStream, PipelineData, ShellError, Signature, Span, Spanned, SyntaxShape, + Value, }; use regex::Regex; @@ -127,7 +127,7 @@ fn operate( } Ok(PipelineData::ListStream( - ValueStream::from_stream(parsed.into_iter(), ctrlc), + ListStream::from_stream(parsed.into_iter(), ctrlc), None, )) } diff --git a/crates/nu-command/src/strings/str_/collect.rs b/crates/nu-command/src/strings/str_/collect.rs index 5547fadf2..783fef4eb 100644 --- a/crates/nu-command/src/strings/str_/collect.rs +++ b/crates/nu-command/src/strings/str_/collect.rs @@ -39,6 +39,7 @@ impl Command for StrCollect { let config = stack.get_config().unwrap_or_default(); + // let output = input.collect_string(&separator.unwrap_or_default(), &config)?; // Hmm, not sure what we actually want. If you don't use debug_string, Date comes out as human readable // which feels funny #[allow(clippy::needless_collect)] diff --git a/crates/nu-command/src/system/run_external.rs b/crates/nu-command/src/system/run_external.rs index 9a6289571..4c2ad078d 100644 --- a/crates/nu-command/src/system/run_external.rs +++ b/crates/nu-command/src/system/run_external.rs @@ -8,7 +8,7 @@ use std::sync::mpsc; use nu_engine::env_to_strings; use nu_protocol::engine::{EngineState, Stack}; use nu_protocol::{ast::Call, engine::Command, ShellError, Signature, SyntaxShape, Value}; -use nu_protocol::{ByteStream, Category, Config, PipelineData, Span, Spanned}; +use nu_protocol::{Category, Config, PipelineData, RawStream, Span, Spanned}; use itertools::Itertools; @@ -242,11 +242,8 @@ impl ExternalCommand { }); let receiver = ChannelReceiver::new(rx); - Ok(PipelineData::ByteStream( - ByteStream { - stream: Box::new(receiver), - ctrlc: output_ctrlc, - }, + Ok(PipelineData::RawStream( + RawStream::new(Box::new(receiver), output_ctrlc), head, None, )) diff --git a/crates/nu-command/src/viewers/table.rs b/crates/nu-command/src/viewers/table.rs index 8534c2c3a..2014ca5fd 100644 --- a/crates/nu-command/src/viewers/table.rs +++ b/crates/nu-command/src/viewers/table.rs @@ -5,8 +5,8 @@ use nu_engine::{env_to_string, CallExt}; use nu_protocol::ast::{Call, PathMember}; use nu_protocol::engine::{Command, EngineState, Stack}; use nu_protocol::{ - Category, Config, DataSource, IntoPipelineData, PipelineData, PipelineMetadata, ShellError, - Signature, Span, StringStream, SyntaxShape, Value, ValueStream, + Category, Config, DataSource, IntoPipelineData, ListStream, PipelineData, PipelineMetadata, + RawStream, ShellError, Signature, Span, SyntaxShape, Value, }; use nu_table::{StyledString, TextStyle, Theme}; use std::sync::atomic::{AtomicBool, Ordering}; @@ -62,32 +62,15 @@ impl Command for Table { }; match input { - PipelineData::ByteStream(stream, ..) => Ok(PipelineData::StringStream( - StringStream::from_stream( - stream.map(move |x| { - Ok(if x.iter().all(|x| x.is_ascii()) { - format!("{}", String::from_utf8_lossy(&x?)) - } else { - format!("{}\n", nu_pretty_hex::pretty_hex(&x?)) - }) - }), - ctrlc, - ), - head, - None, - )), - PipelineData::Value(Value::Binary { val, .. }, ..) => Ok(PipelineData::StringStream( - StringStream::from_stream( - vec![Ok( - if val.iter().all(|x| { - *x < 128 && (*x >= b' ' || *x == b'\t' || *x == b'\r' || *x == b'\n') - }) { - format!("{}", String::from_utf8_lossy(&val)) - } else { - format!("{}\n", nu_pretty_hex::pretty_hex(&val)) - }, - )] - .into_iter(), + PipelineData::RawStream(..) => Ok(input), + PipelineData::Value(Value::Binary { val, .. }, ..) => Ok(PipelineData::RawStream( + RawStream::new( + Box::new( + vec![Ok(format!("{}\n", nu_pretty_hex::pretty_hex(&val)) + .as_bytes() + .to_vec())] + .into_iter(), + ), ctrlc, ), head, @@ -127,7 +110,7 @@ impl Command for Table { None => LsColors::default(), }; - ValueStream::from_stream( + ListStream::from_stream( stream.map(move |mut x| match &mut x { Value::Record { cols, vals, .. } => { let mut idx = 0; @@ -194,15 +177,15 @@ impl Command for Table { let head = call.head; - Ok(PipelineData::StringStream( - StringStream::from_stream( - PagingTableCreator { + Ok(PipelineData::RawStream( + RawStream::new( + Box::new(PagingTableCreator { row_offset, config, ctrlc: ctrlc.clone(), head, stream, - }, + }), ctrlc, ), head, @@ -381,14 +364,14 @@ fn convert_with_precision(val: &str, precision: usize) -> Result>, config: Config, row_offset: usize, } impl Iterator for PagingTableCreator { - type Item = Result; + type Item = Result, ShellError>; fn next(&mut self) -> Option { let mut batch = vec![]; @@ -443,7 +426,7 @@ impl Iterator for PagingTableCreator { Ok(Some(table)) => { let result = nu_table::draw_table(&table, term_width, &color_hm, &self.config); - Some(Ok(result)) + Some(Ok(result.as_bytes().to_vec())) } Err(err) => Some(Err(err)), _ => None, diff --git a/crates/nu-plugin/src/plugin/declaration.rs b/crates/nu-plugin/src/plugin/declaration.rs index 500cf8695..99a0634ef 100644 --- a/crates/nu-plugin/src/plugin/declaration.rs +++ b/crates/nu-plugin/src/plugin/declaration.rs @@ -6,7 +6,7 @@ use std::io::BufReader; use std::path::{Path, PathBuf}; use nu_protocol::engine::{Command, EngineState, Stack}; -use nu_protocol::{ast::Call, Signature, Value}; +use nu_protocol::{ast::Call, Signature}; use nu_protocol::{PipelineData, ShellError}; #[derive(Clone)] @@ -70,33 +70,7 @@ impl Command for PluginDeclaration { ) })?; - let input = match input { - PipelineData::Value(value, ..) => value, - PipelineData::ListStream(stream, ..) => { - let values = stream.collect::>(); - - Value::List { - vals: values, - span: call.head, - } - } - PipelineData::StringStream(stream, ..) => { - let val = stream.into_string("")?; - - Value::String { - val, - span: call.head, - } - } - PipelineData::ByteStream(stream, ..) => { - let val = stream.into_vec()?; - - Value::Binary { - val, - span: call.head, - } - } - }; + let input = input.into_value(call.head); // Create message to plugin to indicate that signature is required and // send call to plugin asking for signature diff --git a/crates/nu-protocol/src/pipeline_data.rs b/crates/nu-protocol/src/pipeline_data.rs index 84ca08968..a128b2980 100644 --- a/crates/nu-protocol/src/pipeline_data.rs +++ b/crates/nu-protocol/src/pipeline_data.rs @@ -1,8 +1,6 @@ use std::sync::{atomic::AtomicBool, Arc}; -use crate::{ - ast::PathMember, ByteStream, Config, ShellError, Span, StringStream, Value, ValueStream, -}; +use crate::{ast::PathMember, Config, ListStream, RawStream, ShellError, Span, Value}; /// The foundational abstraction for input and output to commands /// @@ -36,9 +34,8 @@ use crate::{ #[derive(Debug)] pub enum PipelineData { Value(Value, Option), - ListStream(ValueStream, Option), - StringStream(StringStream, Span, Option), - ByteStream(ByteStream, Span, Option), + ListStream(ListStream, Option), + RawStream(RawStream, Span, Option), } #[derive(Debug, Clone)] @@ -63,8 +60,7 @@ impl PipelineData { pub fn metadata(&self) -> Option { match self { PipelineData::ListStream(_, x) => x.clone(), - PipelineData::ByteStream(_, _, x) => x.clone(), - PipelineData::StringStream(_, _, x) => x.clone(), + PipelineData::RawStream(_, _, x) => x.clone(), PipelineData::Value(_, x) => x.clone(), } } @@ -72,8 +68,7 @@ impl PipelineData { pub fn set_metadata(mut self, metadata: Option) -> Self { match &mut self { PipelineData::ListStream(_, x) => *x = metadata, - PipelineData::ByteStream(_, _, x) => *x = metadata, - PipelineData::StringStream(_, _, x) => *x = metadata, + PipelineData::RawStream(_, _, x) => *x = metadata, PipelineData::Value(_, x) => *x = metadata, } @@ -88,33 +83,51 @@ impl PipelineData { vals: s.collect(), span, // FIXME? }, - PipelineData::StringStream(s, ..) => { - let mut output = String::new(); + PipelineData::RawStream(mut s, ..) => { + let mut items = vec![]; - for item in s { - match item { - Ok(s) => output.push_str(&s), - Err(err) => return Value::Error { error: err }, - } - } - Value::String { - val: output, - span, // FIXME? - } - } - PipelineData::ByteStream(s, ..) => { - let mut output = vec![]; - - for item in s { - match item { - Ok(s) => output.extend(&s), - Err(err) => return Value::Error { error: err }, + for val in &mut s { + match val { + Ok(val) => { + items.push(val); + } + Err(e) => { + return Value::Error { error: e }; + } } } - Value::Binary { - val: output, - span, // FIXME? + if s.is_binary { + let mut output = vec![]; + for item in items { + match item.as_binary() { + Ok(item) => { + output.extend(item); + } + Err(err) => { + return Value::Error { error: err }; + } + } + } + + Value::Binary { + val: output, + span, // FIXME? + } + } else { + let mut output = String::new(); + for item in items { + match item.as_string() { + Ok(s) => output.push_str(&s), + Err(err) => { + return Value::Error { error: err }; + } + } + } + Value::String { + val: output, + span, // FIXME? + } } } } @@ -134,9 +147,30 @@ impl PipelineData { match self { PipelineData::Value(v, ..) => Ok(v.into_string(separator, config)), PipelineData::ListStream(s, ..) => Ok(s.into_string(separator, config)), - PipelineData::StringStream(s, ..) => s.into_string(separator), - PipelineData::ByteStream(s, ..) => { - Ok(String::from_utf8_lossy(&s.into_vec()?).to_string()) + PipelineData::RawStream(s, ..) => { + let mut items = vec![]; + + for val in s { + match val { + Ok(val) => { + items.push(val); + } + Err(e) => { + return Err(e); + } + } + } + + let mut output = String::new(); + for item in items { + match item.as_string() { + Ok(s) => output.push_str(&s), + Err(err) => { + return Err(err); + } + } + } + Ok(output) } } } @@ -191,9 +225,9 @@ impl PipelineData { Ok(vals.into_iter().map(f).into_pipeline_data(ctrlc)) } PipelineData::ListStream(stream, ..) => Ok(stream.map(f).into_pipeline_data(ctrlc)), - PipelineData::StringStream(stream, span, ..) => Ok(stream + PipelineData::RawStream(stream, ..) => Ok(stream .map(move |x| match x { - Ok(s) => f(Value::String { val: s, span }), + Ok(v) => f(v), Err(err) => Value::Error { error: err }, }) .into_pipeline_data(ctrlc)), @@ -205,11 +239,6 @@ impl PipelineData { Value::Error { error } => Err(error), v => Ok(v.into_pipeline_data()), }, - PipelineData::ByteStream(_, span, ..) => Err(ShellError::UnsupportedInput( - "Binary output from this command may need to be decoded using the 'decode' command" - .into(), - span, - )), } } @@ -232,9 +261,9 @@ impl PipelineData { PipelineData::ListStream(stream, ..) => { Ok(stream.map(f).flatten().into_pipeline_data(ctrlc)) } - PipelineData::StringStream(stream, span, ..) => Ok(stream + PipelineData::RawStream(stream, ..) => Ok(stream .map(move |x| match x { - Ok(s) => Value::String { val: s, span }, + Ok(v) => v, Err(err) => Value::Error { error: err }, }) .map(f) @@ -245,11 +274,6 @@ impl PipelineData { Err(error) => Err(error), }, PipelineData::Value(v, ..) => Ok(f(v).into_iter().into_pipeline_data(ctrlc)), - PipelineData::ByteStream(_, span, ..) => Err(ShellError::UnsupportedInput( - "Binary output from this command may need to be decoded using the 'decode' command" - .into(), - span, - )), } } @@ -267,14 +291,13 @@ impl PipelineData { Ok(vals.into_iter().filter(f).into_pipeline_data(ctrlc)) } PipelineData::ListStream(stream, ..) => Ok(stream.filter(f).into_pipeline_data(ctrlc)), - PipelineData::StringStream(stream, span, ..) => Ok(stream + PipelineData::RawStream(stream, ..) => Ok(stream .map(move |x| match x { - Ok(s) => Value::String { val: s, span }, + Ok(v) => v, Err(err) => Value::Error { error: err }, }) .filter(f) .into_pipeline_data(ctrlc)), - PipelineData::Value(Value::Range { val, .. }, ..) => { Ok(val.into_range_iter()?.filter(f).into_pipeline_data(ctrlc)) } @@ -285,11 +308,6 @@ impl PipelineData { Ok(Value::Nothing { span: v.span()? }.into_pipeline_data()) } } - PipelineData::ByteStream(_, span, ..) => Err(ShellError::UnsupportedInput( - "Binary output from this command may need to be decoded using the 'decode' command" - .into(), - span, - )), } } } @@ -305,7 +323,7 @@ impl IntoIterator for PipelineData { match self { PipelineData::Value(Value::List { vals, .. }, metadata) => { PipelineIterator(PipelineData::ListStream( - ValueStream { + ListStream { stream: Box::new(vals.into_iter()), ctrlc: None, }, @@ -315,14 +333,14 @@ impl IntoIterator for PipelineData { PipelineData::Value(Value::Range { val, .. }, metadata) => { match val.into_range_iter() { Ok(iter) => PipelineIterator(PipelineData::ListStream( - ValueStream { + ListStream { stream: Box::new(iter), ctrlc: None, }, metadata, )), Err(error) => PipelineIterator(PipelineData::ListStream( - ValueStream { + ListStream { stream: Box::new(std::iter::once(Value::Error { error })), ctrlc: None, }, @@ -343,18 +361,8 @@ impl Iterator for PipelineIterator { PipelineData::Value(Value::Nothing { .. }, ..) => None, PipelineData::Value(v, ..) => Some(std::mem::take(v)), PipelineData::ListStream(stream, ..) => stream.next(), - PipelineData::StringStream(stream, span, ..) => stream.next().map(|x| match x { - Ok(x) => Value::String { - val: x, - span: *span, - }, - Err(err) => Value::Error { error: err }, - }), - PipelineData::ByteStream(stream, span, ..) => stream.next().map(|x| match x { - Ok(x) => Value::Binary { - val: x, - span: *span, - }, + PipelineData::RawStream(stream, ..) => stream.next().map(|x| match x { + Ok(x) => x, Err(err) => Value::Error { error: err }, }), } @@ -391,7 +399,7 @@ where { fn into_pipeline_data(self, ctrlc: Option>) -> PipelineData { PipelineData::ListStream( - ValueStream { + ListStream { stream: Box::new(self.into_iter().map(Into::into)), ctrlc, }, @@ -405,7 +413,7 @@ where ctrlc: Option>, ) -> PipelineData { PipelineData::ListStream( - ValueStream { + ListStream { stream: Box::new(self.into_iter().map(Into::into)), ctrlc, }, diff --git a/crates/nu-protocol/src/value/mod.rs b/crates/nu-protocol/src/value/mod.rs index 033d7690d..8794a3076 100644 --- a/crates/nu-protocol/src/value/mod.rs +++ b/crates/nu-protocol/src/value/mod.rs @@ -239,6 +239,18 @@ impl Value { } } + pub fn as_binary(&self) -> Result<&[u8], ShellError> { + match self { + Value::Binary { val, .. } => Ok(val), + Value::String { val, .. } => Ok(val.as_bytes()), + x => Err(ShellError::CantConvert( + "binary".into(), + x.get_type().to_string(), + self.span()?, + )), + } + } + pub fn as_record(&self) -> Result<(&[String], &[Value]), ShellError> { match self { Value::Record { cols, vals, .. } => Ok((cols, vals)), diff --git a/crates/nu-protocol/src/value/stream.rs b/crates/nu-protocol/src/value/stream.rs index 3529b086e..f45985bc8 100644 --- a/crates/nu-protocol/src/value/stream.rs +++ b/crates/nu-protocol/src/value/stream.rs @@ -7,95 +7,139 @@ use std::{ }, }; -/// A single buffer of binary data streamed over multiple parts. Optionally contains ctrl-c that can be used -/// to break the stream. -pub struct ByteStream { +pub struct RawStream { pub stream: Box, ShellError>> + Send + 'static>, + pub leftover: Vec, pub ctrlc: Option>, + pub is_binary: bool, + pub span: Span, } -impl ByteStream { - pub fn into_vec(self) -> Result, ShellError> { + +impl RawStream { + pub fn new( + stream: Box, ShellError>> + Send + 'static>, + ctrlc: Option>, + ) -> Self { + Self { + stream, + leftover: vec![], + ctrlc, + is_binary: false, + span: Span::new(0, 0), + } + } + + pub fn into_bytes(self) -> Result, ShellError> { let mut output = vec![]; + for item in self.stream { - output.append(&mut item?); + output.extend(item?); } Ok(output) } -} -impl Debug for ByteStream { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ByteStream").finish() - } -} -impl Iterator for ByteStream { - type Item = Result, ShellError>; - - fn next(&mut self) -> Option { - if let Some(ctrlc) = &self.ctrlc { - if ctrlc.load(Ordering::SeqCst) { - None - } else { - self.stream.next() - } - } else { - self.stream.next() - } - } -} - -/// A single string streamed over multiple parts. Optionally contains ctrl-c that can be used -/// to break the stream. -pub struct StringStream { - pub stream: Box> + Send + 'static>, - pub ctrlc: Option>, -} -impl StringStream { - pub fn into_string(self, separator: &str) -> Result { + pub fn into_string(self) -> Result { let mut output = String::new(); - let mut first = true; - for s in self.stream { - output.push_str(&s?); - - if !first { - output.push_str(separator); - } else { - first = false; - } + for item in self { + output.push_str(&item?.as_string()?); } + Ok(output) } - - pub fn from_stream( - input: impl Iterator> + Send + 'static, - ctrlc: Option>, - ) -> StringStream { - StringStream { - stream: Box::new(input), - ctrlc, - } - } } -impl Debug for StringStream { +impl Debug for RawStream { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("StringStream").finish() + f.debug_struct("RawStream").finish() } } - -impl Iterator for StringStream { - type Item = Result; +impl Iterator for RawStream { + type Item = Result; fn next(&mut self) -> Option { - if let Some(ctrlc) = &self.ctrlc { - if ctrlc.load(Ordering::SeqCst) { - None - } else { - self.stream.next() + // If we know we're already binary, just output that + if self.is_binary { + match self.stream.next() { + Some(buffer) => match buffer { + Ok(mut v) => { + while let Some(b) = self.leftover.pop() { + v.insert(0, b); + } + Some(Ok(Value::Binary { + val: v, + span: self.span, + })) + } + Err(e) => Some(Err(e)), + }, + None => None, } } else { - self.stream.next() + // We *may* be text. We're only going to try utf-8. Other decodings + // needs to be taken as binary first, then passed through `decode`. + match self.stream.next() { + Some(buffer) => match buffer { + Ok(mut v) => { + while let Some(b) = self.leftover.pop() { + v.insert(0, b); + } + + match String::from_utf8(v.clone()) { + Ok(s) => { + // Great, we have a complete string, let's output it + Some(Ok(Value::String { + val: s, + span: self.span, + })) + } + Err(err) => { + // Okay, we *might* have a string but we've also got some errors + if v.is_empty() { + // We can just end here + None + } else if v.len() > 3 + && (v.len() - err.utf8_error().valid_up_to() > 3) + { + // As UTF-8 characters are max 4 bytes, if we have more than that in error we know + // that it's not just a character spanning two frames. + // We now know we are definitely binary, so switch to binary and stay there. + self.is_binary = true; + Some(Ok(Value::Binary { + val: v, + span: self.span, + })) + } else { + // Okay, we have a tiny bit of error at the end of the buffer. This could very well be + // a character that spans two frames. Since this is the case, remove the error from + // the current frame an dput it in the leftover buffer. + self.leftover = + v[(err.utf8_error().valid_up_to() + 1)..].to_vec(); + + let buf = v[0..err.utf8_error().valid_up_to()].to_vec(); + + match String::from_utf8(buf) { + Ok(s) => Some(Ok(Value::String { + val: s, + span: self.span, + })), + Err(_) => { + // Something is definitely wrong. Switch to binary, and stay there + self.is_binary = true; + Some(Ok(Value::Binary { + val: v, + span: self.span, + })) + } + } + } + } + } + } + Err(e) => Some(Err(e)), + }, + None => None, + } } } } @@ -106,12 +150,12 @@ impl Iterator for StringStream { /// In practice, a "stream" here means anything which can be iterated and produce Values as it iterates. /// Like other iterators in Rust, observing values from this stream will drain the items as you view them /// and the stream cannot be replayed. -pub struct ValueStream { +pub struct ListStream { pub stream: Box + Send + 'static>, pub ctrlc: Option>, } -impl ValueStream { +impl ListStream { pub fn into_string(self, separator: &str, config: &Config) -> String { self.map(|x: Value| x.into_string(", ", config)) .collect::>() @@ -121,21 +165,21 @@ impl ValueStream { pub fn from_stream( input: impl Iterator + Send + 'static, ctrlc: Option>, - ) -> ValueStream { - ValueStream { + ) -> ListStream { + ListStream { stream: Box::new(input), ctrlc, } } } -impl Debug for ValueStream { +impl Debug for ListStream { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ValueStream").finish() } } -impl Iterator for ValueStream { +impl Iterator for ListStream { type Item = Value; fn next(&mut self) -> Option { diff --git a/src/main.rs b/src/main.rs index 5a9cbacba..a1ce83ae0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,7 +17,7 @@ use nu_parser::parse; use nu_protocol::{ ast::{Call, Expr, Expression, Pipeline, Statement}, engine::{Command, EngineState, Stack, StateWorkingSet}, - ByteStream, Category, Example, IntoPipelineData, PipelineData, ShellError, Signature, Span, + Category, Example, IntoPipelineData, PipelineData, RawStream, ShellError, Signature, Span, Spanned, SyntaxShape, Value, CONFIG_VARIABLE_ID, }; use std::{ @@ -119,11 +119,8 @@ fn main() -> Result<()> { let stdin = std::io::stdin(); let buf_reader = BufReader::new(stdin); - PipelineData::ByteStream( - ByteStream { - stream: Box::new(BufferedReader::new(buf_reader)), - ctrlc: Some(ctrlc), - }, + PipelineData::RawStream( + RawStream::new(Box::new(BufferedReader::new(buf_reader)), Some(ctrlc)), redirect_stdin.span, None, ) diff --git a/src/utils.rs b/src/utils.rs index f5b72ca32..367dda38a 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -198,36 +198,13 @@ fn print_pipeline_data( let config = stack.get_config().unwrap_or_default(); - match input { - PipelineData::StringStream(stream, _, _) => { - for s in stream { - print!("{}", s?); - let _ = std::io::stdout().flush(); - } - return Ok(()); - } - PipelineData::ByteStream(stream, _, _) => { - let mut address_offset = 0; - for v in stream { - let cfg = nu_pretty_hex::HexConfig { - title: false, - address_offset, - ..Default::default() - }; + let mut stdout = std::io::stdout(); - let v = v?; - address_offset += v.len(); - - let s = if v.iter().all(|x| x.is_ascii()) { - format!("{}", String::from_utf8_lossy(&v)) - } else { - nu_pretty_hex::config_hex(&v, cfg) - }; - println!("{}", s); - } - return Ok(()); + if let PipelineData::RawStream(stream, _, _) = input { + for s in stream { + let _ = stdout.write(s?.as_binary()?); } - _ => {} + return Ok(()); } match engine_state.find_decl("table".as_bytes()) {