From 7de513a4e04d4654314a02c60a1a415fc5f348a0 Mon Sep 17 00:00:00 2001 From: Devyn Cairns Date: Wed, 22 May 2024 09:55:24 -0700 Subject: [PATCH] Implement streaming I/O for CSV and TSV commands (#12918) # Description Implements streaming for: - `from csv` - `from tsv` - `to csv` - `to tsv` via the new string-typed ByteStream support. # User-Facing Changes Commands above. Also: - `to csv` and `to tsv` now have `--columns `, to provide the exact columns desired in the output. This is required for them to have streaming output, because otherwise collecting the entire list is necessary to determine the output columns. If we introduce `TableStream`, this may become less necessary. # Tests + Formatting - :green_circle: `toolkit fmt` - :green_circle: `toolkit clippy` - :green_circle: `toolkit test` - :green_circle: `toolkit test stdlib` # After Submitting - [ ] release notes --------- Co-authored-by: Ian Manske --- .../nu-command/src/formats/from/delimited.rs | 78 ++++-- crates/nu-command/src/formats/to/csv.rs | 41 ++- crates/nu-command/src/formats/to/delimited.rs | 252 +++++++++--------- crates/nu-command/src/formats/to/tsv.rs | 31 ++- 4 files changed, 235 insertions(+), 167 deletions(-) diff --git a/crates/nu-command/src/formats/from/delimited.rs b/crates/nu-command/src/formats/from/delimited.rs index 1fdea19482..853f3bd83e 100644 --- a/crates/nu-command/src/formats/from/delimited.rs +++ b/crates/nu-command/src/formats/from/delimited.rs @@ -1,7 +1,14 @@ use csv::{ReaderBuilder, Trim}; -use nu_protocol::{IntoPipelineData, PipelineData, ShellError, Span, Value}; +use nu_protocol::{ByteStream, ListStream, PipelineData, ShellError, Span, Value}; -fn from_delimited_string_to_value( +fn from_csv_error(err: csv::Error, span: Span) -> ShellError { + ShellError::DelimiterError { + msg: err.to_string(), + span, + } +} + +fn from_delimited_stream( DelimitedReaderConfig { separator, comment, @@ -12,9 +19,15 @@ fn from_delimited_string_to_value( no_infer, trim, }: DelimitedReaderConfig, - s: String, + input: ByteStream, span: Span, -) -> Result { +) -> Result { + let input_reader = if let Some(stream) = input.reader() { + stream + } else { + return Ok(ListStream::new(std::iter::empty(), span, None)); + }; + let mut reader = ReaderBuilder::new() .has_headers(!noheaders) .flexible(flexible) @@ -23,19 +36,29 @@ fn from_delimited_string_to_value( .quote(quote as u8) .escape(escape.map(|c| c as u8)) .trim(trim) - .from_reader(s.as_bytes()); + .from_reader(input_reader); let headers = if noheaders { - (1..=reader.headers()?.len()) + (1..=reader + .headers() + .map_err(|err| from_csv_error(err, span))? + .len()) .map(|i| format!("column{i}")) .collect::>() } else { - reader.headers()?.iter().map(String::from).collect() + reader + .headers() + .map_err(|err| from_csv_error(err, span))? + .iter() + .map(String::from) + .collect() }; - let mut rows = vec![]; - for row in reader.records() { - let row = row?; + let iter = reader.into_records().map(move |row| { + let row = match row { + Ok(row) => row, + Err(err) => return Value::error(from_csv_error(err, span), span), + }; let columns = headers.iter().cloned(); let values = row .into_iter() @@ -57,10 +80,10 @@ fn from_delimited_string_to_value( // // Otherwise, if there are less values than headers, // then `Value::nothing(span)` is used to fill the remaining columns. - rows.push(Value::record(columns.zip(values).collect(), span)); - } + Value::record(columns.zip(values).collect(), span) + }); - Ok(Value::list(rows, span)) + Ok(ListStream::new(iter, span, None)) } pub(super) struct DelimitedReaderConfig { @@ -79,14 +102,27 @@ pub(super) fn from_delimited_data( input: PipelineData, name: Span, ) -> Result { - let (concat_string, _span, metadata) = input.collect_string_strict(name)?; - - Ok(from_delimited_string_to_value(config, concat_string, name) - .map_err(|x| ShellError::DelimiterError { - msg: x.to_string(), - span: name, - })? - .into_pipeline_data_with_metadata(metadata)) + match input { + PipelineData::Empty => Ok(PipelineData::Empty), + PipelineData::Value(value, metadata) => { + let string = value.into_string()?; + let byte_stream = ByteStream::read_string(string, name, None); + Ok(PipelineData::ListStream( + from_delimited_stream(config, byte_stream, name)?, + metadata, + )) + } + PipelineData::ListStream(list_stream, _) => Err(ShellError::OnlySupportsThisInputType { + exp_input_type: "string".into(), + wrong_type: "list".into(), + dst_span: name, + src_span: list_stream.span(), + }), + PipelineData::ByteStream(byte_stream, metadata) => Ok(PipelineData::ListStream( + from_delimited_stream(config, byte_stream, name)?, + metadata, + )), + } } pub fn trim_from_str(trim: Option) -> Result { diff --git a/crates/nu-command/src/formats/to/csv.rs b/crates/nu-command/src/formats/to/csv.rs index 173c6fbd6b..e7786e60cc 100644 --- a/crates/nu-command/src/formats/to/csv.rs +++ b/crates/nu-command/src/formats/to/csv.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::formats::to::delimited::to_delimited_data; use nu_engine::command_prelude::*; use nu_protocol::Config; @@ -27,26 +29,37 @@ impl Command for ToCsv { "do not output the columns names as the first row", Some('n'), ) + .named( + "columns", + SyntaxShape::List(SyntaxShape::String.into()), + "the names (in order) of the columns to use", + None, + ) .category(Category::Formats) } fn examples(&self) -> Vec { vec![ Example { - description: "Outputs an CSV string representing the contents of this table", + description: "Outputs a CSV string representing the contents of this table", example: "[[foo bar]; [1 2]] | to csv", result: Some(Value::test_string("foo,bar\n1,2\n")), }, Example { - description: "Outputs an CSV string representing the contents of this table", + description: "Outputs a CSV string representing the contents of this table", example: "[[foo bar]; [1 2]] | to csv --separator ';' ", result: Some(Value::test_string("foo;bar\n1;2\n")), }, Example { - description: "Outputs an CSV string representing the contents of this record", + description: "Outputs a CSV string representing the contents of this record", example: "{a: 1 b: 2} | to csv", result: Some(Value::test_string("a,b\n1,2\n")), }, + Example { + description: "Outputs a CSV stream with column names pre-determined", + example: "[[foo bar baz]; [1 2 3]] | to csv --columns [baz foo]", + result: Some(Value::test_string("baz,foo\n3,1\n")), + }, ] } @@ -64,8 +77,9 @@ impl Command for ToCsv { let head = call.head; let noheaders = call.has_flag(engine_state, stack, "noheaders")?; let separator: Option> = call.get_flag(engine_state, stack, "separator")?; - let config = engine_state.get_config(); - to_csv(input, noheaders, separator, head, config) + let columns: Option> = call.get_flag(engine_state, stack, "columns")?; + let config = engine_state.config.clone(); + to_csv(input, noheaders, separator, columns, head, config) } } @@ -73,13 +87,14 @@ fn to_csv( input: PipelineData, noheaders: bool, separator: Option>, + columns: Option>, head: Span, - config: &Config, + config: Arc, ) -> Result { let sep = match separator { Some(Spanned { item: s, span, .. }) => { if s == r"\t" { - '\t' + Spanned { item: '\t', span } } else { let vec_s: Vec = s.chars().collect(); if vec_s.len() != 1 { @@ -89,13 +104,19 @@ fn to_csv( span, }); }; - vec_s[0] + Spanned { + item: vec_s[0], + span: head, + } } } - _ => ',', + _ => Spanned { + item: ',', + span: head, + }, }; - to_delimited_data(noheaders, sep, "CSV", input, head, config) + to_delimited_data(noheaders, sep, columns, "CSV", input, head, config) } #[cfg(test)] diff --git a/crates/nu-command/src/formats/to/delimited.rs b/crates/nu-command/src/formats/to/delimited.rs index 490983d67b..a7a2480a34 100644 --- a/crates/nu-command/src/formats/to/delimited.rs +++ b/crates/nu-command/src/formats/to/delimited.rs @@ -1,113 +1,31 @@ -use csv::{Writer, WriterBuilder}; +use csv::WriterBuilder; use nu_cmd_base::formats::to::delimited::merge_descriptors; -use nu_protocol::{Config, IntoPipelineData, PipelineData, Record, ShellError, Span, Value}; -use std::{collections::VecDeque, error::Error}; +use nu_protocol::{ + ByteStream, ByteStreamType, Config, PipelineData, ShellError, Span, Spanned, Value, +}; +use std::{iter, sync::Arc}; -fn from_value_to_delimited_string( - value: &Value, - separator: char, - config: &Config, - head: Span, -) -> Result { - let span = value.span(); - match value { - Value::Record { val, .. } => record_to_delimited(val, span, separator, config, head), - Value::List { vals, .. } => table_to_delimited(vals, span, separator, config, head), - // Propagate errors by explicitly matching them before the final case. - Value::Error { error, .. } => Err(*error.clone()), - v => Err(make_unsupported_input_error(v, head, v.span())), - } -} - -fn record_to_delimited( - record: &Record, - span: Span, - separator: char, - config: &Config, - head: Span, -) -> Result { - let mut wtr = WriterBuilder::new() - .delimiter(separator as u8) - .from_writer(vec![]); - let mut fields: VecDeque = VecDeque::new(); - let mut values: VecDeque = VecDeque::new(); - - for (k, v) in record { - fields.push_back(k.clone()); - - values.push_back(to_string_tagged_value(v, config, head, span)?); - } - - wtr.write_record(fields).expect("can not write."); - wtr.write_record(values).expect("can not write."); - - writer_to_string(wtr).map_err(|_| make_conversion_error("record", span)) -} - -fn table_to_delimited( - vals: &[Value], - span: Span, - separator: char, - config: &Config, - head: Span, -) -> Result { - if let Some(val) = find_non_record(vals) { - return Err(make_unsupported_input_error(val, head, span)); - } - - let mut wtr = WriterBuilder::new() - .delimiter(separator as u8) - .from_writer(vec![]); - - let merged_descriptors = merge_descriptors(vals); - - if merged_descriptors.is_empty() { - let vals = vals - .iter() - .map(|ele| { - to_string_tagged_value(ele, config, head, span).unwrap_or_else(|_| String::new()) - }) - .collect::>(); - wtr.write_record(vals).expect("can not write"); - } else { - wtr.write_record(merged_descriptors.iter().map(|item| &item[..])) - .expect("can not write."); - - for l in vals { - // should always be true because of `find_non_record` above - if let Value::Record { val: l, .. } = l { - let mut row = vec![]; - for desc in &merged_descriptors { - row.push(match l.get(desc) { - Some(s) => to_string_tagged_value(s, config, head, span)?, - None => String::new(), - }); - } - wtr.write_record(&row).expect("can not write"); - } +fn make_csv_error(error: csv::Error, format_name: &str, head: Span) -> ShellError { + if let csv::ErrorKind::Io(error) = error.kind() { + ShellError::IOErrorSpanned { + msg: error.to_string(), + span: head, + } + } else { + ShellError::GenericError { + error: format!("Failed to generate {format_name} data"), + msg: error.to_string(), + span: Some(head), + help: None, + inner: vec![], } - } - writer_to_string(wtr).map_err(|_| make_conversion_error("table", span)) -} - -fn writer_to_string(writer: Writer>) -> Result> { - Ok(String::from_utf8(writer.into_inner()?)?) -} - -fn make_conversion_error(type_from: &str, span: Span) -> ShellError { - ShellError::CantConvert { - to_type: type_from.to_string(), - from_type: "string".to_string(), - span, - help: None, } } fn to_string_tagged_value( v: &Value, config: &Config, - span: Span, - head: Span, + format_name: &'static str, ) -> Result { match &v { Value::String { .. } @@ -123,50 +41,124 @@ fn to_string_tagged_value( Value::Nothing { .. } => Ok(String::new()), // Propagate existing errors Value::Error { error, .. } => Err(*error.clone()), - _ => Err(make_unsupported_input_error(v, head, span)), + _ => Err(make_cant_convert_error(v, format_name)), } } -fn make_unsupported_input_error(value: &Value, head: Span, span: Span) -> ShellError { +fn make_unsupported_input_error( + r#type: impl std::fmt::Display, + head: Span, + span: Span, +) -> ShellError { ShellError::UnsupportedInput { - msg: "Unexpected type".to_string(), - input: format!("input type: {:?}", value.get_type()), + msg: "expected table or record".to_string(), + input: format!("input type: {}", r#type), msg_span: head, input_span: span, } } -pub fn find_non_record(values: &[Value]) -> Option<&Value> { - values - .iter() - .find(|val| !matches!(val, Value::Record { .. })) +fn make_cant_convert_error(value: &Value, format_name: &'static str) -> ShellError { + ShellError::CantConvert { + to_type: "string".into(), + from_type: value.get_type().to_string(), + span: value.span(), + help: Some(format!( + "only simple values are supported for {format_name} output" + )), + } } pub fn to_delimited_data( noheaders: bool, - sep: char, + separator: Spanned, + columns: Option>, format_name: &'static str, input: PipelineData, - span: Span, - config: &Config, + head: Span, + config: Arc, ) -> Result { - let value = input.into_value(span)?; - let output = match from_value_to_delimited_string(&value, sep, config, span) { - Ok(mut x) => { - if noheaders { - if let Some(second_line) = x.find('\n') { - let start = second_line + 1; - x.replace_range(0..start, ""); - } - } - Ok(x) + let mut input = input; + let span = input.span().unwrap_or(head); + let metadata = input.metadata(); + + let separator = u8::try_from(separator.item).map_err(|_| ShellError::IncorrectValue { + msg: "separator must be an ASCII character".into(), + val_span: separator.span, + call_span: head, + })?; + + // Check to ensure the input is likely one of our supported types first. We can't check a stream + // without consuming it though + match input { + PipelineData::Value(Value::List { .. } | Value::Record { .. }, _) => (), + PipelineData::Value(Value::Error { error, .. }, _) => return Err(*error), + PipelineData::Value(other, _) => { + return Err(make_unsupported_input_error(other.get_type(), head, span)) } - Err(_) => Err(ShellError::CantConvert { - to_type: format_name.into(), - from_type: value.get_type().to_string(), - span: value.span(), - help: None, - }), - }?; - Ok(Value::string(output, span).into_pipeline_data()) + PipelineData::ByteStream(..) => { + return Err(make_unsupported_input_error("byte stream", head, span)) + } + PipelineData::ListStream(..) => (), + PipelineData::Empty => (), + } + + // Determine the columns we'll use. This is necessary even if we don't write the header row, + // because we need to write consistent columns. + let columns = match columns { + Some(columns) => columns, + None => { + // The columns were not provided. We need to detect them, and in order to do so, we have + // to convert the input into a value first, so that we can find all of them + let value = input.into_value(span)?; + let columns = match &value { + Value::List { vals, .. } => merge_descriptors(vals), + Value::Record { val, .. } => val.columns().cloned().collect(), + _ => return Err(make_unsupported_input_error(value.get_type(), head, span)), + }; + input = PipelineData::Value(value, metadata.clone()); + columns + } + }; + + // Generate a byte stream of all of the values in the pipeline iterator, with a non-strict + // iterator so we can still accept plain records. + let mut iter = input.into_iter(); + + // If we're configured to generate a header, we generate it first, then set this false + let mut is_header = !noheaders; + + let stream = ByteStream::from_fn(head, None, ByteStreamType::String, move |buffer| { + let mut wtr = WriterBuilder::new() + .delimiter(separator) + .from_writer(buffer); + + if is_header { + // Unless we are configured not to write a header, we write the header row now, once, + // before everything else. + wtr.write_record(&columns) + .map_err(|err| make_csv_error(err, format_name, head))?; + is_header = false; + Ok(true) + } else if let Some(row) = iter.next() { + // Write each column of a normal row, in order + let record = row.into_record()?; + for column in &columns { + let field = record + .get(column) + .map(|v| to_string_tagged_value(v, &config, format_name)) + .unwrap_or(Ok(String::new()))?; + wtr.write_field(field) + .map_err(|err| make_csv_error(err, format_name, head))?; + } + // End the row + wtr.write_record(iter::empty::()) + .map_err(|err| make_csv_error(err, format_name, head))?; + Ok(true) + } else { + Ok(false) + } + }); + + Ok(PipelineData::ByteStream(stream, metadata)) } diff --git a/crates/nu-command/src/formats/to/tsv.rs b/crates/nu-command/src/formats/to/tsv.rs index eeaeb6b401..edc9592409 100644 --- a/crates/nu-command/src/formats/to/tsv.rs +++ b/crates/nu-command/src/formats/to/tsv.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::formats::to::delimited::to_delimited_data; use nu_engine::command_prelude::*; use nu_protocol::Config; @@ -21,6 +23,12 @@ impl Command for ToTsv { "do not output the column names as the first row", Some('n'), ) + .named( + "columns", + SyntaxShape::List(SyntaxShape::String.into()), + "the names (in order) of the columns to use", + None, + ) .category(Category::Formats) } @@ -31,15 +39,20 @@ impl Command for ToTsv { fn examples(&self) -> Vec { vec![ Example { - description: "Outputs an TSV string representing the contents of this table", + description: "Outputs a TSV string representing the contents of this table", example: "[[foo bar]; [1 2]] | to tsv", result: Some(Value::test_string("foo\tbar\n1\t2\n")), }, Example { - description: "Outputs an TSV string representing the contents of this record", + description: "Outputs a TSV string representing the contents of this record", example: "{a: 1 b: 2} | to tsv", result: Some(Value::test_string("a\tb\n1\t2\n")), }, + Example { + description: "Outputs a TSV stream with column names pre-determined", + example: "[[foo bar baz]; [1 2 3]] | to tsv --columns [baz foo]", + result: Some(Value::test_string("baz\tfoo\n3\t1\n")), + }, ] } @@ -52,18 +65,24 @@ impl Command for ToTsv { ) -> Result { let head = call.head; let noheaders = call.has_flag(engine_state, stack, "noheaders")?; - let config = engine_state.get_config(); - to_tsv(input, noheaders, head, config) + let columns: Option> = call.get_flag(engine_state, stack, "columns")?; + let config = engine_state.config.clone(); + to_tsv(input, noheaders, columns, head, config) } } fn to_tsv( input: PipelineData, noheaders: bool, + columns: Option>, head: Span, - config: &Config, + config: Arc, ) -> Result { - to_delimited_data(noheaders, '\t', "TSV", input, head, config) + let sep = Spanned { + item: '\t', + span: head, + }; + to_delimited_data(noheaders, sep, columns, "TSV", input, head, config) } #[cfg(test)]