From 8144926dc7f4cfcee05cf2450fb1b579fa03fd2d Mon Sep 17 00:00:00 2001 From: Jack Wright <56345+ayax79@users.noreply.github.com> Date: Fri, 26 May 2023 14:32:37 -0700 Subject: [PATCH] Adding JSON lines file support to dataframes (#9291) # Description Provides the ability to read and write [JSON lines](https://jsonlines.org/) files. This is accomplished by exposing the support already in Polars. ## Opening a JSON lines file Screenshot 2023-05-25 at 5 25 30 PM ## Saving a dataframe to a JSON lines file Screenshot 2023-05-25 at 5 15 57 PM --- .../src/dataframe/eager/mod.rs | 3 + .../src/dataframe/eager/open.rs | 45 ++++++++- .../src/dataframe/eager/to_json_lines.rs | 97 +++++++++++++++++++ 3 files changed, 142 insertions(+), 3 deletions(-) create mode 100644 crates/nu-cmd-dataframe/src/dataframe/eager/to_json_lines.rs diff --git a/crates/nu-cmd-dataframe/src/dataframe/eager/mod.rs b/crates/nu-cmd-dataframe/src/dataframe/eager/mod.rs index 65c90038b..7091cc51f 100644 --- a/crates/nu-cmd-dataframe/src/dataframe/eager/mod.rs +++ b/crates/nu-cmd-dataframe/src/dataframe/eager/mod.rs @@ -24,6 +24,7 @@ mod take; mod to_arrow; mod to_csv; mod to_df; +mod to_json_lines; mod to_nu; mod to_parquet; mod with_column; @@ -56,6 +57,7 @@ pub use take::TakeDF; pub use to_arrow::ToArrow; pub use to_csv::ToCSV; pub use to_df::ToDataFrame; +pub use to_json_lines::ToJsonLines; pub use to_nu::ToNu; pub use to_parquet::ToParquet; pub use with_column::WithColumn; @@ -98,6 +100,7 @@ pub fn add_eager_decls(working_set: &mut StateWorkingSet) { ToDataFrame, ToNu, ToParquet, + ToJsonLines, WithColumn ); } diff --git a/crates/nu-cmd-dataframe/src/dataframe/eager/open.rs b/crates/nu-cmd-dataframe/src/dataframe/eager/open.rs index fc083ce84..e3aaffa37 100644 --- a/crates/nu-cmd-dataframe/src/dataframe/eager/open.rs +++ b/crates/nu-cmd-dataframe/src/dataframe/eager/open.rs @@ -9,8 +9,8 @@ use nu_protocol::{ use std::{fs::File, io::BufReader, path::PathBuf}; use polars::prelude::{ - CsvEncoding, CsvReader, IpcReader, JsonReader, LazyCsvReader, LazyFileListReader, LazyFrame, - ParallelStrategy, ParquetReader, ScanArgsIpc, ScanArgsParquet, SerReader, + CsvEncoding, CsvReader, IpcReader, JsonFormat, JsonReader, LazyCsvReader, LazyFileListReader, + LazyFrame, ParallelStrategy, ParquetReader, ScanArgsIpc, ScanArgsParquet, SerReader, }; #[derive(Clone)] @@ -22,7 +22,7 @@ impl Command for OpenDataFrame { } fn usage(&self) -> &str { - "Opens CSV, JSON, arrow, or parquet file to create dataframe." + "Opens CSV, JSON, JSON lines, arrow, or parquet file to create dataframe." } fn signature(&self) -> Signature { @@ -118,6 +118,7 @@ fn command( "parquet" => from_parquet(engine_state, stack, call), "ipc" | "arrow" => from_ipc(engine_state, stack, call), "json" => from_json(engine_state, stack, call), + "jsonl" => from_jsonl(engine_state, stack, call), _ => Err(ShellError::FileNotFoundCustom( format!("{msg}. Supported values: csv, tsv, parquet, ipc, arrow, json"), blamed, @@ -299,6 +300,44 @@ fn from_json( Ok(df.into_value(call.head)) } +fn from_jsonl( + engine_state: &EngineState, + stack: &mut Stack, + call: &Call, +) -> Result { + let infer_schema: Option = call.get_flag(engine_state, stack, "infer-schema")?; + let file: Spanned = call.req(engine_state, stack, 0)?; + let file = File::open(&file.item).map_err(|e| { + ShellError::GenericError( + "Error opening file".into(), + e.to_string(), + Some(file.span), + None, + Vec::new(), + ) + })?; + + let buf_reader = BufReader::new(file); + let reader = JsonReader::new(buf_reader) + .with_json_format(JsonFormat::JsonLines) + .infer_schema_len(infer_schema); + + let df: NuDataFrame = reader + .finish() + .map_err(|e| { + ShellError::GenericError( + "Json lines reader error".into(), + format!("{e:?}"), + Some(call.head), + None, + Vec::new(), + ) + })? + .into(); + + Ok(df.into_value(call.head)) +} + fn from_csv( engine_state: &EngineState, stack: &mut Stack, diff --git a/crates/nu-cmd-dataframe/src/dataframe/eager/to_json_lines.rs b/crates/nu-cmd-dataframe/src/dataframe/eager/to_json_lines.rs new file mode 100644 index 000000000..e25538510 --- /dev/null +++ b/crates/nu-cmd-dataframe/src/dataframe/eager/to_json_lines.rs @@ -0,0 +1,97 @@ +use std::{fs::File, io::BufWriter, path::PathBuf}; + +use nu_engine::CallExt; +use nu_protocol::{ + ast::Call, + engine::{Command, EngineState, Stack}, + Category, Example, PipelineData, ShellError, Signature, Spanned, SyntaxShape, Type, Value, +}; +use polars::prelude::{JsonWriter, SerWriter}; + +use super::super::values::NuDataFrame; + +#[derive(Clone)] +pub struct ToJsonLines; + +impl Command for ToJsonLines { + fn name(&self) -> &str { + "dfr to-jsonl" + } + + fn usage(&self) -> &str { + "Saves dataframe to a JSON lines file." + } + + fn signature(&self) -> Signature { + Signature::build(self.name()) + .required("file", SyntaxShape::Filepath, "file path to save dataframe") + .input_type(Type::Custom("dataframe".into())) + .output_type(Type::Any) + .category(Category::Custom("dataframe".into())) + } + + fn examples(&self) -> Vec { + vec![Example { + description: "Saves dataframe to JSON lines file", + example: "[[a b]; [1 2] [3 4]] | dfr into-df | dfr to-jsonl test.jsonl", + result: None, + }] + } + + fn run( + &self, + engine_state: &EngineState, + stack: &mut Stack, + call: &Call, + input: PipelineData, + ) -> Result { + command(engine_state, stack, call, input) + } +} + +fn command( + engine_state: &EngineState, + stack: &mut Stack, + call: &Call, + input: PipelineData, +) -> Result { + let file_name: Spanned = call.req(engine_state, stack, 0)?; + + let mut df = NuDataFrame::try_from_pipeline(input, call.head)?; + + let file = File::create(&file_name.item).map_err(|e| { + ShellError::GenericError( + "Error with file name".into(), + e.to_string(), + Some(file_name.span), + None, + Vec::new(), + ) + })?; + let buf_writer = BufWriter::new(file); + + JsonWriter::new(buf_writer) + .finish(df.as_mut()) + .map_err(|e| { + ShellError::GenericError( + "Error saving file".into(), + e.to_string(), + Some(file_name.span), + None, + Vec::new(), + ) + })?; + + let file_value = Value::String { + val: format!("saved {:?}", &file_name.item), + span: file_name.span, + }; + + Ok(PipelineData::Value( + Value::List { + vals: vec![file_value], + span: call.head, + }, + None, + )) +}