Polars update (#4875)

* update to polars 0.20

* add to date parser for series
This commit is contained in:
Fernando Herrera
2022-03-19 11:13:34 +00:00
committed by GitHub
parent 3db608eb5c
commit d6669d3f33
17 changed files with 255 additions and 161 deletions

View File

@ -162,7 +162,7 @@ fn command(
let df = NuDataFrame::try_from_pipeline(input, call.head)?;
let names = ChunkedArray::<Utf8Type>::new_from_opt_slice("descriptor", &labels).into_series();
let names = ChunkedArray::<Utf8Type>::from_slice_options("descriptor", &labels).into_series();
let head = std::iter::once(names);
@ -235,7 +235,7 @@ fn command(
descriptors.push(max);
let name = format!("{} ({})", col.name(), col.dtype());
ChunkedArray::<Float64Type>::new_from_opt_slice(&name, &descriptors).into_series()
ChunkedArray::<Float64Type>::from_slice_options(&name, &descriptors).into_series()
});
let res = head.chain(tail).collect::<Vec<Series>>();

View File

@ -4,6 +4,7 @@ use nu_protocol::{
engine::{Command, EngineState, Stack},
Category, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Value,
};
use polars::prelude::DistinctKeepStrategy;
use super::super::values::utils::convert_columns_string;
use super::super::values::{Column, NuDataFrame};
@ -28,6 +29,11 @@ impl Command for DropDuplicates {
"subset of columns to drop duplicates",
)
.switch("maintain", "maintain order", Some('m'))
.switch(
"last",
"keeps last duplicate value (by default keeps first)",
Some('l'),
)
.category(Category::Custom("dataframe".into()))
}
@ -82,8 +88,14 @@ fn command(
let subset_slice = subset.as_ref().map(|cols| &cols[..]);
let keep_strategy = if call.has_flag("last") {
DistinctKeepStrategy::Last
} else {
DistinctKeepStrategy::First
};
df.as_ref()
.drop_duplicates(call.has_flag("maintain"), subset_slice)
.distinct(subset_slice, keep_strategy)
.map_err(|e| {
ShellError::SpannedLabeledError(
"Error dropping duplicates".into(),

View File

@ -71,7 +71,7 @@ fn command(
let delimiter: Option<Spanned<String>> = call.get_flag(engine_state, stack, "delimiter")?;
let no_header: bool = call.has_flag("no-header");
let df = NuDataFrame::try_from_pipeline(input, call.head)?;
let mut df = NuDataFrame::try_from_pipeline(input, call.head)?;
let mut file = File::create(&file_name.item).map_err(|e| {
ShellError::SpannedLabeledError(
@ -109,7 +109,7 @@ fn command(
}
};
writer.finish(df.as_ref()).map_err(|e| {
writer.finish(df.as_mut()).map_err(|e| {
ShellError::SpannedLabeledError(
"Error writing to file".into(),
e.to_string(),

View File

@ -55,7 +55,7 @@ fn command(
) -> Result<PipelineData, ShellError> {
let file_name: Spanned<PathBuf> = call.req(engine_state, stack, 0)?;
let df = NuDataFrame::try_from_pipeline(input, call.head)?;
let mut df = NuDataFrame::try_from_pipeline(input, call.head)?;
let file = File::create(&file_name.item).map_err(|e| {
ShellError::SpannedLabeledError(
@ -65,7 +65,7 @@ fn command(
)
})?;
ParquetWriter::new(file).finish(df.as_ref()).map_err(|e| {
ParquetWriter::new(file).finish(df.as_mut()).map_err(|e| {
ShellError::SpannedLabeledError("Error saving file".into(), e.to_string(), file_name.span)
})?;

View File

@ -60,8 +60,8 @@ fn command(
let res = series.arg_max();
let chunked = match res {
Some(index) => UInt32Chunked::new_from_slice("arg_max", &[index as u32]),
None => UInt32Chunked::new_from_slice("arg_max", &[]),
Some(index) => UInt32Chunked::from_slice("arg_max", &[index as u32]),
None => UInt32Chunked::from_slice("arg_max", &[]),
};
let res = chunked.into_series();

View File

@ -60,8 +60,8 @@ fn command(
let res = series.arg_min();
let chunked = match res {
Some(index) => UInt32Chunked::new_from_slice("arg_min", &[index as u32]),
None => UInt32Chunked::new_from_slice("arg_min", &[]),
Some(index) => UInt32Chunked::from_slice("arg_min", &[index as u32]),
None => UInt32Chunked::from_slice("arg_min", &[]),
};
let res = chunked.into_series();

View File

@ -0,0 +1,87 @@
use super::super::super::values::NuDataFrame;
use nu_engine::CallExt;
use nu_protocol::{
ast::Call,
engine::{Command, EngineState, Stack},
Category, Example, PipelineData, ShellError, Signature, SyntaxShape,
};
use polars::prelude::IntoSeries;
#[derive(Clone)]
pub struct AsDate;
impl Command for AsDate {
fn name(&self) -> &str {
"dfr as-date"
}
fn usage(&self) -> &str {
r#"Converts string to date. Format example:
"%Y-%m-%d" => 2021-12-31
"%d-%m-%Y" => 31-12-2021
"%Y%m%d" => 2021319 (2021-03-19)"#
}
fn signature(&self) -> Signature {
Signature::build(self.name())
.required("format", SyntaxShape::String, "formating date string")
.switch("not-exact", "the format string may be contained in the date (e.g. foo-2021-01-01-bar could match 2021-01-01)", Some('n'))
.category(Category::Custom("dataframe".into()))
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Converts string to date",
example: r#"["2021-12-30" "2021-12-31"] | dfr to-df | dfr as-datetime "%Y-%m-%d""#,
result: None,
}]
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
command(engine_state, stack, call, input)
}
}
fn command(
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let format: String = call.req(engine_state, stack, 0)?;
let not_exact = call.has_flag("not-exact");
let df = NuDataFrame::try_from_pipeline(input, call.head)?;
let series = df.as_series(call.head)?;
let casted = series.utf8().map_err(|e| {
ShellError::SpannedLabeledError("Error casting to string".into(), e.to_string(), call.head)
})?;
let res = if not_exact {
casted.as_date_not_exact(Some(format.as_str()))
} else {
casted.as_date(Some(format.as_str()))
};
let mut res = res
.map_err(|e| {
ShellError::SpannedLabeledError(
"Error creating datetime".into(),
e.to_string(),
call.head,
)
})?
.into_series();
res.rename("date");
NuDataFrame::try_from_series(vec![res], call.head)
.map(|df| PipelineData::Value(NuDataFrame::into_value(df, call.head), None))
}

View File

@ -34,7 +34,7 @@ impl Command for AsDateTime {
fn signature(&self) -> Signature {
Signature::build(self.name())
.required("format", SyntaxShape::String, "formating date string")
.required("format", SyntaxShape::String, "formating date time string")
.switch("not-exact", "the format string may be contained in the date (e.g. foo-2021-01-01-bar could match 2021-01-01)", Some('n'))
.category(Category::Custom("dataframe".into()))
}
@ -45,7 +45,7 @@ impl Command for AsDateTime {
example: r#"["2021-12-30 00:00:00" "2021-12-31 00:00:00"] | dfr to-df | dfr as-datetime "%Y-%m-%d %H:%M:%S""#,
result: Some(
NuDataFrame::try_from_columns(vec![Column::new(
"0".to_string(),
"datetime".to_string(),
vec![
Value::Date {
val: DateTime::parse_from_str(
@ -103,7 +103,7 @@ fn command(
casted.as_datetime(Some(format.as_str()), TimeUnit::Milliseconds)
};
let res = res
let mut res = res
.map_err(|e| {
ShellError::SpannedLabeledError(
"Error creating datetime".into(),
@ -113,6 +113,7 @@ fn command(
})?
.into_series();
res.rename("datetime");
NuDataFrame::try_from_series(vec![res], call.head)
.map(|df| PipelineData::Value(NuDataFrame::into_value(df, call.head), None))
}

View File

@ -1,3 +1,4 @@
mod as_date;
mod as_datetime;
mod get_day;
mod get_hour;
@ -10,6 +11,7 @@ mod get_week;
mod get_weekday;
mod get_year;
pub use as_date::AsDate;
pub use as_datetime::AsDateTime;
pub use get_day::GetDay;
pub use get_hour::GetHour;

View File

@ -5,7 +5,7 @@ use nu_protocol::{
engine::{Command, EngineState, Stack},
Category, Example, PipelineData, ShellError, Signature, Span, Value,
};
use polars::prelude::IntoSeries;
use polars::prelude::{IntoSeries, SortOptions};
#[derive(Clone)]
pub struct ArgSort;
@ -22,6 +22,7 @@ impl Command for ArgSort {
fn signature(&self) -> Signature {
Signature::build(self.name())
.switch("reverse", "reverse order", Some('r'))
.switch("nulls-last", "nulls ordered last", Some('n'))
.category(Category::Custom("dataframe".into()))
}
@ -85,10 +86,12 @@ fn command(
) -> Result<PipelineData, ShellError> {
let df = NuDataFrame::try_from_pipeline(input, call.head)?;
let mut res = df
.as_series(call.head)?
.argsort(call.has_flag("reverse"))
.into_series();
let sort_options = SortOptions {
descending: call.has_flag("reverse"),
nulls_last: call.has_flag("nulls-last"),
};
let mut res = df.as_series(call.head)?.argsort(sort_options).into_series();
res.rename("arg_sort");
NuDataFrame::try_from_series(vec![res], call.head)

View File

@ -57,6 +57,7 @@ pub fn add_series_decls(working_set: &mut StateWorkingSet) {
ArgSort,
ArgTrue,
ArgUnique,
AsDate,
AsDateTime,
Concatenate,
Contains,

View File

@ -530,6 +530,25 @@ where
)),
}
}
DataType::Date => {
let to_i64 = series.cast(&DataType::Int64);
match to_i64 {
Ok(series) => {
let nanosecs_per_day: i64 = 24 * 60 * 60 * 1_000_000_000;
let casted = series
.i64()
.map(|chunked| chunked.mul(nanosecs_per_day))
.expect("already checked for casting");
compare_casted_i64(Ok(&casted), val, f, span)
}
Err(e) => Err(ShellError::SpannedLabeledError(
"Unable to cast to f64".into(),
e.to_string(),
span,
)),
}
}
DataType::Int64 => {
let casted = series.i64();
compare_casted_i64(casted, val, f, span)

View File

@ -430,7 +430,7 @@ pub fn create_column(
Ok(Column::new(casted.name().into(), values))
}
DataType::Time => {
let casted = series.time().map_err(|e| {
let casted = series.timestamp(TimeUnit::Nanoseconds).map_err(|e| {
ShellError::LabeledError("Error casting column to time".into(), e.to_string())
})?;
@ -596,7 +596,7 @@ pub fn from_parsed_columns(column_values: ColumnMap) -> Result<NuDataFrame, Shel
});
let res: DatetimeChunked =
ChunkedArray::<Int64Type>::new_from_opt_iter(&name, it)
ChunkedArray::<Int64Type>::from_iter_options(&name, it)
.into_datetime(TimeUnit::Milliseconds, None);
df_series.push(res.into_series())
@ -610,7 +610,7 @@ pub fn from_parsed_columns(column_values: ColumnMap) -> Result<NuDataFrame, Shel
}
});
let res = ChunkedArray::<Int64Type>::new_from_opt_iter(&name, it);
let res = ChunkedArray::<Int64Type>::from_iter_options(&name, it);
df_series.push(res.into_series())
}

View File

@ -394,7 +394,7 @@ impl NuDataFrame {
// Casting needed to compare other numeric types with nushell numeric type.
// In nushell we only have i64 integer numeric types and any array created
// with nushell untagged primitives will be of type i64
DataType::UInt32 => match self_series.cast(&DataType::Int64) {
DataType::UInt32 | DataType::Int32 => match self_series.cast(&DataType::Int64) {
Ok(series) => series,
Err(_) => return None,
},

View File

@ -2,26 +2,42 @@ mod custom_value;
use nu_protocol::{PipelineData, ShellError, Span, Value};
use polars::frame::groupby::{GroupBy, GroupsProxy};
use polars::prelude::DataFrame;
use polars::prelude::{DataFrame, GroupsIdx};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum NuGroupsProxy {
Idx(Vec<(u32, Vec<u32>)>),
Idx {
sorted: bool,
all: Vec<(u32, Vec<u32>)>,
},
Slice(Vec<[u32; 2]>),
}
impl NuGroupsProxy {
fn from_polars(groups: &GroupsProxy) -> Self {
match groups {
GroupsProxy::Idx(indexes) => NuGroupsProxy::Idx(indexes.clone()),
GroupsProxy::Idx(indexes) => NuGroupsProxy::Idx {
sorted: indexes.is_sorted(),
all: indexes
.iter()
.map(|(index, values)| (index, values.clone()))
.collect(),
},
GroupsProxy::Slice(slice) => NuGroupsProxy::Slice(slice.clone()),
}
}
fn to_polars(&self) -> GroupsProxy {
match self {
Self::Idx(indexes) => GroupsProxy::Idx(indexes.clone()),
Self::Idx { sorted, all } => {
let mut groups: GroupsIdx = all.clone().into();
if *sorted {
groups.sort()
}
GroupsProxy::Idx(groups)
}
Self::Slice(slice) => GroupsProxy::Slice(slice.clone()),
}
}