diff --git a/crates/nu-command/src/formats/from/delimited.rs b/crates/nu-command/src/formats/from/delimited.rs index 1fdea19482..853f3bd83e 100644 --- a/crates/nu-command/src/formats/from/delimited.rs +++ b/crates/nu-command/src/formats/from/delimited.rs @@ -1,7 +1,14 @@ use csv::{ReaderBuilder, Trim}; -use nu_protocol::{IntoPipelineData, PipelineData, ShellError, Span, Value}; +use nu_protocol::{ByteStream, ListStream, PipelineData, ShellError, Span, Value}; -fn from_delimited_string_to_value( +fn from_csv_error(err: csv::Error, span: Span) -> ShellError { + ShellError::DelimiterError { + msg: err.to_string(), + span, + } +} + +fn from_delimited_stream( DelimitedReaderConfig { separator, comment, @@ -12,9 +19,15 @@ fn from_delimited_string_to_value( no_infer, trim, }: DelimitedReaderConfig, - s: String, + input: ByteStream, span: Span, -) -> Result { +) -> Result { + let input_reader = if let Some(stream) = input.reader() { + stream + } else { + return Ok(ListStream::new(std::iter::empty(), span, None)); + }; + let mut reader = ReaderBuilder::new() .has_headers(!noheaders) .flexible(flexible) @@ -23,19 +36,29 @@ fn from_delimited_string_to_value( .quote(quote as u8) .escape(escape.map(|c| c as u8)) .trim(trim) - .from_reader(s.as_bytes()); + .from_reader(input_reader); let headers = if noheaders { - (1..=reader.headers()?.len()) + (1..=reader + .headers() + .map_err(|err| from_csv_error(err, span))? + .len()) .map(|i| format!("column{i}")) .collect::>() } else { - reader.headers()?.iter().map(String::from).collect() + reader + .headers() + .map_err(|err| from_csv_error(err, span))? + .iter() + .map(String::from) + .collect() }; - let mut rows = vec![]; - for row in reader.records() { - let row = row?; + let iter = reader.into_records().map(move |row| { + let row = match row { + Ok(row) => row, + Err(err) => return Value::error(from_csv_error(err, span), span), + }; let columns = headers.iter().cloned(); let values = row .into_iter() @@ -57,10 +80,10 @@ fn from_delimited_string_to_value( // // Otherwise, if there are less values than headers, // then `Value::nothing(span)` is used to fill the remaining columns. - rows.push(Value::record(columns.zip(values).collect(), span)); - } + Value::record(columns.zip(values).collect(), span) + }); - Ok(Value::list(rows, span)) + Ok(ListStream::new(iter, span, None)) } pub(super) struct DelimitedReaderConfig { @@ -79,14 +102,27 @@ pub(super) fn from_delimited_data( input: PipelineData, name: Span, ) -> Result { - let (concat_string, _span, metadata) = input.collect_string_strict(name)?; - - Ok(from_delimited_string_to_value(config, concat_string, name) - .map_err(|x| ShellError::DelimiterError { - msg: x.to_string(), - span: name, - })? - .into_pipeline_data_with_metadata(metadata)) + match input { + PipelineData::Empty => Ok(PipelineData::Empty), + PipelineData::Value(value, metadata) => { + let string = value.into_string()?; + let byte_stream = ByteStream::read_string(string, name, None); + Ok(PipelineData::ListStream( + from_delimited_stream(config, byte_stream, name)?, + metadata, + )) + } + PipelineData::ListStream(list_stream, _) => Err(ShellError::OnlySupportsThisInputType { + exp_input_type: "string".into(), + wrong_type: "list".into(), + dst_span: name, + src_span: list_stream.span(), + }), + PipelineData::ByteStream(byte_stream, metadata) => Ok(PipelineData::ListStream( + from_delimited_stream(config, byte_stream, name)?, + metadata, + )), + } } pub fn trim_from_str(trim: Option) -> Result { diff --git a/crates/nu-command/src/formats/to/csv.rs b/crates/nu-command/src/formats/to/csv.rs index 173c6fbd6b..e7786e60cc 100644 --- a/crates/nu-command/src/formats/to/csv.rs +++ b/crates/nu-command/src/formats/to/csv.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::formats::to::delimited::to_delimited_data; use nu_engine::command_prelude::*; use nu_protocol::Config; @@ -27,26 +29,37 @@ impl Command for ToCsv { "do not output the columns names as the first row", Some('n'), ) + .named( + "columns", + SyntaxShape::List(SyntaxShape::String.into()), + "the names (in order) of the columns to use", + None, + ) .category(Category::Formats) } fn examples(&self) -> Vec { vec![ Example { - description: "Outputs an CSV string representing the contents of this table", + description: "Outputs a CSV string representing the contents of this table", example: "[[foo bar]; [1 2]] | to csv", result: Some(Value::test_string("foo,bar\n1,2\n")), }, Example { - description: "Outputs an CSV string representing the contents of this table", + description: "Outputs a CSV string representing the contents of this table", example: "[[foo bar]; [1 2]] | to csv --separator ';' ", result: Some(Value::test_string("foo;bar\n1;2\n")), }, Example { - description: "Outputs an CSV string representing the contents of this record", + description: "Outputs a CSV string representing the contents of this record", example: "{a: 1 b: 2} | to csv", result: Some(Value::test_string("a,b\n1,2\n")), }, + Example { + description: "Outputs a CSV stream with column names pre-determined", + example: "[[foo bar baz]; [1 2 3]] | to csv --columns [baz foo]", + result: Some(Value::test_string("baz,foo\n3,1\n")), + }, ] } @@ -64,8 +77,9 @@ impl Command for ToCsv { let head = call.head; let noheaders = call.has_flag(engine_state, stack, "noheaders")?; let separator: Option> = call.get_flag(engine_state, stack, "separator")?; - let config = engine_state.get_config(); - to_csv(input, noheaders, separator, head, config) + let columns: Option> = call.get_flag(engine_state, stack, "columns")?; + let config = engine_state.config.clone(); + to_csv(input, noheaders, separator, columns, head, config) } } @@ -73,13 +87,14 @@ fn to_csv( input: PipelineData, noheaders: bool, separator: Option>, + columns: Option>, head: Span, - config: &Config, + config: Arc, ) -> Result { let sep = match separator { Some(Spanned { item: s, span, .. }) => { if s == r"\t" { - '\t' + Spanned { item: '\t', span } } else { let vec_s: Vec = s.chars().collect(); if vec_s.len() != 1 { @@ -89,13 +104,19 @@ fn to_csv( span, }); }; - vec_s[0] + Spanned { + item: vec_s[0], + span: head, + } } } - _ => ',', + _ => Spanned { + item: ',', + span: head, + }, }; - to_delimited_data(noheaders, sep, "CSV", input, head, config) + to_delimited_data(noheaders, sep, columns, "CSV", input, head, config) } #[cfg(test)] diff --git a/crates/nu-command/src/formats/to/delimited.rs b/crates/nu-command/src/formats/to/delimited.rs index 490983d67b..a7a2480a34 100644 --- a/crates/nu-command/src/formats/to/delimited.rs +++ b/crates/nu-command/src/formats/to/delimited.rs @@ -1,113 +1,31 @@ -use csv::{Writer, WriterBuilder}; +use csv::WriterBuilder; use nu_cmd_base::formats::to::delimited::merge_descriptors; -use nu_protocol::{Config, IntoPipelineData, PipelineData, Record, ShellError, Span, Value}; -use std::{collections::VecDeque, error::Error}; +use nu_protocol::{ + ByteStream, ByteStreamType, Config, PipelineData, ShellError, Span, Spanned, Value, +}; +use std::{iter, sync::Arc}; -fn from_value_to_delimited_string( - value: &Value, - separator: char, - config: &Config, - head: Span, -) -> Result { - let span = value.span(); - match value { - Value::Record { val, .. } => record_to_delimited(val, span, separator, config, head), - Value::List { vals, .. } => table_to_delimited(vals, span, separator, config, head), - // Propagate errors by explicitly matching them before the final case. - Value::Error { error, .. } => Err(*error.clone()), - v => Err(make_unsupported_input_error(v, head, v.span())), - } -} - -fn record_to_delimited( - record: &Record, - span: Span, - separator: char, - config: &Config, - head: Span, -) -> Result { - let mut wtr = WriterBuilder::new() - .delimiter(separator as u8) - .from_writer(vec![]); - let mut fields: VecDeque = VecDeque::new(); - let mut values: VecDeque = VecDeque::new(); - - for (k, v) in record { - fields.push_back(k.clone()); - - values.push_back(to_string_tagged_value(v, config, head, span)?); - } - - wtr.write_record(fields).expect("can not write."); - wtr.write_record(values).expect("can not write."); - - writer_to_string(wtr).map_err(|_| make_conversion_error("record", span)) -} - -fn table_to_delimited( - vals: &[Value], - span: Span, - separator: char, - config: &Config, - head: Span, -) -> Result { - if let Some(val) = find_non_record(vals) { - return Err(make_unsupported_input_error(val, head, span)); - } - - let mut wtr = WriterBuilder::new() - .delimiter(separator as u8) - .from_writer(vec![]); - - let merged_descriptors = merge_descriptors(vals); - - if merged_descriptors.is_empty() { - let vals = vals - .iter() - .map(|ele| { - to_string_tagged_value(ele, config, head, span).unwrap_or_else(|_| String::new()) - }) - .collect::>(); - wtr.write_record(vals).expect("can not write"); - } else { - wtr.write_record(merged_descriptors.iter().map(|item| &item[..])) - .expect("can not write."); - - for l in vals { - // should always be true because of `find_non_record` above - if let Value::Record { val: l, .. } = l { - let mut row = vec![]; - for desc in &merged_descriptors { - row.push(match l.get(desc) { - Some(s) => to_string_tagged_value(s, config, head, span)?, - None => String::new(), - }); - } - wtr.write_record(&row).expect("can not write"); - } +fn make_csv_error(error: csv::Error, format_name: &str, head: Span) -> ShellError { + if let csv::ErrorKind::Io(error) = error.kind() { + ShellError::IOErrorSpanned { + msg: error.to_string(), + span: head, + } + } else { + ShellError::GenericError { + error: format!("Failed to generate {format_name} data"), + msg: error.to_string(), + span: Some(head), + help: None, + inner: vec![], } - } - writer_to_string(wtr).map_err(|_| make_conversion_error("table", span)) -} - -fn writer_to_string(writer: Writer>) -> Result> { - Ok(String::from_utf8(writer.into_inner()?)?) -} - -fn make_conversion_error(type_from: &str, span: Span) -> ShellError { - ShellError::CantConvert { - to_type: type_from.to_string(), - from_type: "string".to_string(), - span, - help: None, } } fn to_string_tagged_value( v: &Value, config: &Config, - span: Span, - head: Span, + format_name: &'static str, ) -> Result { match &v { Value::String { .. } @@ -123,50 +41,124 @@ fn to_string_tagged_value( Value::Nothing { .. } => Ok(String::new()), // Propagate existing errors Value::Error { error, .. } => Err(*error.clone()), - _ => Err(make_unsupported_input_error(v, head, span)), + _ => Err(make_cant_convert_error(v, format_name)), } } -fn make_unsupported_input_error(value: &Value, head: Span, span: Span) -> ShellError { +fn make_unsupported_input_error( + r#type: impl std::fmt::Display, + head: Span, + span: Span, +) -> ShellError { ShellError::UnsupportedInput { - msg: "Unexpected type".to_string(), - input: format!("input type: {:?}", value.get_type()), + msg: "expected table or record".to_string(), + input: format!("input type: {}", r#type), msg_span: head, input_span: span, } } -pub fn find_non_record(values: &[Value]) -> Option<&Value> { - values - .iter() - .find(|val| !matches!(val, Value::Record { .. })) +fn make_cant_convert_error(value: &Value, format_name: &'static str) -> ShellError { + ShellError::CantConvert { + to_type: "string".into(), + from_type: value.get_type().to_string(), + span: value.span(), + help: Some(format!( + "only simple values are supported for {format_name} output" + )), + } } pub fn to_delimited_data( noheaders: bool, - sep: char, + separator: Spanned, + columns: Option>, format_name: &'static str, input: PipelineData, - span: Span, - config: &Config, + head: Span, + config: Arc, ) -> Result { - let value = input.into_value(span)?; - let output = match from_value_to_delimited_string(&value, sep, config, span) { - Ok(mut x) => { - if noheaders { - if let Some(second_line) = x.find('\n') { - let start = second_line + 1; - x.replace_range(0..start, ""); - } - } - Ok(x) + let mut input = input; + let span = input.span().unwrap_or(head); + let metadata = input.metadata(); + + let separator = u8::try_from(separator.item).map_err(|_| ShellError::IncorrectValue { + msg: "separator must be an ASCII character".into(), + val_span: separator.span, + call_span: head, + })?; + + // Check to ensure the input is likely one of our supported types first. We can't check a stream + // without consuming it though + match input { + PipelineData::Value(Value::List { .. } | Value::Record { .. }, _) => (), + PipelineData::Value(Value::Error { error, .. }, _) => return Err(*error), + PipelineData::Value(other, _) => { + return Err(make_unsupported_input_error(other.get_type(), head, span)) } - Err(_) => Err(ShellError::CantConvert { - to_type: format_name.into(), - from_type: value.get_type().to_string(), - span: value.span(), - help: None, - }), - }?; - Ok(Value::string(output, span).into_pipeline_data()) + PipelineData::ByteStream(..) => { + return Err(make_unsupported_input_error("byte stream", head, span)) + } + PipelineData::ListStream(..) => (), + PipelineData::Empty => (), + } + + // Determine the columns we'll use. This is necessary even if we don't write the header row, + // because we need to write consistent columns. + let columns = match columns { + Some(columns) => columns, + None => { + // The columns were not provided. We need to detect them, and in order to do so, we have + // to convert the input into a value first, so that we can find all of them + let value = input.into_value(span)?; + let columns = match &value { + Value::List { vals, .. } => merge_descriptors(vals), + Value::Record { val, .. } => val.columns().cloned().collect(), + _ => return Err(make_unsupported_input_error(value.get_type(), head, span)), + }; + input = PipelineData::Value(value, metadata.clone()); + columns + } + }; + + // Generate a byte stream of all of the values in the pipeline iterator, with a non-strict + // iterator so we can still accept plain records. + let mut iter = input.into_iter(); + + // If we're configured to generate a header, we generate it first, then set this false + let mut is_header = !noheaders; + + let stream = ByteStream::from_fn(head, None, ByteStreamType::String, move |buffer| { + let mut wtr = WriterBuilder::new() + .delimiter(separator) + .from_writer(buffer); + + if is_header { + // Unless we are configured not to write a header, we write the header row now, once, + // before everything else. + wtr.write_record(&columns) + .map_err(|err| make_csv_error(err, format_name, head))?; + is_header = false; + Ok(true) + } else if let Some(row) = iter.next() { + // Write each column of a normal row, in order + let record = row.into_record()?; + for column in &columns { + let field = record + .get(column) + .map(|v| to_string_tagged_value(v, &config, format_name)) + .unwrap_or(Ok(String::new()))?; + wtr.write_field(field) + .map_err(|err| make_csv_error(err, format_name, head))?; + } + // End the row + wtr.write_record(iter::empty::()) + .map_err(|err| make_csv_error(err, format_name, head))?; + Ok(true) + } else { + Ok(false) + } + }); + + Ok(PipelineData::ByteStream(stream, metadata)) } diff --git a/crates/nu-command/src/formats/to/tsv.rs b/crates/nu-command/src/formats/to/tsv.rs index eeaeb6b401..edc9592409 100644 --- a/crates/nu-command/src/formats/to/tsv.rs +++ b/crates/nu-command/src/formats/to/tsv.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::formats::to::delimited::to_delimited_data; use nu_engine::command_prelude::*; use nu_protocol::Config; @@ -21,6 +23,12 @@ impl Command for ToTsv { "do not output the column names as the first row", Some('n'), ) + .named( + "columns", + SyntaxShape::List(SyntaxShape::String.into()), + "the names (in order) of the columns to use", + None, + ) .category(Category::Formats) } @@ -31,15 +39,20 @@ impl Command for ToTsv { fn examples(&self) -> Vec { vec![ Example { - description: "Outputs an TSV string representing the contents of this table", + description: "Outputs a TSV string representing the contents of this table", example: "[[foo bar]; [1 2]] | to tsv", result: Some(Value::test_string("foo\tbar\n1\t2\n")), }, Example { - description: "Outputs an TSV string representing the contents of this record", + description: "Outputs a TSV string representing the contents of this record", example: "{a: 1 b: 2} | to tsv", result: Some(Value::test_string("a\tb\n1\t2\n")), }, + Example { + description: "Outputs a TSV stream with column names pre-determined", + example: "[[foo bar baz]; [1 2 3]] | to tsv --columns [baz foo]", + result: Some(Value::test_string("baz\tfoo\n3\t1\n")), + }, ] } @@ -52,18 +65,24 @@ impl Command for ToTsv { ) -> Result { let head = call.head; let noheaders = call.has_flag(engine_state, stack, "noheaders")?; - let config = engine_state.get_config(); - to_tsv(input, noheaders, head, config) + let columns: Option> = call.get_flag(engine_state, stack, "columns")?; + let config = engine_state.config.clone(); + to_tsv(input, noheaders, columns, head, config) } } fn to_tsv( input: PipelineData, noheaders: bool, + columns: Option>, head: Span, - config: &Config, + config: Arc, ) -> Result { - to_delimited_data(noheaders, '\t', "TSV", input, head, config) + let sep = Spanned { + item: '\t', + span: head, + }; + to_delimited_data(noheaders, sep, columns, "TSV", input, head, config) } #[cfg(test)]