Lazy dataframes (#5546)

* lazyframe definition

* expressions and lazy frames

* new alias expression

* more expression commands

* updated to polars main

* more expressions and groupby

* more expressions, fetch and sort-by

* csv reader

* removed open csv

* unique function

* joining functions

* join lazy frames commands with eager commands

* corrected tests

* Update .gitignore

* Update .gitignore

Co-authored-by: JT <547158+jntrnr@users.noreply.github.com>
This commit is contained in:
Fernando Herrera
2022-05-16 08:27:43 +01:00
committed by GitHub
parent 2062e33c37
commit 8bd68416e3
71 changed files with 3304 additions and 1364 deletions

View File

@@ -1,403 +0,0 @@
use nu_engine::CallExt;
use nu_protocol::{
ast::Call,
did_you_mean,
engine::{Command, EngineState, Stack},
Category, Example, PipelineData, ShellError, Signature, Span, Spanned, SyntaxShape, Value,
};
use polars::{
frame::groupby::GroupBy,
prelude::{PolarsError, QuantileInterpolOptions},
};
use crate::dataframe::values::NuGroupBy;
use super::super::values::{Column, NuDataFrame};
enum Operation {
Mean,
Sum,
Min,
Max,
First,
Last,
Nunique,
Quantile(f64),
Median,
Var,
Std,
Count,
}
impl Operation {
fn from_tagged(
name: &Spanned<String>,
quantile: Option<Spanned<f64>>,
) -> Result<Operation, ShellError> {
match name.item.as_ref() {
"mean" => Ok(Operation::Mean),
"sum" => Ok(Operation::Sum),
"min" => Ok(Operation::Min),
"max" => Ok(Operation::Max),
"first" => Ok(Operation::First),
"last" => Ok(Operation::Last),
"nunique" => Ok(Operation::Nunique),
"quantile" => match quantile {
None => Err(ShellError::GenericError(
"Quantile value not fount".into(),
"Quantile operation requires quantile value".into(),
Some(name.span),
None,
Vec::new(),
)),
Some(value) => {
if (value.item < 0.0) | (value.item > 1.0) {
Err(ShellError::GenericError(
"Inappropriate quantile".into(),
"Quantile value should be between 0.0 and 1.0".into(),
Some(value.span),
None,
Vec::new(),
))
} else {
Ok(Operation::Quantile(value.item))
}
}
},
"median" => Ok(Operation::Median),
"var" => Ok(Operation::Var),
"std" => Ok(Operation::Std),
"count" => Ok(Operation::Count),
selection => {
let possibilities = [
"mean".to_string(),
"sum".to_string(),
"min".to_string(),
"max".to_string(),
"first".to_string(),
"last".to_string(),
"nunique".to_string(),
"quantile".to_string(),
"median".to_string(),
"var".to_string(),
"std".to_string(),
"count".to_string(),
];
match did_you_mean(&possibilities, selection) {
Some(suggestion) => Err(ShellError::DidYouMean(suggestion, name.span)),
None => Err(ShellError::GenericError(
"Operation not fount".into(),
"Operation does not exist".into(),
Some(name.span),
Some("Perhaps you want: mean, sum, min, max, first, last, nunique, quantile, median, var, std, or count".into()),
Vec::new(),
))
}
}
}
}
fn to_str(&self) -> &'static str {
match self {
Self::Mean => "mean",
Self::Sum => "sum",
Self::Min => "min",
Self::Max => "max",
Self::First => "first",
Self::Last => "last",
Self::Nunique => "nunique",
Self::Quantile(_) => "quantile",
Self::Median => "median",
Self::Var => "var",
Self::Std => "std",
Self::Count => "count",
}
}
}
#[derive(Clone)]
pub struct Aggregate;
impl Command for Aggregate {
fn name(&self) -> &str {
"dfr aggregate"
}
fn usage(&self) -> &str {
"Performs an aggregation operation on a dataframe and groupby object"
}
fn signature(&self) -> Signature {
Signature::build(self.name())
.required(
"operation_name",
SyntaxShape::String,
"\n\tDataframes: mean, sum, min, max, quantile, median, var, std
\tGroupBy: mean, sum, min, max, first, last, nunique, quantile, median, var, std, count",
)
.named(
"quantile",
SyntaxShape::Number,
"quantile value for quantile operation",
Some('q'),
)
.switch(
"explicit",
"returns explicit names for groupby aggregations",
Some('e'),
)
.category(Category::Custom("dataframe".into()))
}
fn examples(&self) -> Vec<Example> {
vec![
Example {
description: "Aggregate sum by grouping by column a and summing on col b",
example:
"[[a b]; [one 1] [one 2]] | dfr to-df | dfr group-by a | dfr aggregate sum",
result: Some(
NuDataFrame::try_from_columns(vec![
Column::new("a".to_string(), vec![Value::test_string("one")]),
Column::new("b".to_string(), vec![Value::test_int(3)]),
])
.expect("simple df for test should not fail")
.into_value(Span::test_data()),
),
},
Example {
description: "Aggregate sum in dataframe columns",
example: "[[a b]; [4 1] [5 2]] | dfr to-df | dfr aggregate sum",
result: Some(
NuDataFrame::try_from_columns(vec![
Column::new("a".to_string(), vec![Value::test_int(9)]),
Column::new("b".to_string(), vec![Value::test_int(3)]),
])
.expect("simple df for test should not fail")
.into_value(Span::test_data()),
),
},
Example {
description: "Aggregate sum in series",
example: "[4 1 5 6] | dfr to-df | dfr aggregate sum",
result: Some(
NuDataFrame::try_from_columns(vec![Column::new(
"0".to_string(),
vec![Value::test_int(16)],
)])
.expect("simple df for test should not fail")
.into_value(Span::test_data()),
),
},
]
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
command(engine_state, stack, call, input)
}
}
fn command(
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let operation: Spanned<String> = call.req(engine_state, stack, 0)?;
let quantile: Option<Spanned<f64>> = call.get_flag(engine_state, stack, "quantile")?;
let op = Operation::from_tagged(&operation, quantile)?;
match input {
PipelineData::Value(Value::CustomValue { val, span }, _) => {
let df = val.as_any().downcast_ref::<NuDataFrame>();
let groupby = val.as_any().downcast_ref::<NuGroupBy>();
match (df, groupby) {
(Some(df), None) => {
let df = df.as_ref();
let res = perform_dataframe_aggregation(df, op, operation.span)?;
Ok(PipelineData::Value(
NuDataFrame::dataframe_into_value(res, span),
None,
))
}
(None, Some(nu_groupby)) => {
let groupby = nu_groupby.to_groupby()?;
let res = perform_groupby_aggregation(
groupby,
op,
operation.span,
call.head,
call.has_flag("explicit"),
)?;
Ok(PipelineData::Value(
NuDataFrame::dataframe_into_value(res, span),
None,
))
}
_ => Err(ShellError::GenericError(
"Incorrect datatype".into(),
"no groupby or dataframe found in input stream".into(),
Some(call.head),
None,
Vec::new(),
)),
}
}
_ => Err(ShellError::GenericError(
"Incorrect datatype".into(),
"no groupby or dataframe found in input stream".into(),
Some(call.head),
None,
Vec::new(),
)),
}
}
fn perform_groupby_aggregation(
groupby: GroupBy,
operation: Operation,
operation_span: Span,
agg_span: Span,
explicit: bool,
) -> Result<polars::prelude::DataFrame, ShellError> {
let mut res = match operation {
Operation::Mean => groupby.mean(),
Operation::Sum => groupby.sum(),
Operation::Min => groupby.min(),
Operation::Max => groupby.max(),
Operation::First => groupby.first(),
Operation::Last => groupby.last(),
Operation::Nunique => groupby.n_unique(),
Operation::Quantile(quantile) => {
groupby.quantile(quantile, QuantileInterpolOptions::default())
}
Operation::Median => groupby.median(),
Operation::Var => groupby.var(),
Operation::Std => groupby.std(),
Operation::Count => groupby.count(),
}
.map_err(|e| {
let span = match &e {
PolarsError::NotFound(_) => agg_span,
_ => operation_span,
};
ShellError::GenericError(
"Error calculating aggregation".into(),
e.to_string(),
Some(span),
None,
Vec::new(),
)
})?;
if !explicit {
let col_names = res
.get_column_names()
.iter()
.map(|name| name.to_string())
.collect::<Vec<String>>();
for col in col_names {
let from = match operation {
Operation::Mean => "_mean",
Operation::Sum => "_sum",
Operation::Min => "_min",
Operation::Max => "_max",
Operation::First => "_first",
Operation::Last => "_last",
Operation::Nunique => "_n_unique",
Operation::Quantile(_) => "_quantile",
Operation::Median => "_median",
Operation::Var => "_agg_var",
Operation::Std => "_agg_std",
Operation::Count => "_count",
};
let new_col = match col.find(from) {
Some(index) => &col[..index],
None => &col[..],
};
res.rename(&col, new_col)
.expect("Column is always there. Looping with known names");
}
}
Ok(res)
}
fn perform_dataframe_aggregation(
dataframe: &polars::prelude::DataFrame,
operation: Operation,
operation_span: Span,
) -> Result<polars::prelude::DataFrame, ShellError> {
match operation {
Operation::Mean => Ok(dataframe.mean()),
Operation::Sum => Ok(dataframe.sum()),
Operation::Min => Ok(dataframe.min()),
Operation::Max => Ok(dataframe.max()),
Operation::Quantile(quantile) => dataframe
.quantile(quantile, QuantileInterpolOptions::default())
.map_err(|e| {
ShellError::GenericError(
"Error calculating quantile".into(),
e.to_string(),
Some(operation_span),
None,
Vec::new(),
)
}),
Operation::Median => Ok(dataframe.median()),
Operation::Var => Ok(dataframe.var()),
Operation::Std => Ok(dataframe.std()),
operation => {
let possibilities = [
"mean".to_string(),
"sum".to_string(),
"min".to_string(),
"max".to_string(),
"quantile".to_string(),
"median".to_string(),
"var".to_string(),
"std".to_string(),
];
match did_you_mean(&possibilities, operation.to_str()) {
Some(suggestion) => Err(ShellError::DidYouMean(suggestion, operation_span)),
None => Err(ShellError::GenericError(
"Operation not fount".into(),
"Operation does not exist".into(),
Some(operation_span),
Some(
"Perhaps you want: mean, sum, min, max, quantile, median, var, or std"
.into(),
),
Vec::new(),
)),
}
}
}
}
#[cfg(test)]
mod test {
use super::super::super::test_dataframe::test_dataframe;
use super::super::CreateGroupBy;
use super::*;
#[test]
fn test_examples() {
test_dataframe(vec![Box::new(Aggregate {}), Box::new(CreateGroupBy {})])
}
}

View File

@@ -4,7 +4,7 @@ use nu_protocol::{
engine::{Command, EngineState, Stack},
Category, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Value,
};
use polars::prelude::DistinctKeepStrategy;
use polars::prelude::UniqueKeepStrategy;
use super::super::values::utils::convert_columns_string;
use super::super::values::{Column, NuDataFrame};
@@ -89,13 +89,13 @@ fn command(
let subset_slice = subset.as_ref().map(|cols| &cols[..]);
let keep_strategy = if call.has_flag("last") {
DistinctKeepStrategy::Last
UniqueKeepStrategy::Last
} else {
DistinctKeepStrategy::First
UniqueKeepStrategy::First
};
df.as_ref()
.distinct(subset_slice, keep_strategy)
.unique(subset_slice, keep_strategy)
.map_err(|e| {
ShellError::GenericError(
"Error dropping duplicates".into(),

View File

@@ -1,10 +1,10 @@
use super::super::values::{Column, NuDataFrame};
use nu_protocol::{
ast::Call,
engine::{Command, EngineState, Stack},
Category, Example, PipelineData, ShellError, Signature, Span, Value,
};
use super::super::values::{Column, NuDataFrame};
use polars::prelude::DataFrameOps;
#[derive(Clone)]
pub struct Dummies;

View File

@@ -4,6 +4,9 @@ use nu_protocol::{
engine::{Command, EngineState, Stack},
Category, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Value,
};
use polars::prelude::LazyFrame;
use crate::dataframe::values::{NuExpression, NuLazyFrame};
use super::super::values::{Column, NuDataFrame};
@@ -16,12 +19,16 @@ impl Command for FilterWith {
}
fn usage(&self) -> &str {
"Filters dataframe using a mask as reference"
"Filters dataframe using a mask or expression as reference"
}
fn signature(&self) -> Signature {
Signature::build(self.name())
.required("mask", SyntaxShape::Any, "boolean mask used to filter data")
.required(
"mask or expression",
SyntaxShape::Any,
"boolean mask used to filter data",
)
.category(Category::Custom("dataframe".into()))
}
@@ -48,15 +55,30 @@ impl Command for FilterWith {
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
command(engine_state, stack, call, input)
let value = input.into_value(call.head);
if NuLazyFrame::can_downcast(&value) {
let df = NuLazyFrame::try_from_value(value)?;
command_lazy(engine_state, stack, call, df)
} else if NuDataFrame::can_downcast(&value) {
let df = NuDataFrame::try_from_value(value)?;
command_eager(engine_state, stack, call, df)
} else {
Err(ShellError::CantConvert(
"expression or query".into(),
value.get_type().to_string(),
value.span()?,
None,
))
}
}
}
fn command(
fn command_eager(
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
df: NuDataFrame,
) -> Result<PipelineData, ShellError> {
let mask_value: Value = call.req(engine_state, stack, 0)?;
@@ -72,8 +94,6 @@ fn command(
)
})?;
let df = NuDataFrame::try_from_pipeline(input, call.head)?;
df.as_ref()
.filter(mask)
.map_err(|e| {
@@ -88,6 +108,23 @@ fn command(
.map(|df| PipelineData::Value(NuDataFrame::dataframe_into_value(df, call.head), None))
}
fn command_lazy(
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
lazy: NuLazyFrame,
) -> Result<PipelineData, ShellError> {
let expr: Value = call.req(engine_state, stack, 0)?;
let expr = NuExpression::try_from_value(expr)?;
let lazy = lazy.apply_with_expr(expr, LazyFrame::filter);
Ok(PipelineData::Value(
NuLazyFrame::into_value(lazy, call.head),
None,
))
}
#[cfg(test)]
mod test {
use super::super::super::test_dataframe::test_dataframe;

View File

@@ -1,3 +1,5 @@
use super::super::values::{utils::DEFAULT_ROWS, Column, NuDataFrame};
use crate::dataframe::values::NuExpression;
use nu_engine::CallExt;
use nu_protocol::{
ast::Call,
@@ -5,8 +7,6 @@ use nu_protocol::{
Category, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Value,
};
use super::super::values::{utils::DEFAULT_ROWS, Column, NuDataFrame};
#[derive(Clone)]
pub struct FirstDF;
@@ -16,7 +16,7 @@ impl Command for FirstDF {
}
fn usage(&self) -> &str {
"Creates new dataframe with first rows"
"Creates new dataframe with first rows or creates a first expression"
}
fn signature(&self) -> Signature {
@@ -26,18 +26,25 @@ impl Command for FirstDF {
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Create new dataframe with head rows",
example: "[[a b]; [1 2] [3 4]] | dfr to-df | dfr first 1",
result: Some(
NuDataFrame::try_from_columns(vec![
Column::new("a".to_string(), vec![Value::test_int(1)]),
Column::new("b".to_string(), vec![Value::test_int(2)]),
])
.expect("simple df for test should not fail")
.into_value(Span::test_data()),
),
}]
vec![
Example {
description: "Create new dataframe with head rows",
example: "[[a b]; [1 2] [3 4]] | dfr to-df | dfr first 1",
result: Some(
NuDataFrame::try_from_columns(vec![
Column::new("a".to_string(), vec![Value::test_int(1)]),
Column::new("b".to_string(), vec![Value::test_int(2)]),
])
.expect("simple df for test should not fail")
.into_value(Span::test_data()),
),
},
Example {
description: "Creates a first expression from a column",
example: "dfr col a | dfr first",
result: None,
},
]
}
fn run(
@@ -47,7 +54,27 @@ impl Command for FirstDF {
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
command(engine_state, stack, call, input)
let value = input.into_value(call.head);
if NuExpression::can_downcast(&value) {
let expr = NuExpression::try_from_value(value)?;
let expr: NuExpression = expr.into_polars().is_null().into();
Ok(PipelineData::Value(
NuExpression::into_value(expr, call.head),
None,
))
} else if NuDataFrame::can_downcast(&value) {
let df = NuDataFrame::try_from_value(value)?;
command(engine_state, stack, call, df)
} else {
Err(ShellError::CantConvert(
"expression or query".into(),
value.get_type().to_string(),
value.span()?,
None,
))
}
}
}
@@ -55,12 +82,11 @@ fn command(
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
df: NuDataFrame,
) -> Result<PipelineData, ShellError> {
let rows: Option<usize> = call.opt(engine_state, stack, 0)?;
let rows = rows.unwrap_or(DEFAULT_ROWS);
let df = NuDataFrame::try_from_pipeline(input, call.head)?;
let res = df.as_ref().head(Some(rows));
Ok(PipelineData::Value(
NuDataFrame::dataframe_into_value(res, call.head),

View File

@@ -1,77 +0,0 @@
use nu_engine::CallExt;
use nu_protocol::{
ast::Call,
engine::{Command, EngineState, Stack},
Category, Example, PipelineData, ShellError, Signature, SyntaxShape, Value,
};
use super::super::values::{utils::convert_columns_string, NuDataFrame, NuGroupBy};
#[derive(Clone)]
pub struct CreateGroupBy;
impl Command for CreateGroupBy {
fn name(&self) -> &str {
"dfr group-by"
}
fn usage(&self) -> &str {
"Creates a groupby object that can be used for other aggregations"
}
fn signature(&self) -> Signature {
Signature::build(self.name())
.rest("rest", SyntaxShape::Any, "groupby columns")
.category(Category::Custom("dataframe".into()))
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Grouping by column a",
example: "[[a b]; [one 1] [one 2]] | dfr to-df | dfr group-by a",
result: None,
}]
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
command(engine_state, stack, call, input)
}
}
fn command(
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
// Extracting the names of the columns to perform the groupby
let columns: Vec<Value> = call.rest(engine_state, stack, 0)?;
let (col_string, col_span) = convert_columns_string(columns, call.head)?;
let df = NuDataFrame::try_from_pipeline(input, call.head)?;
// This is the expensive part of the groupby; to create the
// groups that will be used for grouping the data in the
// dataframe. Once it has been done these values can be stored
// in a NuGroupBy
let groupby = df.as_ref().groupby(&col_string).map_err(|e| {
ShellError::GenericError(
"Error creating groupby".into(),
e.to_string(),
Some(col_span),
None,
Vec::new(),
)
})?;
let groups = groupby.get_groups();
let groupby = NuGroupBy::new(df.as_ref().clone(), col_string, groups);
Ok(PipelineData::Value(groupby.into_value(call.head), None))
}

View File

@@ -1,235 +0,0 @@
use nu_engine::CallExt;
use nu_protocol::{
ast::Call,
engine::{Command, EngineState, Stack},
Category, Example, PipelineData, ShellError, Signature, Span, Spanned, SyntaxShape, Value,
};
use polars::prelude::JoinType;
use crate::dataframe::values::utils::convert_columns_string;
use super::super::values::{Column, NuDataFrame};
#[derive(Clone)]
pub struct JoinDF;
impl Command for JoinDF {
fn name(&self) -> &str {
"dfr join"
}
fn usage(&self) -> &str {
"Joins a dataframe using columns as reference"
}
fn signature(&self) -> Signature {
Signature::build(self.name())
.required("dataframe", SyntaxShape::Any, "right dataframe to join")
.required_named(
"left",
SyntaxShape::Table,
"left column names to perform join",
Some('l'),
)
.required_named(
"right",
SyntaxShape::Table,
"right column names to perform join",
Some('r'),
)
.named(
"type",
SyntaxShape::String,
"type of join. Inner by default",
Some('t'),
)
.named(
"suffix",
SyntaxShape::String,
"suffix for the columns of the right dataframe",
Some('s'),
)
.category(Category::Custom("dataframe".into()))
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "inner join dataframe",
example: r#"let right = ([[a b c]; [1 2 5] [3 4 5] [5 6 6]] | dfr to-df);
$right | dfr join $right -l [a b] -r [a b]"#,
result: Some(
NuDataFrame::try_from_columns(vec![
Column::new(
"a".to_string(),
vec![Value::test_int(1), Value::test_int(3), Value::test_int(5)],
),
Column::new(
"b".to_string(),
vec![Value::test_int(2), Value::test_int(4), Value::test_int(6)],
),
Column::new(
"c".to_string(),
vec![Value::test_int(5), Value::test_int(5), Value::test_int(6)],
),
Column::new(
"c_right".to_string(),
vec![Value::test_int(5), Value::test_int(5), Value::test_int(6)],
),
])
.expect("simple df for test should not fail")
.into_value(Span::test_data()),
),
}]
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
command(engine_state, stack, call, input)
}
}
fn command(
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let r_df: Value = call.req(engine_state, stack, 0)?;
let l_col: Vec<Value> = call
.get_flag(engine_state, stack, "left")?
.expect("required value in syntax");
let r_col: Vec<Value> = call
.get_flag(engine_state, stack, "right")?
.expect("required value in syntax");
let suffix: Option<String> = call.get_flag(engine_state, stack, "suffix")?;
let join_type_op: Option<Spanned<String>> = call.get_flag(engine_state, stack, "type")?;
let join_type = match join_type_op {
None => JoinType::Inner,
Some(val) => match val.item.as_ref() {
"inner" => JoinType::Inner,
"outer" => JoinType::Outer,
"left" => JoinType::Left,
_ => {
return Err(ShellError::GenericError(
"Incorrect join type".into(),
"Invalid join type".into(),
Some(val.span),
Some("Options: inner, outer or left".into()),
Vec::new(),
))
}
},
};
let (l_col_string, l_col_span) = convert_columns_string(l_col, call.head)?;
let (r_col_string, r_col_span) = convert_columns_string(r_col, call.head)?;
let df = NuDataFrame::try_from_pipeline(input, call.head)?;
let r_df = NuDataFrame::try_from_value(r_df)?;
check_column_datatypes(
df.as_ref(),
r_df.as_ref(),
&l_col_string,
l_col_span,
&r_col_string,
r_col_span,
)?;
df.as_ref()
.join(
r_df.as_ref(),
&l_col_string,
&r_col_string,
join_type,
suffix,
)
.map_err(|e| {
ShellError::GenericError(
"Error joining dataframes".into(),
e.to_string(),
Some(l_col_span),
None,
Vec::new(),
)
})
.map(|df| PipelineData::Value(NuDataFrame::dataframe_into_value(df, call.head), None))
}
fn check_column_datatypes<T: AsRef<str>>(
df_l: &polars::prelude::DataFrame,
df_r: &polars::prelude::DataFrame,
l_cols: &[T],
l_col_span: Span,
r_cols: &[T],
r_col_span: Span,
) -> Result<(), ShellError> {
if l_cols.len() != r_cols.len() {
return Err(ShellError::GenericError(
"Mismatched number of column names".into(),
format!(
"found {} left names vs {} right names",
l_cols.len(),
r_cols.len()
),
Some(l_col_span),
Some("perhaps you need to change the number of columns to join".into()),
Vec::new(),
));
}
for (l, r) in l_cols.iter().zip(r_cols) {
let l_series = df_l.column(l.as_ref()).map_err(|e| {
ShellError::GenericError(
"Error selecting the columns".into(),
e.to_string(),
Some(l_col_span),
None,
Vec::new(),
)
})?;
let r_series = df_r.column(r.as_ref()).map_err(|e| {
ShellError::GenericError(
"Error selecting the columns".into(),
e.to_string(),
Some(r_col_span),
None,
Vec::new(),
)
})?;
if l_series.dtype() != r_series.dtype() {
return Err(ShellError::GenericError(
"Mismatched datatypes".into(),
format!(
"left column type '{}' doesn't match '{}' right column match",
l_series.dtype(),
r_series.dtype()
),
Some(l_col_span),
Some("perhaps you need to select other column to match".into()),
Vec::new(),
));
}
}
Ok(())
}
#[cfg(test)]
mod test {
use super::super::super::test_dataframe::test_dataframe;
use super::*;
#[test]
fn test_examples() {
test_dataframe(vec![Box::new(JoinDF {})])
}
}

View File

@@ -1,3 +1,5 @@
use super::super::values::{utils::DEFAULT_ROWS, Column, NuDataFrame};
use crate::dataframe::values::NuExpression;
use nu_engine::CallExt;
use nu_protocol::{
ast::Call,
@@ -5,8 +7,6 @@ use nu_protocol::{
Category, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Value,
};
use super::super::values::{utils::DEFAULT_ROWS, Column, NuDataFrame};
#[derive(Clone)]
pub struct LastDF;
@@ -16,7 +16,7 @@ impl Command for LastDF {
}
fn usage(&self) -> &str {
"Creates new dataframe with tail rows"
"Creates new dataframe with tail rows or creates a last expression"
}
fn signature(&self) -> Signature {
@@ -26,18 +26,25 @@ impl Command for LastDF {
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Create new dataframe with last rows",
example: "[[a b]; [1 2] [3 4]] | dfr to-df | dfr last 1",
result: Some(
NuDataFrame::try_from_columns(vec![
Column::new("a".to_string(), vec![Value::test_int(3)]),
Column::new("b".to_string(), vec![Value::test_int(4)]),
])
.expect("simple df for test should not fail")
.into_value(Span::test_data()),
),
}]
vec![
Example {
description: "Create new dataframe with last rows",
example: "[[a b]; [1 2] [3 4]] | dfr to-df | dfr last 1",
result: Some(
NuDataFrame::try_from_columns(vec![
Column::new("a".to_string(), vec![Value::test_int(3)]),
Column::new("b".to_string(), vec![Value::test_int(4)]),
])
.expect("simple df for test should not fail")
.into_value(Span::test_data()),
),
},
Example {
description: "Creates a last expression from a column",
example: "dfr col a | dfr last",
result: None,
},
]
}
fn run(
@@ -47,7 +54,27 @@ impl Command for LastDF {
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
command(engine_state, stack, call, input)
let value = input.into_value(call.head);
if NuExpression::can_downcast(&value) {
let expr = NuExpression::try_from_value(value)?;
let expr: NuExpression = expr.into_polars().is_null().into();
Ok(PipelineData::Value(
NuExpression::into_value(expr, call.head),
None,
))
} else if NuDataFrame::can_downcast(&value) {
let df = NuDataFrame::try_from_value(value)?;
command(engine_state, stack, call, df)
} else {
Err(ShellError::CantConvert(
"expression or query".into(),
value.get_type().to_string(),
value.span()?,
None,
))
}
}
}
@@ -55,12 +82,11 @@ fn command(
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
df: NuDataFrame,
) -> Result<PipelineData, ShellError> {
let rows: Option<usize> = call.opt(engine_state, stack, 0)?;
let rows = rows.unwrap_or(DEFAULT_ROWS);
let df = NuDataFrame::try_from_pipeline(input, call.head)?;
let res = df.as_ref().tail(Some(rows));
Ok(PipelineData::Value(
NuDataFrame::dataframe_into_value(res, call.head),

View File

@@ -11,7 +11,7 @@ pub struct ListDF;
impl Command for ListDF {
fn name(&self) -> &str {
"dfr list"
"dfr ls"
}
fn usage(&self) -> &str {
@@ -26,7 +26,7 @@ impl Command for ListDF {
vec![Example {
description: "Creates a new dataframe and shows it in the dataframe list",
example: r#"let test = ([[a b];[1 2] [3 4]] | dfr to-df);
dfr list"#,
dfr ls"#,
result: None,
}]
}

View File

@@ -1,4 +1,3 @@
mod aggregate;
mod append;
mod column;
mod command;
@@ -11,13 +10,10 @@ mod dummies;
mod filter_with;
mod first;
mod get;
mod groupby;
mod join;
mod last;
mod list;
mod melt;
mod open;
mod pivot;
mod rename;
mod sample;
mod shape;
@@ -32,7 +28,6 @@ mod with_column;
use nu_protocol::engine::StateWorkingSet;
pub use aggregate::Aggregate;
pub use append::AppendDF;
pub use column::ColumnDF;
pub use command::Dataframe;
@@ -45,13 +40,10 @@ pub use dummies::Dummies;
pub use filter_with::FilterWith;
pub use first::FirstDF;
pub use get::GetDF;
pub use groupby::CreateGroupBy;
pub use join::JoinDF;
pub use last::LastDF;
pub use list::ListDF;
pub use melt::MeltDF;
pub use open::OpenDataFrame;
pub use pivot::PivotDF;
pub use rename::RenameDF;
pub use sample::SampleDF;
pub use shape::ShapeDF;
@@ -76,10 +68,8 @@ pub fn add_eager_decls(working_set: &mut StateWorkingSet) {
// Dataframe commands
bind_command!(
Aggregate,
AppendDF,
ColumnDF,
CreateGroupBy,
Dataframe,
DataTypes,
DescribeDF,
@@ -90,12 +80,10 @@ pub fn add_eager_decls(working_set: &mut StateWorkingSet) {
FilterWith,
FirstDF,
GetDF,
JoinDF,
LastDF,
ListDF,
MeltDF,
OpenDataFrame,
PivotDF,
RenameDF,
SampleDF,
ShapeDF,

View File

@@ -1,198 +0,0 @@
use nu_engine::CallExt;
use nu_protocol::{
ast::Call,
engine::{Command, EngineState, Stack},
Category, Example, PipelineData, ShellError, Signature, Spanned, SyntaxShape,
};
use polars::prelude::DataType;
use crate::dataframe::values::NuGroupBy;
use super::super::values::NuDataFrame;
enum Operation {
First,
Sum,
Min,
Max,
Mean,
Median,
}
impl Operation {
fn from_tagged(name: Spanned<String>) -> Result<Operation, ShellError> {
match name.item.as_ref() {
"first" => Ok(Operation::First),
"sum" => Ok(Operation::Sum),
"min" => Ok(Operation::Min),
"max" => Ok(Operation::Max),
"mean" => Ok(Operation::Mean),
"median" => Ok(Operation::Median),
_ => Err(ShellError::GenericError(
"Operation not fount".into(),
"Operation does not exist for pivot".into(),
Some(name.span),
Some("Options: first, sum, min, max, mean, median".into()),
Vec::new(),
)),
}
}
}
#[derive(Clone)]
pub struct PivotDF;
impl Command for PivotDF {
fn name(&self) -> &str {
"dfr pivot"
}
fn usage(&self) -> &str {
"Performs a pivot operation on a groupby object"
}
fn signature(&self) -> Signature {
Signature::build(self.name())
.required(
"pivot_column",
SyntaxShape::String,
"pivot column to perform pivot",
)
.required(
"value_column",
SyntaxShape::String,
"value column to perform pivot",
)
.required("operation", SyntaxShape::String, "aggregate operation")
.category(Category::Custom("dataframe".into()))
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Pivot a dataframe on b and aggregation on col c",
example:
"[[a b c]; [one x 1] [two y 2]] | dfr to-df | dfr group-by a | dfr pivot b c sum",
result: None,
}]
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
command(engine_state, stack, call, input)
}
}
fn command(
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let pivot_col: Spanned<String> = call.req(engine_state, stack, 0)?;
let value_col: Spanned<String> = call.req(engine_state, stack, 1)?;
let operation: Spanned<String> = call.req(engine_state, stack, 2)?;
let op = Operation::from_tagged(operation)?;
let nu_groupby = NuGroupBy::try_from_pipeline(input, call.head)?;
let df_ref = nu_groupby.as_ref();
check_pivot_column(df_ref, &pivot_col)?;
check_value_column(df_ref, &value_col)?;
let mut groupby = nu_groupby.to_groupby()?;
let pivot = groupby.pivot(vec![&pivot_col.item], vec![&value_col.item]);
match op {
Operation::Mean => pivot.mean(),
Operation::Sum => pivot.sum(),
Operation::Min => pivot.min(),
Operation::Max => pivot.max(),
Operation::First => pivot.first(),
Operation::Median => pivot.median(),
}
.map_err(|e| {
ShellError::GenericError(
"Error creating pivot".into(),
e.to_string(),
Some(call.head),
None,
Vec::new(),
)
})
.map(|df| PipelineData::Value(NuDataFrame::dataframe_into_value(df, call.head), None))
}
fn check_pivot_column(
df: &polars::prelude::DataFrame,
col: &Spanned<String>,
) -> Result<(), ShellError> {
let series = df.column(&col.item).map_err(|e| {
ShellError::GenericError(
"Column not found".into(),
e.to_string(),
Some(col.span),
None,
Vec::new(),
)
})?;
match series.dtype() {
DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Utf8 => Ok(()),
_ => Err(ShellError::GenericError(
"Pivot error".into(),
format!("Unsupported datatype {}", series.dtype()),
Some(col.span),
None,
Vec::new(),
)),
}
}
fn check_value_column(
df: &polars::prelude::DataFrame,
col: &Spanned<String>,
) -> Result<(), ShellError> {
let series = df.column(&col.item).map_err(|e| {
ShellError::GenericError(
"Column not found".into(),
e.to_string(),
Some(col.span),
None,
Vec::new(),
)
})?;
match series.dtype() {
DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Float32
| DataType::Float64 => Ok(()),
_ => Err(ShellError::GenericError(
"Pivot error".into(),
format!("Unsupported datatype {}", series.dtype()),
Some(col.span),
None,
Vec::new(),
)),
}
}

View File

@@ -5,6 +5,8 @@ use nu_protocol::{
Category, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Value,
};
use crate::dataframe::{utils::extract_strings, values::NuLazyFrame};
use super::super::values::{Column, NuDataFrame};
#[derive(Clone)]
@@ -21,8 +23,16 @@ impl Command for RenameDF {
fn signature(&self) -> Signature {
Signature::build(self.name())
.required("from", SyntaxShape::String, "column name to be renamed")
.required("to", SyntaxShape::String, "new column name")
.required(
"columns",
SyntaxShape::Any,
"Column(s) to be renamed. A string or list of strings",
)
.required(
"new names",
SyntaxShape::Any,
"New names for the selected column(s). A string or list of strings",
)
.category(Category::Custom("dataframe".into()))
}
@@ -54,24 +64,39 @@ impl Command for RenameDF {
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
command(engine_state, stack, call, input)
let value = input.into_value(call.head);
if NuLazyFrame::can_downcast(&value) {
let df = NuLazyFrame::try_from_value(value)?;
command_lazy(engine_state, stack, call, df)
} else if NuDataFrame::can_downcast(&value) {
let df = NuDataFrame::try_from_value(value)?;
command_eager(engine_state, stack, call, df)
} else {
Err(ShellError::CantConvert(
"expression or query".into(),
value.get_type().to_string(),
value.span()?,
None,
))
}
}
}
fn command(
fn command_eager(
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
mut df: NuDataFrame,
) -> Result<PipelineData, ShellError> {
let from: String = call.req(engine_state, stack, 0)?;
let to: String = call.req(engine_state, stack, 1)?;
let columns: Value = call.req(engine_state, stack, 0)?;
let columns = extract_strings(columns)?;
let mut df = NuDataFrame::try_from_pipeline(input, call.head)?;
let new_names: Value = call.req(engine_state, stack, 1)?;
let new_names = extract_strings(new_names)?;
df.as_mut()
.rename(&from, &to)
.map_err(|e| {
for (from, to) in columns.iter().zip(new_names.iter()) {
df.as_mut().rename(from, to).map_err(|e| {
ShellError::GenericError(
"Error renaming".into(),
e.to_string(),
@@ -79,13 +104,36 @@ fn command(
None,
Vec::new(),
)
})
.map(|df| {
PipelineData::Value(
NuDataFrame::dataframe_into_value(df.clone(), call.head),
None,
)
})
})?;
}
Ok(PipelineData::Value(df.into_value(call.head), None))
}
fn command_lazy(
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
lazy: NuLazyFrame,
) -> Result<PipelineData, ShellError> {
let columns: Value = call.req(engine_state, stack, 0)?;
let columns = extract_strings(columns)?;
let new_names: Value = call.req(engine_state, stack, 1)?;
let new_names = extract_strings(new_names)?;
if columns.len() != new_names.len() {
let value: Value = call.req(engine_state, stack, 1)?;
return Err(ShellError::IncompatibleParametersSingle(
"New name list has different size to column list".into(),
value.span()?,
));
}
let lazy = lazy.into_polars();
let lazy: NuLazyFrame = lazy.rename(&columns, &new_names).into();
Ok(PipelineData::Value(lazy.into_value(call.head), None))
}
#[cfg(test)]

View File

@@ -33,6 +33,12 @@ impl Command for SampleDF {
"fraction of dataframe to be taken",
Some('f'),
)
.named(
"seed",
SyntaxShape::Number,
"seed for the selection",
Some('s'),
)
.switch("replace", "sample with replace", Some('e'))
.category(Category::Custom("dataframe".into()))
}
@@ -71,12 +77,15 @@ fn command(
) -> Result<PipelineData, ShellError> {
let rows: Option<Spanned<usize>> = call.get_flag(engine_state, stack, "n-rows")?;
let fraction: Option<Spanned<f64>> = call.get_flag(engine_state, stack, "fraction")?;
let seed: Option<u64> = call
.get_flag::<i64>(engine_state, stack, "seed")?
.map(|val| val as u64);
let replace: bool = call.has_flag("replace");
let df = NuDataFrame::try_from_pipeline(input, call.head)?;
match (rows, fraction) {
(Some(rows), None) => df.as_ref().sample_n(rows.item, replace, 0).map_err(|e| {
(Some(rows), None) => df.as_ref().sample_n(rows.item, replace, seed).map_err(|e| {
ShellError::GenericError(
"Error creating sample".into(),
e.to_string(),
@@ -85,15 +94,18 @@ fn command(
Vec::new(),
)
}),
(None, Some(frac)) => df.as_ref().sample_frac(frac.item, replace, 0).map_err(|e| {
ShellError::GenericError(
"Error creating sample".into(),
e.to_string(),
Some(frac.span),
None,
Vec::new(),
)
}),
(None, Some(frac)) => df
.as_ref()
.sample_frac(frac.item, replace, seed)
.map_err(|e| {
ShellError::GenericError(
"Error creating sample".into(),
e.to_string(),
Some(frac.span),
None,
Vec::new(),
)
}),
(Some(_), Some(_)) => Err(ShellError::GenericError(
"Incompatible flags".into(),
"Only one selection criterion allowed".into(),

View File

@@ -1,12 +1,12 @@
use super::super::values::{Column, NuDataFrame};
use crate::dataframe::values::{NuExpression, NuLazyFrame};
use nu_engine::CallExt;
use nu_protocol::{
ast::Call,
engine::{Command, EngineState, Stack},
Category, Example, PipelineData, ShellError, Signature, Span, Spanned, SyntaxShape, Value,
Category, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Value,
};
use super::super::values::{Column, NuDataFrame};
#[derive(Clone)]
pub struct WithColumn;
@@ -21,35 +21,51 @@ impl Command for WithColumn {
fn signature(&self) -> Signature {
Signature::build(self.name())
.required("series", SyntaxShape::Any, "series to be added")
.required_named("name", SyntaxShape::String, "column name", Some('n'))
.named("name", SyntaxShape::String, "new column name", Some('n'))
.rest(
"series or expressions",
SyntaxShape::Any,
"series to be added or expressions used to define the new columns",
)
.category(Category::Custom("dataframe".into()))
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Adds a series to the dataframe",
example:
"[[a b]; [1 2] [3 4]] | dfr to-df | dfr with-column ([5 6] | dfr to-df) --name c",
result: Some(
NuDataFrame::try_from_columns(vec![
Column::new(
"a".to_string(),
vec![Value::test_int(1), Value::test_int(3)],
),
Column::new(
"b".to_string(),
vec![Value::test_int(2), Value::test_int(4)],
),
Column::new(
"c".to_string(),
vec![Value::test_int(5), Value::test_int(6)],
),
])
.expect("simple df for test should not fail")
.into_value(Span::test_data()),
),
}]
vec![
Example {
description: "Adds a series to the dataframe",
example: r#"[[a b]; [1 2] [3 4]]
| dfr to-df
| dfr with-column ([5 6] | dfr to-df) --name c"#,
result: Some(
NuDataFrame::try_from_columns(vec![
Column::new(
"a".to_string(),
vec![Value::test_int(1), Value::test_int(3)],
),
Column::new(
"b".to_string(),
vec![Value::test_int(2), Value::test_int(4)],
),
Column::new(
"c".to_string(),
vec![Value::test_int(5), Value::test_int(6)],
),
])
.expect("simple df for test should not fail")
.into_value(Span::test_data()),
),
},
Example {
description: "Adds a series to the dataframe",
example: r#"[[a b]; [1 2] [3 4]]
| dfr to-df
| dfr to-lazy
| dfr with-column ((dfr col a) * 2 | dfr as "c")
| dfr collect"#,
result: None,
},
]
}
fn run(
@@ -59,26 +75,41 @@ impl Command for WithColumn {
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
command(engine_state, stack, call, input)
let value = input.into_value(call.head);
if NuLazyFrame::can_downcast(&value) {
let df = NuLazyFrame::try_from_value(value)?;
command_lazy(engine_state, stack, call, df)
} else if NuDataFrame::can_downcast(&value) {
let df = NuDataFrame::try_from_value(value)?;
command_eager(engine_state, stack, call, df)
} else {
Err(ShellError::CantConvert(
"expression or query".into(),
value.get_type().to_string(),
value.span()?,
None,
))
}
}
}
fn command(
fn command_eager(
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
mut df: NuDataFrame,
) -> Result<PipelineData, ShellError> {
let name: Spanned<String> = call
.get_flag(engine_state, stack, "name")?
.expect("required named value");
let other_value: Value = call.req(engine_state, stack, 0)?;
let other_span = other_value.span()?;
let mut other = NuDataFrame::try_from_value(other_value)?.as_series(other_span)?;
let series = other.rename(&name.item).clone();
let mut df = NuDataFrame::try_from_pipeline(input, call.head)?;
let name = match call.get_flag::<String>(engine_state, stack, "name")? {
Some(name) => name,
None => other.name().to_string(),
};
let series = other.rename(&name).clone();
df.as_mut()
.with_column(series)
@@ -99,6 +130,27 @@ fn command(
})
}
fn command_lazy(
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
lazy: NuLazyFrame,
) -> Result<PipelineData, ShellError> {
let vals: Vec<Value> = call.rest(engine_state, stack, 0)?;
let value = Value::List {
vals,
span: call.head,
};
let expressions = NuExpression::extract_exprs(value)?;
let lazy: NuLazyFrame = lazy.into_polars().with_columns(&expressions).into();
Ok(PipelineData::Value(
NuLazyFrame::into_value(lazy, call.head),
None,
))
}
#[cfg(test)]
mod test {
use super::super::super::test_dataframe::test_dataframe;