diff --git a/Cargo.lock b/Cargo.lock index 5c5b83df8..0ddec0d76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -167,12 +167,14 @@ dependencies = [ "indexmap", "json-deserializer", "lexical-core", + "lz4", "multiversion", "num-traits", "parquet2", "simdutf8", "streaming-iterator", "strength_reduce", + "zstd", ] [[package]] diff --git a/crates/nu-command/Cargo.toml b/crates/nu-command/Cargo.toml index 9915f3e71..6b1c1179e 100644 --- a/crates/nu-command/Cargo.toml +++ b/crates/nu-command/Cargo.toml @@ -115,6 +115,7 @@ features = [ "dtype-struct", "dtype-categorical", "dynamic_groupby", + "ipc", "is_in", "json", "lazy", diff --git a/crates/nu-command/src/dataframe/eager/mod.rs b/crates/nu-command/src/dataframe/eager/mod.rs index 1b7442ac3..bbeb55103 100644 --- a/crates/nu-command/src/dataframe/eager/mod.rs +++ b/crates/nu-command/src/dataframe/eager/mod.rs @@ -18,6 +18,7 @@ mod sample; mod shape; mod slice; mod take; +mod to_arrow; mod to_csv; mod to_df; mod to_nu; @@ -46,6 +47,7 @@ pub use sample::SampleDF; pub use shape::ShapeDF; pub use slice::SliceDF; pub use take::TakeDF; +pub use to_arrow::ToArrow; pub use to_csv::ToCSV; pub use to_df::ToDataFrame; pub use to_nu::ToNu; @@ -84,6 +86,7 @@ pub fn add_eager_decls(working_set: &mut StateWorkingSet) { ShapeDF, SliceDF, TakeDF, + ToArrow, ToCSV, ToDataFrame, ToNu, diff --git a/crates/nu-command/src/dataframe/eager/open.rs b/crates/nu-command/src/dataframe/eager/open.rs index de2004800..4c9d4e286 100644 --- a/crates/nu-command/src/dataframe/eager/open.rs +++ b/crates/nu-command/src/dataframe/eager/open.rs @@ -9,8 +9,8 @@ use nu_protocol::{ use std::{fs::File, io::BufReader, path::PathBuf}; use polars::prelude::{ - CsvEncoding, CsvReader, JsonReader, LazyCsvReader, LazyFrame, ParallelStrategy, ParquetReader, - ScanArgsParquet, SerReader, + CsvEncoding, CsvReader, IpcReader, JsonReader, LazyCsvReader, LazyFrame, ParallelStrategy, + ParquetReader, ScanArgsIpc, ScanArgsParquet, SerReader, }; #[derive(Clone)] @@ -22,7 +22,7 @@ impl Command for OpenDataFrame { } fn usage(&self) -> &str { - "Opens csv, json or parquet file to create dataframe" + "Opens csv, json, arrow, or parquet file to create dataframe" } fn signature(&self) -> Signature { @@ -33,6 +33,12 @@ impl Command for OpenDataFrame { "file path to load values from", ) .switch("lazy", "creates a lazy dataframe", Some('l')) + .named( + "type", + SyntaxShape::String, + "File type: csv, tsv, json, parquet, arrow. If omitted, derive from file extension", + Some('t'), + ) .named( "delimiter", SyntaxShape::String, @@ -93,15 +99,33 @@ fn command( ) -> Result { let file: Spanned = call.req(engine_state, stack, 0)?; - match file.item.extension() { - Some(e) => match e.to_str() { - Some("csv") | Some("tsv") => from_csv(engine_state, stack, call), - Some("parquet") => from_parquet(engine_state, stack, call), - Some("json") => from_json(engine_state, stack, call), - _ => Err(ShellError::FileNotFoundCustom( - "Not a csv, tsv, parquet or json file".into(), + let type_option: Option> = call.get_flag(engine_state, stack, "type")?; + + let type_id = match &type_option { + Some(ref t) => Some((t.item.to_owned(), "Invalid type", t.span)), + None => match file.item.extension() { + Some(e) => Some(( + e.to_string_lossy().into_owned(), + "Invalid extension", file.span, )), + None => None, + }, + }; + + match type_id { + Some((e, msg, blamed)) => match e.as_str() { + "csv" | "tsv" => from_csv(engine_state, stack, call), + "parquet" => from_parquet(engine_state, stack, call), + "ipc" | "arrow" => from_ipc(engine_state, stack, call), + "json" => from_json(engine_state, stack, call), + _ => Err(ShellError::FileNotFoundCustom( + format!( + "{}. Supported values: csv, tsv, parquet, ipc, arrow, json", + msg + ), + blamed, + )), }, None => Err(ShellError::FileNotFoundCustom( "File without extension".into(), @@ -177,6 +201,70 @@ fn from_parquet( } } +fn from_ipc( + engine_state: &EngineState, + stack: &mut Stack, + call: &Call, +) -> Result { + if call.has_flag("lazy") { + let file: String = call.req(engine_state, stack, 0)?; + let args = ScanArgsIpc { + n_rows: None, + cache: true, + rechunk: false, + row_count: None, + }; + + let df: NuLazyFrame = LazyFrame::scan_ipc(file, args) + .map_err(|e| { + ShellError::GenericError( + "IPC reader error".into(), + format!("{:?}", e), + Some(call.head), + None, + Vec::new(), + ) + })? + .into(); + + df.into_value(call.head) + } else { + let file: Spanned = call.req(engine_state, stack, 0)?; + let columns: Option> = call.get_flag(engine_state, stack, "columns")?; + + let r = File::open(&file.item).map_err(|e| { + ShellError::GenericError( + "Error opening file".into(), + e.to_string(), + Some(file.span), + None, + Vec::new(), + ) + })?; + let reader = IpcReader::new(r); + + let reader = match columns { + None => reader, + Some(columns) => reader.with_columns(Some(columns)), + }; + + let df: NuDataFrame = reader + .finish() + .map_err(|e| { + ShellError::GenericError( + "IPC reader error".into(), + format!("{:?}", e), + Some(call.head), + None, + Vec::new(), + ) + })? + .into(); + + Ok(df.into_value(call.head)) + } +} + fn from_json( engine_state: &EngineState, stack: &mut Stack, diff --git a/crates/nu-command/src/dataframe/eager/to_arrow.rs b/crates/nu-command/src/dataframe/eager/to_arrow.rs new file mode 100644 index 000000000..f3c52c5dc --- /dev/null +++ b/crates/nu-command/src/dataframe/eager/to_arrow.rs @@ -0,0 +1,94 @@ +use std::{fs::File, path::PathBuf}; + +use nu_engine::CallExt; +use nu_protocol::{ + ast::Call, + engine::{Command, EngineState, Stack}, + Category, Example, PipelineData, ShellError, Signature, Spanned, SyntaxShape, Type, Value, +}; +use polars::prelude::{IpcWriter, SerWriter}; + +use super::super::values::NuDataFrame; + +#[derive(Clone)] +pub struct ToArrow; + +impl Command for ToArrow { + fn name(&self) -> &str { + "to arrow" + } + + fn usage(&self) -> &str { + "Saves dataframe to arrow file" + } + + fn signature(&self) -> Signature { + Signature::build(self.name()) + .required("file", SyntaxShape::Filepath, "file path to save dataframe") + .input_type(Type::Custom("dataframe".into())) + .output_type(Type::Any) + .category(Category::Custom("dataframe".into())) + } + + fn examples(&self) -> Vec { + vec![Example { + description: "Saves dataframe to arrow file", + example: "[[a b]; [1 2] [3 4]] | into df | to arrow test.arrow", + result: None, + }] + } + + fn run( + &self, + engine_state: &EngineState, + stack: &mut Stack, + call: &Call, + input: PipelineData, + ) -> Result { + command(engine_state, stack, call, input) + } +} + +fn command( + engine_state: &EngineState, + stack: &mut Stack, + call: &Call, + input: PipelineData, +) -> Result { + let file_name: Spanned = call.req(engine_state, stack, 0)?; + + let mut df = NuDataFrame::try_from_pipeline(input, call.head)?; + + let mut file = File::create(&file_name.item).map_err(|e| { + ShellError::GenericError( + "Error with file name".into(), + e.to_string(), + Some(file_name.span), + None, + Vec::new(), + ) + })?; + + IpcWriter::new(&mut file).finish(df.as_mut()).map_err(|e| { + ShellError::GenericError( + "Error saving file".into(), + e.to_string(), + Some(file_name.span), + None, + Vec::new(), + ) + })?; + + let file_value = Value::String { + val: format!("saved {:?}", &file_name.item), + span: file_name.span, + }; + + Ok(PipelineData::Value( + Value::List { + vals: vec![file_value], + span: call.head, + }, + None, + )) +} diff --git a/crates/nu-command/tests/commands/open.rs b/crates/nu-command/tests/commands/open.rs index a6996da05..c0a1fd0cf 100644 --- a/crates/nu-command/tests/commands/open.rs +++ b/crates/nu-command/tests/commands/open.rs @@ -208,6 +208,22 @@ fn parses_utf16_ini() { assert_eq!(actual.out, "-236") } +#[cfg(feature = "database")] +#[test] +fn parses_arrow_ipc() { + let actual = nu!( + cwd: "tests/fixtures/formats", pipeline( + r#" + open-df caco3_plastics.arrow + | into nu + | first 1 + | get origin + "# + )); + + assert_eq!(actual.out, "SPAIN") +} + #[test] fn errors_if_file_not_found() { let actual = nu!( diff --git a/tests/fixtures/formats/caco3_plastics.arrow b/tests/fixtures/formats/caco3_plastics.arrow new file mode 100644 index 000000000..13b336a8e Binary files /dev/null and b/tests/fixtures/formats/caco3_plastics.arrow differ