From 2541a712e4091a334c66d750e7246c77413ce76d Mon Sep 17 00:00:00 2001 From: Jack Wright <56345+ayax79@users.noreply.github.com> Date: Mon, 23 Sep 2024 04:43:43 -0700 Subject: [PATCH] Added `polars concat` to allow concatenation of multiple dataframes (#13879) # Description Provides the ability to concatenate multiple dataframes together # User-Facing Changes - Introduces new command `polars concat` --- .../src/dataframe/command/data/concat.rs | 162 ++++++++++++++++++ .../src/dataframe/command/data/mod.rs | 2 + 2 files changed, 164 insertions(+) create mode 100644 crates/nu_plugin_polars/src/dataframe/command/data/concat.rs diff --git a/crates/nu_plugin_polars/src/dataframe/command/data/concat.rs b/crates/nu_plugin_polars/src/dataframe/command/data/concat.rs new file mode 100644 index 0000000000..afb2a1733d --- /dev/null +++ b/crates/nu_plugin_polars/src/dataframe/command/data/concat.rs @@ -0,0 +1,162 @@ +use crate::{ + values::{CustomValueSupport, NuLazyFrame}, + PolarsPlugin, +}; + +use crate::values::NuDataFrame; +use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand}; +use nu_protocol::{ + Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, SyntaxShape, Type, + Value, +}; +use polars::{ + df, + prelude::{LazyFrame, UnionArgs}, +}; + +#[derive(Clone)] +pub struct ConcatDF; + +impl PluginCommand for ConcatDF { + type Plugin = PolarsPlugin; + + fn name(&self) -> &str { + "polars concat" + } + + fn description(&self) -> &str { + "Concatenate two or more dataframes." + } + + fn signature(&self) -> Signature { + Signature::build(self.name()) + .switch("no-parallel", "Disable parallel execution", None) + .switch("rechunk", "Rechunk the resulting dataframe", None) + .switch("to-supertypes", "Cast to supertypes", None) + .switch("diagonal", "Concatenate dataframes diagonally", None) + .switch( + "from-partitioned-ds", + "Concatenate dataframes from a partitioned dataset", + None, + ) + .rest( + "dataframes", + SyntaxShape::Any, + "The dataframes to concatenate", + ) + .input_output_type(Type::Any, Type::Custom("dataframe".into())) + .category(Category::Custom("dataframe".into())) + } + + fn examples(&self) -> Vec { + vec![ + Example { + description: "Concatenates two dataframes with the dataframe in the pipeline.", + example: "[[a b]; [1 2]] | polars into-df + | polars concat ([[a b]; [3 4]] | polars into-df) ([[a b]; [5 6]] | polars into-df) + | polars collect + | polars sort-by [a b]", + result: Some( + NuDataFrame::from( + df!( + "a" => [1, 3, 5], + "b" => [2, 4, 6], + ) + .expect("simple df for test should not fail"), + ) + .into_value(Span::test_data()), + ), + }, + Example { + description: "Concatenates three dataframes together", + example: "polars concat ([[a b]; [1 2]] | polars into-df) ([[a b]; [3 4]] | polars into-df) ([[a b]; [5 6]] | polars into-df) + | polars collect + | polars sort-by [a b]", + result: Some( + NuDataFrame::from( + df!( + "a" => [1, 3, 5], + "b" => [2, 4, 6], + ) + .expect("simple df for test should not fail"), + ) + .into_value(Span::test_data()), + ), + } + ] + } + + fn run( + &self, + plugin: &Self::Plugin, + engine: &EngineInterface, + call: &EvaluatedCall, + input: PipelineData, + ) -> Result { + let maybe_df = NuLazyFrame::try_from_pipeline_coerce(plugin, input, call.head).ok(); + command_lazy(plugin, engine, call, maybe_df).map_err(LabeledError::from) + } +} + +fn command_lazy( + plugin: &PolarsPlugin, + engine: &EngineInterface, + call: &EvaluatedCall, + maybe_lazy: Option, +) -> Result { + let parallel = !call.has_flag("no-parallel")?; + let rechunk = call.has_flag("rechunk")?; + let to_supertypes = call.has_flag("to-supertypes")?; + let diagonal = call.has_flag("diagonal")?; + let from_partitioned_ds = call.has_flag("from-partitioned-ds")?; + let mut dataframes = call + .rest::(0)? + .iter() + .map(|v| NuLazyFrame::try_from_value_coerce(plugin, v).map(|lazy| lazy.to_polars())) + .collect::, ShellError>>()?; + + if dataframes.is_empty() { + Err(ShellError::GenericError { + error: "At least one other dataframe must be provided".into(), + msg: "".into(), + span: Some(call.head), + help: None, + inner: vec![], + }) + } else { + if let Some(lazy) = maybe_lazy.as_ref() { + dataframes.insert(0, lazy.to_polars()); + } + let args = UnionArgs { + parallel, + rechunk, + to_supertypes, + diagonal, + from_partitioned_ds, + }; + + let res: NuLazyFrame = polars::prelude::concat(&dataframes, args) + .map_err(|e| ShellError::GenericError { + error: format!("Failed to concatenate dataframes: {e}"), + msg: "".into(), + span: Some(call.head), + help: None, + inner: vec![], + })? + .into(); + + res.to_pipeline_data(plugin, engine, call.head) + } +} + +#[cfg(test)] +mod test { + use crate::test::test_polars_plugin_command; + + use super::*; + + #[test] + fn test_examples() -> Result<(), ShellError> { + test_polars_plugin_command(&ConcatDF) + } +} diff --git a/crates/nu_plugin_polars/src/dataframe/command/data/mod.rs b/crates/nu_plugin_polars/src/dataframe/command/data/mod.rs index 302b5f4685..f11ac31fe4 100644 --- a/crates/nu_plugin_polars/src/dataframe/command/data/mod.rs +++ b/crates/nu_plugin_polars/src/dataframe/command/data/mod.rs @@ -4,6 +4,7 @@ mod arg_where; mod cast; mod col; mod collect; +mod concat; mod drop; mod drop_duplicates; mod drop_nulls; @@ -74,6 +75,7 @@ pub(crate) fn data_commands() -> Vec