Polars upgrade (#4665)

* polars upgrade

* Update describe.rs

Co-authored-by: JT <547158+jntrnr@users.noreply.github.com>
This commit is contained in:
Fernando Herrera
2022-02-27 16:10:29 +00:00
committed by GitHub
parent 10ceac998e
commit 4ebbe07d27
13 changed files with 236 additions and 128 deletions

View File

@ -85,12 +85,12 @@ umask = "1.0.0"
users = "0.11.0"
[dependencies.polars]
version = "0.18.0"
version = "0.19.1"
optional = true
features = [
"default", "parquet", "json", "serde", "object",
"checked_arithmetic", "strings", "cum_agg", "is_in",
"rolling_window", "strings", "pivot", "random"
"rolling_window", "strings", "rows", "random"
]
[features]

View File

@ -5,7 +5,10 @@ use nu_protocol::{
engine::{Command, EngineState, Stack},
Category, Example, PipelineData, ShellError, Signature, Span, Spanned, SyntaxShape, Value,
};
use polars::{frame::groupby::GroupBy, prelude::PolarsError};
use polars::{
frame::groupby::GroupBy,
prelude::{PolarsError, QuantileInterpolOptions},
};
use crate::dataframe::values::NuGroupBy;
@ -266,7 +269,9 @@ fn perform_groupby_aggregation(
Operation::First => groupby.first(),
Operation::Last => groupby.last(),
Operation::Nunique => groupby.n_unique(),
Operation::Quantile(quantile) => groupby.quantile(quantile),
Operation::Quantile(quantile) => {
groupby.quantile(quantile, QuantileInterpolOptions::default())
}
Operation::Median => groupby.median(),
Operation::Var => groupby.var(),
Operation::Std => groupby.std(),
@ -327,13 +332,15 @@ fn perform_dataframe_aggregation(
Operation::Sum => Ok(dataframe.sum()),
Operation::Min => Ok(dataframe.min()),
Operation::Max => Ok(dataframe.max()),
Operation::Quantile(quantile) => dataframe.quantile(quantile).map_err(|e| {
ShellError::SpannedLabeledError(
"Error calculating quantile".into(),
e.to_string(),
operation_span,
)
}),
Operation::Quantile(quantile) => dataframe
.quantile(quantile, QuantileInterpolOptions::default())
.map_err(|e| {
ShellError::SpannedLabeledError(
"Error calculating quantile".into(),
e.to_string(),
operation_span,
)
}),
Operation::Median => Ok(dataframe.median()),
Operation::Var => Ok(dataframe.var()),
Operation::Std => Ok(dataframe.std()),

View File

@ -1,14 +1,16 @@
use super::super::values::{Column, NuDataFrame};
use nu_engine::CallExt;
use nu_protocol::{
ast::Call,
engine::{Command, EngineState, Stack},
Category, Example, PipelineData, ShellError, Signature, Span, Value,
Category, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Value,
};
use polars::{
chunked_array::ChunkedArray,
prelude::{
AnyValue, DataFrame, DataType, Float64Type, IntoSeries, NewChunkedArray, Series, Utf8Type,
AnyValue, DataFrame, DataType, Float64Type, IntoSeries, NewChunkedArray,
QuantileInterpolOptions, Series, Utf8Type,
},
};
@ -25,7 +27,14 @@ impl Command for DescribeDF {
}
fn signature(&self) -> Signature {
Signature::build(self.name()).category(Category::Custom("dataframe".into()))
Signature::build(self.name())
.category(Category::Custom("dataframe".into()))
.named(
"quantiles",
SyntaxShape::Table,
"optional quantiles for describe",
Some('q'),
)
}
fn examples(&self) -> Vec<Example> {
@ -98,29 +107,62 @@ impl Command for DescribeDF {
}
fn command(
_engine_state: &EngineState,
_stack: &mut Stack,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let quantiles: Option<Vec<Value>> = call.get_flag(engine_state, stack, "quantiles")?;
let quantiles = quantiles.map(|values| {
values
.iter()
.map(|value| match value {
Value::Float { val, span } => {
if (&0.0..=&1.0).contains(&val) {
Ok(*val)
} else {
Err(ShellError::SpannedLabeledError(
"Incorrect value for quantile".to_string(),
"value should be between 0 and 1".to_string(),
*span,
))
}
}
_ => match value.span() {
Ok(span) => Err(ShellError::SpannedLabeledError(
"Incorrect value for quantile".to_string(),
"value should be a float".to_string(),
span,
)),
Err(e) => Err(e),
},
})
.collect::<Result<Vec<f64>, ShellError>>()
});
let quantiles = match quantiles {
Some(quantiles) => quantiles?,
None => vec![0.25, 0.50, 0.75],
};
let mut quantiles_labels = quantiles
.iter()
.map(|q| Some(format!("{}%", q * 100.0)))
.collect::<Vec<Option<String>>>();
let mut labels = vec![
Some("count".to_string()),
Some("sum".to_string()),
Some("mean".to_string()),
Some("median".to_string()),
Some("std".to_string()),
Some("min".to_string()),
];
labels.append(&mut quantiles_labels);
labels.push(Some("max".to_string()));
let df = NuDataFrame::try_from_pipeline(input, call.head)?;
let names = ChunkedArray::<Utf8Type>::new_from_opt_slice(
"descriptor",
&[
Some("count"),
Some("sum"),
Some("mean"),
Some("median"),
Some("std"),
Some("min"),
Some("25%"),
Some("50%"),
Some("75%"),
Some("max"),
],
)
.into_series();
let names = ChunkedArray::<Utf8Type>::new_from_opt_slice("descriptor", &labels).into_series();
let head = std::iter::once(names);
@ -165,32 +207,19 @@ fn command(
_ => None,
});
let q_25 = col
.quantile_as_series(0.25)
.ok()
.and_then(|ca| ca.cast(&DataType::Float64).ok())
.and_then(|ca| match ca.get(0) {
AnyValue::Float64(v) => Some(v),
_ => None,
});
let q_50 = col
.quantile_as_series(0.50)
.ok()
.and_then(|ca| ca.cast(&DataType::Float64).ok())
.and_then(|ca| match ca.get(0) {
AnyValue::Float64(v) => Some(v),
_ => None,
});
let q_75 = col
.quantile_as_series(0.75)
.ok()
.and_then(|ca| ca.cast(&DataType::Float64).ok())
.and_then(|ca| match ca.get(0) {
AnyValue::Float64(v) => Some(v),
_ => None,
});
let mut quantiles = quantiles
.clone()
.into_iter()
.map(|q| {
col.quantile_as_series(q, QuantileInterpolOptions::default())
.ok()
.and_then(|ca| ca.cast(&DataType::Float64).ok())
.and_then(|ca| match ca.get(0) {
AnyValue::Float64(v) => Some(v),
_ => None,
})
})
.collect::<Vec<Option<f64>>>();
let max = col
.max_as_series()
@ -201,23 +230,12 @@ fn command(
_ => None,
});
let mut descriptors = vec![Some(count), sum, mean, median, std, min];
descriptors.append(&mut quantiles);
descriptors.push(max);
let name = format!("{} ({})", col.name(), col.dtype());
ChunkedArray::<Float64Type>::new_from_opt_slice(
&name,
&[
Some(count),
sum,
mean,
median,
std,
min,
q_25,
q_50,
q_75,
max,
],
)
.into_series()
ChunkedArray::<Float64Type>::new_from_opt_slice(&name, &descriptors).into_series()
});
let res = head.chain(tail).collect::<Vec<Series>>();

View File

@ -64,7 +64,7 @@ fn command(
ShellError::SpannedLabeledError("Error creating groupby".into(), e.to_string(), col_span)
})?;
let groups = groupby.get_groups().to_vec();
let groups = groupby.get_groups();
let groupby = NuGroupBy::new(df.as_ref().clone(), col_string, groups);
Ok(PipelineData::Value(groupby.into_value(call.head), None))

View File

@ -5,7 +5,8 @@ use nu_protocol::{
engine::{Command, EngineState, Stack},
Category, Example, PipelineData, ShellError, Signature, Spanned, SyntaxShape,
};
use std::{fs::File, path::PathBuf};
use std::{fs::File, io::BufReader, path::PathBuf};
use polars::prelude::{CsvEncoding, CsvReader, JsonReader, ParquetReader, SerReader};
@ -138,12 +139,12 @@ fn from_json(
call: &Call,
) -> Result<polars::prelude::DataFrame, ShellError> {
let file: Spanned<PathBuf> = call.req(engine_state, stack, 0)?;
let r = File::open(&file.item).map_err(|e| {
let mut file = File::open(&file.item).map_err(|e| {
ShellError::SpannedLabeledError("Error opening file".into(), e.to_string(), file.span)
})?;
let reader = JsonReader::new(r);
let buf_reader = BufReader::new(&mut file);
let reader = JsonReader::new(buf_reader);
reader.finish().map_err(|e| {
ShellError::SpannedLabeledError("Json reader error".into(), format!("{:?}", e), call.head)

View File

@ -105,7 +105,7 @@ fn command(
let mut groupby = nu_groupby.to_groupby()?;
let pivot = groupby.pivot(&pivot_col.item, &value_col.item);
let pivot = groupby.pivot(vec![&pivot_col.item], vec![&value_col.item]);
match op {
Operation::Mean => pivot.mean(),

View File

@ -76,14 +76,14 @@ fn command(
let df = NuDataFrame::try_from_pipeline(input, call.head)?;
match (rows, fraction) {
(Some(rows), None) => df.as_ref().sample_n(rows.item, replace).map_err(|e| {
(Some(rows), None) => df.as_ref().sample_n(rows.item, replace, 0).map_err(|e| {
ShellError::SpannedLabeledError(
"Error creating sample".into(),
e.to_string(),
rows.span,
)
}),
(None, Some(frac)) => df.as_ref().sample_frac(frac.item, replace).map_err(|e| {
(None, Some(frac)) => df.as_ref().sample_frac(frac.item, replace, 0).map_err(|e| {
ShellError::SpannedLabeledError(
"Error creating sample".into(),
e.to_string(),

View File

@ -82,7 +82,7 @@ fn command(
})?;
let value = Value::Bool {
val: bool.all_false(),
val: !bool.any(),
span: call.head,
};

View File

@ -82,7 +82,7 @@ fn command(
})?;
let value = Value::Bool {
val: bool.all_true(),
val: bool.all(),
span: call.head,
};

View File

@ -7,7 +7,7 @@ use polars::chunked_array::object::builder::ObjectChunkedBuilder;
use polars::chunked_array::ChunkedArray;
use polars::prelude::{
DataFrame, DataType, DatetimeChunked, Int64Type, IntoSeries, NamedFrom, NewChunkedArray,
ObjectType, Series,
ObjectType, Series, TimeUnit,
};
use std::ops::{Deref, DerefMut};
@ -399,7 +399,7 @@ pub fn create_column(
Ok(Column::new(casted.name().into(), values))
}
DataType::Datetime => {
DataType::Datetime(_, _) => {
let casted = series.datetime().map_err(|e| {
ShellError::LabeledError("Error casting column to datetime".into(), e.to_string())
})?;
@ -596,7 +596,8 @@ pub fn from_parsed_columns(column_values: ColumnMap) -> Result<NuDataFrame, Shel
});
let res: DatetimeChunked =
ChunkedArray::<Int64Type>::new_from_opt_iter(&name, it).into();
ChunkedArray::<Int64Type>::new_from_opt_iter(&name, it)
.into_datetime(TimeUnit::Milliseconds, None);
df_series.push(res.into_series())
}

View File

@ -369,12 +369,12 @@ impl NuDataFrame {
.expect("already checked that dataframe is different than 0");
// if unable to sort, then unable to compare
let lhs = match self.as_ref().sort(*first_col, false) {
let lhs = match self.as_ref().sort(vec![*first_col], false) {
Ok(df) => df,
Err(_) => return None,
};
let rhs = match other.as_ref().sort(*first_col, false) {
let rhs = match other.as_ref().sort(vec![*first_col], false) {
Ok(df) => df,
Err(_) => return None,
};

View File

@ -1,23 +1,45 @@
mod custom_value;
use nu_protocol::{PipelineData, ShellError, Span, Value};
use polars::frame::groupby::{GroupBy, GroupTuples};
use polars::frame::groupby::{GroupBy, GroupsProxy};
use polars::prelude::DataFrame;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum NuGroupsProxy {
Idx(Vec<(u32, Vec<u32>)>),
Slice(Vec<[u32; 2]>),
}
impl NuGroupsProxy {
fn from_polars(groups: &GroupsProxy) -> Self {
match groups {
GroupsProxy::Idx(indexes) => NuGroupsProxy::Idx(indexes.clone()),
GroupsProxy::Slice(slice) => NuGroupsProxy::Slice(slice.clone()),
}
}
fn to_polars(&self) -> GroupsProxy {
match self {
Self::Idx(indexes) => GroupsProxy::Idx(indexes.clone()),
Self::Slice(slice) => GroupsProxy::Slice(slice.clone()),
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct NuGroupBy {
dataframe: DataFrame,
by: Vec<String>,
groups: GroupTuples,
groups: NuGroupsProxy,
}
impl NuGroupBy {
pub fn new(dataframe: DataFrame, by: Vec<String>, groups: GroupTuples) -> Self {
pub fn new(dataframe: DataFrame, by: Vec<String>, groups: &GroupsProxy) -> Self {
NuGroupBy {
dataframe,
by,
groups,
groups: NuGroupsProxy::from_polars(groups),
}
}
@ -60,7 +82,12 @@ impl NuGroupBy {
ShellError::LabeledError("Error creating groupby".into(), e.to_string())
})?;
Ok(GroupBy::new(&self.dataframe, by, self.groups.clone(), None))
Ok(GroupBy::new(
&self.dataframe,
by,
self.groups.to_polars(),
None,
))
}
pub fn print(&self, span: Span) -> Result<Vec<Value>, ShellError> {