use crate::dataframe::values::{Column, NuDataFrame, NuExpression, NuLazyFrame, NuLazyGroupBy}; use nu_engine::CallExt; use nu_protocol::{ ast::Call, engine::{Command, EngineState, Stack}, Category, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Type, Value, }; use polars::{datatypes::DataType, prelude::Expr}; #[derive(Clone)] pub struct LazyAggregate; impl Command for LazyAggregate { fn name(&self) -> &str { "dfr agg" } fn usage(&self) -> &str { "Performs a series of aggregations from a group-by." } fn signature(&self) -> Signature { Signature::build(self.name()) .rest( "Group-by expressions", SyntaxShape::Any, "Expression(s) that define the aggregations to be applied", ) .input_output_type( Type::Custom("dataframe".into()), Type::Custom("dataframe".into()), ) .category(Category::Custom("lazyframe".into())) } fn examples(&self) -> Vec { vec![ Example { description: "Group by and perform an aggregation", example: r#"[[a b]; [1 2] [1 4] [2 6] [2 4]] | dfr into-df | dfr group-by a | dfr agg [ (dfr col b | dfr min | dfr as "b_min") (dfr col b | dfr max | dfr as "b_max") (dfr col b | dfr sum | dfr as "b_sum") ]"#, result: Some( NuDataFrame::try_from_columns(vec![ Column::new( "a".to_string(), vec![Value::test_int(1), Value::test_int(2)], ), Column::new( "b_min".to_string(), vec![Value::test_int(2), Value::test_int(4)], ), Column::new( "b_max".to_string(), vec![Value::test_int(4), Value::test_int(6)], ), Column::new( "b_sum".to_string(), vec![Value::test_int(6), Value::test_int(10)], ), ]) .expect("simple df for test should not fail") .into_value(Span::test_data()), ), }, Example { description: "Group by and perform an aggregation", example: r#"[[a b]; [1 2] [1 4] [2 6] [2 4]] | dfr into-lazy | dfr group-by a | dfr agg [ (dfr col b | dfr min | dfr as "b_min") (dfr col b | dfr max | dfr as "b_max") (dfr col b | dfr sum | dfr as "b_sum") ] | dfr collect"#, result: Some( NuDataFrame::try_from_columns(vec![ Column::new( "a".to_string(), vec![Value::test_int(1), Value::test_int(2)], ), Column::new( "b_min".to_string(), vec![Value::test_int(2), Value::test_int(4)], ), Column::new( "b_max".to_string(), vec![Value::test_int(4), Value::test_int(6)], ), Column::new( "b_sum".to_string(), vec![Value::test_int(6), Value::test_int(10)], ), ]) .expect("simple df for test should not fail") .into_value(Span::test_data()), ), }, ] } fn run( &self, engine_state: &EngineState, stack: &mut Stack, call: &Call, input: PipelineData, ) -> Result { let vals: Vec = call.rest(engine_state, stack, 0)?; let value = Value::list(vals, call.head); let expressions = NuExpression::extract_exprs(value)?; let group_by = NuLazyGroupBy::try_from_pipeline(input, call.head)?; if let Some(schema) = &group_by.schema { for expr in expressions.iter() { if let Some(name) = get_col_name(expr) { let dtype = schema.get(name.as_str()); if matches!(dtype, Some(DataType::Object(..))) { return Err(ShellError::GenericError { error: "Object type column not supported for aggregation".into(), msg: format!("Column '{name}' is type Object"), span: Some(call.head), help: Some("Aggregations cannot be performed on Object type columns. Use dtype command to check column types".into()), inner: vec![], }); } } } } let lazy = NuLazyFrame { from_eager: group_by.from_eager, lazy: Some(group_by.into_polars().agg(&expressions)), schema: None, }; let res = lazy.into_value(call.head)?; Ok(PipelineData::Value(res, None)) } } fn get_col_name(expr: &Expr) -> Option { match expr { Expr::Column(column) => Some(column.to_string()), Expr::Agg(agg) => match agg { polars::prelude::AggExpr::Min { input: e, .. } | polars::prelude::AggExpr::Max { input: e, .. } | polars::prelude::AggExpr::Median(e) | polars::prelude::AggExpr::NUnique(e) | polars::prelude::AggExpr::First(e) | polars::prelude::AggExpr::Last(e) | polars::prelude::AggExpr::Mean(e) | polars::prelude::AggExpr::Implode(e) | polars::prelude::AggExpr::Count(e, _) | polars::prelude::AggExpr::Sum(e) | polars::prelude::AggExpr::AggGroups(e) | polars::prelude::AggExpr::Std(e, _) | polars::prelude::AggExpr::Var(e, _) => get_col_name(e.as_ref()), polars::prelude::AggExpr::Quantile { expr, .. } => get_col_name(expr.as_ref()), }, Expr::Filter { input: expr, .. } | Expr::Slice { input: expr, .. } | Expr::Cast { expr, .. } | Expr::Sort { expr, .. } | Expr::Gather { expr, .. } | Expr::SortBy { expr, .. } | Expr::Exclude(expr, _) | Expr::Alias(expr, _) | Expr::KeepName(expr) | Expr::Explode(expr) => get_col_name(expr.as_ref()), Expr::Ternary { .. } | Expr::AnonymousFunction { .. } | Expr::Function { .. } | Expr::Columns(_) | Expr::DtypeColumn(_) | Expr::Literal(_) | Expr::BinaryExpr { .. } | Expr::Window { .. } | Expr::Wildcard | Expr::RenameAlias { .. } | Expr::Count | Expr::Nth(_) | Expr::SubPlan(_, _) | Expr::Selector(_) => None, } } #[cfg(test)] mod test { use super::super::super::test_dataframe::test_dataframe; use super::*; use crate::dataframe::expressions::{ExprAlias, ExprMax, ExprMin, ExprSum}; use crate::dataframe::lazy::groupby::ToLazyGroupBy; #[test] fn test_examples() { test_dataframe(vec![ Box::new(LazyAggregate {}), Box::new(ToLazyGroupBy {}), Box::new(ExprAlias {}), Box::new(ExprMin {}), Box::new(ExprMax {}), Box::new(ExprSum {}), ]) } }