Support Arrow IPC file format with dataframes (#6548)

* Add support for Arrow IPC file format

Add support for Arrow IPC file format to dataframes commands. Support
opening of Arrow IPC-format files with extension '.arrow' or '.ipc' in
the open-df command. Add a 'to arrow' command to write a dataframe to
Arrow IPC format.

* Add unit test for open-df on Arrow

* Add -t flag to open-df command

Add a `--type`/`-t` flag to the `open-df` command, to explicitly specify
the type of file being used. Allowed values are the same at the set of
allowed file extensions.
This commit is contained in:
Aron Nopanen 2022-09-12 16:30:20 -07:00 committed by GitHub
parent 4490e97a13
commit d08212409f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 214 additions and 10 deletions

2
Cargo.lock generated
View File

@ -167,12 +167,14 @@ dependencies = [
"indexmap",
"json-deserializer",
"lexical-core",
"lz4",
"multiversion",
"num-traits",
"parquet2",
"simdutf8",
"streaming-iterator",
"strength_reduce",
"zstd",
]
[[package]]

View File

@ -115,6 +115,7 @@ features = [
"dtype-struct",
"dtype-categorical",
"dynamic_groupby",
"ipc",
"is_in",
"json",
"lazy",

View File

@ -18,6 +18,7 @@ mod sample;
mod shape;
mod slice;
mod take;
mod to_arrow;
mod to_csv;
mod to_df;
mod to_nu;
@ -46,6 +47,7 @@ pub use sample::SampleDF;
pub use shape::ShapeDF;
pub use slice::SliceDF;
pub use take::TakeDF;
pub use to_arrow::ToArrow;
pub use to_csv::ToCSV;
pub use to_df::ToDataFrame;
pub use to_nu::ToNu;
@ -84,6 +86,7 @@ pub fn add_eager_decls(working_set: &mut StateWorkingSet) {
ShapeDF,
SliceDF,
TakeDF,
ToArrow,
ToCSV,
ToDataFrame,
ToNu,

View File

@ -9,8 +9,8 @@ use nu_protocol::{
use std::{fs::File, io::BufReader, path::PathBuf};
use polars::prelude::{
CsvEncoding, CsvReader, JsonReader, LazyCsvReader, LazyFrame, ParallelStrategy, ParquetReader,
ScanArgsParquet, SerReader,
CsvEncoding, CsvReader, IpcReader, JsonReader, LazyCsvReader, LazyFrame, ParallelStrategy,
ParquetReader, ScanArgsIpc, ScanArgsParquet, SerReader,
};
#[derive(Clone)]
@ -22,7 +22,7 @@ impl Command for OpenDataFrame {
}
fn usage(&self) -> &str {
"Opens csv, json or parquet file to create dataframe"
"Opens csv, json, arrow, or parquet file to create dataframe"
}
fn signature(&self) -> Signature {
@ -33,6 +33,12 @@ impl Command for OpenDataFrame {
"file path to load values from",
)
.switch("lazy", "creates a lazy dataframe", Some('l'))
.named(
"type",
SyntaxShape::String,
"File type: csv, tsv, json, parquet, arrow. If omitted, derive from file extension",
Some('t'),
)
.named(
"delimiter",
SyntaxShape::String,
@ -93,15 +99,33 @@ fn command(
) -> Result<PipelineData, ShellError> {
let file: Spanned<PathBuf> = call.req(engine_state, stack, 0)?;
match file.item.extension() {
Some(e) => match e.to_str() {
Some("csv") | Some("tsv") => from_csv(engine_state, stack, call),
Some("parquet") => from_parquet(engine_state, stack, call),
Some("json") => from_json(engine_state, stack, call),
_ => Err(ShellError::FileNotFoundCustom(
"Not a csv, tsv, parquet or json file".into(),
let type_option: Option<Spanned<String>> = call.get_flag(engine_state, stack, "type")?;
let type_id = match &type_option {
Some(ref t) => Some((t.item.to_owned(), "Invalid type", t.span)),
None => match file.item.extension() {
Some(e) => Some((
e.to_string_lossy().into_owned(),
"Invalid extension",
file.span,
)),
None => None,
},
};
match type_id {
Some((e, msg, blamed)) => match e.as_str() {
"csv" | "tsv" => from_csv(engine_state, stack, call),
"parquet" => from_parquet(engine_state, stack, call),
"ipc" | "arrow" => from_ipc(engine_state, stack, call),
"json" => from_json(engine_state, stack, call),
_ => Err(ShellError::FileNotFoundCustom(
format!(
"{}. Supported values: csv, tsv, parquet, ipc, arrow, json",
msg
),
blamed,
)),
},
None => Err(ShellError::FileNotFoundCustom(
"File without extension".into(),
@ -177,6 +201,70 @@ fn from_parquet(
}
}
fn from_ipc(
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
) -> Result<Value, ShellError> {
if call.has_flag("lazy") {
let file: String = call.req(engine_state, stack, 0)?;
let args = ScanArgsIpc {
n_rows: None,
cache: true,
rechunk: false,
row_count: None,
};
let df: NuLazyFrame = LazyFrame::scan_ipc(file, args)
.map_err(|e| {
ShellError::GenericError(
"IPC reader error".into(),
format!("{:?}", e),
Some(call.head),
None,
Vec::new(),
)
})?
.into();
df.into_value(call.head)
} else {
let file: Spanned<PathBuf> = call.req(engine_state, stack, 0)?;
let columns: Option<Vec<String>> = call.get_flag(engine_state, stack, "columns")?;
let r = File::open(&file.item).map_err(|e| {
ShellError::GenericError(
"Error opening file".into(),
e.to_string(),
Some(file.span),
None,
Vec::new(),
)
})?;
let reader = IpcReader::new(r);
let reader = match columns {
None => reader,
Some(columns) => reader.with_columns(Some(columns)),
};
let df: NuDataFrame = reader
.finish()
.map_err(|e| {
ShellError::GenericError(
"IPC reader error".into(),
format!("{:?}", e),
Some(call.head),
None,
Vec::new(),
)
})?
.into();
Ok(df.into_value(call.head))
}
}
fn from_json(
engine_state: &EngineState,
stack: &mut Stack,

View File

@ -0,0 +1,94 @@
use std::{fs::File, path::PathBuf};
use nu_engine::CallExt;
use nu_protocol::{
ast::Call,
engine::{Command, EngineState, Stack},
Category, Example, PipelineData, ShellError, Signature, Spanned, SyntaxShape, Type, Value,
};
use polars::prelude::{IpcWriter, SerWriter};
use super::super::values::NuDataFrame;
#[derive(Clone)]
pub struct ToArrow;
impl Command for ToArrow {
fn name(&self) -> &str {
"to arrow"
}
fn usage(&self) -> &str {
"Saves dataframe to arrow file"
}
fn signature(&self) -> Signature {
Signature::build(self.name())
.required("file", SyntaxShape::Filepath, "file path to save dataframe")
.input_type(Type::Custom("dataframe".into()))
.output_type(Type::Any)
.category(Category::Custom("dataframe".into()))
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Saves dataframe to arrow file",
example: "[[a b]; [1 2] [3 4]] | into df | to arrow test.arrow",
result: None,
}]
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
command(engine_state, stack, call, input)
}
}
fn command(
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let file_name: Spanned<PathBuf> = call.req(engine_state, stack, 0)?;
let mut df = NuDataFrame::try_from_pipeline(input, call.head)?;
let mut file = File::create(&file_name.item).map_err(|e| {
ShellError::GenericError(
"Error with file name".into(),
e.to_string(),
Some(file_name.span),
None,
Vec::new(),
)
})?;
IpcWriter::new(&mut file).finish(df.as_mut()).map_err(|e| {
ShellError::GenericError(
"Error saving file".into(),
e.to_string(),
Some(file_name.span),
None,
Vec::new(),
)
})?;
let file_value = Value::String {
val: format!("saved {:?}", &file_name.item),
span: file_name.span,
};
Ok(PipelineData::Value(
Value::List {
vals: vec![file_value],
span: call.head,
},
None,
))
}

View File

@ -208,6 +208,22 @@ fn parses_utf16_ini() {
assert_eq!(actual.out, "-236")
}
#[cfg(feature = "database")]
#[test]
fn parses_arrow_ipc() {
let actual = nu!(
cwd: "tests/fixtures/formats", pipeline(
r#"
open-df caco3_plastics.arrow
| into nu
| first 1
| get origin
"#
));
assert_eq!(actual.out, "SPAIN")
}
#[test]
fn errors_if_file_not_found() {
let actual = nu!(

Binary file not shown.