From 3a5ee1aed03d24701f2cff3681fec54564a0333b Mon Sep 17 00:00:00 2001 From: Fernando Herrera Date: Thu, 27 May 2021 06:09:48 +0100 Subject: [PATCH] Dataframe commands (#3498) * Sample command * Join command with checks * More dataframes commands * Groupby and aggregate commands * Missing feature dataframe flag * Renamed file --- Cargo.lock | 42 ++- crates/nu-command/Cargo.toml | 5 +- crates/nu-command/src/commands.rs | 6 +- .../src/commands/autoview/command.rs | 21 +- .../src/commands/dataframe/aggregate.rs | 202 +++++++++++++++ .../src/commands/dataframe/command.rs | 26 +- .../src/commands/dataframe/convert.rs | 43 ++++ .../nu-command/src/commands/dataframe/drop.rs | 97 +++++++ .../src/commands/dataframe/dtypes.rs | 81 ++++++ .../src/commands/dataframe/groupby.rs | 242 +++--------------- .../nu-command/src/commands/dataframe/join.rs | 205 +++++++++++++++ .../nu-command/src/commands/dataframe/list.rs | 51 ++-- .../nu-command/src/commands/dataframe/load.rs | 23 +- .../nu-command/src/commands/dataframe/mod.rs | 17 ++ .../src/commands/dataframe/sample.rs | 117 +++++++++ .../src/commands/dataframe/select.rs | 84 ++++++ .../nu-command/src/commands/dataframe/show.rs | 78 ++++++ .../src/commands/dataframe/utils.rs | 42 +++ .../src/commands/default_context.rs | 17 ++ crates/nu-data/src/config.rs | 2 +- crates/nu-protocol/Cargo.toml | 4 +- crates/nu-protocol/src/dataframe/mod.rs | 9 + .../nu-protocol/src/dataframe/nu_dataframe.rs | 35 ++- .../nu-protocol/src/dataframe/nu_groupby.rs | 54 ++++ crates/nu-protocol/src/value.rs | 4 +- crates/nu-protocol/src/value/primitive.rs | 4 +- 26 files changed, 1221 insertions(+), 290 deletions(-) create mode 100644 crates/nu-command/src/commands/dataframe/aggregate.rs create mode 100644 crates/nu-command/src/commands/dataframe/convert.rs create mode 100644 crates/nu-command/src/commands/dataframe/drop.rs create mode 100644 crates/nu-command/src/commands/dataframe/dtypes.rs create mode 100644 crates/nu-command/src/commands/dataframe/join.rs create mode 100644 crates/nu-command/src/commands/dataframe/sample.rs create mode 100644 crates/nu-command/src/commands/dataframe/select.rs create mode 100644 crates/nu-command/src/commands/dataframe/show.rs create mode 100644 crates/nu-command/src/commands/dataframe/utils.rs create mode 100644 crates/nu-protocol/src/dataframe/nu_groupby.rs diff --git a/Cargo.lock b/Cargo.lock index 6b533731a2..d73f6435f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2843,6 +2843,12 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "libm" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7d73b3f436185384286bd8098d17ec07c9a7d2388a6599f824d8502b529702a" + [[package]] name = "libnghttp2-sys" version = "0.1.6+1.43.0" @@ -3760,9 +3766,6 @@ dependencies = [ "polars", "serde 1.0.125", "serde_bytes", - "serde_json", - "serde_yaml", - "toml", ] [[package]] @@ -4253,6 +4256,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" dependencies = [ "autocfg", + "libm", ] [[package]] @@ -4685,9 +4689,9 @@ dependencies = [ [[package]] name = "polars" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc4e2e025126632e8e19d53cd9b655da344bd4942ba603ad246c7776b6401844" +checksum = "c406ce46726b7d33b05a343d9c1317c0803a419d50bb45275de3f366410e9a80" dependencies = [ "polars-core", "polars-io", @@ -4696,9 +4700,9 @@ dependencies = [ [[package]] name = "polars-arrow" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3534c76a7bafaca9c783506a1f331ad746621d3808ab2407c02ffadd9e99326" +checksum = "53b2d5fb400345c7977e4e728a10be382476f2f9d2caf6b57cd60e97ea17d364" dependencies = [ "arrow", "num 0.4.0", @@ -4707,9 +4711,9 @@ dependencies = [ [[package]] name = "polars-core" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad76c4d55017da2d0f8930b0caa327d12286c1e4407469f361e84fad176f9601" +checksum = "88561e850748c507f0fc7835b35e795e770597ceecb14e0a8f7d8abf8346645d" dependencies = [ "ahash 0.7.2", "anyhow", @@ -4723,6 +4727,8 @@ dependencies = [ "parquet", "polars-arrow", "prettytable-rs", + "rand 0.7.3", + "rand_distr", "rayon", "regex 1.5.3", "thiserror", @@ -4731,9 +4737,9 @@ dependencies = [ [[package]] name = "polars-io" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07f20f27363d85f847a2b7e9d1bfd426bff18680691dd42ff17ca91893f12f89" +checksum = "27388810ec5f3346838725aa0aa49343802c1344b96fe82229ae781c62c98bc7" dependencies = [ "ahash 0.7.2", "anyhow", @@ -4755,9 +4761,9 @@ dependencies = [ [[package]] name = "polars-lazy" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "595906f951bacf223625ed6b0e4e73153eb9e251850bb2f9c36d78828334f32b" +checksum = "0e7f83284970a9db7d0b6a56d6f944c3988587429c124c1d087188e9d2c7ad7c" dependencies = [ "ahash 0.7.2", "itertools", @@ -5089,6 +5095,16 @@ dependencies = [ "getrandom 0.2.2", ] +[[package]] +name = "rand_distr" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9e9532ada3929fb8b2e9dbe28d1e06c9b2cc65813f074fcb6bd5fbefeff9d56" +dependencies = [ + "num-traits 0.2.14", + "rand 0.7.3", +] + [[package]] name = "rand_hc" version = "0.2.0" diff --git a/crates/nu-command/Cargo.toml b/crates/nu-command/Cargo.toml index 67656fb415..00b22ae65d 100644 --- a/crates/nu-command/Cargo.toml +++ b/crates/nu-command/Cargo.toml @@ -99,7 +99,10 @@ uuid_crate = { package = "uuid", version = "0.8.2", features = ["v4"], optional which = { version = "4.1.0", optional = true } zip = { version = "0.5.9", optional = true } -polars = { version = "0.13.3",optional = true, features = ["parquet", "json"] } +[dependencies.polars] +version = "0.13.4" +optional = true +features = ["parquet", "json", "random"] [target.'cfg(unix)'.dependencies] umask = "1.0.0" diff --git a/crates/nu-command/src/commands.rs b/crates/nu-command/src/commands.rs index bf02d3b812..086ebf947b 100644 --- a/crates/nu-command/src/commands.rs +++ b/crates/nu-command/src/commands.rs @@ -188,7 +188,11 @@ pub(crate) mod touch; pub(crate) use all::Command as All; pub(crate) use any::Command as Any; #[cfg(feature = "dataframe")] -pub(crate) use dataframe::{DataFrame, DataFrameGroupBy, DataFrameList, DataFrameLoad}; +pub(crate) use dataframe::{ + DataFrame, DataFrameAggregate, DataFrameConvert, DataFrameDTypes, DataFrameDrop, + DataFrameGroupBy, DataFrameJoin, DataFrameList, DataFrameLoad, DataFrameSample, + DataFrameSelect, DataFrameShow, +}; pub(crate) use enter::Enter; pub(crate) use every::Every; pub(crate) use exec::Exec; diff --git a/crates/nu-command/src/commands/autoview/command.rs b/crates/nu-command/src/commands/autoview/command.rs index ed943e1c59..7ac29f1bfc 100644 --- a/crates/nu-command/src/commands/autoview/command.rs +++ b/crates/nu-command/src/commands/autoview/command.rs @@ -8,6 +8,9 @@ use nu_protocol::hir::{self, Expression, ExternalRedirection, Literal, SpannedEx use nu_protocol::{Primitive, Signature, UntaggedValue, Value}; use nu_table::TextStyle; +#[cfg(feature = "dataframe")] +use nu_protocol::dataframe::PolarsData; + pub struct Command; impl WholeStreamCommand for Command { @@ -236,8 +239,8 @@ pub fn autoview(args: CommandArgs) -> Result { } #[cfg(feature = "dataframe")] Value { - value: UntaggedValue::DataFrame(df), - .. + value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)), + tag, } => { if let Some(table) = table { // TODO. Configure the parameter rows from file. It can be @@ -248,6 +251,20 @@ pub fn autoview(args: CommandArgs) -> Result { let _ = result.collect::>(); } } + #[cfg(feature = "dataframe")] + Value { + value: UntaggedValue::DataFrame(PolarsData::GroupBy(groupby)), + tag, + } => { + if let Some(table) = table { + // TODO. Configure the parameter rows from file. It can be + // adjusted to see a certain amount of values in the head + let command_args = + create_default_command_args(&context, groupby.print()?.into(), tag); + let result = table.run(command_args)?; + let _ = result.collect::>(); + } + } Value { value: UntaggedValue::Primitive(Primitive::Nothing), .. diff --git a/crates/nu-command/src/commands/dataframe/aggregate.rs b/crates/nu-command/src/commands/dataframe/aggregate.rs new file mode 100644 index 0000000000..75165862af --- /dev/null +++ b/crates/nu-command/src/commands/dataframe/aggregate.rs @@ -0,0 +1,202 @@ +use crate::prelude::*; +use nu_engine::WholeStreamCommand; +use nu_errors::ShellError; +use nu_protocol::{ + dataframe::{NuDataFrame, PolarsData}, + Signature, SyntaxShape, UntaggedValue, Value, +}; +use nu_source::Tagged; +use polars::frame::groupby::GroupBy; + +use super::utils::convert_columns; + +enum Operation { + Mean, + Sum, + Min, + Max, + First, + Last, + Nunique, + Quantile(f64), + Median, + Var, + Std, + Count, +} + +impl Operation { + fn from_tagged( + name: &Tagged, + quantile: Option>, + ) -> Result { + match name.item.as_ref() { + "mean" => Ok(Operation::Mean), + "sum" => Ok(Operation::Sum), + "min" => Ok(Operation::Min), + "max" => Ok(Operation::Max), + "first" => Ok(Operation::First), + "last" => Ok(Operation::Last), + "nunique" => Ok(Operation::Nunique), + "quantile" => { + match quantile { + None => Err(ShellError::labeled_error( + "Quantile value not fount", + "Quantile operation requires quantile value", + &name.tag, + )), + Some(value ) => { + if (value.item < 0.0) | (value.item > 1.0) { + Err(ShellError::labeled_error( + "Inappropriate quantile", + "Quantile value should be between 0.0 and 1.0", + &value.tag, + )) + } else { + Ok(Operation::Quantile(value.item)) + } + } + } + } + "median" => Ok(Operation::Median), + "var" => Ok(Operation::Var), + "std" => Ok(Operation::Std), + "count" => Ok(Operation::Count), + _ => Err(ShellError::labeled_error_with_secondary( + "Operation not fount", + "Operation does not exist", + &name.tag, + "Perhaps you want: mean, sum, min, max, first, last, nunique, quantile, median, count", + &name.tag, + )), + } + } +} + +pub struct DataFrame; + +impl WholeStreamCommand for DataFrame { + fn name(&self) -> &str { + "pls aggregate" + } + + fn usage(&self) -> &str { + "Performs an aggregation operation on a groupby object" + } + + fn signature(&self) -> Signature { + Signature::build("pls aggregate") + .required("operation", SyntaxShape::String, "aggregate operation") + .optional( + "selection", + SyntaxShape::Table, + "columns to perform aggregation", + ) + .named( + "quantile", + SyntaxShape::Number, + "quantile value for quantile operation", + Some('q'), + ) + } + + fn run(&self, args: CommandArgs) -> Result { + aggregate(args) + } + + fn examples(&self) -> Vec { + vec![Example { + description: "Aggregate sum by grouping by column a and summing on col b", + example: + "echo [[a b]; [one 1] [one 2]] | pls convert | pls groupby [a] | pls aggregate sum", + result: None, + }] + } +} + +fn aggregate(args: CommandArgs) -> Result { + let tag = args.call_info.name_tag.clone(); + let mut args = args.evaluate_once()?; + + let quantile: Option> = args.get_flag("quantile")?; + let operation: Tagged = args.req(0)?; + let op = Operation::from_tagged(&operation, quantile)?; + + // Extracting the selection columns of the columns to perform the aggregation + let agg_cols: Option> = args.opt(1)?; + let (selection, agg_span) = match agg_cols { + Some(cols) => { + let (agg_string, agg_span) = convert_columns(&cols, &tag)?; + (Some(agg_string), agg_span) + } + None => (None, Span::unknown()), + }; + + // The operation is only done in one dataframe. Only one input is + // expected from the InputStream + match args.input.next() { + None => Err(ShellError::labeled_error( + "No input received", + "missing dataframe input from stream", + &tag, + )), + Some(value) => { + if let UntaggedValue::DataFrame(PolarsData::GroupBy(nu_groupby)) = value.value { + let groupby = nu_groupby.to_groupby()?; + + let groupby = match &selection { + Some(cols) => groupby.select(cols), + None => groupby, + }; + + let res = perform_aggregation(groupby, op, &operation.tag, &agg_span)?; + + let final_df = Value { + tag, + value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new( + res, + ))), + }; + + Ok(OutputStream::one(final_df)) + } else { + Err(ShellError::labeled_error( + "No groupby in stream", + "no groupby found in input stream", + &tag, + )) + } + } + } +} + +fn perform_aggregation( + groupby: GroupBy, + operation: Operation, + operation_tag: &Tag, + agg_span: &Span, +) -> Result { + match operation { + Operation::Mean => groupby.mean(), + Operation::Sum => groupby.sum(), + Operation::Min => groupby.min(), + Operation::Max => groupby.max(), + Operation::First => groupby.first(), + Operation::Last => groupby.last(), + Operation::Nunique => groupby.n_unique(), + Operation::Quantile(quantile) => groupby.quantile(quantile), + Operation::Median => groupby.median(), + Operation::Var => groupby.var(), + Operation::Std => groupby.std(), + Operation::Count => groupby.count(), + } + .map_err(|e| { + let span = if e.to_string().contains("Not found") { + agg_span + } else { + &operation_tag.span + }; + + ShellError::labeled_error("Aggregation error", format!("{}", e), span) + }) +} diff --git a/crates/nu-command/src/commands/dataframe/command.rs b/crates/nu-command/src/commands/dataframe/command.rs index 909114a2ae..4e3bdb4d02 100644 --- a/crates/nu-command/src/commands/dataframe/command.rs +++ b/crates/nu-command/src/commands/dataframe/command.rs @@ -1,38 +1,26 @@ use crate::prelude::*; use nu_engine::WholeStreamCommand; use nu_errors::ShellError; -use nu_protocol::{dataframe::NuDataFrame, Signature, UntaggedValue}; +use nu_protocol::{Signature, UntaggedValue}; pub struct Command; impl WholeStreamCommand for Command { fn name(&self) -> &str { - "dataframe" + "pls" } fn usage(&self) -> &str { - "Creates a dataframe from pipelined Table or List " + "Commands to work with polars dataframes" } fn signature(&self) -> Signature { - Signature::build("dataframe") + Signature::build("pls") } fn run(&self, args: CommandArgs) -> Result { - let tag = args.call_info.name_tag.clone(); - let args = args.evaluate_once()?; - - let df = NuDataFrame::try_from_iter(args.input, &tag)?; - let init = InputStream::one(UntaggedValue::DataFrame(df).into_value(&tag)); - - Ok(init.to_output_stream()) - } - - fn examples(&self) -> Vec { - vec![Example { - description: "Takes an input stream and converts it to a dataframe", - example: "echo [[a b];[1 2] [3 4]] | dataframe", - result: None, - }] + Ok(OutputStream::one( + UntaggedValue::string(get_full_help(&Command, args.scope())).into_value(Tag::unknown()), + )) } } diff --git a/crates/nu-command/src/commands/dataframe/convert.rs b/crates/nu-command/src/commands/dataframe/convert.rs new file mode 100644 index 0000000000..ed447ee95b --- /dev/null +++ b/crates/nu-command/src/commands/dataframe/convert.rs @@ -0,0 +1,43 @@ +use crate::prelude::*; +use nu_engine::WholeStreamCommand; +use nu_errors::ShellError; +use nu_protocol::{ + dataframe::{NuDataFrame, PolarsData}, + Signature, UntaggedValue, +}; + +pub struct DataFrame; + +impl WholeStreamCommand for DataFrame { + fn name(&self) -> &str { + "pls convert" + } + + fn usage(&self) -> &str { + "Converts a pipelined Table or List into a polars dataframe" + } + + fn signature(&self) -> Signature { + Signature::build("pls convert") + } + + fn run(&self, args: CommandArgs) -> Result { + let tag = args.call_info.name_tag.clone(); + let args = args.evaluate_once()?; + + let df = NuDataFrame::try_from_iter(args.input, &tag)?; + let init = InputStream::one( + UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)).into_value(&tag), + ); + + Ok(init.to_output_stream()) + } + + fn examples(&self) -> Vec { + vec![Example { + description: "Takes an input stream and converts it to a polars dataframe", + example: "echo [[a b];[1 2] [3 4]] | pls convert", + result: None, + }] + } +} diff --git a/crates/nu-command/src/commands/dataframe/drop.rs b/crates/nu-command/src/commands/dataframe/drop.rs new file mode 100644 index 0000000000..b3bd9d2c8f --- /dev/null +++ b/crates/nu-command/src/commands/dataframe/drop.rs @@ -0,0 +1,97 @@ +use crate::prelude::*; +use nu_engine::WholeStreamCommand; +use nu_errors::ShellError; +use nu_protocol::{ + dataframe::{NuDataFrame, PolarsData}, + Signature, SyntaxShape, UntaggedValue, Value, +}; + +use super::utils::convert_columns; + +pub struct DataFrame; + +impl WholeStreamCommand for DataFrame { + fn name(&self) -> &str { + "pls drop" + } + + fn usage(&self) -> &str { + "Creates a new dataframe by dropping the selected columns" + } + + fn signature(&self) -> Signature { + Signature::build("pls drop").required( + "columns", + SyntaxShape::Table, + "column names to be dropped", + ) + } + + fn run(&self, args: CommandArgs) -> Result { + drop(args) + } + + fn examples(&self) -> Vec { + vec![Example { + description: "drop column a", + example: "echo [[a b]; [1 2] [3 4]] | pls convert | pls drop [a]", + result: None, + }] + } +} + +fn drop(args: CommandArgs) -> Result { + let tag = args.call_info.name_tag.clone(); + let mut args = args.evaluate_once()?; + + let columns: Vec = args.req(0)?; + + let (col_string, col_span) = convert_columns(&columns, &tag)?; + + match args.input.next() { + None => Err(ShellError::labeled_error( + "No input received", + "missing dataframe input from stream", + &tag, + )), + Some(value) => { + if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame { + dataframe: Some(ref df), + .. + })) = value.value + { + let new_df = match col_string.iter().next() { + Some(col) => df.drop(col).map_err(|e| { + ShellError::labeled_error("Join error", format!("{}", e), &col_span) + }), + None => Err(ShellError::labeled_error( + "Empty names list", + "No column names where found", + &col_span, + )), + }?; + + let res = col_string.iter().skip(1).try_fold(new_df, |new_df, col| { + new_df.drop(col).map_err(|e| { + ShellError::labeled_error("Drop error", format!("{}", e), &col_span) + }) + })?; + + let value = Value { + value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new( + res, + ))), + tag: tag.clone(), + }; + + Ok(OutputStream::one(value)) + } else { + Err(ShellError::labeled_error( + "No dataframe in stream", + "no dataframe found in input stream", + &tag, + )) + } + } + } +} diff --git a/crates/nu-command/src/commands/dataframe/dtypes.rs b/crates/nu-command/src/commands/dataframe/dtypes.rs new file mode 100644 index 0000000000..8248eb8222 --- /dev/null +++ b/crates/nu-command/src/commands/dataframe/dtypes.rs @@ -0,0 +1,81 @@ +use crate::prelude::*; +use nu_engine::WholeStreamCommand; +use nu_errors::ShellError; +use nu_protocol::{ + dataframe::{NuDataFrame, PolarsData}, + Signature, TaggedDictBuilder, UntaggedValue, +}; + +pub struct DataFrame; + +impl WholeStreamCommand for DataFrame { + fn name(&self) -> &str { + "pls dtypes" + } + + fn usage(&self) -> &str { + "Show dataframe data types" + } + + fn signature(&self) -> Signature { + Signature::build("pls dtypes") + } + + fn run(&self, args: CommandArgs) -> Result { + dtypes(args) + } + + fn examples(&self) -> Vec { + vec![Example { + description: "drop column a", + example: "echo [[a b]; [1 2] [3 4]] | pls convert | pls dtypes", + result: None, + }] + } +} + +fn dtypes(args: CommandArgs) -> Result { + let tag = args.call_info.name_tag.clone(); + let mut args = args.evaluate_once()?; + + match args.input.next() { + None => Err(ShellError::labeled_error( + "No input received", + "missing dataframe input from stream", + &tag, + )), + Some(value) => { + if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame { + dataframe: Some(df), + .. + })) = value.value + { + let col_names = df + .get_column_names() + .iter() + .map(|v| v.to_string()) + .collect::>(); + + let values = + df.dtypes() + .into_iter() + .zip(col_names.into_iter()) + .map(move |(dtype, name)| { + let mut data = TaggedDictBuilder::new(tag.clone()); + data.insert_value("column", name.as_ref()); + data.insert_value("dtype", format!("{}", dtype)); + + data.into_value() + }); + + Ok(OutputStream::from_stream(values)) + } else { + Err(ShellError::labeled_error( + "No dataframe in stream", + "no dataframe found in input stream", + &tag, + )) + } + } + } +} diff --git a/crates/nu-command/src/commands/dataframe/groupby.rs b/crates/nu-command/src/commands/dataframe/groupby.rs index 2e2ba5168f..32cf16cd38 100644 --- a/crates/nu-command/src/commands/dataframe/groupby.rs +++ b/crates/nu-command/src/commands/dataframe/groupby.rs @@ -2,100 +2,29 @@ use crate::prelude::*; use nu_engine::WholeStreamCommand; use nu_errors::ShellError; use nu_protocol::{ - dataframe::NuDataFrame, Primitive, Signature, SyntaxShape, UntaggedValue, Value, + dataframe::{NuDataFrame, NuGroupBy, PolarsData}, + Signature, SyntaxShape, UntaggedValue, Value, }; -use nu_source::Tagged; -use polars::frame::groupby::GroupBy; -enum Operation { - Mean, - Sum, - Min, - Max, - First, - Last, - Nunique, - Quantile(f64), - Median, - //Var, - //Std, - Count, -} - -impl Operation { - fn from_tagged( - name: &Tagged, - quantile: Option>, - ) -> Result { - match name.item.as_ref() { - "mean" => Ok(Operation::Mean), - "sum" => Ok(Operation::Sum), - "min" => Ok(Operation::Min), - "max" => Ok(Operation::Max), - "first" => Ok(Operation::First), - "last" => Ok(Operation::Last), - "nunique" => Ok(Operation::Nunique), - "quantile" => { - match quantile { - None => Err(ShellError::labeled_error( - "Quantile value not fount", - "Quantile operation requires quantile value", - &name.tag, - )), - Some(value ) => { - if (value.item < 0.0) | (value.item > 1.0) { - Err(ShellError::labeled_error( - "Inappropriate quantile", - "Quantile value should be between 0.0 and 1.0", - &value.tag, - )) - } else { - Ok(Operation::Quantile(value.item)) - } - } - } - } - "median" => Ok(Operation::Median), - //"var" => Ok(Operation::Var), - //"std" => Ok(Operation::Std), - "count" => Ok(Operation::Count), - _ => Err(ShellError::labeled_error_with_secondary( - "Operation not fount", - "Operation does not exist", - &name.tag, - "Perhaps you want: mean, sum, min, max, first, last, nunique, quantile, median, count", - &name.tag, - )), - } - } -} +use super::utils::convert_columns; pub struct DataFrame; impl WholeStreamCommand for DataFrame { fn name(&self) -> &str { - "dataframe groupby" + "pls groupby" } fn usage(&self) -> &str { - "Creates a groupby operation on a dataframe" + "Creates a groupby object that can be used for other aggregations" } fn signature(&self) -> Signature { - Signature::build("dataframe groupby") - .required("columns", SyntaxShape::Table, "groupby columns") - .required( - "aggregation columns", - SyntaxShape::Table, - "columns to perform aggregation", - ) - .required("operation", SyntaxShape::String, "aggregate operation") - .named( - "quantile", - SyntaxShape::Number, - "auantile value for quantile operation", - Some('q'), - ) + Signature::build("pls groupby").required( + "by columns", + SyntaxShape::Table, + "groupby columns", + ) } fn run(&self, args: CommandArgs) -> Result { @@ -104,8 +33,8 @@ impl WholeStreamCommand for DataFrame { fn examples(&self) -> Vec { vec![Example { - description: "", - example: "", + description: "Grouping by column a", + example: "echo [[a b]; [one 1] [one 2]] | pls convert | pls groupby [a]", result: None, }] } @@ -115,77 +44,9 @@ fn groupby(args: CommandArgs) -> Result { let tag = args.call_info.name_tag.clone(); let mut args = args.evaluate_once()?; - let quantile: Option> = args.get_flag("quantile")?; - let operation: Tagged = args.req(2)?; - let op = Operation::from_tagged(&operation, quantile)?; - // Extracting the names of the columns to perform the groupby - let columns: Vec = args.req(0)?; - - // Extracting the first tag from the groupby column names - let mut col_span = match columns - .iter() - .nth(0) - .map(|v| Span::new(v.tag.span.start(), v.tag.span.end())) - { - Some(span) => span, - None => { - return Err(ShellError::labeled_error( - "Empty groupby names list", - "Empty list for groupby column names", - &tag, - )) - } - }; - - let columns_string = columns - .into_iter() - .map(|value| match value.value { - UntaggedValue::Primitive(Primitive::String(s)) => { - col_span = col_span.until(value.tag.span); - Ok(s) - } - _ => Err(ShellError::labeled_error( - "Incorrect column format", - "Only string as column name", - &value.tag, - )), - }) - .collect::, _>>()?; - - // Extracting the names of the columns to perform the aggregation - let agg_cols: Vec = args.req(1)?; - - // Extracting the first tag from the aggregation column names - let mut agg_span = match agg_cols - .iter() - .nth(0) - .map(|v| Span::new(v.tag.span.start(), v.tag.span.end())) - { - Some(span) => span, - None => { - return Err(ShellError::labeled_error( - "Empty aggregation names list", - "Empty list for aggregation column names", - &tag, - )) - } - }; - - let aggregation_string = agg_cols - .into_iter() - .map(|value| match value.value { - UntaggedValue::Primitive(Primitive::String(s)) => { - agg_span = agg_span.until(value.tag.span); - Ok(s) - } - _ => Err(ShellError::labeled_error( - "Incorrect column format", - "Only string as column name", - value.tag, - )), - }) - .collect::, _>>()?; + let by_columns: Vec = args.req(0)?; + let (columns_string, col_span) = convert_columns(&by_columns, &tag)?; // The operation is only done in one dataframe. Only one input is // expected from the InputStream @@ -196,29 +57,31 @@ fn groupby(args: CommandArgs) -> Result { &tag, )), Some(value) => { - if let UntaggedValue::DataFrame(NuDataFrame { - dataframe: Some(df), - .. - }) = value.value - { - let groupby = df - .groupby(&columns_string) - .map_err(|e| { - ShellError::labeled_error("Groupby error", format!("{}", e), col_span) - })? - .select(&aggregation_string); - - let res = perform_aggregation(groupby, op, &operation.tag, &agg_span)?; - - let final_df = Value { - tag, - value: UntaggedValue::DataFrame(NuDataFrame { - dataframe: Some(res), - name: "agg result".to_string(), - }), + if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(nu_df)) = value.value { + let df = match nu_df.dataframe { + Some(df) => df, + None => unreachable!("No dataframe in nu_dataframe"), }; - Ok(OutputStream::one(final_df)) + // This is the expensive part of the groupby; to create the + // groups that will be used for grouping the data in the + // dataframe. Once it has been done these values can be stored + // in the NuGroupBy + let groupby = df.groupby(&columns_string).map_err(|e| { + ShellError::labeled_error("Groupby error", format!("{}", e), col_span) + })?; + + let groups = groupby.get_groups().to_vec(); + let groupby = Value { + tag: value.tag, + value: UntaggedValue::DataFrame(PolarsData::GroupBy(NuGroupBy::new( + NuDataFrame::new_with_name(df, nu_df.name), + columns_string, + groups, + ))), + }; + + Ok(OutputStream::one(groupby)) } else { Err(ShellError::labeled_error( "No dataframe in stream", @@ -229,34 +92,3 @@ fn groupby(args: CommandArgs) -> Result { } } } - -fn perform_aggregation( - groupby: GroupBy, - operation: Operation, - operation_tag: &Tag, - agg_span: &Span, -) -> Result { - match operation { - Operation::Mean => groupby.mean(), - Operation::Sum => groupby.sum(), - Operation::Min => groupby.min(), - Operation::Max => groupby.max(), - Operation::First => groupby.first(), - Operation::Last => groupby.last(), - Operation::Nunique => groupby.n_unique(), - Operation::Quantile(quantile) => groupby.quantile(quantile), - Operation::Median => groupby.median(), - //Operation::Var => groupby.var(), - //Operation::Std => groupby.std(), - Operation::Count => groupby.count(), - } - .map_err(|e| { - let span = if e.to_string().contains("Not found") { - agg_span - } else { - &operation_tag.span - }; - - ShellError::labeled_error("Aggregation error", format!("{}", e), span) - }) -} diff --git a/crates/nu-command/src/commands/dataframe/join.rs b/crates/nu-command/src/commands/dataframe/join.rs new file mode 100644 index 0000000000..b0a34f835e --- /dev/null +++ b/crates/nu-command/src/commands/dataframe/join.rs @@ -0,0 +1,205 @@ +use crate::prelude::*; +use nu_engine::WholeStreamCommand; +use nu_errors::ShellError; +use nu_protocol::{ + dataframe::{NuDataFrame, PolarsData}, + Signature, SyntaxShape, UntaggedValue, Value, +}; + +use super::utils::convert_columns; + +use polars::prelude::JoinType; + +use nu_source::Tagged; + +pub struct DataFrame; + +impl WholeStreamCommand for DataFrame { + fn name(&self) -> &str { + "pls join" + } + + fn usage(&self) -> &str { + "Joins a dataframe using columns as reference" + } + + fn signature(&self) -> Signature { + Signature::build("pls join") + .required("dataframe", SyntaxShape::Any, "right dataframe to join") + .required( + "l_columns", + SyntaxShape::Table, + "left column names to perform join", + ) + .required( + "r_columns", + SyntaxShape::Table, + "right column names to perform join", + ) + .named( + "type", + SyntaxShape::String, + "type of join. Inner by default", + Some('t'), + ) + } + + fn run(&self, args: CommandArgs) -> Result { + join(args) + } + + fn examples(&self) -> Vec { + vec![ + Example { + description: "inner join dataframe", + example: "echo [[a b]; [1 2] [3 4]] | pls convert | pls join $right [a] [a]", + result: None, + }, + Example { + description: "right join dataframe", + example: + "echo [[a b]; [1 2] [3 4] [5 6]] | pls convert | pls join $right [b] [b] -t right", + result: None, + }, + ] + } +} + +fn join(args: CommandArgs) -> Result { + let tag = args.call_info.name_tag.clone(); + let mut args = args.evaluate_once()?; + + let r_df: Value = args.req(0)?; + let l_col: Vec = args.req(1)?; + let r_col: Vec = args.req(2)?; + let join_type_op: Option> = args.get_flag("type")?; + + let join_type = match join_type_op { + None => JoinType::Inner, + Some(val) => match val.item.as_ref() { + "inner" => JoinType::Inner, + "outer" => JoinType::Outer, + "left" => JoinType::Left, + _ => { + return Err(ShellError::labeled_error_with_secondary( + "Incorrect join type", + "Invalid join type", + &val.tag, + "Perhaps you mean: inner, outer or left", + &val.tag, + )) + } + }, + }; + + let (l_col_string, l_col_span) = convert_columns(&l_col, &tag)?; + let (r_col_string, r_col_span) = convert_columns(&r_col, &tag)?; + + match args.input.next() { + None => Err(ShellError::labeled_error( + "No input received", + "missing dataframe input from stream", + &tag, + )), + Some(value) => { + if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame { + dataframe: Some(ref df), + .. + })) = value.value + { + let res = match r_df.value { + UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame { + dataframe: Some(r_df), + .. + })) => { + // Checking the column types before performing the join + check_column_datatypes( + df, + &l_col_string, + &l_col_span, + &r_col_string, + &r_col_span, + )?; + + df.join(&r_df, &l_col_string, &r_col_string, join_type) + .map_err(|e| { + ShellError::labeled_error( + "Join error", + format!("{}", e), + &l_col_span, + ) + }) + } + _ => Err(ShellError::labeled_error( + "Not a dataframe", + "not a dataframe type value", + &r_df.tag, + )), + }?; + + let value = Value { + value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new( + res, + ))), + tag: tag.clone(), + }; + + Ok(OutputStream::one(value)) + } else { + Err(ShellError::labeled_error( + "No dataframe in stream", + "no dataframe found in input stream", + &tag, + )) + } + } + } +} + +fn check_column_datatypes>( + df: &polars::prelude::DataFrame, + l_cols: &[T], + l_col_span: &Span, + r_cols: &[T], + r_col_span: &Span, +) -> Result<(), ShellError> { + if l_cols.len() != r_cols.len() { + return Err(ShellError::labeled_error_with_secondary( + "Mismatched number of column names", + format!( + "found {} left names vs {} right names", + l_cols.len(), + r_cols.len() + ), + l_col_span, + "perhaps you need to change the number of columns to join", + r_col_span, + )); + } + + for (l, r) in l_cols.iter().zip(r_cols.iter()) { + let l_series = df + .column(l.as_ref()) + .map_err(|e| ShellError::labeled_error("Join error", format!("{}", e), l_col_span))?; + + let r_series = df + .column(r.as_ref()) + .map_err(|e| ShellError::labeled_error("Join error", format!("{}", e), r_col_span))?; + + if l_series.dtype() != r_series.dtype() { + return Err(ShellError::labeled_error_with_secondary( + "Mismatched datatypes", + format!( + "left column type '{}' doesn't match '{}' right column match", + l_series.dtype(), + r_series.dtype() + ), + l_col_span, + "perhaps you need to select other column to match", + r_col_span, + )); + } + } + + Ok(()) +} diff --git a/crates/nu-command/src/commands/dataframe/list.rs b/crates/nu-command/src/commands/dataframe/list.rs index 3f80f408df..9ab54c8c60 100644 --- a/crates/nu-command/src/commands/dataframe/list.rs +++ b/crates/nu-command/src/commands/dataframe/list.rs @@ -1,13 +1,16 @@ use crate::prelude::*; use nu_engine::WholeStreamCommand; use nu_errors::ShellError; -use nu_protocol::{Signature, TaggedDictBuilder, UntaggedValue, Value}; +use nu_protocol::{ + dataframe::{NuDataFrame, PolarsData}, + Signature, TaggedDictBuilder, UntaggedValue, +}; pub struct DataFrame; impl WholeStreamCommand for DataFrame { fn name(&self) -> &str { - "dataframe list" + "pls list" } fn usage(&self) -> &str { @@ -15,38 +18,46 @@ impl WholeStreamCommand for DataFrame { } fn signature(&self) -> Signature { - Signature::build("dataframe list") + Signature::build("pls list") } fn run(&self, args: CommandArgs) -> Result { let args = args.evaluate_once()?; - let mut dataframes: Vec = Vec::new(); - for (name, value) in args.context.scope.get_vars() { - if let UntaggedValue::DataFrame(df) = value.value { - let mut data = TaggedDictBuilder::new(value.tag); + let values = args + .context + .scope + .get_vars() + .into_iter() + .filter_map(|(name, value)| { + if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame { + dataframe: Some(df), + name: file_name, + })) = &value.value + { + let mut data = TaggedDictBuilder::new(value.tag.clone()); - let polars_df = df.dataframe.unwrap(); + let rows = df.height(); + let cols = df.width(); - let rows = polars_df.height(); - let cols = polars_df.width(); + data.insert_value("name", name.as_ref()); + data.insert_value("file", file_name.as_ref()); + data.insert_value("rows", format!("{}", rows)); + data.insert_value("columns", format!("{}", cols)); - data.insert_value("name", name); - data.insert_value("file", df.name); - data.insert_value("rows", format!("{}", rows)); - data.insert_value("columns", format!("{}", cols)); + Some(data.into_value()) + } else { + None + } + }); - dataframes.push(data.into_value()); - } - } - - Ok(OutputStream::from_stream(dataframes.into_iter())) + Ok(OutputStream::from_stream(values)) } fn examples(&self) -> Vec { vec![Example { description: "Lists loaded dataframes in current scope", - example: "dataframe list", + example: "pls list", result: None, }] } diff --git a/crates/nu-command/src/commands/dataframe/load.rs b/crates/nu-command/src/commands/dataframe/load.rs index fac5b55ee3..67bf90fd02 100644 --- a/crates/nu-command/src/commands/dataframe/load.rs +++ b/crates/nu-command/src/commands/dataframe/load.rs @@ -4,7 +4,8 @@ use crate::prelude::*; use nu_engine::{EvaluatedCommandArgs, WholeStreamCommand}; use nu_errors::ShellError; use nu_protocol::{ - dataframe::NuDataFrame, Primitive, Signature, SyntaxShape, UntaggedValue, Value, + dataframe::{NuDataFrame, PolarsData}, + Primitive, Signature, SyntaxShape, UntaggedValue, Value, }; use nu_source::Tagged; @@ -15,7 +16,7 @@ pub struct DataFrame; impl WholeStreamCommand for DataFrame { fn name(&self) -> &str { - "dataframe load" + "pls load" } fn usage(&self) -> &str { @@ -23,7 +24,7 @@ impl WholeStreamCommand for DataFrame { } fn signature(&self) -> Signature { - Signature::build("dataframe load") + Signature::build("pls load") .required( "file", SyntaxShape::FilePath, @@ -67,7 +68,7 @@ impl WholeStreamCommand for DataFrame { fn examples(&self) -> Vec { vec![Example { description: "Takes a file name and creates a dataframe", - example: "dataframe load test.csv", + example: "pls load test.csv", result: None, }] } @@ -85,7 +86,7 @@ fn create_from_file(args: CommandArgs) -> Result { Some("json") => from_json(args), _ => Err(ShellError::labeled_error( "Error with file", - "Not a csv or parquet file", + "Not a csv, parquet or json file", &file.tag, )), }, @@ -107,12 +108,12 @@ fn create_from_file(args: CommandArgs) -> Result { } }; - let nu_dataframe = NuDataFrame { - dataframe: Some(df), - name: file_name, - }; - - let init = InputStream::one(UntaggedValue::DataFrame(nu_dataframe).into_value(&tag)); + let init = InputStream::one( + UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new_with_name( + df, file_name, + ))) + .into_value(&tag), + ); Ok(init.to_output_stream()) } diff --git a/crates/nu-command/src/commands/dataframe/mod.rs b/crates/nu-command/src/commands/dataframe/mod.rs index dcb60b916b..1653ed2ca1 100644 --- a/crates/nu-command/src/commands/dataframe/mod.rs +++ b/crates/nu-command/src/commands/dataframe/mod.rs @@ -1,9 +1,26 @@ +pub mod aggregate; pub mod command; +pub mod convert; +pub mod drop; +pub mod dtypes; pub mod groupby; +pub mod join; pub mod list; pub mod load; +pub mod sample; +pub mod select; +pub mod show; +pub(crate) mod utils; +pub use aggregate::DataFrame as DataFrameAggregate; pub use command::Command as DataFrame; +pub use convert::DataFrame as DataFrameConvert; +pub use drop::DataFrame as DataFrameDrop; +pub use dtypes::DataFrame as DataFrameDTypes; pub use groupby::DataFrame as DataFrameGroupBy; +pub use join::DataFrame as DataFrameJoin; pub use list::DataFrame as DataFrameList; pub use load::DataFrame as DataFrameLoad; +pub use sample::DataFrame as DataFrameSample; +pub use select::DataFrame as DataFrameSelect; +pub use show::DataFrame as DataFrameShow; diff --git a/crates/nu-command/src/commands/dataframe/sample.rs b/crates/nu-command/src/commands/dataframe/sample.rs new file mode 100644 index 0000000000..bdb4ec8008 --- /dev/null +++ b/crates/nu-command/src/commands/dataframe/sample.rs @@ -0,0 +1,117 @@ +use crate::prelude::*; +use nu_engine::WholeStreamCommand; +use nu_errors::ShellError; +use nu_protocol::{ + dataframe::{NuDataFrame, PolarsData}, + Signature, SyntaxShape, UntaggedValue, Value, +}; + +use nu_source::Tagged; + +pub struct DataFrame; + +impl WholeStreamCommand for DataFrame { + fn name(&self) -> &str { + "pls sample" + } + + fn usage(&self) -> &str { + "Create sample dataframe" + } + + fn signature(&self) -> Signature { + Signature::build("pls load") + .named( + "n_rows", + SyntaxShape::Number, + "number of rows to be taken from dataframe", + Some('n'), + ) + .named( + "fraction", + SyntaxShape::Number, + "fraction of dataframe to be taken", + Some('f'), + ) + .switch("replace", "sample with replace", Some('e')) + } + + fn run(&self, args: CommandArgs) -> Result { + sample(args) + } + + fn examples(&self) -> Vec { + vec![ + Example { + description: "Sample rows from dataframe", + example: "echo [[a b]; [1 2] [3 4]] | pls load | pls sample -r 1", + result: None, + }, + Example { + description: "Shows sample row using fraction and replace", + example: "echo [[a b]; [1 2] [3 4] [5 6]] | pls load | pls sample -f 0.5 -e", + result: None, + }, + ] + } +} + +fn sample(args: CommandArgs) -> Result { + let tag = args.call_info.name_tag.clone(); + let mut args = args.evaluate_once()?; + + let rows: Option> = args.get_flag("n_rows")?; + let fraction: Option> = args.get_flag("fraction")?; + let replace: bool = args.has_flag("replace"); + + match args.input.next() { + None => Err(ShellError::labeled_error( + "No input received", + "missing dataframe input from stream", + &tag, + )), + Some(value) => { + if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame { + dataframe: Some(ref df), + .. + })) = value.value + { + let res = match (rows, fraction) { + (Some(rows), None) => df.sample_n(rows.item, replace).map_err(|e| { + ShellError::labeled_error("Polars error", format!("{}", e), &rows.tag) + }), + (None, Some(frac)) => df.sample_frac(frac.item, replace).map_err(|e| { + ShellError::labeled_error("Polars error", format!("{}", e), &frac.tag) + }), + (Some(_), Some(_)) => Err(ShellError::labeled_error( + "Incompatible flags", + "Only one selection criterion allowed", + &tag, + )), + (None, None) => Err(ShellError::labeled_error_with_secondary( + "No selection", + "No selection criterion was found", + &tag, + "Perhaps you want to use the flag -n or -f", + &tag, + )), + }?; + + let value = Value { + value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new( + res, + ))), + tag: tag.clone(), + }; + + Ok(OutputStream::one(value)) + } else { + Err(ShellError::labeled_error( + "No dataframe in stream", + "no dataframe found in input stream", + &tag, + )) + } + } + } +} diff --git a/crates/nu-command/src/commands/dataframe/select.rs b/crates/nu-command/src/commands/dataframe/select.rs new file mode 100644 index 0000000000..c16149ebcd --- /dev/null +++ b/crates/nu-command/src/commands/dataframe/select.rs @@ -0,0 +1,84 @@ +use crate::prelude::*; +use nu_engine::WholeStreamCommand; +use nu_errors::ShellError; +use nu_protocol::{ + dataframe::{NuDataFrame, PolarsData}, + Signature, SyntaxShape, UntaggedValue, Value, +}; + +use super::utils::convert_columns; + +pub struct DataFrame; + +impl WholeStreamCommand for DataFrame { + fn name(&self) -> &str { + "pls select" + } + + fn usage(&self) -> &str { + "Creates a new dataframe with the selected columns" + } + + fn signature(&self) -> Signature { + Signature::build("pls select").required( + "columns", + SyntaxShape::Table, + "selected column names", + ) + } + + fn run(&self, args: CommandArgs) -> Result { + select(args) + } + + fn examples(&self) -> Vec { + vec![Example { + description: "Create new dataframe with column a", + example: "echo [[a b]; [1 2] [3 4]] | pls convert | pls select [a]", + result: None, + }] + } +} + +fn select(args: CommandArgs) -> Result { + let tag = args.call_info.name_tag.clone(); + let mut args = args.evaluate_once()?; + + let columns: Vec = args.req(0)?; + + let (col_string, col_span) = convert_columns(&columns, &tag)?; + + match args.input.next() { + None => Err(ShellError::labeled_error( + "No input received", + "missing dataframe input from stream", + &tag, + )), + Some(value) => { + if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame { + dataframe: Some(ref df), + .. + })) = value.value + { + let res = df.select(&col_string).map_err(|e| { + ShellError::labeled_error("Drop error", format!("{}", e), &col_span) + })?; + + let value = Value { + value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new( + res, + ))), + tag: tag.clone(), + }; + + Ok(OutputStream::one(value)) + } else { + Err(ShellError::labeled_error( + "No dataframe in stream", + "no dataframe found in input stream", + &tag, + )) + } + } + } +} diff --git a/crates/nu-command/src/commands/dataframe/show.rs b/crates/nu-command/src/commands/dataframe/show.rs new file mode 100644 index 0000000000..ff8acb2967 --- /dev/null +++ b/crates/nu-command/src/commands/dataframe/show.rs @@ -0,0 +1,78 @@ +use crate::prelude::*; +use nu_engine::WholeStreamCommand; +use nu_errors::ShellError; +use nu_protocol::{dataframe::PolarsData, Signature, SyntaxShape, UntaggedValue}; + +use nu_source::Tagged; + +pub struct DataFrame; + +impl WholeStreamCommand for DataFrame { + fn name(&self) -> &str { + "pls show" + } + + fn usage(&self) -> &str { + "Show dataframe" + } + + fn signature(&self) -> Signature { + Signature::build("pls show") + .named( + "n_rows", + SyntaxShape::Number, + "number of rows to be shown", + Some('n'), + ) + .switch("tail", "shows tail rows", Some('t')) + } + + fn run(&self, args: CommandArgs) -> Result { + show(args) + } + + fn examples(&self) -> Vec { + vec![ + Example { + description: "Shows head rows from dataframe", + example: "echo [[a b]; [1 2] [3 4]] | pls convert | pls show", + result: None, + }, + Example { + description: "Shows tail rows from dataframe", + example: "echo [[a b]; [1 2] [3 4] [5 6]] | pls convert | pls show -t -n 1", + result: None, + }, + ] + } +} + +fn show(args: CommandArgs) -> Result { + let tag = args.call_info.name_tag.clone(); + let mut args = args.evaluate_once()?; + + let rows: Option> = args.get_flag("rows")?; + let tail: bool = args.has_flag("tail"); + + match args.input.next() { + None => Err(ShellError::labeled_error( + "No input received", + "missing dataframe input from stream", + &tag, + )), + Some(value) => { + if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value { + let rows = rows.map(|v| v.item); + let values = if tail { df.tail(rows)? } else { df.head(rows)? }; + + Ok(OutputStream::from_stream(values.into_iter())) + } else { + Err(ShellError::labeled_error( + "No dataframe in stream", + "no dataframe found in input stream", + &tag, + )) + } + } + } +} diff --git a/crates/nu-command/src/commands/dataframe/utils.rs b/crates/nu-command/src/commands/dataframe/utils.rs new file mode 100644 index 0000000000..38d198edad --- /dev/null +++ b/crates/nu-command/src/commands/dataframe/utils.rs @@ -0,0 +1,42 @@ +use crate::prelude::*; +use nu_errors::ShellError; +use nu_protocol::{Primitive, UntaggedValue, Value}; + +// Converts a Vec to a Vec with a Span marking the whole +// location of the columns for error referencing +pub(crate) fn convert_columns<'columns>( + columns: &'columns [Value], + tag: &Tag, +) -> Result<(Vec, Span), ShellError> { + let mut col_span = match columns + .iter() + .nth(0) + .map(|v| Span::new(v.tag.span.start(), v.tag.span.end())) + { + Some(span) => span, + None => { + return Err(ShellError::labeled_error( + "Empty column list", + "Empty list found for command", + tag, + )) + } + }; + + let res = columns + .iter() + .map(|value| match &value.value { + UntaggedValue::Primitive(Primitive::String(s)) => { + col_span = col_span.until(value.tag.span); + Ok(s.clone()) + } + _ => Err(ShellError::labeled_error( + "Incorrect column format", + "Only string as column name", + &value.tag, + )), + }) + .collect::, _>>()?; + + Ok((res, col_span)) +} diff --git a/crates/nu-command/src/commands/default_context.rs b/crates/nu-command/src/commands/default_context.rs index cfe39a11f3..8623db4422 100644 --- a/crates/nu-command/src/commands/default_context.rs +++ b/crates/nu-command/src/commands/default_context.rs @@ -253,14 +253,31 @@ pub fn create_default_context(interactive: bool) -> Result Result { UntaggedValue::Error(e) => return Err(e.clone()), UntaggedValue::Block(_) => toml::Value::String("".to_string()), #[cfg(feature = "dataframe")] - UntaggedValue::DataFrame(_) => toml::Value::String("".to_string()), + UntaggedValue::DataFrame(_) => toml::Value::String("".to_string()), UntaggedValue::Primitive(Primitive::Range(_)) => toml::Value::String("".to_string()), UntaggedValue::Primitive(Primitive::Binary(b)) => { toml::Value::Array(b.iter().map(|x| toml::Value::Integer(*x as i64)).collect()) diff --git a/crates/nu-protocol/Cargo.toml b/crates/nu-protocol/Cargo.toml index d8e80e3bbc..e51db62cc2 100644 --- a/crates/nu-protocol/Cargo.toml +++ b/crates/nu-protocol/Cargo.toml @@ -25,7 +25,9 @@ num-traits = "0.2.14" serde = { version = "1.0", features = ["derive"] } serde_bytes = "0.11.5" -polars = {version="0.13.3", optional = true} +[dependencies.polars] +version = "0.13.4" +optional = true # implement conversions serde_json = "1.0" diff --git a/crates/nu-protocol/src/dataframe/mod.rs b/crates/nu-protocol/src/dataframe/mod.rs index 2b1699b7bf..47767847c3 100644 --- a/crates/nu-protocol/src/dataframe/mod.rs +++ b/crates/nu-protocol/src/dataframe/mod.rs @@ -1,3 +1,12 @@ pub mod nu_dataframe; +pub mod nu_groupby; pub use nu_dataframe::NuDataFrame; +pub use nu_groupby::NuGroupBy; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)] +pub enum PolarsData { + EagerDataFrame(NuDataFrame), + GroupBy(NuGroupBy), +} diff --git a/crates/nu-protocol/src/dataframe/nu_dataframe.rs b/crates/nu-protocol/src/dataframe/nu_dataframe.rs index 512631281e..d42aa68ade 100644 --- a/crates/nu-protocol/src/dataframe/nu_dataframe.rs +++ b/crates/nu-protocol/src/dataframe/nu_dataframe.rs @@ -59,8 +59,18 @@ impl Default for NuDataFrame { } impl NuDataFrame { - fn new() -> Self { - Self::default() + pub fn new(df: polars::prelude::DataFrame) -> Self { + NuDataFrame { + dataframe: Some(df), + name: String::from("dataframe"), + } + } + + pub fn new_with_name(df: polars::prelude::DataFrame, name: String) -> Self { + NuDataFrame { + dataframe: Some(df), + name, + } } } @@ -103,7 +113,7 @@ impl<'de> Deserialize<'de> for NuDataFrame { where D: Deserializer<'de>, { - deserializer.deserialize_i32(NuDataFrame::new()) + deserializer.deserialize_i32(NuDataFrame::default()) } } @@ -137,22 +147,23 @@ impl NuDataFrame { // Print is made out a head and if the dataframe is too large, then a tail pub fn print(&self) -> Result, ShellError> { if let Some(df) = &self.dataframe { - let size: usize = 5; - let mut values = self.head(Some(size))?; + let size: usize = 20; if df.height() > size { + let sample_size = size / 2; + let mut values = self.head(Some(sample_size))?; add_separator(&mut values, df); - - let remaining = df.height() - size; - let tail_size = remaining.min(size); + let remaining = df.height() - sample_size; + let tail_size = remaining.min(sample_size); let mut tail_values = self.tail(Some(tail_size))?; - values.append(&mut tail_values); - } - Ok(values) + Ok(values) + } else { + Ok(self.head(Some(size))?) + } } else { - unreachable!() + unreachable!("No dataframe found in print command") } } diff --git a/crates/nu-protocol/src/dataframe/nu_groupby.rs b/crates/nu-protocol/src/dataframe/nu_groupby.rs new file mode 100644 index 0000000000..53db811790 --- /dev/null +++ b/crates/nu-protocol/src/dataframe/nu_groupby.rs @@ -0,0 +1,54 @@ +use nu_source::Tag; +use polars::frame::groupby::{GroupBy, GroupTuples}; +use serde::{Deserialize, Serialize}; + +use super::NuDataFrame; +use nu_errors::ShellError; + +use crate::{TaggedDictBuilder, Value}; + +#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)] +pub struct NuGroupBy { + dataframe: NuDataFrame, + by: Vec, + groups: GroupTuples, +} + +impl NuGroupBy { + pub fn new(dataframe: NuDataFrame, by: Vec, groups: GroupTuples) -> Self { + NuGroupBy { + dataframe, + by, + groups, + } + } + + pub fn to_groupby(&self) -> Result { + let df = match &self.dataframe.dataframe { + Some(df) => df, + None => unreachable!("No dataframe in nu_dataframe"), + }; + + let by = df.select_series(&self.by).map_err(|e| { + ShellError::labeled_error("Error creating groupby", format!("{}", e), Tag::unknown()) + })?; + + Ok(GroupBy::new(df, by, self.groups.clone(), None)) + } + + pub fn print(&self) -> Result, ShellError> { + let mut values: Vec = Vec::new(); + + let mut data = TaggedDictBuilder::new(Tag::unknown()); + data.insert_value("property", "dataframe"); + data.insert_value("value", self.dataframe.name.as_ref()); + values.push(data.into_value()); + + let mut data = TaggedDictBuilder::new(Tag::unknown()); + data.insert_value("property", "group by"); + data.insert_value("value", self.by.join(", ")); + values.push(data.into_value()); + + Ok(values) + } +} diff --git a/crates/nu-protocol/src/value.rs b/crates/nu-protocol/src/value.rs index 64e650bacd..7c847a93c2 100644 --- a/crates/nu-protocol/src/value.rs +++ b/crates/nu-protocol/src/value.rs @@ -31,7 +31,7 @@ use std::path::PathBuf; use std::time::SystemTime; #[cfg(feature = "dataframe")] -use crate::dataframe::NuDataFrame; +use crate::dataframe::PolarsData; /// The core structured values that flow through a pipeline #[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)] @@ -54,7 +54,7 @@ pub enum UntaggedValue { /// Data option that holds the polars structs required to to data /// manipulation and operations using polars dataframes #[cfg(feature = "dataframe")] - DataFrame(NuDataFrame), + DataFrame(PolarsData), } impl UntaggedValue { diff --git a/crates/nu-protocol/src/value/primitive.rs b/crates/nu-protocol/src/value/primitive.rs index e1b2e20c57..4136971c3b 100644 --- a/crates/nu-protocol/src/value/primitive.rs +++ b/crates/nu-protocol/src/value/primitive.rs @@ -364,8 +364,8 @@ macro_rules! from_native_to_primitive { ($native_type:ty, $primitive_type:expr, $converter: expr) => { // e.g. from u32 -> Primitive impl From<$native_type> for Primitive { - fn from(int: $native_type) -> Primitive { - if let Some(i) = $converter(int) { + fn from(value: $native_type) -> Primitive { + if let Some(i) = $converter(value) { $primitive_type(i) } else { unreachable!("Internal error: protocol did not use compatible decimal")