From 8925ca5da3cc519b9a904fd95478a83a86a58990 Mon Sep 17 00:00:00 2001 From: Jonathan Turner Date: Sat, 7 Mar 2020 05:06:39 +1300 Subject: [PATCH] Move to bytes/string hybrid codec (#1457) * WIP: move to bytes codec * Progress on adding collect helpers * Progress on adding collect helpers * Add in line splitting back to lines * Lines outputting line primitives * Close to ready? * Finish fixing lines * clippy fixes * fmt fixes * removed unused code * Cleanup a few bits * Cleanup a few bits * Cleanup a few more bits * Fix failing test with corrected test case --- Cargo.lock | 2 +- crates/nu-cli/Cargo.toml | 2 +- crates/nu-cli/src/cli.rs | 26 +- crates/nu-cli/src/commands/autoview.rs | 1 + .../src/commands/classified/external.rs | 239 +++++++++++------- crates/nu-cli/src/commands/from_bson.rs | 32 +-- .../src/commands/from_delimited_data.rs | 27 +- crates/nu-cli/src/commands/from_ini.rs | 30 +-- crates/nu-cli/src/commands/from_json.rs | 68 ++--- crates/nu-cli/src/commands/from_ods.rs | 84 +++--- crates/nu-cli/src/commands/from_sqlite.rs | 47 ++-- crates/nu-cli/src/commands/from_ssv.rs | 27 +- crates/nu-cli/src/commands/from_toml.rs | 32 +-- crates/nu-cli/src/commands/from_url.rs | 42 +-- crates/nu-cli/src/commands/from_xlsx.rs | 83 +++--- crates/nu-cli/src/commands/from_xml.rs | 30 +-- crates/nu-cli/src/commands/from_yaml.rs | 32 +-- crates/nu-cli/src/commands/lines.rs | 105 +++++--- crates/nu-cli/src/stream.rs | 89 ++++++- tests/shell/pipeline/commands/external.rs | 1 + 20 files changed, 475 insertions(+), 524 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ebed1bd097..5fa0a9fddf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2251,7 +2251,7 @@ dependencies = [ "bigdecimal", "bson", "byte-unit", - "bytes 0.4.12", + "bytes 0.5.4", "calamine", "cfg-if", "chrono", diff --git a/crates/nu-cli/Cargo.toml b/crates/nu-cli/Cargo.toml index 3e91c87d27..9e3bf359b6 100644 --- a/crates/nu-cli/Cargo.toml +++ b/crates/nu-cli/Cargo.toml @@ -25,7 +25,7 @@ base64 = "0.11" bigdecimal = { version = "0.1.0", features = ["serde"] } bson = { version = "0.14.0", features = ["decimal128"] } byte-unit = "3.0.3" -bytes = "0.4.12" +bytes = "0.5.4" calamine = "0.16" cfg-if = "0.1" chrono = { version = "0.4.10", features = ["serde"] } diff --git a/crates/nu-cli/src/cli.rs b/crates/nu-cli/src/cli.rs index 510d238e07..9d05732333 100644 --- a/crates/nu-cli/src/cli.rs +++ b/crates/nu-cli/src/cli.rs @@ -1,3 +1,4 @@ +use crate::commands::classified::external::{MaybeTextCodec, StringOrBinary}; use crate::commands::classified::pipeline::run_pipeline; use crate::commands::plugin::JsonRpc; use crate::commands::plugin::{PluginCommand, PluginSink}; @@ -6,7 +7,8 @@ use crate::context::Context; #[cfg(not(feature = "starship-prompt"))] use crate::git::current_branch; use crate::prelude::*; -use futures_codec::{FramedRead, LinesCodec}; +use futures_codec::FramedRead; + use nu_errors::ShellError; use nu_parser::{ClassifiedPipeline, PipelineShape, SpannedToken, TokensIterator}; use nu_protocol::{Primitive, ReturnSuccess, Signature, UntaggedValue, Value}; @@ -620,15 +622,21 @@ async fn process_line( } let input_stream = if redirect_stdin { - let file = futures::io::AllowStdIo::new( - crate::commands::classified::external::StdoutWithNewline::new(std::io::stdin()), - ); - let stream = FramedRead::new(file, LinesCodec).map(|line| { + let file = futures::io::AllowStdIo::new(std::io::stdin()); + let stream = FramedRead::new(file, MaybeTextCodec).map(|line| { if let Ok(line) = line { - Ok(Value { - value: UntaggedValue::Primitive(Primitive::String(line)), - tag: Tag::unknown(), - }) + match line { + StringOrBinary::String(s) => Ok(Value { + value: UntaggedValue::Primitive(Primitive::String(s)), + tag: Tag::unknown(), + }), + StringOrBinary::Binary(b) => Ok(Value { + value: UntaggedValue::Primitive(Primitive::Binary( + b.into_iter().collect(), + )), + tag: Tag::unknown(), + }), + } } else { panic!("Internal error: could not read lines of text from stdin") } diff --git a/crates/nu-cli/src/commands/autoview.rs b/crates/nu-cli/src/commands/autoview.rs index bd92ce6fca..ec8759a283 100644 --- a/crates/nu-cli/src/commands/autoview.rs +++ b/crates/nu-cli/src/commands/autoview.rs @@ -92,6 +92,7 @@ pub fn autoview(context: RunnableContext) -> Result { } }; let stream = stream.to_input_stream(); + if let Some(table) = table { let command_args = create_default_command_args(&context).with_input(stream); let result = table.run(command_args, &context.commands); diff --git a/crates/nu-cli/src/commands/classified/external.rs b/crates/nu-cli/src/commands/classified/external.rs index de306c1c3f..9d68310dcb 100644 --- a/crates/nu-cli/src/commands/classified/external.rs +++ b/crates/nu-cli/src/commands/classified/external.rs @@ -1,8 +1,9 @@ use crate::futures::ThreadedReceiver; use crate::prelude::*; +use bytes::{BufMut, Bytes, BytesMut}; use futures::executor::block_on_stream; use futures::stream::StreamExt; -use futures_codec::{FramedRead, LinesCodec}; +use futures_codec::FramedRead; use log::trace; use nu_errors::ShellError; use nu_parser::commands::classified::external::ExternalArg; @@ -15,6 +16,70 @@ use std::ops::Deref; use std::process::{Command, Stdio}; use std::sync::mpsc; +pub enum StringOrBinary { + String(String), + Binary(Vec), +} +pub struct MaybeTextCodec; + +impl futures_codec::Encoder for MaybeTextCodec { + type Item = StringOrBinary; + type Error = std::io::Error; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + match item { + StringOrBinary::String(s) => { + dst.reserve(s.len()); + dst.put(s.as_bytes()); + Ok(()) + } + StringOrBinary::Binary(b) => { + dst.reserve(b.len()); + dst.put(Bytes::from(b)); + Ok(()) + } + } + } +} + +impl futures_codec::Decoder for MaybeTextCodec { + type Item = StringOrBinary; + type Error = std::io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let v: Vec = src.to_vec(); + match String::from_utf8(v) { + Ok(s) => { + src.clear(); + if s.is_empty() { + Ok(None) + } else { + Ok(Some(StringOrBinary::String(s))) + } + } + Err(err) => { + // Note: the longest UTF-8 character per Unicode spec is currently 6 bytes. If we fail somewhere earlier than the last 6 bytes, + // we know that we're failing to understand the string encoding and not just seeing a partial character. When this happens, let's + // fall back to assuming it's a binary buffer. + if src.is_empty() { + Ok(None) + } else if src.len() > 6 && (src.len() - err.utf8_error().valid_up_to() > 6) { + // Fall back to assuming binary + let buf = src.to_vec(); + src.clear(); + Ok(Some(StringOrBinary::Binary(buf))) + } else { + // Looks like a utf-8 string, so let's assume that + let buf = src.split_to(err.utf8_error().valid_up_to() + 1); + String::from_utf8(buf.to_vec()) + .map(|x| Some(StringOrBinary::String(x))) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e)) + } + } + } + } +} + pub fn nu_value_to_string(command: &ExternalCommand, from: &Value) -> Result { match &from.value { UntaggedValue::Primitive(Primitive::Int(i)) => Ok(i.to_string()), @@ -29,25 +94,6 @@ pub fn nu_value_to_string(command: &ExternalCommand, from: &Value) -> Result Result, ShellError> { - match &from.value { - UntaggedValue::Primitive(Primitive::Nothing) => Ok(None), - UntaggedValue::Primitive(Primitive::String(s)) - | UntaggedValue::Primitive(Primitive::Line(s)) => Ok(Some(s.clone())), - unsupported => Err(ShellError::labeled_error( - format!( - "Received unexpected type from pipeline ({})", - unsupported.type_name() - ), - "expected a string", - name_tag, - )), - } -} - pub(crate) fn run_external_command( command: ExternalCommand, context: &mut Context, @@ -382,45 +428,6 @@ fn run_with_stdin( spawn(&command, &path, &process_args[..], input, is_last) } -/// This is a wrapper for stdout-like readers that ensure a carriage return ends the stream -pub struct StdoutWithNewline { - stdout: T, - ended_in_newline: bool, -} - -impl StdoutWithNewline { - pub fn new(stdout: T) -> StdoutWithNewline { - StdoutWithNewline { - stdout, - ended_in_newline: false, - } - } -} -impl std::io::Read for StdoutWithNewline { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - match self.stdout.read(buf) { - Err(e) => Err(e), - Ok(0) => { - if !self.ended_in_newline && !buf.is_empty() { - self.ended_in_newline = true; - buf[0] = b'\n'; - Ok(1) - } else { - Ok(0) - } - } - Ok(len) => { - if buf[len - 1] == b'\n' { - self.ended_in_newline = true; - } else { - self.ended_in_newline = false; - } - Ok(len) - } - } - } -} - fn spawn( command: &ExternalCommand, path: &str, @@ -487,31 +494,54 @@ fn spawn( .expect("Internal error: could not get stdin pipe for external command"); for value in block_on_stream(input) { - let input_string = match nu_value_to_string_for_stdin(&stdin_name_tag, &value) { - Ok(None) => continue, - Ok(Some(v)) => v, - Err(e) => { + match &value.value { + UntaggedValue::Primitive(Primitive::Nothing) => continue, + UntaggedValue::Primitive(Primitive::String(s)) + | UntaggedValue::Primitive(Primitive::Line(s)) => { + if let Err(e) = stdin_write.write(s.as_bytes()) { + let message = format!("Unable to write to stdin (error = {})", e); + + let _ = stdin_write_tx.send(Ok(Value { + value: UntaggedValue::Error(ShellError::labeled_error( + message, + "application may have closed before completing pipeline", + &stdin_name_tag, + )), + tag: stdin_name_tag, + })); + return Err(()); + } + } + UntaggedValue::Primitive(Primitive::Binary(b)) => { + if let Err(e) = stdin_write.write(b) { + let message = format!("Unable to write to stdin (error = {})", e); + + let _ = stdin_write_tx.send(Ok(Value { + value: UntaggedValue::Error(ShellError::labeled_error( + message, + "application may have closed before completing pipeline", + &stdin_name_tag, + )), + tag: stdin_name_tag, + })); + return Err(()); + } + } + unsupported => { let _ = stdin_write_tx.send(Ok(Value { - value: UntaggedValue::Error(e), + value: UntaggedValue::Error(ShellError::labeled_error( + format!( + "Received unexpected type from pipeline ({})", + unsupported.type_name() + ), + "expected a string", + stdin_name_tag.clone(), + )), tag: stdin_name_tag, })); return Err(()); } }; - - if let Err(e) = stdin_write.write(input_string.as_bytes()) { - let message = format!("Unable to write to stdin (error = {})", e); - - let _ = stdin_write_tx.send(Ok(Value { - value: UntaggedValue::Error(ShellError::labeled_error( - message, - "application may have closed before completing pipeline", - &stdin_name_tag, - )), - tag: stdin_name_tag, - })); - return Err(()); - } } } @@ -534,29 +564,46 @@ fn spawn( return Err(()); }; - let file = futures::io::AllowStdIo::new(StdoutWithNewline::new(stdout)); - let stream = FramedRead::new(file, LinesCodec); + let file = futures::io::AllowStdIo::new(stdout); + let stream = FramedRead::new(file, MaybeTextCodec); for line in block_on_stream(stream) { - if let Ok(line) = line { - let result = stdout_read_tx.send(Ok(Value { - value: UntaggedValue::Primitive(Primitive::Line(line)), - tag: stdout_name_tag.clone(), - })); + match line { + Ok(line) => match line { + StringOrBinary::String(s) => { + let result = stdout_read_tx.send(Ok(Value { + value: UntaggedValue::Primitive(Primitive::String(s.clone())), + tag: stdout_name_tag.clone(), + })); - if result.is_err() { + if result.is_err() { + break; + } + } + StringOrBinary::Binary(b) => { + let result = stdout_read_tx.send(Ok(Value { + value: UntaggedValue::Primitive(Primitive::Binary( + b.into_iter().collect(), + )), + tag: stdout_name_tag.clone(), + })); + + if result.is_err() { + break; + } + } + }, + Err(_) => { + let _ = stdout_read_tx.send(Ok(Value { + value: UntaggedValue::Error(ShellError::labeled_error( + "Unable to read from stdout.", + "unable to read from stdout", + &stdout_name_tag, + )), + tag: stdout_name_tag.clone(), + })); break; } - } else { - let _ = stdout_read_tx.send(Ok(Value { - value: UntaggedValue::Error(ShellError::labeled_error( - "Unable to read lines from stdout. This usually happens when the output does not end with a newline.", - "unable to read from stdout", - &stdout_name_tag, - )), - tag: stdout_name_tag.clone(), - })); - break; } } } diff --git a/crates/nu-cli/src/commands/from_bson.rs b/crates/nu-cli/src/commands/from_bson.rs index c64b607386..92849e6701 100644 --- a/crates/nu-cli/src/commands/from_bson.rs +++ b/crates/nu-cli/src/commands/from_bson.rs @@ -205,32 +205,18 @@ fn from_bson(args: CommandArgs, registry: &CommandRegistry) -> Result = input.values.collect().await; + let bytes = input.collect_binary(tag.clone()).await?; - for value in values { - let value_tag = &value.tag; - match value.value { - UntaggedValue::Primitive(Primitive::Binary(vb)) => - match from_bson_bytes_to_value(vb, tag.clone()) { - Ok(x) => yield ReturnSuccess::value(x), - Err(_) => { - yield Err(ShellError::labeled_error_with_secondary( - "Could not parse as BSON", - "input cannot be parsed as BSON", - tag.clone(), - "value originates from here", - value_tag, - )) - } - } - _ => yield Err(ShellError::labeled_error_with_secondary( - "Expected a string from pipeline", - "requires string input", + match from_bson_bytes_to_value(bytes.item, tag.clone()) { + Ok(x) => yield ReturnSuccess::value(x), + Err(_) => { + yield Err(ShellError::labeled_error_with_secondary( + "Could not parse as BSON", + "input cannot be parsed as BSON", tag.clone(), "value originates from here", - value_tag, - )), - + bytes.tag, + )) } } }; diff --git a/crates/nu-cli/src/commands/from_delimited_data.rs b/crates/nu-cli/src/commands/from_delimited_data.rs index 47787fa001..86854227ec 100644 --- a/crates/nu-cli/src/commands/from_delimited_data.rs +++ b/crates/nu-cli/src/commands/from_delimited_data.rs @@ -47,28 +47,9 @@ pub fn from_delimited_data( let name_tag = name; let stream = async_stream! { - let values: Vec = input.values.collect().await; + let concat_string = input.collect_string(name_tag.clone()).await?; - let mut concat_string = String::new(); - let mut latest_tag: Option = None; - - for value in values { - let value_tag = &value.tag; - latest_tag = Some(value_tag.clone()); - if let Ok(s) = value.as_string() { - concat_string.push_str(&s); - } else { - yield Err(ShellError::labeled_error_with_secondary( - "Expected a string from pipeline", - "requires string input", - name_tag.clone(), - "value originates from here", - value_tag.clone(), - )) - } - } - - match from_delimited_string_to_value(concat_string, headerless, sep, name_tag.clone()) { + match from_delimited_string_to_value(concat_string.item, headerless, sep, name_tag.clone()) { Ok(x) => match x { Value { value: UntaggedValue::Table(list), .. } => { for l in list { @@ -77,7 +58,7 @@ pub fn from_delimited_data( } x => yield ReturnSuccess::value(x), }, - Err(_) => if let Some(last_tag) = latest_tag { + Err(_) => { let line_one = format!("Could not parse as {}", format_name); let line_two = format!("input cannot be parsed as {}", format_name); yield Err(ShellError::labeled_error_with_secondary( @@ -85,7 +66,7 @@ pub fn from_delimited_data( line_two, name_tag.clone(), "value originates from here", - last_tag.clone(), + concat_string.tag, )) } , } diff --git a/crates/nu-cli/src/commands/from_ini.rs b/crates/nu-cli/src/commands/from_ini.rs index 025ecb14de..98d85f3862 100644 --- a/crates/nu-cli/src/commands/from_ini.rs +++ b/crates/nu-cli/src/commands/from_ini.rs @@ -66,32 +66,12 @@ pub fn from_ini_string_to_value( fn from_ini(args: CommandArgs, registry: &CommandRegistry) -> Result { let args = args.evaluate_once(registry)?; let tag = args.name_tag(); - let span = tag.span; let input = args.input; let stream = async_stream! { - let values: Vec = input.values.collect().await; + let concat_string = input.collect_string(tag.clone()).await?; - let mut concat_string = String::new(); - let mut latest_tag: Option = None; - - for value in values { - latest_tag = Some(value.tag.clone()); - let value_span = value.tag.span; - if let Ok(s) = value.as_string() { - concat_string.push_str(&s); - } else { - yield Err(ShellError::labeled_error_with_secondary( - "Expected a string from pipeline", - "requires string input", - span, - "value originates from here", - value_span, - )) - } - } - - match from_ini_string_to_value(concat_string, tag.clone()) { + match from_ini_string_to_value(concat_string.item, tag.clone()) { Ok(x) => match x { Value { value: UntaggedValue::Table(list), .. } => { for l in list { @@ -100,15 +80,15 @@ fn from_ini(args: CommandArgs, registry: &CommandRegistry) -> Result yield ReturnSuccess::value(x), }, - Err(_) => if let Some(last_tag) = latest_tag { + Err(_) => { yield Err(ShellError::labeled_error_with_secondary( "Could not parse as INI", "input cannot be parsed as INI", &tag, "value originates from here", - last_tag, + concat_string.tag, )) - } , + } } }; diff --git a/crates/nu-cli/src/commands/from_json.rs b/crates/nu-cli/src/commands/from_json.rs index 75e9d0b3e3..4b48dfe5da 100644 --- a/crates/nu-cli/src/commands/from_json.rs +++ b/crates/nu-cli/src/commands/from_json.rs @@ -74,35 +74,13 @@ fn from_json( FromJSONArgs { objects }: FromJSONArgs, RunnableContext { input, name, .. }: RunnableContext, ) -> Result { - let name_span = name.span; let name_tag = name; let stream = async_stream! { - let values: Vec = input.values.collect().await; - - let mut concat_string = String::new(); - let mut latest_tag: Option = None; - - for value in values { - latest_tag = Some(value.tag.clone()); - let value_span = value.tag.span; - - if let Ok(s) = value.as_string() { - concat_string.push_str(&s); - } else { - yield Err(ShellError::labeled_error_with_secondary( - "Expected a string from pipeline", - "requires string input", - name_span, - "value originates from here", - value_span, - )) - } - } - + let concat_string = input.collect_string(name_tag.clone()).await?; if objects { - for json_str in concat_string.lines() { + for json_str in concat_string.item.lines() { if json_str.is_empty() { continue; } @@ -111,23 +89,21 @@ fn from_json( Ok(x) => yield ReturnSuccess::value(x), Err(e) => { - if let Some(ref last_tag) = latest_tag { - let mut message = "Could not parse as JSON (".to_string(); - message.push_str(&e.to_string()); - message.push_str(")"); + let mut message = "Could not parse as JSON (".to_string(); + message.push_str(&e.to_string()); + message.push_str(")"); - yield Err(ShellError::labeled_error_with_secondary( - message, - "input cannot be parsed as JSON", - &name_tag, - "value originates from here", - last_tag)) - } + yield Err(ShellError::labeled_error_with_secondary( + message, + "input cannot be parsed as JSON", + &name_tag, + "value originates from here", + concat_string.tag.clone())) } } } } else { - match from_json_string_to_value(concat_string, name_tag.clone()) { + match from_json_string_to_value(concat_string.item, name_tag.clone()) { Ok(x) => match x { Value { value: UntaggedValue::Table(list), .. } => { @@ -138,18 +114,16 @@ fn from_json( x => yield ReturnSuccess::value(x), } Err(e) => { - if let Some(last_tag) = latest_tag { - let mut message = "Could not parse as JSON (".to_string(); - message.push_str(&e.to_string()); - message.push_str(")"); + let mut message = "Could not parse as JSON (".to_string(); + message.push_str(&e.to_string()); + message.push_str(")"); - yield Err(ShellError::labeled_error_with_secondary( - message, - "input cannot be parsed as JSON", - name_tag, - "value originates from here", - last_tag)) - } + yield Err(ShellError::labeled_error_with_secondary( + message, + "input cannot be parsed as JSON", + name_tag, + "value originates from here", + concat_string.tag)) } } } diff --git a/crates/nu-cli/src/commands/from_ods.rs b/crates/nu-cli/src/commands/from_ods.rs index 118a86b7c9..0992ee1e78 100644 --- a/crates/nu-cli/src/commands/from_ods.rs +++ b/crates/nu-cli/src/commands/from_ods.rs @@ -3,7 +3,7 @@ use crate::prelude::*; use crate::TaggedListBuilder; use calamine::*; use nu_errors::ShellError; -use nu_protocol::{Primitive, ReturnSuccess, Signature, TaggedDictBuilder, UntaggedValue, Value}; +use nu_protocol::{ReturnSuccess, Signature, TaggedDictBuilder, UntaggedValue}; use std::io::Cursor; pub struct FromODS; @@ -49,67 +49,49 @@ fn from_ods( let tag = runnable_context.name; let stream = async_stream! { - let values: Vec = input.values.collect().await; + let bytes = input.collect_binary(tag.clone()).await?; + let mut buf: Cursor> = Cursor::new(bytes.item); + let mut ods = Ods::<_>::new(buf).map_err(|_| ShellError::labeled_error( + "Could not load ods file", + "could not load ods file", + &tag))?; - for value in values { - let value_span = value.tag.span; - let value_tag = value.tag.clone(); + let mut dict = TaggedDictBuilder::new(&tag); - match value.value { - UntaggedValue::Primitive(Primitive::Binary(vb)) => { - let mut buf: Cursor> = Cursor::new(vb); - let mut ods = Ods::<_>::new(buf).map_err(|_| ShellError::labeled_error( - "Could not load ods file", - "could not load ods file", - &tag))?; + let sheet_names = ods.sheet_names().to_owned(); - let mut dict = TaggedDictBuilder::new(&tag); + for sheet_name in &sheet_names { + let mut sheet_output = TaggedListBuilder::new(&tag); - let sheet_names = ods.sheet_names().to_owned(); + if let Some(Ok(current_sheet)) = ods.worksheet_range(sheet_name) { + for row in current_sheet.rows() { + let mut row_output = TaggedDictBuilder::new(&tag); + for (i, cell) in row.iter().enumerate() { + let value = match cell { + DataType::Empty => UntaggedValue::nothing(), + DataType::String(s) => UntaggedValue::string(s), + DataType::Float(f) => UntaggedValue::decimal(*f), + DataType::Int(i) => UntaggedValue::int(*i), + DataType::Bool(b) => UntaggedValue::boolean(*b), + _ => UntaggedValue::nothing(), + }; - for sheet_name in &sheet_names { - let mut sheet_output = TaggedListBuilder::new(&tag); - - if let Some(Ok(current_sheet)) = ods.worksheet_range(sheet_name) { - for row in current_sheet.rows() { - let mut row_output = TaggedDictBuilder::new(&tag); - for (i, cell) in row.iter().enumerate() { - let value = match cell { - DataType::Empty => UntaggedValue::nothing(), - DataType::String(s) => UntaggedValue::string(s), - DataType::Float(f) => UntaggedValue::decimal(*f), - DataType::Int(i) => UntaggedValue::int(*i), - DataType::Bool(b) => UntaggedValue::boolean(*b), - _ => UntaggedValue::nothing(), - }; - - row_output.insert_untagged(&format!("Column{}", i), value); - } - - sheet_output.push_untagged(row_output.into_untagged_value()); - } - - dict.insert_untagged(sheet_name, sheet_output.into_untagged_value()); - } else { - yield Err(ShellError::labeled_error( - "Could not load sheet", - "could not load sheet", - &tag)); - } + row_output.insert_untagged(&format!("Column{}", i), value); } - yield ReturnSuccess::value(dict.into_value()); + sheet_output.push_untagged(row_output.into_untagged_value()); } - _ => yield Err(ShellError::labeled_error_with_secondary( - "Expected binary data from pipeline", - "requires binary data input", - &tag, - "value originates from here", - value_tag, - )), + dict.insert_untagged(sheet_name, sheet_output.into_untagged_value()); + } else { + yield Err(ShellError::labeled_error( + "Could not load sheet", + "could not load sheet", + &tag)); } } + + yield ReturnSuccess::value(dict.into_value()); }; Ok(stream.to_output_stream()) diff --git a/crates/nu-cli/src/commands/from_sqlite.rs b/crates/nu-cli/src/commands/from_sqlite.rs index 2067ed91d0..21ec571481 100644 --- a/crates/nu-cli/src/commands/from_sqlite.rs +++ b/crates/nu-cli/src/commands/from_sqlite.rs @@ -138,40 +138,25 @@ fn from_sqlite(args: CommandArgs, registry: &CommandRegistry) -> Result = input.values.collect().await; - - for value in values { - let value_tag = &value.tag; - match value.value { - UntaggedValue::Primitive(Primitive::Binary(vb)) => - match from_sqlite_bytes_to_value(vb, tag.clone()) { - Ok(x) => match x { - Value { value: UntaggedValue::Table(list), .. } => { - for l in list { - yield ReturnSuccess::value(l); - } - } - _ => yield ReturnSuccess::value(x), - } - Err(err) => { - println!("{:?}", err); - yield Err(ShellError::labeled_error_with_secondary( - "Could not parse as SQLite", - "input cannot be parsed as SQLite", - &tag, - "value originates from here", - value_tag, - )) - } + let bytes = input.collect_binary(tag.clone()).await?; + match from_sqlite_bytes_to_value(bytes.item, tag.clone()) { + Ok(x) => match x { + Value { value: UntaggedValue::Table(list), .. } => { + for l in list { + yield ReturnSuccess::value(l); } - _ => yield Err(ShellError::labeled_error_with_secondary( - "Expected binary data from pipeline", - "requires binary data input", + } + _ => yield ReturnSuccess::value(x), + } + Err(err) => { + println!("{:?}", err); + yield Err(ShellError::labeled_error_with_secondary( + "Could not parse as SQLite", + "input cannot be parsed as SQLite", &tag, "value originates from here", - value_tag, - )), - + bytes.tag, + )) } } }; diff --git a/crates/nu-cli/src/commands/from_ssv.rs b/crates/nu-cli/src/commands/from_ssv.rs index a29b12a9ad..1284a30cba 100644 --- a/crates/nu-cli/src/commands/from_ssv.rs +++ b/crates/nu-cli/src/commands/from_ssv.rs @@ -259,45 +259,26 @@ fn from_ssv( RunnableContext { input, name, .. }: RunnableContext, ) -> Result { let stream = async_stream! { - let values: Vec = input.values.collect().await; - let mut concat_string = String::new(); - let mut latest_tag: Option = None; + let concat_string = input.collect_string(name.clone()).await?; let split_at = match minimum_spaces { Some(number) => number.item, None => DEFAULT_MINIMUM_SPACES }; - for value in values { - let value_tag = value.tag.clone(); - latest_tag = Some(value_tag.clone()); - if let Ok(s) = value.as_string() { - concat_string.push_str(&s); - } - else { - yield Err(ShellError::labeled_error_with_secondary ( - "Expected a string from pipeline", - "requires string input", - &name, - "value originates from here", - &value_tag - )) - } - } - - match from_ssv_string_to_value(&concat_string, headerless, aligned_columns, split_at, name.clone()) { + match from_ssv_string_to_value(&concat_string.item, headerless, aligned_columns, split_at, name.clone()) { Some(x) => match x { Value { value: UntaggedValue::Table(list), ..} => { for l in list { yield ReturnSuccess::value(l) } } x => yield ReturnSuccess::value(x) }, - None => if let Some(tag) = latest_tag { + None => { yield Err(ShellError::labeled_error_with_secondary( "Could not parse as SSV", "input cannot be parsed ssv", &name, "value originates from here", - &tag, + &concat_string.tag, )) }, } diff --git a/crates/nu-cli/src/commands/from_toml.rs b/crates/nu-cli/src/commands/from_toml.rs index acc035f408..5454a0163d 100644 --- a/crates/nu-cli/src/commands/from_toml.rs +++ b/crates/nu-cli/src/commands/from_toml.rs @@ -69,33 +69,11 @@ pub fn from_toml( ) -> Result { let args = args.evaluate_once(registry)?; let tag = args.name_tag(); - let name_span = tag.span; let input = args.input; let stream = async_stream! { - let values: Vec = input.values.collect().await; - - let mut concat_string = String::new(); - let mut latest_tag: Option = None; - - for value in values { - latest_tag = Some(value.tag.clone()); - let value_span = value.tag.span; - if let Ok(s) = value.as_string() { - concat_string.push_str(&s); - } - else { - yield Err(ShellError::labeled_error_with_secondary( - "Expected a string from pipeline", - "requires string input", - name_span, - "value originates from here", - value_span, - )) - } - } - - match from_toml_string_to_value(concat_string, tag.clone()) { + let concat_string = input.collect_string(tag.clone()).await?; + match from_toml_string_to_value(concat_string.item, tag.clone()) { Ok(x) => match x { Value { value: UntaggedValue::Table(list), .. } => { for l in list { @@ -104,15 +82,15 @@ pub fn from_toml( } x => yield ReturnSuccess::value(x), }, - Err(_) => if let Some(last_tag) = latest_tag { + Err(_) => { yield Err(ShellError::labeled_error_with_secondary( "Could not parse as TOML", "input cannot be parsed as TOML", &tag, "value originates from here", - last_tag, + concat_string.tag, )) - } , + } } }; diff --git a/crates/nu-cli/src/commands/from_url.rs b/crates/nu-cli/src/commands/from_url.rs index 439d2f00b0..4bcbcbf855 100644 --- a/crates/nu-cli/src/commands/from_url.rs +++ b/crates/nu-cli/src/commands/from_url.rs @@ -1,7 +1,7 @@ use crate::commands::WholeStreamCommand; use crate::prelude::*; use nu_errors::ShellError; -use nu_protocol::{ReturnSuccess, Signature, TaggedDictBuilder, UntaggedValue, Value}; +use nu_protocol::{ReturnSuccess, Signature, TaggedDictBuilder, UntaggedValue}; pub struct FromURL; @@ -30,32 +30,12 @@ impl WholeStreamCommand for FromURL { fn from_url(args: CommandArgs, registry: &CommandRegistry) -> Result { let args = args.evaluate_once(registry)?; let tag = args.name_tag(); - let name_span = tag.span; let input = args.input; let stream = async_stream! { - let values: Vec = input.values.collect().await; + let concat_string = input.collect_string(tag.clone()).await?; - let mut concat_string = String::new(); - let mut latest_tag: Option = None; - - for value in values { - latest_tag = Some(value.tag.clone()); - let value_span = value.tag.span; - if let Ok(s) = value.as_string() { - concat_string.push_str(&s); - } else { - yield Err(ShellError::labeled_error_with_secondary( - "Expected a string from pipeline", - "requires string input", - name_span, - "value originates from here", - value_span, - )) - } - } - - let result = serde_urlencoded::from_str::>(&concat_string); + let result = serde_urlencoded::from_str::>(&concat_string.item); match result { Ok(result) => { @@ -68,15 +48,13 @@ fn from_url(args: CommandArgs, registry: &CommandRegistry) -> Result { - if let Some(last_tag) = latest_tag { - yield Err(ShellError::labeled_error_with_secondary( - "String not compatible with url-encoding", - "input not url-encoded", - tag, - "value originates from here", - last_tag, - )); - } + yield Err(ShellError::labeled_error_with_secondary( + "String not compatible with url-encoding", + "input not url-encoded", + tag, + "value originates from here", + concat_string.tag, + )); } } }; diff --git a/crates/nu-cli/src/commands/from_xlsx.rs b/crates/nu-cli/src/commands/from_xlsx.rs index 5394efb0ea..dbb549f83f 100644 --- a/crates/nu-cli/src/commands/from_xlsx.rs +++ b/crates/nu-cli/src/commands/from_xlsx.rs @@ -3,7 +3,7 @@ use crate::prelude::*; use crate::TaggedListBuilder; use calamine::*; use nu_errors::ShellError; -use nu_protocol::{Primitive, ReturnSuccess, Signature, TaggedDictBuilder, UntaggedValue, Value}; +use nu_protocol::{ReturnSuccess, Signature, TaggedDictBuilder, UntaggedValue}; use std::io::Cursor; pub struct FromXLSX; @@ -49,67 +49,50 @@ fn from_xlsx( let tag = runnable_context.name; let stream = async_stream! { - let values: Vec = input.values.collect().await; + let value = input.collect_binary(tag.clone()).await?; - for value in values { - let value_span = value.tag.span; - let value_tag = value.tag.clone(); + let mut buf: Cursor> = Cursor::new(value.item); + let mut xls = Xlsx::<_>::new(buf).map_err(|_| { + ShellError::labeled_error("Could not load xlsx file", "could not load xlsx file", &tag) + })?; - match value.value { - UntaggedValue::Primitive(Primitive::Binary(vb)) => { - let mut buf: Cursor> = Cursor::new(vb); - let mut xls = Xlsx::<_>::new(buf).map_err(|_| ShellError::labeled_error( - "Could not load xlsx file", - "could not load xlsx file", - &tag))?; + let mut dict = TaggedDictBuilder::new(&tag); - let mut dict = TaggedDictBuilder::new(&tag); + let sheet_names = xls.sheet_names().to_owned(); - let sheet_names = xls.sheet_names().to_owned(); + for sheet_name in &sheet_names { + let mut sheet_output = TaggedListBuilder::new(&tag); - for sheet_name in &sheet_names { - let mut sheet_output = TaggedListBuilder::new(&tag); + if let Some(Ok(current_sheet)) = xls.worksheet_range(sheet_name) { + for row in current_sheet.rows() { + let mut row_output = TaggedDictBuilder::new(&tag); + for (i, cell) in row.iter().enumerate() { + let value = match cell { + DataType::Empty => UntaggedValue::nothing(), + DataType::String(s) => UntaggedValue::string(s), + DataType::Float(f) => UntaggedValue::decimal(*f), + DataType::Int(i) => UntaggedValue::int(*i), + DataType::Bool(b) => UntaggedValue::boolean(*b), + _ => UntaggedValue::nothing(), + }; - if let Some(Ok(current_sheet)) = xls.worksheet_range(sheet_name) { - for row in current_sheet.rows() { - let mut row_output = TaggedDictBuilder::new(&tag); - for (i, cell) in row.iter().enumerate() { - let value = match cell { - DataType::Empty => UntaggedValue::nothing(), - DataType::String(s) => UntaggedValue::string(s), - DataType::Float(f) => UntaggedValue::decimal(*f), - DataType::Int(i) => UntaggedValue::int(*i), - DataType::Bool(b) => UntaggedValue::boolean(*b), - _ => UntaggedValue::nothing(), - }; - - row_output.insert_untagged(&format!("Column{}", i), value); - } - - sheet_output.push_untagged(row_output.into_untagged_value()); - } - - dict.insert_untagged(sheet_name, sheet_output.into_untagged_value()); - } else { - yield Err(ShellError::labeled_error( - "Could not load sheet", - "could not load sheet", - &tag)); - } + row_output.insert_untagged(&format!("Column{}", i), value); } - yield ReturnSuccess::value(dict.into_value()); + sheet_output.push_untagged(row_output.into_untagged_value()); } - _ => yield Err(ShellError::labeled_error_with_secondary( - "Expected binary data from pipeline", - "requires binary data input", - &tag, - "value originates from here", - value_tag, - )), + dict.insert_untagged(sheet_name, sheet_output.into_untagged_value()); + } else { + yield Err(ShellError::labeled_error( + "Could not load sheet", + "could not load sheet", + &tag, + )); } } + + yield ReturnSuccess::value(dict.into_value()); }; Ok(stream.to_output_stream()) diff --git a/crates/nu-cli/src/commands/from_xml.rs b/crates/nu-cli/src/commands/from_xml.rs index fc22f8ac6a..10d3c762a7 100644 --- a/crates/nu-cli/src/commands/from_xml.rs +++ b/crates/nu-cli/src/commands/from_xml.rs @@ -101,34 +101,12 @@ pub fn from_xml_string_to_value(s: String, tag: impl Into) -> Result Result { let args = args.evaluate_once(registry)?; let tag = args.name_tag(); - let name_span = tag.span; let input = args.input; let stream = async_stream! { - let values: Vec = input.values.collect().await; + let concat_string = input.collect_string(tag.clone()).await?; - let mut concat_string = String::new(); - let mut latest_tag: Option = None; - - for value in values { - latest_tag = Some(value.tag.clone()); - let value_span = value.tag.span; - - if let Ok(s) = value.as_string() { - concat_string.push_str(&s); - } - else { - yield Err(ShellError::labeled_error_with_secondary( - "Expected a string from pipeline", - "requires string input", - name_span, - "value originates from here", - value_span, - )) - } - } - - match from_xml_string_to_value(concat_string, tag.clone()) { + match from_xml_string_to_value(concat_string.item, tag.clone()) { Ok(x) => match x { Value { value: UntaggedValue::Table(list), .. } => { for l in list { @@ -137,13 +115,13 @@ fn from_xml(args: CommandArgs, registry: &CommandRegistry) -> Result yield ReturnSuccess::value(x), }, - Err(_) => if let Some(last_tag) = latest_tag { + Err(_) => { yield Err(ShellError::labeled_error_with_secondary( "Could not parse as XML", "input cannot be parsed as XML", &tag, "value originates from here", - &last_tag, + &concat_string.tag, )) } , } diff --git a/crates/nu-cli/src/commands/from_yaml.rs b/crates/nu-cli/src/commands/from_yaml.rs index f8f329702a..83158eb70d 100644 --- a/crates/nu-cli/src/commands/from_yaml.rs +++ b/crates/nu-cli/src/commands/from_yaml.rs @@ -121,34 +121,12 @@ pub fn from_yaml_string_to_value(s: String, tag: impl Into) -> Result Result { let args = args.evaluate_once(registry)?; let tag = args.name_tag(); - let name_span = tag.span; let input = args.input; let stream = async_stream! { - let values: Vec = input.values.collect().await; + let concat_string = input.collect_string(tag.clone()).await?; - let mut concat_string = String::new(); - let mut latest_tag: Option = None; - - for value in values { - latest_tag = Some(value.tag.clone()); - let value_span = value.tag.span; - - if let Ok(s) = value.as_string() { - concat_string.push_str(&s); - } - else { - yield Err(ShellError::labeled_error_with_secondary( - "Expected a string from pipeline", - "requires string input", - name_span, - "value originates from here", - value_span, - )) - } - } - - match from_yaml_string_to_value(concat_string, tag.clone()) { + match from_yaml_string_to_value(concat_string.item, tag.clone()) { Ok(x) => match x { Value { value: UntaggedValue::Table(list), .. } => { for l in list { @@ -157,15 +135,15 @@ fn from_yaml(args: CommandArgs, registry: &CommandRegistry) -> Result yield ReturnSuccess::value(x), }, - Err(_) => if let Some(last_tag) = latest_tag { + Err(_) => { yield Err(ShellError::labeled_error_with_secondary( "Could not parse as YAML", "input cannot be parsed as YAML", &tag, "value originates from here", - &last_tag, + &concat_string.tag, )) - } , + } } }; diff --git a/crates/nu-cli/src/commands/lines.rs b/crates/nu-cli/src/commands/lines.rs index 2b1ca87a06..9f01c21791 100644 --- a/crates/nu-cli/src/commands/lines.rs +++ b/crates/nu-cli/src/commands/lines.rs @@ -1,8 +1,7 @@ use crate::commands::WholeStreamCommand; use crate::prelude::*; -use log::trace; use nu_errors::ShellError; -use nu_protocol::{Primitive, ReturnSuccess, Signature, UntaggedValue}; +use nu_protocol::{Primitive, ReturnSuccess, Signature, UntaggedValue, Value}; pub struct Lines; @@ -28,46 +27,90 @@ impl WholeStreamCommand for Lines { } } -// TODO: "Amount remaining" wrapper +fn ends_with_line_ending(st: &str) -> bool { + let mut temp = st.to_string(); + let last = temp.pop(); + if let Some(c) = last { + c == '\n' + } else { + false + } +} fn lines(args: CommandArgs, registry: &CommandRegistry) -> Result { let args = args.evaluate_once(registry)?; let tag = args.name_tag(); let name_span = tag.span; - let input = args.input; + let mut input = args.input; - let stream = input - .values - .map(move |v| { - if let Ok(s) = v.as_string() { - let split_result: Vec<_> = s.lines().filter(|s| s.trim() != "").collect(); + let mut leftover = vec![]; + let mut leftover_string = String::new(); + let stream = async_stream! { + loop { + match input.values.next().await { + Some(Value { value: UntaggedValue::Primitive(Primitive::String(st)), ..}) => { + let mut st = leftover_string.clone() + &st; + leftover.clear(); - trace!("split result = {:?}", split_result); + let mut lines: Vec = st.lines().map(|x| x.to_string()).collect(); - let result = split_result - .into_iter() - .map(|s| { - ReturnSuccess::value( - UntaggedValue::Primitive(Primitive::Line(s.into())) - .into_untagged_value(), - ) - }) - .collect::>(); + if !ends_with_line_ending(&st) { + if let Some(last) = lines.pop() { + leftover_string = last; + } else { + leftover_string.clear(); + } + } else { + leftover_string.clear(); + } - futures::stream::iter(result) - } else { - let value_span = v.tag.span; + let success_lines: Vec<_> = lines.iter().map(|x| ReturnSuccess::value(UntaggedValue::line(x).into_untagged_value())).collect(); + yield futures::stream::iter(success_lines) + } + Some(Value { value: UntaggedValue::Primitive(Primitive::Line(st)), ..}) => { + let mut st = leftover_string.clone() + &st; + leftover.clear(); - futures::stream::iter(vec![Err(ShellError::labeled_error_with_secondary( - "Expected a string from pipeline", - "requires string input", - name_span, - "value originates from here", - value_span, - ))]) + let mut lines: Vec = st.lines().map(|x| x.to_string()).collect(); + if !ends_with_line_ending(&st) { + if let Some(last) = lines.pop() { + leftover_string = last; + } else { + leftover_string.clear(); + } + } else { + leftover_string.clear(); + } + + let success_lines: Vec<_> = lines.iter().map(|x| ReturnSuccess::value(UntaggedValue::line(x).into_untagged_value())).collect(); + yield futures::stream::iter(success_lines) + } + Some( Value { tag: value_span, ..}) => { + yield futures::stream::iter(vec![Err(ShellError::labeled_error_with_secondary( + "Expected a string from pipeline", + "requires string input", + name_span, + "value originates from here", + value_span, + ))]); + } + None => { + if !leftover.is_empty() { + let mut st = leftover_string.clone(); + if let Ok(extra) = String::from_utf8(leftover) { + st.push_str(&extra); + } + yield futures::stream::iter(vec![ReturnSuccess::value(UntaggedValue::string(st).into_untagged_value())]) + } + break; + } } - }) - .flatten(); + } + if !leftover_string.is_empty() { + yield futures::stream::iter(vec![ReturnSuccess::value(UntaggedValue::string(leftover_string).into_untagged_value())]); + } + } + .flatten(); Ok(stream.to_output_stream()) } diff --git a/crates/nu-cli/src/stream.rs b/crates/nu-cli/src/stream.rs index 0dfe6e0e88..1d3e179d80 100644 --- a/crates/nu-cli/src/stream.rs +++ b/crates/nu-cli/src/stream.rs @@ -1,6 +1,8 @@ use crate::prelude::*; use futures::stream::iter; -use nu_protocol::{ReturnSuccess, ReturnValue, UntaggedValue, Value}; +use nu_errors::ShellError; +use nu_protocol::{Primitive, ReturnSuccess, ReturnValue, UntaggedValue, Value}; +use nu_source::{Tagged, TaggedItem}; pub struct InputStream { pub(crate) values: BoxStream<'static, Value>, @@ -27,6 +29,91 @@ impl InputStream { values: input.boxed(), } } + + pub async fn collect_string(mut self, tag: Tag) -> Result, ShellError> { + let mut bytes = vec![]; + let mut value_tag = tag.clone(); + + loop { + match self.values.next().await { + Some(Value { + value: UntaggedValue::Primitive(Primitive::String(s)), + tag: value_t, + }) => { + value_tag = value_t; + bytes.extend_from_slice(&s.into_bytes()); + } + Some(Value { + value: UntaggedValue::Primitive(Primitive::Line(s)), + tag: value_t, + }) => { + value_tag = value_t; + bytes.extend_from_slice(&s.into_bytes()); + } + Some(Value { + value: UntaggedValue::Primitive(Primitive::Binary(b)), + tag: value_t, + }) => { + value_tag = value_t; + bytes.extend_from_slice(&b); + } + Some(Value { tag: value_tag, .. }) => { + return Err(ShellError::labeled_error_with_secondary( + "Expected a string from pipeline", + "requires string input", + tag, + "value originates from here", + value_tag, + )) + } + None => break, + } + } + + match String::from_utf8(bytes) { + Ok(s) => Ok(s.tagged(value_tag.clone())), + Err(_) => Err(ShellError::labeled_error_with_secondary( + "Expected a string from pipeline", + "requires string input", + tag, + "value originates from here", + value_tag, + )), + } + } + + pub async fn collect_binary(mut self, tag: Tag) -> Result>, ShellError> { + let mut bytes = vec![]; + let mut value_tag = tag.clone(); + + loop { + match self.values.next().await { + Some(Value { + value: UntaggedValue::Primitive(Primitive::Binary(b)), + tag: value_t, + }) => { + value_tag = value_t; + bytes.extend_from_slice(&b); + } + Some(Value { + tag: value_tag, + value: v, + }) => { + println!("{:?}", v); + return Err(ShellError::labeled_error_with_secondary( + "Expected binary from pipeline", + "requires binary input", + tag, + "value originates from here", + value_tag, + )); + } + None => break, + } + } + + Ok(bytes.tagged(value_tag)) + } } impl Stream for InputStream { diff --git a/tests/shell/pipeline/commands/external.rs b/tests/shell/pipeline/commands/external.rs index 5b1dc8cf72..b324c55a28 100644 --- a/tests/shell/pipeline/commands/external.rs +++ b/tests/shell/pipeline/commands/external.rs @@ -126,6 +126,7 @@ mod stdin_evaluation { iecho yes | chop | chop + | lines | first 1 "# ));