From 7a123d3eb18b54350d627f013dc5160f2c7326c4 Mon Sep 17 00:00:00 2001 From: Jack Wright <56345+ayax79@users.noreply.github.com> Date: Tue, 15 Aug 2023 18:31:49 -0700 Subject: [PATCH] Expose polars avro support (#10019) # Description Exposes polars avro support via dfr open and dfr to-avro --------- Co-authored-by: Jack Wright --- Cargo.lock | 63 +++++++++ crates/nu-cmd-dataframe/Cargo.toml | 3 +- .../src/dataframe/eager/mod.rs | 3 + .../src/dataframe/eager/open.rs | 47 ++++++- .../src/dataframe/eager/to_avro.rs | 122 ++++++++++++++++++ 5 files changed, 235 insertions(+), 3 deletions(-) create mode 100644 crates/nu-cmd-dataframe/src/dataframe/eager/to_avro.rs diff --git a/Cargo.lock b/Cargo.lock index 8620949bb..0a67ea6c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -27,6 +27,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "adler32" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" + [[package]] name = "ahash" version = "0.8.3" @@ -166,6 +172,7 @@ checksum = "15ae0428d69ab31d7b2adad22a752d6f11fef2e901d2262d0cad4f5cb08b7093" dependencies = [ "ahash", "arrow-format", + "avro-schema", "base64", "bytemuck", "chrono", @@ -273,6 +280,20 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "avro-schema" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5281855b39aba9684d2f47bf96983fbfd8f1725f12fabb0513a8ab879647bbd" +dependencies = [ + "crc", + "fallible-streaming-iterator", + "libflate", + "serde", + "serde_json", + "snap", +] + [[package]] name = "backtrace" version = "0.3.68" @@ -740,6 +761,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49fc9a695bca7f35f5f4c15cddc84415f66a74ea78eef08e90c5024f2b540e23" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccaeedb56da03b09f598226e25e80088cb4cd25f316e6e4df7d695f0feeb1403" + [[package]] name = "crc32fast" version = "1.3.2" @@ -2022,6 +2058,26 @@ version = "0.2.147" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +[[package]] +name = "libflate" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ff4ae71b685bbad2f2f391fe74f6b7659a34871c08b210fdc039e43bee07d18" +dependencies = [ + "adler32", + "crc32fast", + "libflate_lz77", +] + +[[package]] +name = "libflate_lz77" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a52d3a8bfc85f250440e4424db7d857e241a3aebbbe301f3eb606ab15c39acbf" +dependencies = [ + "rle-decode-fast", +] + [[package]] name = "libgit2-sys" version = "0.15.2+1.6.4" @@ -2604,6 +2660,7 @@ dependencies = [ "nu-test-support", "num 0.4.0", "polars", + "polars-io", "serde", "sqlparser", ] @@ -4189,6 +4246,12 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4bf2521270932c3c7bed1a59151222bd7643c79310f2916f01925e1e16255698" +[[package]] +name = "rle-decode-fast" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" + [[package]] name = "rmp" version = "0.8.11" diff --git a/crates/nu-cmd-dataframe/Cargo.toml b/crates/nu-cmd-dataframe/Cargo.toml index e4f20cbb4..f4c179762 100644 --- a/crates/nu-cmd-dataframe/Cargo.toml +++ b/crates/nu-cmd-dataframe/Cargo.toml @@ -24,6 +24,7 @@ indexmap = { version = "2.0" } num = { version = "0.4", optional = true } serde = { version = "1.0", features = ["derive"] } sqlparser = { version = "0.34", features = ["serde"], optional = true } +polars-io = { version = "0.30.0", features = ["avro"] } [dependencies.polars] features = [ @@ -50,7 +51,7 @@ features = [ "serde", "serde-lazy", "strings", - "to_dummies", + "to_dummies" ] optional = true version = "0.30.0" diff --git a/crates/nu-cmd-dataframe/src/dataframe/eager/mod.rs b/crates/nu-cmd-dataframe/src/dataframe/eager/mod.rs index 7091cc51f..dddb23fc7 100644 --- a/crates/nu-cmd-dataframe/src/dataframe/eager/mod.rs +++ b/crates/nu-cmd-dataframe/src/dataframe/eager/mod.rs @@ -22,6 +22,7 @@ mod sql_expr; mod summary; mod take; mod to_arrow; +mod to_avro; mod to_csv; mod to_df; mod to_json_lines; @@ -55,6 +56,7 @@ pub use sql_expr::parse_sql_expr; pub use summary::Summary; pub use take::TakeDF; pub use to_arrow::ToArrow; +pub use to_avro::ToAvro; pub use to_csv::ToCSV; pub use to_df::ToDataFrame; pub use to_json_lines::ToJsonLines; @@ -96,6 +98,7 @@ pub fn add_eager_decls(working_set: &mut StateWorkingSet) { SliceDF, TakeDF, ToArrow, + ToAvro, ToCSV, ToDataFrame, ToNu, diff --git a/crates/nu-cmd-dataframe/src/dataframe/eager/open.rs b/crates/nu-cmd-dataframe/src/dataframe/eager/open.rs index 45a906d65..cd4c567a7 100644 --- a/crates/nu-cmd-dataframe/src/dataframe/eager/open.rs +++ b/crates/nu-cmd-dataframe/src/dataframe/eager/open.rs @@ -13,6 +13,8 @@ use polars::prelude::{ LazyFrame, ParallelStrategy, ParquetReader, ScanArgsIpc, ScanArgsParquet, SerReader, }; +use polars_io::avro::AvroReader; + #[derive(Clone)] pub struct OpenDataFrame; @@ -22,7 +24,7 @@ impl Command for OpenDataFrame { } fn usage(&self) -> &str { - "Opens CSV, JSON, JSON lines, arrow, or parquet file to create dataframe." + "Opens CSV, JSON, JSON lines, arrow, avro, or parquet file to create dataframe." } fn signature(&self) -> Signature { @@ -36,7 +38,7 @@ impl Command for OpenDataFrame { .named( "type", SyntaxShape::String, - "File type: csv, tsv, json, parquet, arrow. If omitted, derive from file extension", + "File type: csv, tsv, json, parquet, arrow, avro. If omitted, derive from file extension", Some('t'), ) .named( @@ -118,6 +120,7 @@ fn command( "ipc" | "arrow" => from_ipc(engine_state, stack, call), "json" => from_json(engine_state, stack, call), "jsonl" => from_jsonl(engine_state, stack, call), + "avro" => from_avro(engine_state, stack, call), _ => Err(ShellError::FileNotFoundCustom( format!("{msg}. Supported values: csv, tsv, parquet, ipc, arrow, json"), blamed, @@ -199,6 +202,46 @@ fn from_parquet( } } +fn from_avro( + engine_state: &EngineState, + stack: &mut Stack, + call: &Call, +) -> Result { + let file: Spanned = call.req(engine_state, stack, 0)?; + let columns: Option> = call.get_flag(engine_state, stack, "columns")?; + + let r = File::open(&file.item).map_err(|e| { + ShellError::GenericError( + "Error opening file".into(), + e.to_string(), + Some(file.span), + None, + Vec::new(), + ) + })?; + let reader = AvroReader::new(r); + + let reader = match columns { + None => reader, + Some(columns) => reader.with_columns(Some(columns)), + }; + + let df: NuDataFrame = reader + .finish() + .map_err(|e| { + ShellError::GenericError( + "Avro reader error".into(), + format!("{e:?}"), + Some(call.head), + None, + Vec::new(), + ) + })? + .into(); + + Ok(df.into_value(call.head)) +} + fn from_ipc( engine_state: &EngineState, stack: &mut Stack, diff --git a/crates/nu-cmd-dataframe/src/dataframe/eager/to_avro.rs b/crates/nu-cmd-dataframe/src/dataframe/eager/to_avro.rs new file mode 100644 index 000000000..bdf415ee3 --- /dev/null +++ b/crates/nu-cmd-dataframe/src/dataframe/eager/to_avro.rs @@ -0,0 +1,122 @@ +use std::{fs::File, path::PathBuf}; + +use nu_engine::CallExt; +use nu_protocol::{ + ast::Call, + engine::{Command, EngineState, Stack}, + Category, Example, PipelineData, ShellError, Signature, Spanned, SyntaxShape, Type, Value, +}; +use polars_io::avro::{AvroCompression, AvroWriter}; +use polars_io::SerWriter; + +use super::super::values::NuDataFrame; + +#[derive(Clone)] +pub struct ToAvro; + +impl Command for ToAvro { + fn name(&self) -> &str { + "dfr to-avro" + } + + fn usage(&self) -> &str { + "Saves dataframe to avro file." + } + + fn signature(&self) -> Signature { + Signature::build(self.name()) + .named( + "compression", + SyntaxShape::String, + "use compression, supports deflate or snappy", + Some('c'), + ) + .required("file", SyntaxShape::Filepath, "file path to save dataframe") + .input_output_type(Type::Custom("dataframe".into()), Type::Any) + .category(Category::Custom("dataframe".into())) + } + + fn examples(&self) -> Vec { + vec![Example { + description: "Saves dataframe to avro file", + example: "[[a b]; [1 2] [3 4]] | dfr into-df | dfr to-avro test.avro", + result: None, + }] + } + + fn run( + &self, + engine_state: &EngineState, + stack: &mut Stack, + call: &Call, + input: PipelineData, + ) -> Result { + command(engine_state, stack, call, input) + } +} + +fn get_compression(call: &Call) -> Result, ShellError> { + if let Some((compression, span)) = call + .get_flag_expr("compression") + .and_then(|e| e.as_string().map(|s| (s, e.span))) + { + match compression.as_ref() { + "snappy" => Ok(Some(AvroCompression::Snappy)), + "deflate" => Ok(Some(AvroCompression::Deflate)), + _ => Err(ShellError::IncorrectValue { + msg: "compression must be one of deflate or snappy".to_string(), + span, + }), + } + } else { + Ok(None) + } +} + +fn command( + engine_state: &EngineState, + stack: &mut Stack, + call: &Call, + input: PipelineData, +) -> Result { + let file_name: Spanned = call.req(engine_state, stack, 0)?; + let compression = get_compression(call)?; + + let mut df = NuDataFrame::try_from_pipeline(input, call.head)?; + + let file = File::create(&file_name.item).map_err(|e| { + ShellError::GenericError( + "Error with file name".into(), + e.to_string(), + Some(file_name.span), + None, + Vec::new(), + ) + })?; + + AvroWriter::new(file) + .with_compression(compression) + .finish(df.as_mut()) + .map_err(|e| { + ShellError::GenericError( + "Error saving file".into(), + e.to_string(), + Some(file_name.span), + None, + Vec::new(), + ) + })?; + + let file_value = Value::String { + val: format!("saved {:?}", &file_name.item), + span: file_name.span, + }; + + Ok(PipelineData::Value( + Value::List { + vals: vec![file_value], + span: call.head, + }, + None, + )) +}