forked from extern/nushell
move dataframe commands to nu-cmd-dataframe (#9241)
All of the dataframe commands ported over with no issues... ### 11 tests are commented out (for now) So 100 of the original 111 tests are passing with only 11 tests being ignored for now.. As per our conversation in the core team meeting on Wednesday I took @jntrnr suggestion and just commented out the tests dealing with [IntoDatetime](https://github.com/nushell/nushell/blob/main/crates/nu-command/src/conversions/into/mod.rs) Later on we can move this functionality out of nu-command if we decide it makes sense... ### The following tests were ignored... ```rust modified: crates/nu-cmd-dataframe/src/dataframe/series/date/get_day.rs modified: crates/nu-cmd-dataframe/src/dataframe/series/date/get_hour.rs modified: crates/nu-cmd-dataframe/src/dataframe/series/date/get_minute.rs modified: crates/nu-cmd-dataframe/src/dataframe/series/date/get_month.rs modified: crates/nu-cmd-dataframe/src/dataframe/series/date/get_nanosecond.rs modified: crates/nu-cmd-dataframe/src/dataframe/series/date/get_ordinal.rs modified: crates/nu-cmd-dataframe/src/dataframe/series/date/get_second.rs modified: crates/nu-cmd-dataframe/src/dataframe/series/date/get_week.rs modified: crates/nu-cmd-dataframe/src/dataframe/series/date/get_weekday.rs modified: crates/nu-cmd-dataframe/src/dataframe/series/date/get_year.rs modified: crates/nu-cmd-dataframe/src/dataframe/series/string/strftime.rs ```
This commit is contained in:
12
crates/nu-cmd-dataframe/src/dataframe/values/mod.rs
Normal file
12
crates/nu-cmd-dataframe/src/dataframe/values/mod.rs
Normal file
@ -0,0 +1,12 @@
|
||||
mod nu_dataframe;
|
||||
mod nu_expression;
|
||||
mod nu_lazyframe;
|
||||
mod nu_lazygroupby;
|
||||
mod nu_when;
|
||||
pub mod utils;
|
||||
|
||||
pub use nu_dataframe::{Axis, Column, NuDataFrame};
|
||||
pub use nu_expression::NuExpression;
|
||||
pub use nu_lazyframe::NuLazyFrame;
|
||||
pub use nu_lazygroupby::NuLazyGroupBy;
|
||||
pub use nu_when::NuWhen;
|
@ -0,0 +1,782 @@
|
||||
use super::{operations::Axis, NuDataFrame};
|
||||
use nu_protocol::{
|
||||
ast::{Boolean, Comparison, Math, Operator},
|
||||
span, ShellError, Span, Spanned, Value,
|
||||
};
|
||||
use num::Zero;
|
||||
use polars::prelude::{
|
||||
BooleanType, ChunkCompare, ChunkedArray, DataType, Float64Type, Int64Type, IntoSeries,
|
||||
NumOpsDispatchChecked, PolarsError, Series, TimeUnit, Utf8NameSpaceImpl,
|
||||
};
|
||||
use std::ops::{Add, BitAnd, BitOr, Div, Mul, Sub};
|
||||
|
||||
pub(super) fn between_dataframes(
|
||||
operator: Spanned<Operator>,
|
||||
left: &Value,
|
||||
lhs: &NuDataFrame,
|
||||
right: &Value,
|
||||
rhs: &NuDataFrame,
|
||||
) -> Result<Value, ShellError> {
|
||||
let operation_span = span(&[left.span()?, right.span()?]);
|
||||
match operator.item {
|
||||
Operator::Math(Math::Plus) => match lhs.append_df(rhs, Axis::Row, operation_span) {
|
||||
Ok(df) => Ok(df.into_value(operation_span)),
|
||||
Err(e) => Err(e),
|
||||
},
|
||||
_ => Err(ShellError::OperatorMismatch {
|
||||
op_span: operator.span,
|
||||
lhs_ty: left.get_type(),
|
||||
lhs_span: left.span()?,
|
||||
rhs_ty: right.get_type(),
|
||||
rhs_span: right.span()?,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn compute_between_series(
|
||||
operator: Spanned<Operator>,
|
||||
left: &Value,
|
||||
lhs: &Series,
|
||||
right: &Value,
|
||||
rhs: &Series,
|
||||
) -> Result<Value, ShellError> {
|
||||
let operation_span = span(&[left.span()?, right.span()?]);
|
||||
match operator.item {
|
||||
Operator::Math(Math::Plus) => {
|
||||
let mut res = lhs + rhs;
|
||||
let name = format!("sum_{}_{}", lhs.name(), rhs.name());
|
||||
res.rename(&name);
|
||||
NuDataFrame::series_to_value(res, operation_span)
|
||||
}
|
||||
Operator::Math(Math::Minus) => {
|
||||
let mut res = lhs - rhs;
|
||||
let name = format!("sub_{}_{}", lhs.name(), rhs.name());
|
||||
res.rename(&name);
|
||||
NuDataFrame::series_to_value(res, operation_span)
|
||||
}
|
||||
Operator::Math(Math::Multiply) => {
|
||||
let mut res = lhs * rhs;
|
||||
let name = format!("mul_{}_{}", lhs.name(), rhs.name());
|
||||
res.rename(&name);
|
||||
NuDataFrame::series_to_value(res, operation_span)
|
||||
}
|
||||
Operator::Math(Math::Divide) => {
|
||||
let res = lhs.checked_div(rhs);
|
||||
match res {
|
||||
Ok(mut res) => {
|
||||
let name = format!("div_{}_{}", lhs.name(), rhs.name());
|
||||
res.rename(&name);
|
||||
NuDataFrame::series_to_value(res, operation_span)
|
||||
}
|
||||
Err(e) => Err(ShellError::GenericError(
|
||||
"Division error".into(),
|
||||
e.to_string(),
|
||||
Some(right.span()?),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
Operator::Comparison(Comparison::Equal) => {
|
||||
let name = format!("eq_{}_{}", lhs.name(), rhs.name());
|
||||
let res = compare_series(lhs, rhs, name.as_str(), right.span().ok(), Series::equal)?;
|
||||
NuDataFrame::series_to_value(res, operation_span)
|
||||
}
|
||||
Operator::Comparison(Comparison::NotEqual) => {
|
||||
let name = format!("neq_{}_{}", lhs.name(), rhs.name());
|
||||
let res = compare_series(lhs, rhs, name.as_str(), right.span().ok(), Series::equal)?;
|
||||
NuDataFrame::series_to_value(res, operation_span)
|
||||
}
|
||||
Operator::Comparison(Comparison::LessThan) => {
|
||||
let name = format!("lt_{}_{}", lhs.name(), rhs.name());
|
||||
let res = compare_series(lhs, rhs, name.as_str(), right.span().ok(), Series::equal)?;
|
||||
NuDataFrame::series_to_value(res, operation_span)
|
||||
}
|
||||
Operator::Comparison(Comparison::LessThanOrEqual) => {
|
||||
let name = format!("lte_{}_{}", lhs.name(), rhs.name());
|
||||
let res = compare_series(lhs, rhs, name.as_str(), right.span().ok(), Series::equal)?;
|
||||
NuDataFrame::series_to_value(res, operation_span)
|
||||
}
|
||||
Operator::Comparison(Comparison::GreaterThan) => {
|
||||
let name = format!("gt_{}_{}", lhs.name(), rhs.name());
|
||||
let res = compare_series(lhs, rhs, name.as_str(), right.span().ok(), Series::equal)?;
|
||||
NuDataFrame::series_to_value(res, operation_span)
|
||||
}
|
||||
Operator::Comparison(Comparison::GreaterThanOrEqual) => {
|
||||
let name = format!("gte_{}_{}", lhs.name(), rhs.name());
|
||||
let res = compare_series(lhs, rhs, name.as_str(), right.span().ok(), Series::equal)?;
|
||||
NuDataFrame::series_to_value(res, operation_span)
|
||||
}
|
||||
Operator::Boolean(Boolean::And) => match lhs.dtype() {
|
||||
DataType::Boolean => {
|
||||
let lhs_cast = lhs.bool();
|
||||
let rhs_cast = rhs.bool();
|
||||
|
||||
match (lhs_cast, rhs_cast) {
|
||||
(Ok(l), Ok(r)) => {
|
||||
let mut res = l.bitand(r).into_series();
|
||||
let name = format!("and_{}_{}", lhs.name(), rhs.name());
|
||||
res.rename(&name);
|
||||
NuDataFrame::series_to_value(res, operation_span)
|
||||
}
|
||||
_ => Err(ShellError::GenericError(
|
||||
"Incompatible types".into(),
|
||||
"unable to cast to boolean".into(),
|
||||
Some(right.span()?),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
_ => Err(ShellError::IncompatibleParametersSingle {
|
||||
msg: format!(
|
||||
"Operation {} can only be done with boolean values",
|
||||
operator.item
|
||||
),
|
||||
span: operation_span,
|
||||
}),
|
||||
},
|
||||
Operator::Boolean(Boolean::Or) => match lhs.dtype() {
|
||||
DataType::Boolean => {
|
||||
let lhs_cast = lhs.bool();
|
||||
let rhs_cast = rhs.bool();
|
||||
|
||||
match (lhs_cast, rhs_cast) {
|
||||
(Ok(l), Ok(r)) => {
|
||||
let mut res = l.bitor(r).into_series();
|
||||
let name = format!("or_{}_{}", lhs.name(), rhs.name());
|
||||
res.rename(&name);
|
||||
NuDataFrame::series_to_value(res, operation_span)
|
||||
}
|
||||
_ => Err(ShellError::GenericError(
|
||||
"Incompatible types".into(),
|
||||
"unable to cast to boolean".into(),
|
||||
Some(right.span()?),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
_ => Err(ShellError::IncompatibleParametersSingle {
|
||||
msg: format!(
|
||||
"Operation {} can only be done with boolean values",
|
||||
operator.item
|
||||
),
|
||||
span: operation_span,
|
||||
}),
|
||||
},
|
||||
_ => Err(ShellError::OperatorMismatch {
|
||||
op_span: operator.span,
|
||||
lhs_ty: left.get_type(),
|
||||
lhs_span: left.span()?,
|
||||
rhs_ty: right.get_type(),
|
||||
rhs_span: right.span()?,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn compare_series<'s, F>(
|
||||
lhs: &'s Series,
|
||||
rhs: &'s Series,
|
||||
name: &'s str,
|
||||
span: Option<Span>,
|
||||
f: F,
|
||||
) -> Result<Series, ShellError>
|
||||
where
|
||||
F: Fn(&'s Series, &'s Series) -> Result<ChunkedArray<BooleanType>, PolarsError>,
|
||||
{
|
||||
let mut res = f(lhs, rhs)
|
||||
.map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Equality error".into(),
|
||||
e.to_string(),
|
||||
span,
|
||||
None,
|
||||
Vec::new(),
|
||||
)
|
||||
})?
|
||||
.into_series();
|
||||
|
||||
res.rename(name);
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
pub(super) fn compute_series_single_value(
|
||||
operator: Spanned<Operator>,
|
||||
left: &Value,
|
||||
lhs: &NuDataFrame,
|
||||
right: &Value,
|
||||
) -> Result<Value, ShellError> {
|
||||
if !lhs.is_series() {
|
||||
return Err(ShellError::OperatorMismatch {
|
||||
op_span: operator.span,
|
||||
lhs_ty: left.get_type(),
|
||||
lhs_span: left.span()?,
|
||||
rhs_ty: right.get_type(),
|
||||
rhs_span: right.span()?,
|
||||
});
|
||||
}
|
||||
|
||||
let lhs_span = left.span()?;
|
||||
let lhs = lhs.as_series(lhs_span)?;
|
||||
|
||||
match operator.item {
|
||||
Operator::Math(Math::Plus) => match &right {
|
||||
Value::Int { val, .. } => {
|
||||
compute_series_i64(&lhs, *val, <ChunkedArray<Int64Type>>::add, lhs_span)
|
||||
}
|
||||
Value::Float { val, .. } => {
|
||||
compute_series_decimal(&lhs, *val, <ChunkedArray<Float64Type>>::add, lhs_span)
|
||||
}
|
||||
Value::String { val, .. } => add_string_to_series(&lhs, val, lhs_span),
|
||||
_ => Err(ShellError::OperatorMismatch {
|
||||
op_span: operator.span,
|
||||
lhs_ty: left.get_type(),
|
||||
lhs_span: left.span()?,
|
||||
rhs_ty: right.get_type(),
|
||||
rhs_span: right.span()?,
|
||||
}),
|
||||
},
|
||||
Operator::Math(Math::Minus) => match &right {
|
||||
Value::Int { val, .. } => {
|
||||
compute_series_i64(&lhs, *val, <ChunkedArray<Int64Type>>::sub, lhs_span)
|
||||
}
|
||||
Value::Float { val, .. } => {
|
||||
compute_series_decimal(&lhs, *val, <ChunkedArray<Float64Type>>::sub, lhs_span)
|
||||
}
|
||||
_ => Err(ShellError::OperatorMismatch {
|
||||
op_span: operator.span,
|
||||
lhs_ty: left.get_type(),
|
||||
lhs_span: left.span()?,
|
||||
rhs_ty: right.get_type(),
|
||||
rhs_span: right.span()?,
|
||||
}),
|
||||
},
|
||||
Operator::Math(Math::Multiply) => match &right {
|
||||
Value::Int { val, .. } => {
|
||||
compute_series_i64(&lhs, *val, <ChunkedArray<Int64Type>>::mul, lhs_span)
|
||||
}
|
||||
Value::Float { val, .. } => {
|
||||
compute_series_decimal(&lhs, *val, <ChunkedArray<Float64Type>>::mul, lhs_span)
|
||||
}
|
||||
_ => Err(ShellError::OperatorMismatch {
|
||||
op_span: operator.span,
|
||||
lhs_ty: left.get_type(),
|
||||
lhs_span: left.span()?,
|
||||
rhs_ty: right.get_type(),
|
||||
rhs_span: right.span()?,
|
||||
}),
|
||||
},
|
||||
Operator::Math(Math::Divide) => match &right {
|
||||
Value::Int { val, span } => {
|
||||
if *val == 0 {
|
||||
Err(ShellError::DivisionByZero { span: *span })
|
||||
} else {
|
||||
compute_series_i64(&lhs, *val, <ChunkedArray<Int64Type>>::div, lhs_span)
|
||||
}
|
||||
}
|
||||
Value::Float { val, span } => {
|
||||
if val.is_zero() {
|
||||
Err(ShellError::DivisionByZero { span: *span })
|
||||
} else {
|
||||
compute_series_decimal(&lhs, *val, <ChunkedArray<Float64Type>>::div, lhs_span)
|
||||
}
|
||||
}
|
||||
_ => Err(ShellError::OperatorMismatch {
|
||||
op_span: operator.span,
|
||||
lhs_ty: left.get_type(),
|
||||
lhs_span: left.span()?,
|
||||
rhs_ty: right.get_type(),
|
||||
rhs_span: right.span()?,
|
||||
}),
|
||||
},
|
||||
Operator::Comparison(Comparison::Equal) => match &right {
|
||||
Value::Int { val, .. } => compare_series_i64(&lhs, *val, ChunkedArray::equal, lhs_span),
|
||||
Value::Float { val, .. } => {
|
||||
compare_series_decimal(&lhs, *val, ChunkedArray::equal, lhs_span)
|
||||
}
|
||||
Value::String { val, .. } => {
|
||||
let equal_pattern = format!("^{}$", fancy_regex::escape(val));
|
||||
contains_series_pat(&lhs, &equal_pattern, lhs_span)
|
||||
}
|
||||
Value::Date { val, .. } => {
|
||||
compare_series_i64(&lhs, val.timestamp_millis(), ChunkedArray::equal, lhs_span)
|
||||
}
|
||||
_ => Err(ShellError::OperatorMismatch {
|
||||
op_span: operator.span,
|
||||
lhs_ty: left.get_type(),
|
||||
lhs_span: left.span()?,
|
||||
rhs_ty: right.get_type(),
|
||||
rhs_span: right.span()?,
|
||||
}),
|
||||
},
|
||||
Operator::Comparison(Comparison::NotEqual) => match &right {
|
||||
Value::Int { val, .. } => {
|
||||
compare_series_i64(&lhs, *val, ChunkedArray::not_equal, lhs_span)
|
||||
}
|
||||
Value::Float { val, .. } => {
|
||||
compare_series_decimal(&lhs, *val, ChunkedArray::not_equal, lhs_span)
|
||||
}
|
||||
Value::Date { val, .. } => compare_series_i64(
|
||||
&lhs,
|
||||
val.timestamp_millis(),
|
||||
ChunkedArray::not_equal,
|
||||
lhs_span,
|
||||
),
|
||||
_ => Err(ShellError::OperatorMismatch {
|
||||
op_span: operator.span,
|
||||
lhs_ty: left.get_type(),
|
||||
lhs_span: left.span()?,
|
||||
rhs_ty: right.get_type(),
|
||||
rhs_span: right.span()?,
|
||||
}),
|
||||
},
|
||||
Operator::Comparison(Comparison::LessThan) => match &right {
|
||||
Value::Int { val, .. } => compare_series_i64(&lhs, *val, ChunkedArray::lt, lhs_span),
|
||||
Value::Float { val, .. } => {
|
||||
compare_series_decimal(&lhs, *val, ChunkedArray::lt, lhs_span)
|
||||
}
|
||||
Value::Date { val, .. } => {
|
||||
compare_series_i64(&lhs, val.timestamp_millis(), ChunkedArray::lt, lhs_span)
|
||||
}
|
||||
_ => Err(ShellError::OperatorMismatch {
|
||||
op_span: operator.span,
|
||||
lhs_ty: left.get_type(),
|
||||
lhs_span: left.span()?,
|
||||
rhs_ty: right.get_type(),
|
||||
rhs_span: right.span()?,
|
||||
}),
|
||||
},
|
||||
Operator::Comparison(Comparison::LessThanOrEqual) => match &right {
|
||||
Value::Int { val, .. } => compare_series_i64(&lhs, *val, ChunkedArray::lt_eq, lhs_span),
|
||||
Value::Float { val, .. } => {
|
||||
compare_series_decimal(&lhs, *val, ChunkedArray::lt_eq, lhs_span)
|
||||
}
|
||||
Value::Date { val, .. } => {
|
||||
compare_series_i64(&lhs, val.timestamp_millis(), ChunkedArray::lt_eq, lhs_span)
|
||||
}
|
||||
_ => Err(ShellError::OperatorMismatch {
|
||||
op_span: operator.span,
|
||||
lhs_ty: left.get_type(),
|
||||
lhs_span: left.span()?,
|
||||
rhs_ty: right.get_type(),
|
||||
rhs_span: right.span()?,
|
||||
}),
|
||||
},
|
||||
Operator::Comparison(Comparison::GreaterThan) => match &right {
|
||||
Value::Int { val, .. } => compare_series_i64(&lhs, *val, ChunkedArray::gt, lhs_span),
|
||||
Value::Float { val, .. } => {
|
||||
compare_series_decimal(&lhs, *val, ChunkedArray::gt, lhs_span)
|
||||
}
|
||||
Value::Date { val, .. } => {
|
||||
compare_series_i64(&lhs, val.timestamp_millis(), ChunkedArray::gt, lhs_span)
|
||||
}
|
||||
_ => Err(ShellError::OperatorMismatch {
|
||||
op_span: operator.span,
|
||||
lhs_ty: left.get_type(),
|
||||
lhs_span: left.span()?,
|
||||
rhs_ty: right.get_type(),
|
||||
rhs_span: right.span()?,
|
||||
}),
|
||||
},
|
||||
Operator::Comparison(Comparison::GreaterThanOrEqual) => match &right {
|
||||
Value::Int { val, .. } => compare_series_i64(&lhs, *val, ChunkedArray::gt_eq, lhs_span),
|
||||
Value::Float { val, .. } => {
|
||||
compare_series_decimal(&lhs, *val, ChunkedArray::gt_eq, lhs_span)
|
||||
}
|
||||
Value::Date { val, .. } => {
|
||||
compare_series_i64(&lhs, val.timestamp_millis(), ChunkedArray::gt_eq, lhs_span)
|
||||
}
|
||||
_ => Err(ShellError::OperatorMismatch {
|
||||
op_span: operator.span,
|
||||
lhs_ty: left.get_type(),
|
||||
lhs_span: left.span()?,
|
||||
rhs_ty: right.get_type(),
|
||||
rhs_span: right.span()?,
|
||||
}),
|
||||
},
|
||||
// TODO: update this to do a regex match instead of a simple contains?
|
||||
Operator::Comparison(Comparison::RegexMatch) => match &right {
|
||||
Value::String { val, .. } => contains_series_pat(&lhs, val, lhs_span),
|
||||
_ => Err(ShellError::OperatorMismatch {
|
||||
op_span: operator.span,
|
||||
lhs_ty: left.get_type(),
|
||||
lhs_span: left.span()?,
|
||||
rhs_ty: right.get_type(),
|
||||
rhs_span: right.span()?,
|
||||
}),
|
||||
},
|
||||
Operator::Comparison(Comparison::StartsWith) => match &right {
|
||||
Value::String { val, .. } => {
|
||||
let starts_with_pattern = format!("^{}", fancy_regex::escape(val));
|
||||
contains_series_pat(&lhs, &starts_with_pattern, lhs_span)
|
||||
}
|
||||
_ => Err(ShellError::OperatorMismatch {
|
||||
op_span: operator.span,
|
||||
lhs_ty: left.get_type(),
|
||||
lhs_span: left.span()?,
|
||||
rhs_ty: right.get_type(),
|
||||
rhs_span: right.span()?,
|
||||
}),
|
||||
},
|
||||
Operator::Comparison(Comparison::EndsWith) => match &right {
|
||||
Value::String { val, .. } => {
|
||||
let ends_with_pattern = format!("{}$", fancy_regex::escape(val));
|
||||
contains_series_pat(&lhs, &ends_with_pattern, lhs_span)
|
||||
}
|
||||
_ => Err(ShellError::OperatorMismatch {
|
||||
op_span: operator.span,
|
||||
lhs_ty: left.get_type(),
|
||||
lhs_span: left.span()?,
|
||||
rhs_ty: right.get_type(),
|
||||
rhs_span: right.span()?,
|
||||
}),
|
||||
},
|
||||
_ => Err(ShellError::OperatorMismatch {
|
||||
op_span: operator.span,
|
||||
lhs_ty: left.get_type(),
|
||||
lhs_span: left.span()?,
|
||||
rhs_ty: right.get_type(),
|
||||
rhs_span: right.span()?,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn compute_series_i64<F>(series: &Series, val: i64, f: F, span: Span) -> Result<Value, ShellError>
|
||||
where
|
||||
F: Fn(ChunkedArray<Int64Type>, i64) -> ChunkedArray<Int64Type>,
|
||||
{
|
||||
match series.dtype() {
|
||||
DataType::UInt32 | DataType::Int32 | DataType::UInt64 => {
|
||||
let to_i64 = series.cast(&DataType::Int64);
|
||||
|
||||
match to_i64 {
|
||||
Ok(series) => {
|
||||
let casted = series.i64();
|
||||
compute_casted_i64(casted, val, f, span)
|
||||
}
|
||||
Err(e) => Err(ShellError::GenericError(
|
||||
"Unable to cast to i64".into(),
|
||||
e.to_string(),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
DataType::Int64 => {
|
||||
let casted = series.i64();
|
||||
compute_casted_i64(casted, val, f, span)
|
||||
}
|
||||
_ => Err(ShellError::GenericError(
|
||||
"Incorrect type".into(),
|
||||
format!(
|
||||
"Series of type {} can not be used for operations with an i64 value",
|
||||
series.dtype()
|
||||
),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn compute_casted_i64<F>(
|
||||
casted: Result<&ChunkedArray<Int64Type>, PolarsError>,
|
||||
val: i64,
|
||||
f: F,
|
||||
span: Span,
|
||||
) -> Result<Value, ShellError>
|
||||
where
|
||||
F: Fn(ChunkedArray<Int64Type>, i64) -> ChunkedArray<Int64Type>,
|
||||
{
|
||||
match casted {
|
||||
Ok(casted) => {
|
||||
let res = f(casted.clone(), val);
|
||||
let res = res.into_series();
|
||||
NuDataFrame::series_to_value(res, span)
|
||||
}
|
||||
Err(e) => Err(ShellError::GenericError(
|
||||
"Unable to cast to i64".into(),
|
||||
e.to_string(),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn compute_series_decimal<F>(
|
||||
series: &Series,
|
||||
val: f64,
|
||||
f: F,
|
||||
span: Span,
|
||||
) -> Result<Value, ShellError>
|
||||
where
|
||||
F: Fn(ChunkedArray<Float64Type>, f64) -> ChunkedArray<Float64Type>,
|
||||
{
|
||||
match series.dtype() {
|
||||
DataType::Float32 => {
|
||||
let to_f64 = series.cast(&DataType::Float64);
|
||||
|
||||
match to_f64 {
|
||||
Ok(series) => {
|
||||
let casted = series.f64();
|
||||
compute_casted_f64(casted, val, f, span)
|
||||
}
|
||||
Err(e) => Err(ShellError::GenericError(
|
||||
"Unable to cast to f64".into(),
|
||||
e.to_string(),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
DataType::Float64 => {
|
||||
let casted = series.f64();
|
||||
compute_casted_f64(casted, val, f, span)
|
||||
}
|
||||
_ => Err(ShellError::GenericError(
|
||||
"Incorrect type".into(),
|
||||
format!(
|
||||
"Series of type {} can not be used for operations with a decimal value",
|
||||
series.dtype()
|
||||
),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn compute_casted_f64<F>(
|
||||
casted: Result<&ChunkedArray<Float64Type>, PolarsError>,
|
||||
val: f64,
|
||||
f: F,
|
||||
span: Span,
|
||||
) -> Result<Value, ShellError>
|
||||
where
|
||||
F: Fn(ChunkedArray<Float64Type>, f64) -> ChunkedArray<Float64Type>,
|
||||
{
|
||||
match casted {
|
||||
Ok(casted) => {
|
||||
let res = f(casted.clone(), val);
|
||||
let res = res.into_series();
|
||||
NuDataFrame::series_to_value(res, span)
|
||||
}
|
||||
Err(e) => Err(ShellError::GenericError(
|
||||
"Unable to cast to f64".into(),
|
||||
e.to_string(),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn compare_series_i64<F>(series: &Series, val: i64, f: F, span: Span) -> Result<Value, ShellError>
|
||||
where
|
||||
F: Fn(&ChunkedArray<Int64Type>, i64) -> ChunkedArray<BooleanType>,
|
||||
{
|
||||
match series.dtype() {
|
||||
DataType::UInt32
|
||||
| DataType::Int32
|
||||
| DataType::UInt64
|
||||
| DataType::Datetime(TimeUnit::Milliseconds, _) => {
|
||||
let to_i64 = series.cast(&DataType::Int64);
|
||||
|
||||
match to_i64 {
|
||||
Ok(series) => {
|
||||
let casted = series.i64();
|
||||
compare_casted_i64(casted, val, f, span)
|
||||
}
|
||||
Err(e) => Err(ShellError::GenericError(
|
||||
"Unable to cast to f64".into(),
|
||||
e.to_string(),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
DataType::Date => {
|
||||
let to_i64 = series.cast(&DataType::Int64);
|
||||
|
||||
match to_i64 {
|
||||
Ok(series) => {
|
||||
let nanosecs_per_day: i64 = 24 * 60 * 60 * 1_000_000_000;
|
||||
let casted = series
|
||||
.i64()
|
||||
.map(|chunked| chunked.mul(nanosecs_per_day))
|
||||
.expect("already checked for casting");
|
||||
compare_casted_i64(Ok(&casted), val, f, span)
|
||||
}
|
||||
Err(e) => Err(ShellError::GenericError(
|
||||
"Unable to cast to f64".into(),
|
||||
e.to_string(),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
DataType::Int64 => {
|
||||
let casted = series.i64();
|
||||
compare_casted_i64(casted, val, f, span)
|
||||
}
|
||||
_ => Err(ShellError::GenericError(
|
||||
"Incorrect type".into(),
|
||||
format!(
|
||||
"Series of type {} can not be used for operations with an i64 value",
|
||||
series.dtype()
|
||||
),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn compare_casted_i64<F>(
|
||||
casted: Result<&ChunkedArray<Int64Type>, PolarsError>,
|
||||
val: i64,
|
||||
f: F,
|
||||
span: Span,
|
||||
) -> Result<Value, ShellError>
|
||||
where
|
||||
F: Fn(&ChunkedArray<Int64Type>, i64) -> ChunkedArray<BooleanType>,
|
||||
{
|
||||
match casted {
|
||||
Ok(casted) => {
|
||||
let res = f(casted, val);
|
||||
let res = res.into_series();
|
||||
NuDataFrame::series_to_value(res, span)
|
||||
}
|
||||
Err(e) => Err(ShellError::GenericError(
|
||||
"Unable to cast to i64".into(),
|
||||
e.to_string(),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn compare_series_decimal<F>(
|
||||
series: &Series,
|
||||
val: f64,
|
||||
f: F,
|
||||
span: Span,
|
||||
) -> Result<Value, ShellError>
|
||||
where
|
||||
F: Fn(&ChunkedArray<Float64Type>, f64) -> ChunkedArray<BooleanType>,
|
||||
{
|
||||
match series.dtype() {
|
||||
DataType::Float32 => {
|
||||
let to_f64 = series.cast(&DataType::Float64);
|
||||
|
||||
match to_f64 {
|
||||
Ok(series) => {
|
||||
let casted = series.f64();
|
||||
compare_casted_f64(casted, val, f, span)
|
||||
}
|
||||
Err(e) => Err(ShellError::GenericError(
|
||||
"Unable to cast to i64".into(),
|
||||
e.to_string(),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
DataType::Float64 => {
|
||||
let casted = series.f64();
|
||||
compare_casted_f64(casted, val, f, span)
|
||||
}
|
||||
_ => Err(ShellError::GenericError(
|
||||
"Incorrect type".into(),
|
||||
format!(
|
||||
"Series of type {} can not be used for operations with a decimal value",
|
||||
series.dtype()
|
||||
),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn compare_casted_f64<F>(
|
||||
casted: Result<&ChunkedArray<Float64Type>, PolarsError>,
|
||||
val: f64,
|
||||
f: F,
|
||||
span: Span,
|
||||
) -> Result<Value, ShellError>
|
||||
where
|
||||
F: Fn(&ChunkedArray<Float64Type>, f64) -> ChunkedArray<BooleanType>,
|
||||
{
|
||||
match casted {
|
||||
Ok(casted) => {
|
||||
let res = f(casted, val);
|
||||
let res = res.into_series();
|
||||
NuDataFrame::series_to_value(res, span)
|
||||
}
|
||||
Err(e) => Err(ShellError::GenericError(
|
||||
"Unable to cast to f64".into(),
|
||||
e.to_string(),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn contains_series_pat(series: &Series, pat: &str, span: Span) -> Result<Value, ShellError> {
|
||||
let casted = series.utf8();
|
||||
match casted {
|
||||
Ok(casted) => {
|
||||
let res = casted.contains(pat, false);
|
||||
|
||||
match res {
|
||||
Ok(res) => {
|
||||
let res = res.into_series();
|
||||
NuDataFrame::series_to_value(res, span)
|
||||
}
|
||||
Err(e) => Err(ShellError::GenericError(
|
||||
"Error using contains".into(),
|
||||
e.to_string(),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
Err(e) => Err(ShellError::GenericError(
|
||||
"Unable to cast to string".into(),
|
||||
e.to_string(),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn add_string_to_series(series: &Series, pat: &str, span: Span) -> Result<Value, ShellError> {
|
||||
let casted = series.utf8();
|
||||
match casted {
|
||||
Ok(casted) => {
|
||||
let res = casted + pat;
|
||||
let res = res.into_series();
|
||||
|
||||
NuDataFrame::series_to_value(res, span)
|
||||
}
|
||||
Err(e) => Err(ShellError::GenericError(
|
||||
"Unable to cast to string".into(),
|
||||
e.to_string(),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
}
|
||||
}
|
@ -0,0 +1,785 @@
|
||||
use super::{DataFrameValue, NuDataFrame};
|
||||
|
||||
use chrono::{DateTime, FixedOffset, NaiveDateTime};
|
||||
use indexmap::map::{Entry, IndexMap};
|
||||
use nu_protocol::{ShellError, Span, Value};
|
||||
use polars::chunked_array::object::builder::ObjectChunkedBuilder;
|
||||
use polars::chunked_array::ChunkedArray;
|
||||
use polars::prelude::{
|
||||
DataFrame, DataType, DatetimeChunked, Int64Type, IntoSeries, NamedFrom, NewChunkedArray,
|
||||
ObjectType, Series, TemporalMethods, TimeUnit,
|
||||
};
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
const SECS_PER_DAY: i64 = 86_400;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Column {
|
||||
name: String,
|
||||
values: Vec<Value>,
|
||||
}
|
||||
|
||||
impl Column {
|
||||
pub fn new(name: String, values: Vec<Value>) -> Self {
|
||||
Self { name, values }
|
||||
}
|
||||
|
||||
pub fn new_empty(name: String) -> Self {
|
||||
Self {
|
||||
name,
|
||||
values: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn name(&self) -> &str {
|
||||
self.name.as_str()
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoIterator for Column {
|
||||
type Item = Value;
|
||||
type IntoIter = std::vec::IntoIter<Self::Item>;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
self.values.into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for Column {
|
||||
type Target = Vec<Value>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.values
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for Column {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.values
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum InputType {
|
||||
Integer,
|
||||
Float,
|
||||
String,
|
||||
Boolean,
|
||||
Object,
|
||||
Date,
|
||||
Duration,
|
||||
Filesize,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TypedColumn {
|
||||
column: Column,
|
||||
column_type: Option<InputType>,
|
||||
}
|
||||
|
||||
impl TypedColumn {
|
||||
fn new_empty(name: String) -> Self {
|
||||
Self {
|
||||
column: Column::new_empty(name),
|
||||
column_type: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for TypedColumn {
|
||||
type Target = Column;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.column
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for TypedColumn {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.column
|
||||
}
|
||||
}
|
||||
|
||||
pub type ColumnMap = IndexMap<String, TypedColumn>;
|
||||
|
||||
pub fn create_column(
|
||||
series: &Series,
|
||||
from_row: usize,
|
||||
to_row: usize,
|
||||
span: Span,
|
||||
) -> Result<Column, ShellError> {
|
||||
let size = to_row - from_row;
|
||||
match series.dtype() {
|
||||
DataType::Null => {
|
||||
let values = std::iter::repeat(Value::Nothing { span })
|
||||
.take(size)
|
||||
.collect::<Vec<Value>>();
|
||||
|
||||
Ok(Column::new(series.name().into(), values))
|
||||
}
|
||||
DataType::UInt8 => {
|
||||
let casted = series.u8().map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Error casting column to u8".into(),
|
||||
"".to_string(),
|
||||
None,
|
||||
Some(e.to_string()),
|
||||
Vec::new(),
|
||||
)
|
||||
})?;
|
||||
let values = casted
|
||||
.into_iter()
|
||||
.skip(from_row)
|
||||
.take(size)
|
||||
.map(|v| match v {
|
||||
Some(a) => Value::Int {
|
||||
val: a as i64,
|
||||
span,
|
||||
},
|
||||
None => Value::Nothing { span },
|
||||
})
|
||||
.collect::<Vec<Value>>();
|
||||
|
||||
Ok(Column::new(casted.name().into(), values))
|
||||
}
|
||||
DataType::UInt16 => {
|
||||
let casted = series.u16().map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Error casting column to u16".into(),
|
||||
"".to_string(),
|
||||
None,
|
||||
Some(e.to_string()),
|
||||
Vec::new(),
|
||||
)
|
||||
})?;
|
||||
let values = casted
|
||||
.into_iter()
|
||||
.skip(from_row)
|
||||
.take(size)
|
||||
.map(|v| match v {
|
||||
Some(a) => Value::Int {
|
||||
val: a as i64,
|
||||
span,
|
||||
},
|
||||
None => Value::Nothing { span },
|
||||
})
|
||||
.collect::<Vec<Value>>();
|
||||
|
||||
Ok(Column::new(casted.name().into(), values))
|
||||
}
|
||||
DataType::UInt32 => {
|
||||
let casted = series.u32().map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Error casting column to u32".into(),
|
||||
"".to_string(),
|
||||
None,
|
||||
Some(e.to_string()),
|
||||
Vec::new(),
|
||||
)
|
||||
})?;
|
||||
let values = casted
|
||||
.into_iter()
|
||||
.skip(from_row)
|
||||
.take(size)
|
||||
.map(|v| match v {
|
||||
Some(a) => Value::Int {
|
||||
val: a as i64,
|
||||
span,
|
||||
},
|
||||
None => Value::Nothing { span },
|
||||
})
|
||||
.collect::<Vec<Value>>();
|
||||
|
||||
Ok(Column::new(casted.name().into(), values))
|
||||
}
|
||||
DataType::UInt64 => {
|
||||
let casted = series.u64().map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Error casting column to u64".into(),
|
||||
"".to_string(),
|
||||
None,
|
||||
Some(e.to_string()),
|
||||
Vec::new(),
|
||||
)
|
||||
})?;
|
||||
let values = casted
|
||||
.into_iter()
|
||||
.skip(from_row)
|
||||
.take(size)
|
||||
.map(|v| match v {
|
||||
Some(a) => Value::Int {
|
||||
val: a as i64,
|
||||
span,
|
||||
},
|
||||
None => Value::Nothing { span },
|
||||
})
|
||||
.collect::<Vec<Value>>();
|
||||
|
||||
Ok(Column::new(casted.name().into(), values))
|
||||
}
|
||||
DataType::Int8 => {
|
||||
let casted = series.i8().map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Error casting column to i8".into(),
|
||||
"".to_string(),
|
||||
None,
|
||||
Some(e.to_string()),
|
||||
Vec::new(),
|
||||
)
|
||||
})?;
|
||||
let values = casted
|
||||
.into_iter()
|
||||
.skip(from_row)
|
||||
.take(size)
|
||||
.map(|v| match v {
|
||||
Some(a) => Value::Int {
|
||||
val: a as i64,
|
||||
span,
|
||||
},
|
||||
None => Value::Nothing { span },
|
||||
})
|
||||
.collect::<Vec<Value>>();
|
||||
|
||||
Ok(Column::new(casted.name().into(), values))
|
||||
}
|
||||
DataType::Int16 => {
|
||||
let casted = series.i16().map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Error casting column to i16".into(),
|
||||
"".to_string(),
|
||||
None,
|
||||
Some(e.to_string()),
|
||||
Vec::new(),
|
||||
)
|
||||
})?;
|
||||
let values = casted
|
||||
.into_iter()
|
||||
.skip(from_row)
|
||||
.take(size)
|
||||
.map(|v| match v {
|
||||
Some(a) => Value::Int {
|
||||
val: a as i64,
|
||||
span,
|
||||
},
|
||||
None => Value::Nothing { span },
|
||||
})
|
||||
.collect::<Vec<Value>>();
|
||||
|
||||
Ok(Column::new(casted.name().into(), values))
|
||||
}
|
||||
DataType::Int32 => {
|
||||
let casted = series.i32().map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Error casting column to i32".into(),
|
||||
"".to_string(),
|
||||
None,
|
||||
Some(e.to_string()),
|
||||
Vec::new(),
|
||||
)
|
||||
})?;
|
||||
let values = casted
|
||||
.into_iter()
|
||||
.skip(from_row)
|
||||
.take(size)
|
||||
.map(|v| match v {
|
||||
Some(a) => Value::Int {
|
||||
val: a as i64,
|
||||
span,
|
||||
},
|
||||
None => Value::Nothing { span },
|
||||
})
|
||||
.collect::<Vec<Value>>();
|
||||
|
||||
Ok(Column::new(casted.name().into(), values))
|
||||
}
|
||||
DataType::Int64 => {
|
||||
let casted = series.i64().map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Error casting column to i64".into(),
|
||||
"".to_string(),
|
||||
None,
|
||||
Some(e.to_string()),
|
||||
Vec::new(),
|
||||
)
|
||||
})?;
|
||||
let values = casted
|
||||
.into_iter()
|
||||
.skip(from_row)
|
||||
.take(size)
|
||||
.map(|v| match v {
|
||||
Some(a) => Value::Int { val: a, span },
|
||||
None => Value::Nothing { span },
|
||||
})
|
||||
.collect::<Vec<Value>>();
|
||||
|
||||
Ok(Column::new(casted.name().into(), values))
|
||||
}
|
||||
DataType::Float32 => {
|
||||
let casted = series.f32().map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Error casting column to f32".into(),
|
||||
"".to_string(),
|
||||
None,
|
||||
Some(e.to_string()),
|
||||
Vec::new(),
|
||||
)
|
||||
})?;
|
||||
let values = casted
|
||||
.into_iter()
|
||||
.skip(from_row)
|
||||
.take(size)
|
||||
.map(|v| match v {
|
||||
Some(a) => Value::Float {
|
||||
val: a as f64,
|
||||
span,
|
||||
},
|
||||
None => Value::Nothing { span },
|
||||
})
|
||||
.collect::<Vec<Value>>();
|
||||
|
||||
Ok(Column::new(casted.name().into(), values))
|
||||
}
|
||||
DataType::Float64 => {
|
||||
let casted = series.f64().map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Error casting column to f64".into(),
|
||||
"".to_string(),
|
||||
None,
|
||||
Some(e.to_string()),
|
||||
Vec::new(),
|
||||
)
|
||||
})?;
|
||||
let values = casted
|
||||
.into_iter()
|
||||
.skip(from_row)
|
||||
.take(size)
|
||||
.map(|v| match v {
|
||||
Some(a) => Value::Float { val: a, span },
|
||||
None => Value::Nothing { span },
|
||||
})
|
||||
.collect::<Vec<Value>>();
|
||||
|
||||
Ok(Column::new(casted.name().into(), values))
|
||||
}
|
||||
DataType::Boolean => {
|
||||
let casted = series.bool().map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Error casting column to bool".into(),
|
||||
"".to_string(),
|
||||
None,
|
||||
Some(e.to_string()),
|
||||
Vec::new(),
|
||||
)
|
||||
})?;
|
||||
|
||||
let values = casted
|
||||
.into_iter()
|
||||
.skip(from_row)
|
||||
.take(size)
|
||||
.map(|v| match v {
|
||||
Some(a) => Value::Bool { val: a, span },
|
||||
None => Value::Nothing { span },
|
||||
})
|
||||
.collect::<Vec<Value>>();
|
||||
|
||||
Ok(Column::new(casted.name().into(), values))
|
||||
}
|
||||
DataType::Utf8 => {
|
||||
let casted = series.utf8().map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Error casting column to string".into(),
|
||||
"".to_string(),
|
||||
None,
|
||||
Some(e.to_string()),
|
||||
Vec::new(),
|
||||
)
|
||||
})?;
|
||||
|
||||
let values = casted
|
||||
.into_iter()
|
||||
.skip(from_row)
|
||||
.take(size)
|
||||
.map(|v| match v {
|
||||
Some(a) => Value::String {
|
||||
val: a.into(),
|
||||
span,
|
||||
},
|
||||
None => Value::Nothing { span },
|
||||
})
|
||||
.collect::<Vec<Value>>();
|
||||
|
||||
Ok(Column::new(casted.name().into(), values))
|
||||
}
|
||||
DataType::Object(x) => {
|
||||
let casted = series
|
||||
.as_any()
|
||||
.downcast_ref::<ChunkedArray<ObjectType<DataFrameValue>>>();
|
||||
|
||||
match casted {
|
||||
None => Err(ShellError::GenericError(
|
||||
"Error casting object from series".into(),
|
||||
"".to_string(),
|
||||
None,
|
||||
Some(format!("Object not supported for conversion: {x}")),
|
||||
Vec::new(),
|
||||
)),
|
||||
Some(ca) => {
|
||||
let values = ca
|
||||
.into_iter()
|
||||
.skip(from_row)
|
||||
.take(size)
|
||||
.map(|v| match v {
|
||||
Some(a) => a.get_value(),
|
||||
None => Value::Nothing { span },
|
||||
})
|
||||
.collect::<Vec<Value>>();
|
||||
|
||||
Ok(Column::new(ca.name().into(), values))
|
||||
}
|
||||
}
|
||||
}
|
||||
DataType::Date => {
|
||||
let casted = series.date().map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Error casting column to date".into(),
|
||||
"".to_string(),
|
||||
None,
|
||||
Some(e.to_string()),
|
||||
Vec::new(),
|
||||
)
|
||||
})?;
|
||||
|
||||
let values = casted
|
||||
.into_iter()
|
||||
.skip(from_row)
|
||||
.take(size)
|
||||
.map(|v| match v {
|
||||
Some(a) => {
|
||||
// elapsed time in day since 1970-01-01
|
||||
let seconds = a as i64 * SECS_PER_DAY;
|
||||
let naive_datetime = match NaiveDateTime::from_timestamp_opt(seconds, 0) {
|
||||
Some(val) => val,
|
||||
None => {
|
||||
return Value::Error {
|
||||
error: Box::new(ShellError::UnsupportedInput(
|
||||
"The given local datetime representation is invalid."
|
||||
.to_string(),
|
||||
format!("timestamp is {a:?}"),
|
||||
span,
|
||||
Span::unknown(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
};
|
||||
// Zero length offset
|
||||
let offset = match FixedOffset::east_opt(0) {
|
||||
Some(val) => val,
|
||||
None => {
|
||||
return Value::Error {
|
||||
error: Box::new(ShellError::UnsupportedInput(
|
||||
"The given local datetime representation is invalid."
|
||||
.to_string(),
|
||||
format!("timestamp is {a:?}"),
|
||||
span,
|
||||
Span::unknown(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
};
|
||||
let datetime = DateTime::<FixedOffset>::from_utc(naive_datetime, offset);
|
||||
|
||||
Value::Date {
|
||||
val: datetime,
|
||||
span,
|
||||
}
|
||||
}
|
||||
None => Value::Nothing { span },
|
||||
})
|
||||
.collect::<Vec<Value>>();
|
||||
|
||||
Ok(Column::new(casted.name().into(), values))
|
||||
}
|
||||
DataType::Datetime(time_unit, _) => {
|
||||
let casted = series.datetime().map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Error casting column to datetime".into(),
|
||||
"".to_string(),
|
||||
None,
|
||||
Some(e.to_string()),
|
||||
Vec::new(),
|
||||
)
|
||||
})?;
|
||||
let values = casted
|
||||
.into_iter()
|
||||
.skip(from_row)
|
||||
.take(size)
|
||||
.map(|v| match v {
|
||||
Some(a) => {
|
||||
let unit_divisor = match time_unit {
|
||||
TimeUnit::Nanoseconds => 1_000_000_000,
|
||||
TimeUnit::Microseconds => 1_000_000,
|
||||
TimeUnit::Milliseconds => 1_000,
|
||||
};
|
||||
// elapsed time in nano/micro/milliseconds since 1970-01-01
|
||||
let seconds = a / unit_divisor;
|
||||
let naive_datetime = match NaiveDateTime::from_timestamp_opt(seconds, 0) {
|
||||
Some(val) => val,
|
||||
None => {
|
||||
return Value::Error {
|
||||
error: Box::new(ShellError::UnsupportedInput(
|
||||
"The given local datetime representation is invalid."
|
||||
.to_string(),
|
||||
format!("timestamp is {a:?}"),
|
||||
span,
|
||||
Span::unknown(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
};
|
||||
// Zero length offset
|
||||
let offset = match FixedOffset::east_opt(0) {
|
||||
Some(val) => val,
|
||||
None => {
|
||||
return Value::Error {
|
||||
error: Box::new(ShellError::UnsupportedInput(
|
||||
"The given local datetime representation is invalid."
|
||||
.to_string(),
|
||||
format!("timestamp is {a:?}"),
|
||||
span,
|
||||
Span::unknown(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
};
|
||||
let datetime = DateTime::<FixedOffset>::from_utc(naive_datetime, offset);
|
||||
|
||||
Value::Date {
|
||||
val: datetime,
|
||||
span,
|
||||
}
|
||||
}
|
||||
None => Value::Nothing { span },
|
||||
})
|
||||
.collect::<Vec<Value>>();
|
||||
|
||||
Ok(Column::new(casted.name().into(), values))
|
||||
}
|
||||
DataType::Time => {
|
||||
let casted = series.timestamp(TimeUnit::Nanoseconds).map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Error casting column to time".into(),
|
||||
"".to_string(),
|
||||
None,
|
||||
Some(e.to_string()),
|
||||
Vec::new(),
|
||||
)
|
||||
})?;
|
||||
|
||||
let values = casted
|
||||
.into_iter()
|
||||
.skip(from_row)
|
||||
.take(size)
|
||||
.map(|v| match v {
|
||||
Some(nanoseconds) => Value::Duration {
|
||||
val: nanoseconds,
|
||||
span,
|
||||
},
|
||||
None => Value::Nothing { span },
|
||||
})
|
||||
.collect::<Vec<Value>>();
|
||||
|
||||
Ok(Column::new(casted.name().into(), values))
|
||||
}
|
||||
e => Err(ShellError::GenericError(
|
||||
"Error creating Dataframe".into(),
|
||||
"".to_string(),
|
||||
None,
|
||||
Some(format!("Value not supported in nushell: {e}")),
|
||||
Vec::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
// Adds a separator to the vector of values using the column names from the
|
||||
// dataframe to create the Values Row
|
||||
pub fn add_separator(values: &mut Vec<Value>, df: &DataFrame, span: Span) {
|
||||
let mut cols = vec![];
|
||||
let mut vals = vec![];
|
||||
|
||||
cols.push("index".to_string());
|
||||
vals.push(Value::String {
|
||||
val: "...".into(),
|
||||
span,
|
||||
});
|
||||
|
||||
for name in df.get_column_names() {
|
||||
cols.push(name.to_string());
|
||||
vals.push(Value::String {
|
||||
val: "...".into(),
|
||||
span,
|
||||
})
|
||||
}
|
||||
|
||||
let extra_record = Value::Record { cols, vals, span };
|
||||
|
||||
values.push(extra_record);
|
||||
}
|
||||
|
||||
// Inserting the values found in a Value::List
|
||||
pub fn insert_record(
|
||||
column_values: &mut ColumnMap,
|
||||
cols: &[String],
|
||||
values: &[Value],
|
||||
) -> Result<(), ShellError> {
|
||||
for (col, value) in cols.iter().zip(values.iter()) {
|
||||
insert_value(value.clone(), col.clone(), column_values)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn insert_value(
|
||||
value: Value,
|
||||
key: String,
|
||||
column_values: &mut ColumnMap,
|
||||
) -> Result<(), ShellError> {
|
||||
let col_val = match column_values.entry(key.clone()) {
|
||||
Entry::Vacant(entry) => entry.insert(TypedColumn::new_empty(key)),
|
||||
Entry::Occupied(entry) => entry.into_mut(),
|
||||
};
|
||||
|
||||
// Checking that the type for the value is the same
|
||||
// for the previous value in the column
|
||||
if col_val.values.is_empty() {
|
||||
match &value {
|
||||
Value::Int { .. } => {
|
||||
col_val.column_type = Some(InputType::Integer);
|
||||
}
|
||||
Value::Float { .. } => {
|
||||
col_val.column_type = Some(InputType::Float);
|
||||
}
|
||||
Value::String { .. } => {
|
||||
col_val.column_type = Some(InputType::String);
|
||||
}
|
||||
Value::Bool { .. } => {
|
||||
col_val.column_type = Some(InputType::Boolean);
|
||||
}
|
||||
Value::Date { .. } => {
|
||||
col_val.column_type = Some(InputType::Date);
|
||||
}
|
||||
Value::Duration { .. } => {
|
||||
col_val.column_type = Some(InputType::Duration);
|
||||
}
|
||||
Value::Filesize { .. } => {
|
||||
col_val.column_type = Some(InputType::Filesize);
|
||||
}
|
||||
_ => col_val.column_type = Some(InputType::Object),
|
||||
}
|
||||
col_val.values.push(value);
|
||||
} else {
|
||||
let prev_value = &col_val.values[col_val.values.len() - 1];
|
||||
|
||||
match (&prev_value, &value) {
|
||||
(Value::Int { .. }, Value::Int { .. })
|
||||
| (Value::Float { .. }, Value::Float { .. })
|
||||
| (Value::String { .. }, Value::String { .. })
|
||||
| (Value::Bool { .. }, Value::Bool { .. })
|
||||
| (Value::Date { .. }, Value::Date { .. })
|
||||
| (Value::Filesize { .. }, Value::Filesize { .. })
|
||||
| (Value::Duration { .. }, Value::Duration { .. }) => col_val.values.push(value),
|
||||
_ => {
|
||||
col_val.column_type = Some(InputType::Object);
|
||||
col_val.values.push(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// The ColumnMap has the parsed data from the StreamInput
|
||||
// This data can be used to create a Series object that can initialize
|
||||
// the dataframe based on the type of data that is found
|
||||
pub fn from_parsed_columns(column_values: ColumnMap) -> Result<NuDataFrame, ShellError> {
|
||||
let mut df_series: Vec<Series> = Vec::new();
|
||||
for (name, column) in column_values {
|
||||
if let Some(column_type) = &column.column_type {
|
||||
match column_type {
|
||||
InputType::Float => {
|
||||
let series_values: Result<Vec<_>, _> =
|
||||
column.values.iter().map(|v| v.as_f64()).collect();
|
||||
let series = Series::new(&name, series_values?);
|
||||
df_series.push(series)
|
||||
}
|
||||
InputType::Integer => {
|
||||
let series_values: Result<Vec<_>, _> =
|
||||
column.values.iter().map(|v| v.as_i64()).collect();
|
||||
let series = Series::new(&name, series_values?);
|
||||
df_series.push(series)
|
||||
}
|
||||
InputType::Filesize => {
|
||||
let series_values: Result<Vec<_>, _> =
|
||||
column.values.iter().map(|v| v.as_i64()).collect();
|
||||
let series = Series::new(&name, series_values?);
|
||||
df_series.push(series)
|
||||
}
|
||||
InputType::String => {
|
||||
let series_values: Result<Vec<_>, _> =
|
||||
column.values.iter().map(|v| v.as_string()).collect();
|
||||
let series = Series::new(&name, series_values?);
|
||||
df_series.push(series)
|
||||
}
|
||||
InputType::Boolean => {
|
||||
let series_values: Result<Vec<_>, _> =
|
||||
column.values.iter().map(|v| v.as_bool()).collect();
|
||||
let series = Series::new(&name, series_values?);
|
||||
df_series.push(series)
|
||||
}
|
||||
InputType::Object => {
|
||||
let mut builder =
|
||||
ObjectChunkedBuilder::<DataFrameValue>::new(&name, column.values.len());
|
||||
|
||||
for v in &column.values {
|
||||
builder.append_value(DataFrameValue::new(v.clone()));
|
||||
}
|
||||
|
||||
let res = builder.finish();
|
||||
df_series.push(res.into_series())
|
||||
}
|
||||
InputType::Date => {
|
||||
let it = column.values.iter().map(|v| {
|
||||
if let Value::Date { val, .. } = &v {
|
||||
Some(val.timestamp_millis())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
|
||||
let res: DatetimeChunked =
|
||||
ChunkedArray::<Int64Type>::from_iter_options(&name, it)
|
||||
.into_datetime(TimeUnit::Milliseconds, None);
|
||||
|
||||
df_series.push(res.into_series())
|
||||
}
|
||||
InputType::Duration => {
|
||||
let series_values: Result<Vec<_>, _> =
|
||||
column.values.iter().map(|v| v.as_i64()).collect();
|
||||
let series = Series::new(&name, series_values?);
|
||||
df_series.push(series)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
DataFrame::new(df_series)
|
||||
.map(|df| NuDataFrame::new(false, df))
|
||||
.map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Error creating dataframe".into(),
|
||||
"".to_string(),
|
||||
None,
|
||||
Some(e.to_string()),
|
||||
Vec::new(),
|
||||
)
|
||||
})
|
||||
}
|
@ -0,0 +1,68 @@
|
||||
use super::NuDataFrame;
|
||||
use nu_protocol::{ast::Operator, CustomValue, ShellError, Span, Value};
|
||||
|
||||
// CustomValue implementation for NuDataFrame
|
||||
impl CustomValue for NuDataFrame {
|
||||
fn typetag_name(&self) -> &'static str {
|
||||
"dataframe"
|
||||
}
|
||||
|
||||
fn typetag_deserialize(&self) {
|
||||
unimplemented!("typetag_deserialize")
|
||||
}
|
||||
|
||||
fn clone_value(&self, span: nu_protocol::Span) -> Value {
|
||||
let cloned = NuDataFrame {
|
||||
df: self.df.clone(),
|
||||
from_lazy: false,
|
||||
};
|
||||
|
||||
Value::CustomValue {
|
||||
val: Box::new(cloned),
|
||||
span,
|
||||
}
|
||||
}
|
||||
|
||||
fn value_string(&self) -> String {
|
||||
self.typetag_name().to_string()
|
||||
}
|
||||
|
||||
fn to_base_value(&self, span: Span) -> Result<Value, ShellError> {
|
||||
let vals = self.print(span)?;
|
||||
|
||||
Ok(Value::List { vals, span })
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn follow_path_int(&self, count: usize, span: Span) -> Result<Value, ShellError> {
|
||||
self.get_value(count, span)
|
||||
}
|
||||
|
||||
fn follow_path_string(&self, column_name: String, span: Span) -> Result<Value, ShellError> {
|
||||
let column = self.column(&column_name, span)?;
|
||||
Ok(column.into_value(span))
|
||||
}
|
||||
|
||||
fn partial_cmp(&self, other: &Value) -> Option<std::cmp::Ordering> {
|
||||
match other {
|
||||
Value::CustomValue { val, .. } => val
|
||||
.as_any()
|
||||
.downcast_ref::<Self>()
|
||||
.and_then(|other| self.is_equal(other)),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn operation(
|
||||
&self,
|
||||
lhs_span: Span,
|
||||
operator: Operator,
|
||||
op: Span,
|
||||
right: &Value,
|
||||
) -> Result<Value, ShellError> {
|
||||
self.compute_with_value(lhs_span, operator, op, right)
|
||||
}
|
||||
}
|
514
crates/nu-cmd-dataframe/src/dataframe/values/nu_dataframe/mod.rs
Normal file
514
crates/nu-cmd-dataframe/src/dataframe/values/nu_dataframe/mod.rs
Normal file
@ -0,0 +1,514 @@
|
||||
mod between_values;
|
||||
mod conversion;
|
||||
mod custom_value;
|
||||
mod operations;
|
||||
|
||||
pub use conversion::{Column, ColumnMap};
|
||||
pub use operations::Axis;
|
||||
|
||||
use indexmap::map::IndexMap;
|
||||
use nu_protocol::{did_you_mean, PipelineData, ShellError, Span, Value};
|
||||
use polars::prelude::{DataFrame, DataType, IntoLazy, LazyFrame, PolarsObject, Series};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{cmp::Ordering, fmt::Display, hash::Hasher};
|
||||
|
||||
use super::{utils::DEFAULT_ROWS, NuLazyFrame};
|
||||
|
||||
// DataFrameValue is an encapsulation of Nushell Value that can be used
|
||||
// to define the PolarsObject Trait. The polars object trait allows to
|
||||
// create dataframes with mixed datatypes
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct DataFrameValue(Value);
|
||||
|
||||
impl DataFrameValue {
|
||||
fn new(value: Value) -> Self {
|
||||
Self(value)
|
||||
}
|
||||
|
||||
fn get_value(&self) -> Value {
|
||||
self.0.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for DataFrameValue {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0.get_type())
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for DataFrameValue {
|
||||
fn default() -> Self {
|
||||
Self(Value::Nothing {
|
||||
span: Span::unknown(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for DataFrameValue {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.0.partial_cmp(&other.0).map_or(false, Ordering::is_eq)
|
||||
}
|
||||
}
|
||||
impl Eq for DataFrameValue {}
|
||||
|
||||
impl std::hash::Hash for DataFrameValue {
|
||||
fn hash<H: Hasher>(&self, state: &mut H) {
|
||||
match &self.0 {
|
||||
Value::Nothing { .. } => 0.hash(state),
|
||||
Value::Int { val, .. } => val.hash(state),
|
||||
Value::String { val, .. } => val.hash(state),
|
||||
// TODO. Define hash for the rest of types
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PolarsObject for DataFrameValue {
|
||||
fn type_name() -> &'static str {
|
||||
"object"
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct NuDataFrame {
|
||||
pub df: DataFrame,
|
||||
pub from_lazy: bool,
|
||||
}
|
||||
|
||||
impl AsRef<DataFrame> for NuDataFrame {
|
||||
fn as_ref(&self) -> &polars::prelude::DataFrame {
|
||||
&self.df
|
||||
}
|
||||
}
|
||||
|
||||
impl AsMut<DataFrame> for NuDataFrame {
|
||||
fn as_mut(&mut self) -> &mut polars::prelude::DataFrame {
|
||||
&mut self.df
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DataFrame> for NuDataFrame {
|
||||
fn from(df: DataFrame) -> Self {
|
||||
Self {
|
||||
df,
|
||||
from_lazy: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NuDataFrame {
|
||||
pub fn new(from_lazy: bool, df: DataFrame) -> Self {
|
||||
Self { df, from_lazy }
|
||||
}
|
||||
|
||||
pub fn lazy(&self) -> LazyFrame {
|
||||
self.df.clone().lazy()
|
||||
}
|
||||
|
||||
fn default_value(span: Span) -> Value {
|
||||
let dataframe = DataFrame::default();
|
||||
NuDataFrame::dataframe_into_value(dataframe, span)
|
||||
}
|
||||
|
||||
pub fn dataframe_into_value(dataframe: DataFrame, span: Span) -> Value {
|
||||
Value::CustomValue {
|
||||
val: Box::new(Self::new(false, dataframe)),
|
||||
span,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_value(self, span: Span) -> Value {
|
||||
if self.from_lazy {
|
||||
let lazy = NuLazyFrame::from_dataframe(self);
|
||||
Value::CustomValue {
|
||||
val: Box::new(lazy),
|
||||
span,
|
||||
}
|
||||
} else {
|
||||
Value::CustomValue {
|
||||
val: Box::new(self),
|
||||
span,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn series_to_value(series: Series, span: Span) -> Result<Value, ShellError> {
|
||||
match DataFrame::new(vec![series]) {
|
||||
Ok(dataframe) => Ok(NuDataFrame::dataframe_into_value(dataframe, span)),
|
||||
Err(e) => Err(ShellError::GenericError(
|
||||
"Error creating dataframe".into(),
|
||||
e.to_string(),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_from_iter<T>(iter: T) -> Result<Self, ShellError>
|
||||
where
|
||||
T: Iterator<Item = Value>,
|
||||
{
|
||||
// Dictionary to store the columnar data extracted from
|
||||
// the input. During the iteration we check if the values
|
||||
// have different type
|
||||
let mut column_values: ColumnMap = IndexMap::new();
|
||||
|
||||
for value in iter {
|
||||
match value {
|
||||
Value::CustomValue { .. } => return Self::try_from_value(value),
|
||||
Value::List { vals, .. } => {
|
||||
let cols = (0..vals.len())
|
||||
.map(|i| format!("{i}"))
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
conversion::insert_record(&mut column_values, &cols, &vals)?
|
||||
}
|
||||
Value::Record { cols, vals, .. } => {
|
||||
conversion::insert_record(&mut column_values, &cols, &vals)?
|
||||
}
|
||||
_ => {
|
||||
let key = "0".to_string();
|
||||
conversion::insert_value(value, key, &mut column_values)?
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
conversion::from_parsed_columns(column_values)
|
||||
}
|
||||
|
||||
pub fn try_from_series(columns: Vec<Series>, span: Span) -> Result<Self, ShellError> {
|
||||
let dataframe = DataFrame::new(columns).map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Error creating dataframe".into(),
|
||||
format!("Unable to create DataFrame: {e}"),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(Self::new(false, dataframe))
|
||||
}
|
||||
|
||||
pub fn try_from_columns(columns: Vec<Column>) -> Result<Self, ShellError> {
|
||||
let mut column_values: ColumnMap = IndexMap::new();
|
||||
|
||||
for column in columns {
|
||||
let name = column.name().to_string();
|
||||
for value in column {
|
||||
conversion::insert_value(value, name.clone(), &mut column_values)?;
|
||||
}
|
||||
}
|
||||
|
||||
conversion::from_parsed_columns(column_values)
|
||||
}
|
||||
|
||||
pub fn fill_list_nan(list: Vec<Value>, list_span: Span, fill: Value) -> Value {
|
||||
let newlist = list
|
||||
.into_iter()
|
||||
.map(|value| match value {
|
||||
Value::Float { val, .. } => {
|
||||
if val.is_nan() {
|
||||
fill.clone()
|
||||
} else {
|
||||
value
|
||||
}
|
||||
}
|
||||
Value::List { vals, span } => Self::fill_list_nan(vals, span, fill.clone()),
|
||||
_ => value,
|
||||
})
|
||||
.collect::<Vec<Value>>();
|
||||
Value::list(newlist, list_span)
|
||||
}
|
||||
|
||||
pub fn columns(&self, span: Span) -> Result<Vec<Column>, ShellError> {
|
||||
let height = self.df.height();
|
||||
self.df
|
||||
.get_columns()
|
||||
.iter()
|
||||
.map(|col| conversion::create_column(col, 0, height, span))
|
||||
.collect::<Result<Vec<Column>, ShellError>>()
|
||||
}
|
||||
|
||||
pub fn try_from_value(value: Value) -> Result<Self, ShellError> {
|
||||
if Self::can_downcast(&value) {
|
||||
Ok(Self::get_df(value)?)
|
||||
} else if NuLazyFrame::can_downcast(&value) {
|
||||
let span = value.span()?;
|
||||
let lazy = NuLazyFrame::try_from_value(value)?;
|
||||
let df = lazy.collect(span)?;
|
||||
Ok(df)
|
||||
} else {
|
||||
Err(ShellError::CantConvert {
|
||||
to_type: "lazy or eager dataframe".into(),
|
||||
from_type: value.get_type().to_string(),
|
||||
span: value.span()?,
|
||||
help: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_df(value: Value) -> Result<Self, ShellError> {
|
||||
match value {
|
||||
Value::CustomValue { val, span } => match val.as_any().downcast_ref::<Self>() {
|
||||
Some(df) => Ok(NuDataFrame {
|
||||
df: df.df.clone(),
|
||||
from_lazy: false,
|
||||
}),
|
||||
None => Err(ShellError::CantConvert {
|
||||
to_type: "dataframe".into(),
|
||||
from_type: "non-dataframe".into(),
|
||||
span,
|
||||
help: None,
|
||||
}),
|
||||
},
|
||||
x => Err(ShellError::CantConvert {
|
||||
to_type: "dataframe".into(),
|
||||
from_type: x.get_type().to_string(),
|
||||
span: x.span()?,
|
||||
help: None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_from_pipeline(input: PipelineData, span: Span) -> Result<Self, ShellError> {
|
||||
let value = input.into_value(span);
|
||||
Self::try_from_value(value)
|
||||
}
|
||||
|
||||
pub fn can_downcast(value: &Value) -> bool {
|
||||
if let Value::CustomValue { val, .. } = value {
|
||||
val.as_any().downcast_ref::<Self>().is_some()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn column(&self, column: &str, span: Span) -> Result<Self, ShellError> {
|
||||
let s = self.df.column(column).map_err(|_| {
|
||||
let possibilities = self
|
||||
.df
|
||||
.get_column_names()
|
||||
.iter()
|
||||
.map(|name| name.to_string())
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
let option = did_you_mean(&possibilities, column).unwrap_or_else(|| column.to_string());
|
||||
ShellError::DidYouMean(option, span)
|
||||
})?;
|
||||
|
||||
let df = DataFrame::new(vec![s.clone()]).map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Error creating dataframe".into(),
|
||||
e.to_string(),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(Self {
|
||||
df,
|
||||
from_lazy: false,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn is_series(&self) -> bool {
|
||||
self.df.width() == 1
|
||||
}
|
||||
|
||||
pub fn as_series(&self, span: Span) -> Result<Series, ShellError> {
|
||||
if !self.is_series() {
|
||||
return Err(ShellError::GenericError(
|
||||
"Error using as series".into(),
|
||||
"dataframe has more than one column".into(),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
));
|
||||
}
|
||||
|
||||
let series = self
|
||||
.df
|
||||
.get_columns()
|
||||
.get(0)
|
||||
.expect("We have already checked that the width is 1");
|
||||
|
||||
Ok(series.clone())
|
||||
}
|
||||
|
||||
pub fn get_value(&self, row: usize, span: Span) -> Result<Value, ShellError> {
|
||||
let series = self.as_series(span)?;
|
||||
let column = conversion::create_column(&series, row, row + 1, span)?;
|
||||
|
||||
if column.len() == 0 {
|
||||
Err(ShellError::AccessEmptyContent { span })
|
||||
} else {
|
||||
let value = column
|
||||
.into_iter()
|
||||
.next()
|
||||
.expect("already checked there is a value");
|
||||
Ok(value)
|
||||
}
|
||||
}
|
||||
|
||||
// Print is made out a head and if the dataframe is too large, then a tail
|
||||
pub fn print(&self, span: Span) -> Result<Vec<Value>, ShellError> {
|
||||
let df = &self.df;
|
||||
let size: usize = 20;
|
||||
|
||||
if df.height() > size {
|
||||
let sample_size = size / 2;
|
||||
let mut values = self.head(Some(sample_size), span)?;
|
||||
conversion::add_separator(&mut values, df, span);
|
||||
let remaining = df.height() - sample_size;
|
||||
let tail_size = remaining.min(sample_size);
|
||||
let mut tail_values = self.tail(Some(tail_size), span)?;
|
||||
values.append(&mut tail_values);
|
||||
|
||||
Ok(values)
|
||||
} else {
|
||||
Ok(self.head(Some(size), span)?)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn height(&self) -> usize {
|
||||
self.df.height()
|
||||
}
|
||||
|
||||
pub fn head(&self, rows: Option<usize>, span: Span) -> Result<Vec<Value>, ShellError> {
|
||||
let to_row = rows.unwrap_or(5);
|
||||
let values = self.to_rows(0, to_row, span)?;
|
||||
|
||||
Ok(values)
|
||||
}
|
||||
|
||||
pub fn tail(&self, rows: Option<usize>, span: Span) -> Result<Vec<Value>, ShellError> {
|
||||
let df = &self.df;
|
||||
let to_row = df.height();
|
||||
let size = rows.unwrap_or(DEFAULT_ROWS);
|
||||
let from_row = to_row.saturating_sub(size);
|
||||
|
||||
let values = self.to_rows(from_row, to_row, span)?;
|
||||
|
||||
Ok(values)
|
||||
}
|
||||
|
||||
pub fn to_rows(
|
||||
&self,
|
||||
from_row: usize,
|
||||
to_row: usize,
|
||||
span: Span,
|
||||
) -> Result<Vec<Value>, ShellError> {
|
||||
let df = &self.df;
|
||||
let upper_row = to_row.min(df.height());
|
||||
|
||||
let mut size: usize = 0;
|
||||
let columns = self
|
||||
.df
|
||||
.get_columns()
|
||||
.iter()
|
||||
.map(
|
||||
|col| match conversion::create_column(col, from_row, upper_row, span) {
|
||||
Ok(col) => {
|
||||
size = col.len();
|
||||
Ok(col)
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
},
|
||||
)
|
||||
.collect::<Result<Vec<Column>, ShellError>>()?;
|
||||
|
||||
let mut iterators = columns
|
||||
.into_iter()
|
||||
.map(|col| (col.name().to_string(), col.into_iter()))
|
||||
.collect::<Vec<(String, std::vec::IntoIter<Value>)>>();
|
||||
|
||||
let values = (0..size)
|
||||
.map(|i| {
|
||||
let mut cols = vec![];
|
||||
let mut vals = vec![];
|
||||
|
||||
cols.push("index".into());
|
||||
vals.push(Value::Int {
|
||||
val: (i + from_row) as i64,
|
||||
span,
|
||||
});
|
||||
|
||||
for (name, col) in &mut iterators {
|
||||
cols.push(name.clone());
|
||||
|
||||
match col.next() {
|
||||
Some(v) => vals.push(v),
|
||||
None => vals.push(Value::Nothing { span }),
|
||||
};
|
||||
}
|
||||
|
||||
Value::Record { cols, vals, span }
|
||||
})
|
||||
.collect::<Vec<Value>>();
|
||||
|
||||
Ok(values)
|
||||
}
|
||||
|
||||
// Dataframes are considered equal if they have the same shape, column name and values
|
||||
pub fn is_equal(&self, other: &Self) -> Option<Ordering> {
|
||||
if self.as_ref().width() == 0 {
|
||||
// checking for empty dataframe
|
||||
return None;
|
||||
}
|
||||
|
||||
if self.as_ref().get_column_names() != other.as_ref().get_column_names() {
|
||||
// checking both dataframes share the same names
|
||||
return None;
|
||||
}
|
||||
|
||||
if self.as_ref().height() != other.as_ref().height() {
|
||||
// checking both dataframes have the same row size
|
||||
return None;
|
||||
}
|
||||
|
||||
// sorting dataframe by the first column
|
||||
let column_names = self.as_ref().get_column_names();
|
||||
let first_col = column_names
|
||||
.first()
|
||||
.expect("already checked that dataframe is different than 0");
|
||||
|
||||
// if unable to sort, then unable to compare
|
||||
let lhs = match self.as_ref().sort(vec![*first_col], false) {
|
||||
Ok(df) => df,
|
||||
Err(_) => return None,
|
||||
};
|
||||
|
||||
let rhs = match other.as_ref().sort(vec![*first_col], false) {
|
||||
Ok(df) => df,
|
||||
Err(_) => return None,
|
||||
};
|
||||
|
||||
for name in self.as_ref().get_column_names() {
|
||||
let self_series = lhs.column(name).expect("name from dataframe names");
|
||||
|
||||
let other_series = rhs
|
||||
.column(name)
|
||||
.expect("already checked that name in other");
|
||||
|
||||
let self_series = match self_series.dtype() {
|
||||
// Casting needed to compare other numeric types with nushell numeric type.
|
||||
// In nushell we only have i64 integer numeric types and any array created
|
||||
// with nushell untagged primitives will be of type i64
|
||||
DataType::UInt32 | DataType::Int32 => match self_series.cast(&DataType::Int64) {
|
||||
Ok(series) => series,
|
||||
Err(_) => return None,
|
||||
},
|
||||
_ => self_series.clone(),
|
||||
};
|
||||
|
||||
if !self_series.series_equal(other_series) {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
||||
Some(Ordering::Equal)
|
||||
}
|
||||
}
|
@ -0,0 +1,214 @@
|
||||
use nu_protocol::{ast::Operator, ShellError, Span, Spanned, Value};
|
||||
use polars::prelude::{DataFrame, Series};
|
||||
|
||||
use super::between_values::{
|
||||
between_dataframes, compute_between_series, compute_series_single_value,
|
||||
};
|
||||
|
||||
use super::NuDataFrame;
|
||||
|
||||
pub enum Axis {
|
||||
Row,
|
||||
Column,
|
||||
}
|
||||
|
||||
impl NuDataFrame {
|
||||
pub fn compute_with_value(
|
||||
&self,
|
||||
lhs_span: Span,
|
||||
operator: Operator,
|
||||
op_span: Span,
|
||||
right: &Value,
|
||||
) -> Result<Value, ShellError> {
|
||||
match right {
|
||||
Value::CustomValue {
|
||||
val: rhs,
|
||||
span: rhs_span,
|
||||
} => {
|
||||
let rhs = rhs.as_any().downcast_ref::<NuDataFrame>().ok_or_else(|| {
|
||||
ShellError::DowncastNotPossible(
|
||||
"Unable to create dataframe".to_string(),
|
||||
*rhs_span,
|
||||
)
|
||||
})?;
|
||||
|
||||
match (self.is_series(), rhs.is_series()) {
|
||||
(true, true) => {
|
||||
let lhs = &self
|
||||
.as_series(lhs_span)
|
||||
.expect("Already checked that is a series");
|
||||
let rhs = &rhs
|
||||
.as_series(*rhs_span)
|
||||
.expect("Already checked that is a series");
|
||||
|
||||
if lhs.dtype() != rhs.dtype() {
|
||||
return Err(ShellError::IncompatibleParameters {
|
||||
left_message: format!("datatype {}", lhs.dtype()),
|
||||
left_span: lhs_span,
|
||||
right_message: format!("datatype {}", lhs.dtype()),
|
||||
right_span: *rhs_span,
|
||||
});
|
||||
}
|
||||
|
||||
if lhs.len() != rhs.len() {
|
||||
return Err(ShellError::IncompatibleParameters {
|
||||
left_message: format!("len {}", lhs.len()),
|
||||
left_span: lhs_span,
|
||||
right_message: format!("len {}", rhs.len()),
|
||||
right_span: *rhs_span,
|
||||
});
|
||||
}
|
||||
|
||||
let op = Spanned {
|
||||
item: operator,
|
||||
span: op_span,
|
||||
};
|
||||
|
||||
compute_between_series(
|
||||
op,
|
||||
&NuDataFrame::default_value(lhs_span),
|
||||
lhs,
|
||||
right,
|
||||
rhs,
|
||||
)
|
||||
}
|
||||
_ => {
|
||||
if self.df.height() != rhs.df.height() {
|
||||
return Err(ShellError::IncompatibleParameters {
|
||||
left_message: format!("rows {}", self.df.height()),
|
||||
left_span: lhs_span,
|
||||
right_message: format!("rows {}", rhs.df.height()),
|
||||
right_span: *rhs_span,
|
||||
});
|
||||
}
|
||||
|
||||
let op = Spanned {
|
||||
item: operator,
|
||||
span: op_span,
|
||||
};
|
||||
|
||||
between_dataframes(
|
||||
op,
|
||||
&NuDataFrame::default_value(lhs_span),
|
||||
self,
|
||||
right,
|
||||
rhs,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
let op = Spanned {
|
||||
item: operator,
|
||||
span: op_span,
|
||||
};
|
||||
|
||||
compute_series_single_value(op, &NuDataFrame::default_value(lhs_span), self, right)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn append_df(
|
||||
&self,
|
||||
other: &NuDataFrame,
|
||||
axis: Axis,
|
||||
span: Span,
|
||||
) -> Result<Self, ShellError> {
|
||||
match axis {
|
||||
Axis::Row => {
|
||||
let mut columns: Vec<&str> = Vec::new();
|
||||
|
||||
let new_cols = self
|
||||
.df
|
||||
.get_columns()
|
||||
.iter()
|
||||
.chain(other.df.get_columns())
|
||||
.map(|s| {
|
||||
let name = if columns.contains(&s.name()) {
|
||||
format!("{}_{}", s.name(), "x")
|
||||
} else {
|
||||
columns.push(s.name());
|
||||
s.name().to_string()
|
||||
};
|
||||
|
||||
let mut series = s.clone();
|
||||
series.rename(&name);
|
||||
series
|
||||
})
|
||||
.collect::<Vec<Series>>();
|
||||
|
||||
let df_new = DataFrame::new(new_cols).map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Error creating dataframe".into(),
|
||||
e.to_string(),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(NuDataFrame::new(false, df_new))
|
||||
}
|
||||
Axis::Column => {
|
||||
if self.df.width() != other.df.width() {
|
||||
return Err(ShellError::IncompatibleParametersSingle {
|
||||
msg: "Dataframes with different number of columns".into(),
|
||||
span,
|
||||
});
|
||||
}
|
||||
|
||||
if !self
|
||||
.df
|
||||
.get_column_names()
|
||||
.iter()
|
||||
.all(|col| other.df.get_column_names().contains(col))
|
||||
{
|
||||
return Err(ShellError::IncompatibleParametersSingle {
|
||||
msg: "Dataframes with different columns names".into(),
|
||||
span,
|
||||
});
|
||||
}
|
||||
|
||||
let new_cols = self
|
||||
.df
|
||||
.get_columns()
|
||||
.iter()
|
||||
.map(|s| {
|
||||
let other_col = other
|
||||
.df
|
||||
.column(s.name())
|
||||
.expect("Already checked that dataframes have same columns");
|
||||
|
||||
let mut tmp = s.clone();
|
||||
let res = tmp.append(other_col);
|
||||
|
||||
match res {
|
||||
Ok(s) => Ok(s.clone()),
|
||||
Err(e) => Err({
|
||||
ShellError::GenericError(
|
||||
"Error appending dataframe".into(),
|
||||
format!("Unable to append: {e}"),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)
|
||||
}),
|
||||
}
|
||||
})
|
||||
.collect::<Result<Vec<Series>, ShellError>>()?;
|
||||
|
||||
let df_new = DataFrame::new(new_cols).map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Error appending dataframe".into(),
|
||||
format!("Unable to append dataframes: {e}"),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(NuDataFrame::new(false, df_new))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,149 @@
|
||||
use std::ops::{Add, Div, Mul, Rem, Sub};
|
||||
|
||||
use super::NuExpression;
|
||||
use nu_protocol::{
|
||||
ast::{Comparison, Math, Operator},
|
||||
CustomValue, ShellError, Span, Type, Value,
|
||||
};
|
||||
use polars::prelude::Expr;
|
||||
|
||||
// CustomValue implementation for NuDataFrame
|
||||
impl CustomValue for NuExpression {
|
||||
fn typetag_name(&self) -> &'static str {
|
||||
"expression"
|
||||
}
|
||||
|
||||
fn typetag_deserialize(&self) {
|
||||
unimplemented!("typetag_deserialize")
|
||||
}
|
||||
|
||||
fn clone_value(&self, span: nu_protocol::Span) -> Value {
|
||||
let cloned = NuExpression(self.0.clone());
|
||||
|
||||
Value::CustomValue {
|
||||
val: Box::new(cloned),
|
||||
span,
|
||||
}
|
||||
}
|
||||
|
||||
fn value_string(&self) -> String {
|
||||
self.typetag_name().to_string()
|
||||
}
|
||||
|
||||
fn to_base_value(&self, span: Span) -> Result<Value, ShellError> {
|
||||
Ok(self.to_value(span))
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn operation(
|
||||
&self,
|
||||
lhs_span: Span,
|
||||
operator: Operator,
|
||||
op: Span,
|
||||
right: &Value,
|
||||
) -> Result<Value, ShellError> {
|
||||
compute_with_value(self, lhs_span, operator, op, right)
|
||||
}
|
||||
}
|
||||
|
||||
fn compute_with_value(
|
||||
left: &NuExpression,
|
||||
lhs_span: Span,
|
||||
operator: Operator,
|
||||
op: Span,
|
||||
right: &Value,
|
||||
) -> Result<Value, ShellError> {
|
||||
match right {
|
||||
Value::CustomValue {
|
||||
val: rhs,
|
||||
span: rhs_span,
|
||||
} => {
|
||||
let rhs = rhs.as_any().downcast_ref::<NuExpression>().ok_or_else(|| {
|
||||
ShellError::DowncastNotPossible(
|
||||
"Unable to create expression".to_string(),
|
||||
*rhs_span,
|
||||
)
|
||||
})?;
|
||||
|
||||
match rhs.as_ref() {
|
||||
polars::prelude::Expr::Literal(..) => {
|
||||
with_operator(operator, left, rhs, lhs_span, right.span()?, op)
|
||||
}
|
||||
_ => Err(ShellError::TypeMismatch {
|
||||
err_message: "Only literal expressions or number".into(),
|
||||
span: right.span()?,
|
||||
}),
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
let rhs = NuExpression::try_from_value(right.clone())?;
|
||||
with_operator(operator, left, &rhs, lhs_span, right.span()?, op)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn with_operator(
|
||||
operator: Operator,
|
||||
left: &NuExpression,
|
||||
right: &NuExpression,
|
||||
lhs_span: Span,
|
||||
rhs_span: Span,
|
||||
op_span: Span,
|
||||
) -> Result<Value, ShellError> {
|
||||
match operator {
|
||||
Operator::Math(Math::Plus) => apply_arithmetic(left, right, lhs_span, Add::add),
|
||||
Operator::Math(Math::Minus) => apply_arithmetic(left, right, lhs_span, Sub::sub),
|
||||
Operator::Math(Math::Multiply) => apply_arithmetic(left, right, lhs_span, Mul::mul),
|
||||
Operator::Math(Math::Divide) => apply_arithmetic(left, right, lhs_span, Div::div),
|
||||
Operator::Math(Math::Modulo) => apply_arithmetic(left, right, lhs_span, Rem::rem),
|
||||
Operator::Math(Math::FloorDivision) => apply_arithmetic(left, right, lhs_span, Div::div),
|
||||
Operator::Comparison(Comparison::Equal) => Ok(left
|
||||
.clone()
|
||||
.apply_with_expr(right.clone(), Expr::eq)
|
||||
.into_value(lhs_span)),
|
||||
Operator::Comparison(Comparison::NotEqual) => Ok(left
|
||||
.clone()
|
||||
.apply_with_expr(right.clone(), Expr::neq)
|
||||
.into_value(lhs_span)),
|
||||
Operator::Comparison(Comparison::GreaterThan) => Ok(left
|
||||
.clone()
|
||||
.apply_with_expr(right.clone(), Expr::gt)
|
||||
.into_value(lhs_span)),
|
||||
Operator::Comparison(Comparison::GreaterThanOrEqual) => Ok(left
|
||||
.clone()
|
||||
.apply_with_expr(right.clone(), Expr::gt_eq)
|
||||
.into_value(lhs_span)),
|
||||
Operator::Comparison(Comparison::LessThan) => Ok(left
|
||||
.clone()
|
||||
.apply_with_expr(right.clone(), Expr::lt)
|
||||
.into_value(lhs_span)),
|
||||
Operator::Comparison(Comparison::LessThanOrEqual) => Ok(left
|
||||
.clone()
|
||||
.apply_with_expr(right.clone(), Expr::lt_eq)
|
||||
.into_value(lhs_span)),
|
||||
_ => Err(ShellError::OperatorMismatch {
|
||||
op_span,
|
||||
lhs_ty: Type::Custom(left.typetag_name().into()),
|
||||
lhs_span,
|
||||
rhs_ty: Type::Custom(right.typetag_name().into()),
|
||||
rhs_span,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_arithmetic<F>(
|
||||
left: &NuExpression,
|
||||
right: &NuExpression,
|
||||
span: Span,
|
||||
f: F,
|
||||
) -> Result<Value, ShellError>
|
||||
where
|
||||
F: Fn(Expr, Expr) -> Expr,
|
||||
{
|
||||
let expr: NuExpression = f(left.as_ref().clone(), right.as_ref().clone()).into();
|
||||
|
||||
Ok(expr.into_value(span))
|
||||
}
|
@ -0,0 +1,637 @@
|
||||
mod custom_value;
|
||||
|
||||
use core::fmt;
|
||||
use nu_protocol::{PipelineData, ShellError, Span, Value};
|
||||
use polars::prelude::{col, AggExpr, Expr, Literal};
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
|
||||
// Polars Expression wrapper for Nushell operations
|
||||
// Object is behind and Option to allow easy implementation of
|
||||
// the Deserialize trait
|
||||
#[derive(Default, Clone)]
|
||||
pub struct NuExpression(Option<Expr>);
|
||||
|
||||
// Mocked serialization of the LazyFrame object
|
||||
impl Serialize for NuExpression {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
serializer.serialize_none()
|
||||
}
|
||||
}
|
||||
|
||||
// Mocked deserialization of the LazyFrame object
|
||||
impl<'de> Deserialize<'de> for NuExpression {
|
||||
fn deserialize<D>(_deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
Ok(NuExpression::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for NuExpression {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "NuExpression")
|
||||
}
|
||||
}
|
||||
|
||||
// Referenced access to the real LazyFrame
|
||||
impl AsRef<Expr> for NuExpression {
|
||||
fn as_ref(&self) -> &polars::prelude::Expr {
|
||||
// The only case when there cannot be an expr is if it is created
|
||||
// using the default function or if created by deserializing something
|
||||
self.0.as_ref().expect("there should always be a frame")
|
||||
}
|
||||
}
|
||||
|
||||
impl AsMut<Expr> for NuExpression {
|
||||
fn as_mut(&mut self) -> &mut polars::prelude::Expr {
|
||||
// The only case when there cannot be an expr is if it is created
|
||||
// using the default function or if created by deserializing something
|
||||
self.0.as_mut().expect("there should always be a frame")
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Expr> for NuExpression {
|
||||
fn from(expr: Expr) -> Self {
|
||||
Self(Some(expr))
|
||||
}
|
||||
}
|
||||
|
||||
impl NuExpression {
|
||||
pub fn into_value(self, span: Span) -> Value {
|
||||
Value::CustomValue {
|
||||
val: Box::new(self),
|
||||
span,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_from_value(value: Value) -> Result<Self, ShellError> {
|
||||
match value {
|
||||
Value::CustomValue { val, span } => match val.as_any().downcast_ref::<Self>() {
|
||||
Some(expr) => Ok(NuExpression(expr.0.clone())),
|
||||
None => Err(ShellError::CantConvert {
|
||||
to_type: "lazy expression".into(),
|
||||
from_type: "non-dataframe".into(),
|
||||
span,
|
||||
help: None,
|
||||
}),
|
||||
},
|
||||
Value::String { val, .. } => Ok(val.lit().into()),
|
||||
Value::Int { val, .. } => Ok(val.lit().into()),
|
||||
Value::Bool { val, .. } => Ok(val.lit().into()),
|
||||
Value::Float { val, .. } => Ok(val.lit().into()),
|
||||
x => Err(ShellError::CantConvert {
|
||||
to_type: "lazy expression".into(),
|
||||
from_type: x.get_type().to_string(),
|
||||
span: x.span()?,
|
||||
help: None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_from_pipeline(input: PipelineData, span: Span) -> Result<Self, ShellError> {
|
||||
let value = input.into_value(span);
|
||||
Self::try_from_value(value)
|
||||
}
|
||||
|
||||
pub fn can_downcast(value: &Value) -> bool {
|
||||
match value {
|
||||
Value::CustomValue { val, .. } => val.as_any().downcast_ref::<Self>().is_some(),
|
||||
Value::List { vals, .. } => vals.iter().all(Self::can_downcast),
|
||||
Value::String { .. } | Value::Int { .. } | Value::Bool { .. } | Value::Float { .. } => {
|
||||
true
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_polars(self) -> Expr {
|
||||
self.0.expect("Expression cannot be none to convert")
|
||||
}
|
||||
|
||||
pub fn apply_with_expr<F>(self, other: NuExpression, f: F) -> Self
|
||||
where
|
||||
F: Fn(Expr, Expr) -> Expr,
|
||||
{
|
||||
let expr = self.0.expect("Lazy expression must not be empty to apply");
|
||||
let other = other.0.expect("Lazy expression must not be empty to apply");
|
||||
|
||||
f(expr, other).into()
|
||||
}
|
||||
|
||||
pub fn to_value(&self, span: Span) -> Value {
|
||||
expr_to_value(self.as_ref(), span)
|
||||
}
|
||||
|
||||
// Convenient function to extract multiple Expr that could be inside a nushell Value
|
||||
pub fn extract_exprs(value: Value) -> Result<Vec<Expr>, ShellError> {
|
||||
ExtractedExpr::extract_exprs(value).map(ExtractedExpr::into_exprs)
|
||||
}
|
||||
}
|
||||
|
||||
// Enum to represent the parsing of the expressions from Value
|
||||
enum ExtractedExpr {
|
||||
Single(Expr),
|
||||
List(Vec<ExtractedExpr>),
|
||||
}
|
||||
|
||||
impl ExtractedExpr {
|
||||
fn into_exprs(self) -> Vec<Expr> {
|
||||
match self {
|
||||
Self::Single(expr) => vec![expr],
|
||||
Self::List(expressions) => expressions
|
||||
.into_iter()
|
||||
.flat_map(ExtractedExpr::into_exprs)
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_exprs(value: Value) -> Result<ExtractedExpr, ShellError> {
|
||||
match value {
|
||||
Value::String { val, .. } => Ok(ExtractedExpr::Single(col(val.as_str()))),
|
||||
Value::CustomValue { .. } => NuExpression::try_from_value(value)
|
||||
.map(NuExpression::into_polars)
|
||||
.map(ExtractedExpr::Single),
|
||||
Value::List { vals, .. } => vals
|
||||
.into_iter()
|
||||
.map(Self::extract_exprs)
|
||||
.collect::<Result<Vec<ExtractedExpr>, ShellError>>()
|
||||
.map(ExtractedExpr::List),
|
||||
x => Err(ShellError::CantConvert {
|
||||
to_type: "expression".into(),
|
||||
from_type: x.get_type().to_string(),
|
||||
span: x.span()?,
|
||||
help: None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn expr_to_value(expr: &Expr, span: Span) -> Value {
|
||||
let cols = vec!["expr".to_string(), "value".to_string()];
|
||||
|
||||
match expr {
|
||||
Expr::Alias(expr, alias) => {
|
||||
let expr = expr_to_value(expr.as_ref(), span);
|
||||
let alias = Value::String {
|
||||
val: alias.as_ref().into(),
|
||||
span,
|
||||
};
|
||||
|
||||
let cols = vec!["expr".into(), "alias".into()];
|
||||
|
||||
Value::Record {
|
||||
cols,
|
||||
vals: vec![expr, alias],
|
||||
span,
|
||||
}
|
||||
}
|
||||
Expr::Column(name) => {
|
||||
let expr_type = Value::String {
|
||||
val: "column".to_string(),
|
||||
span,
|
||||
};
|
||||
let value = Value::String {
|
||||
val: name.to_string(),
|
||||
span,
|
||||
};
|
||||
|
||||
let vals = vec![expr_type, value];
|
||||
Value::Record { cols, vals, span }
|
||||
}
|
||||
Expr::Columns(columns) => {
|
||||
let expr_type = Value::String {
|
||||
val: "columns".into(),
|
||||
span,
|
||||
};
|
||||
let value = Value::List {
|
||||
vals: columns
|
||||
.iter()
|
||||
.map(|col| Value::String {
|
||||
val: col.clone(),
|
||||
span,
|
||||
})
|
||||
.collect(),
|
||||
span,
|
||||
};
|
||||
|
||||
let vals = vec![expr_type, value];
|
||||
Value::Record { cols, vals, span }
|
||||
}
|
||||
Expr::Literal(literal) => {
|
||||
let expr_type = Value::String {
|
||||
val: "literal".into(),
|
||||
span,
|
||||
};
|
||||
let value = Value::String {
|
||||
val: format!("{literal:?}"),
|
||||
span,
|
||||
};
|
||||
|
||||
let vals = vec![expr_type, value];
|
||||
Value::Record { cols, vals, span }
|
||||
}
|
||||
Expr::BinaryExpr { left, op, right } => {
|
||||
let left_val = expr_to_value(left, span);
|
||||
let right_val = expr_to_value(right, span);
|
||||
|
||||
let operator = Value::String {
|
||||
val: format!("{op:?}"),
|
||||
span,
|
||||
};
|
||||
|
||||
let cols = vec!["left".into(), "op".into(), "right".into()];
|
||||
|
||||
Value::Record {
|
||||
cols,
|
||||
vals: vec![left_val, operator, right_val],
|
||||
span,
|
||||
}
|
||||
}
|
||||
Expr::Ternary {
|
||||
predicate,
|
||||
truthy,
|
||||
falsy,
|
||||
} => {
|
||||
let predicate = expr_to_value(predicate.as_ref(), span);
|
||||
let truthy = expr_to_value(truthy.as_ref(), span);
|
||||
let falsy = expr_to_value(falsy.as_ref(), span);
|
||||
|
||||
let cols = vec!["predicate".into(), "truthy".into(), "falsy".into()];
|
||||
|
||||
Value::Record {
|
||||
cols,
|
||||
vals: vec![predicate, truthy, falsy],
|
||||
span,
|
||||
}
|
||||
}
|
||||
Expr::Agg(agg_expr) => {
|
||||
let value = match agg_expr {
|
||||
AggExpr::Min { input: expr, .. }
|
||||
| AggExpr::Max { input: expr, .. }
|
||||
| AggExpr::Median(expr)
|
||||
| AggExpr::NUnique(expr)
|
||||
| AggExpr::First(expr)
|
||||
| AggExpr::Last(expr)
|
||||
| AggExpr::Mean(expr)
|
||||
| AggExpr::Implode(expr)
|
||||
| AggExpr::Count(expr)
|
||||
| AggExpr::Sum(expr)
|
||||
| AggExpr::AggGroups(expr)
|
||||
| AggExpr::Std(expr, _)
|
||||
| AggExpr::Var(expr, _) => expr_to_value(expr.as_ref(), span),
|
||||
AggExpr::Quantile {
|
||||
expr,
|
||||
quantile,
|
||||
interpol,
|
||||
} => {
|
||||
let expr = expr_to_value(expr.as_ref(), span);
|
||||
let quantile = expr_to_value(quantile.as_ref(), span);
|
||||
let interpol = Value::String {
|
||||
val: format!("{interpol:?}"),
|
||||
span,
|
||||
};
|
||||
|
||||
let cols = vec!["expr".into(), "quantile".into(), "interpol".into()];
|
||||
|
||||
Value::Record {
|
||||
cols,
|
||||
vals: vec![expr, quantile, interpol],
|
||||
span,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let expr_type = Value::String {
|
||||
val: "agg".into(),
|
||||
span,
|
||||
};
|
||||
|
||||
let vals = vec![expr_type, value];
|
||||
Value::Record { cols, vals, span }
|
||||
}
|
||||
Expr::Count => {
|
||||
let expr = Value::String {
|
||||
val: "count".into(),
|
||||
span,
|
||||
};
|
||||
let cols = vec!["expr".into()];
|
||||
|
||||
Value::Record {
|
||||
cols,
|
||||
vals: vec![expr],
|
||||
span,
|
||||
}
|
||||
}
|
||||
Expr::Wildcard => {
|
||||
let expr = Value::String {
|
||||
val: "wildcard".into(),
|
||||
span,
|
||||
};
|
||||
let cols = vec!["expr".into()];
|
||||
|
||||
Value::Record {
|
||||
cols,
|
||||
vals: vec![expr],
|
||||
span,
|
||||
}
|
||||
}
|
||||
Expr::Explode(expr) => {
|
||||
let expr = expr_to_value(expr.as_ref(), span);
|
||||
let cols = vec!["expr".into()];
|
||||
|
||||
Value::Record {
|
||||
cols,
|
||||
vals: vec![expr],
|
||||
span,
|
||||
}
|
||||
}
|
||||
Expr::KeepName(expr) => {
|
||||
let expr = expr_to_value(expr.as_ref(), span);
|
||||
let cols = vec!["expr".into()];
|
||||
|
||||
Value::Record {
|
||||
cols,
|
||||
vals: vec![expr],
|
||||
span,
|
||||
}
|
||||
}
|
||||
Expr::Nth(i) => {
|
||||
let expr = Value::int(*i, span);
|
||||
let cols = vec!["expr".into()];
|
||||
|
||||
Value::Record {
|
||||
cols,
|
||||
vals: vec![expr],
|
||||
span,
|
||||
}
|
||||
}
|
||||
Expr::DtypeColumn(dtypes) => {
|
||||
let vals = dtypes
|
||||
.iter()
|
||||
.map(|d| Value::String {
|
||||
val: format!("{d}"),
|
||||
span,
|
||||
})
|
||||
.collect();
|
||||
|
||||
Value::List { vals, span }
|
||||
}
|
||||
Expr::Sort { expr, options } => {
|
||||
let expr = expr_to_value(expr.as_ref(), span);
|
||||
let options = Value::String {
|
||||
val: format!("{options:?}"),
|
||||
span,
|
||||
};
|
||||
let cols = vec!["expr".into(), "options".into()];
|
||||
|
||||
Value::Record {
|
||||
cols,
|
||||
vals: vec![expr, options],
|
||||
span,
|
||||
}
|
||||
}
|
||||
Expr::Cast {
|
||||
expr,
|
||||
data_type,
|
||||
strict,
|
||||
} => {
|
||||
let expr = expr_to_value(expr.as_ref(), span);
|
||||
let dtype = Value::String {
|
||||
val: format!("{data_type:?}"),
|
||||
span,
|
||||
};
|
||||
let strict = Value::Bool { val: *strict, span };
|
||||
|
||||
let cols = vec!["expr".into(), "dtype".into(), "strict".into()];
|
||||
|
||||
Value::Record {
|
||||
cols,
|
||||
vals: vec![expr, dtype, strict],
|
||||
span,
|
||||
}
|
||||
}
|
||||
Expr::Take { expr, idx } => {
|
||||
let expr = expr_to_value(expr.as_ref(), span);
|
||||
let idx = expr_to_value(idx.as_ref(), span);
|
||||
|
||||
let cols = vec!["expr".into(), "idx".into()];
|
||||
|
||||
Value::Record {
|
||||
cols,
|
||||
vals: vec![expr, idx],
|
||||
span,
|
||||
}
|
||||
}
|
||||
Expr::SortBy {
|
||||
expr,
|
||||
by,
|
||||
descending,
|
||||
} => {
|
||||
let expr = expr_to_value(expr.as_ref(), span);
|
||||
let by: Vec<Value> = by.iter().map(|b| expr_to_value(b, span)).collect();
|
||||
let by = Value::List { vals: by, span };
|
||||
|
||||
let descending: Vec<Value> = descending
|
||||
.iter()
|
||||
.map(|r| Value::Bool { val: *r, span })
|
||||
.collect();
|
||||
let descending = Value::List {
|
||||
vals: descending,
|
||||
span,
|
||||
};
|
||||
|
||||
let cols = vec!["expr".into(), "by".into(), "descending".into()];
|
||||
|
||||
Value::Record {
|
||||
cols,
|
||||
vals: vec![expr, by, descending],
|
||||
span,
|
||||
}
|
||||
}
|
||||
Expr::Filter { input, by } => {
|
||||
let input = expr_to_value(input.as_ref(), span);
|
||||
let by = expr_to_value(by.as_ref(), span);
|
||||
|
||||
let cols = vec!["input".into(), "by".into()];
|
||||
|
||||
Value::Record {
|
||||
cols,
|
||||
vals: vec![input, by],
|
||||
span,
|
||||
}
|
||||
}
|
||||
Expr::Slice {
|
||||
input,
|
||||
offset,
|
||||
length,
|
||||
} => {
|
||||
let input = expr_to_value(input.as_ref(), span);
|
||||
let offset = expr_to_value(offset.as_ref(), span);
|
||||
let length = expr_to_value(length.as_ref(), span);
|
||||
|
||||
let cols = vec!["input".into(), "offset".into(), "length".into()];
|
||||
|
||||
Value::Record {
|
||||
cols,
|
||||
vals: vec![input, offset, length],
|
||||
span,
|
||||
}
|
||||
}
|
||||
Expr::Exclude(expr, excluded) => {
|
||||
let expr = expr_to_value(expr.as_ref(), span);
|
||||
let excluded = excluded
|
||||
.iter()
|
||||
.map(|e| Value::String {
|
||||
val: format!("{e:?}"),
|
||||
span,
|
||||
})
|
||||
.collect();
|
||||
let excluded = Value::List {
|
||||
vals: excluded,
|
||||
span,
|
||||
};
|
||||
|
||||
let cols = vec!["expr".into(), "excluded".into()];
|
||||
|
||||
Value::Record {
|
||||
cols,
|
||||
vals: vec![expr, excluded],
|
||||
span,
|
||||
}
|
||||
}
|
||||
Expr::RenameAlias { expr, function } => {
|
||||
let expr = expr_to_value(expr.as_ref(), span);
|
||||
let function = Value::String {
|
||||
val: format!("{function:?}"),
|
||||
span,
|
||||
};
|
||||
|
||||
let cols = vec!["expr".into(), "function".into()];
|
||||
|
||||
Value::Record {
|
||||
cols,
|
||||
vals: vec![expr, function],
|
||||
span,
|
||||
}
|
||||
}
|
||||
Expr::AnonymousFunction {
|
||||
input,
|
||||
function,
|
||||
output_type,
|
||||
options,
|
||||
} => {
|
||||
let input: Vec<Value> = input.iter().map(|e| expr_to_value(e, span)).collect();
|
||||
let input = Value::List { vals: input, span };
|
||||
|
||||
let function = Value::String {
|
||||
val: format!("{function:?}"),
|
||||
span,
|
||||
};
|
||||
let output_type = Value::String {
|
||||
val: format!("{output_type:?}"),
|
||||
span,
|
||||
};
|
||||
let options = Value::String {
|
||||
val: format!("{options:?}"),
|
||||
span,
|
||||
};
|
||||
|
||||
let cols = vec![
|
||||
"input".into(),
|
||||
"function".into(),
|
||||
"output_type".into(),
|
||||
"options".into(),
|
||||
];
|
||||
|
||||
Value::Record {
|
||||
cols,
|
||||
vals: vec![input, function, output_type, options],
|
||||
span,
|
||||
}
|
||||
}
|
||||
Expr::Function {
|
||||
input,
|
||||
function,
|
||||
options,
|
||||
} => {
|
||||
let input: Vec<Value> = input.iter().map(|e| expr_to_value(e, span)).collect();
|
||||
let input = Value::List { vals: input, span };
|
||||
|
||||
let function = Value::String {
|
||||
val: format!("{function:?}"),
|
||||
span,
|
||||
};
|
||||
let options = Value::String {
|
||||
val: format!("{options:?}"),
|
||||
span,
|
||||
};
|
||||
|
||||
let cols = vec!["input".into(), "function".into(), "options".into()];
|
||||
|
||||
Value::Record {
|
||||
cols,
|
||||
vals: vec![input, function, options],
|
||||
span,
|
||||
}
|
||||
}
|
||||
Expr::Cache { input, id } => {
|
||||
let input = expr_to_value(input.as_ref(), span);
|
||||
let id = Value::String {
|
||||
val: format!("{id:?}"),
|
||||
span,
|
||||
};
|
||||
|
||||
let cols = vec!["input".into(), "id".into()];
|
||||
|
||||
Value::Record {
|
||||
cols,
|
||||
vals: vec![input, id],
|
||||
span,
|
||||
}
|
||||
}
|
||||
Expr::Window {
|
||||
function,
|
||||
partition_by,
|
||||
order_by,
|
||||
options,
|
||||
} => {
|
||||
let function = expr_to_value(function, span);
|
||||
|
||||
let partition_by: Vec<Value> = partition_by
|
||||
.iter()
|
||||
.map(|e| expr_to_value(e, span))
|
||||
.collect();
|
||||
let partition_by = Value::List {
|
||||
vals: partition_by,
|
||||
span,
|
||||
};
|
||||
|
||||
let order_by = order_by
|
||||
.as_ref()
|
||||
.map(|e| expr_to_value(e.as_ref(), span))
|
||||
.unwrap_or_else(|| Value::nothing(span));
|
||||
|
||||
let options = Value::String {
|
||||
val: format!("{options:?}"),
|
||||
span,
|
||||
};
|
||||
|
||||
let cols = vec![
|
||||
"function".into(),
|
||||
"partition_by".into(),
|
||||
"order_by".into(),
|
||||
"options".into(),
|
||||
];
|
||||
|
||||
Value::Record {
|
||||
cols,
|
||||
vals: vec![function, partition_by, order_by, options],
|
||||
span,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,53 @@
|
||||
use super::NuLazyFrame;
|
||||
use nu_protocol::{CustomValue, ShellError, Span, Value};
|
||||
|
||||
// CustomValue implementation for NuDataFrame
|
||||
impl CustomValue for NuLazyFrame {
|
||||
fn typetag_name(&self) -> &'static str {
|
||||
"lazyframe"
|
||||
}
|
||||
|
||||
fn typetag_deserialize(&self) {
|
||||
unimplemented!("typetag_deserialize")
|
||||
}
|
||||
|
||||
fn clone_value(&self, span: nu_protocol::Span) -> Value {
|
||||
let cloned = NuLazyFrame {
|
||||
lazy: self.lazy.clone(),
|
||||
from_eager: self.from_eager,
|
||||
schema: self.schema.clone(),
|
||||
};
|
||||
|
||||
Value::CustomValue {
|
||||
val: Box::new(cloned),
|
||||
span,
|
||||
}
|
||||
}
|
||||
|
||||
fn value_string(&self) -> String {
|
||||
self.typetag_name().to_string()
|
||||
}
|
||||
|
||||
fn to_base_value(&self, span: Span) -> Result<Value, ShellError> {
|
||||
let cols = vec!["plan".into(), "optimized_plan".into()];
|
||||
let vals = vec![
|
||||
Value::String {
|
||||
val: self.as_ref().describe_plan(),
|
||||
span,
|
||||
},
|
||||
Value::String {
|
||||
val: self
|
||||
.as_ref()
|
||||
.describe_optimized_plan()
|
||||
.unwrap_or_else(|_| "<NOT AVAILABLE>".to_string()),
|
||||
span,
|
||||
},
|
||||
];
|
||||
|
||||
Ok(Value::Record { cols, vals, span })
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
self
|
||||
}
|
||||
}
|
195
crates/nu-cmd-dataframe/src/dataframe/values/nu_lazyframe/mod.rs
Normal file
195
crates/nu-cmd-dataframe/src/dataframe/values/nu_lazyframe/mod.rs
Normal file
@ -0,0 +1,195 @@
|
||||
mod custom_value;
|
||||
|
||||
use super::{NuDataFrame, NuExpression};
|
||||
use core::fmt;
|
||||
use nu_protocol::{PipelineData, ShellError, Span, Value};
|
||||
use polars::prelude::{Expr, IntoLazy, LazyFrame, Schema};
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
|
||||
// Lazyframe wrapper for Nushell operations
|
||||
// Polars LazyFrame is behind and Option to allow easy implementation of
|
||||
// the Deserialize trait
|
||||
#[derive(Default)]
|
||||
pub struct NuLazyFrame {
|
||||
pub lazy: Option<LazyFrame>,
|
||||
pub schema: Option<Schema>,
|
||||
pub from_eager: bool,
|
||||
}
|
||||
|
||||
// Mocked serialization of the LazyFrame object
|
||||
impl Serialize for NuLazyFrame {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
serializer.serialize_none()
|
||||
}
|
||||
}
|
||||
|
||||
// Mocked deserialization of the LazyFrame object
|
||||
impl<'de> Deserialize<'de> for NuLazyFrame {
|
||||
fn deserialize<D>(_deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
Ok(NuLazyFrame::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for NuLazyFrame {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "NuLazyframe")
|
||||
}
|
||||
}
|
||||
|
||||
// Referenced access to the real LazyFrame
|
||||
impl AsRef<LazyFrame> for NuLazyFrame {
|
||||
fn as_ref(&self) -> &polars::prelude::LazyFrame {
|
||||
// The only case when there cannot be a lazy frame is if it is created
|
||||
// using the default function or if created by deserializing something
|
||||
self.lazy.as_ref().expect("there should always be a frame")
|
||||
}
|
||||
}
|
||||
|
||||
impl AsMut<LazyFrame> for NuLazyFrame {
|
||||
fn as_mut(&mut self) -> &mut polars::prelude::LazyFrame {
|
||||
// The only case when there cannot be a lazy frame is if it is created
|
||||
// using the default function or if created by deserializing something
|
||||
self.lazy.as_mut().expect("there should always be a frame")
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LazyFrame> for NuLazyFrame {
|
||||
fn from(lazy_frame: LazyFrame) -> Self {
|
||||
Self {
|
||||
lazy: Some(lazy_frame),
|
||||
from_eager: false,
|
||||
schema: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NuLazyFrame {
|
||||
pub fn new(from_eager: bool, lazy: LazyFrame) -> Self {
|
||||
Self {
|
||||
lazy: Some(lazy),
|
||||
from_eager,
|
||||
schema: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_dataframe(df: NuDataFrame) -> Self {
|
||||
let lazy = df.as_ref().clone().lazy();
|
||||
Self {
|
||||
lazy: Some(lazy),
|
||||
from_eager: true,
|
||||
schema: Some(df.as_ref().schema()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_value(self, span: Span) -> Result<Value, ShellError> {
|
||||
if self.from_eager {
|
||||
let df = self.collect(span)?;
|
||||
Ok(Value::CustomValue {
|
||||
val: Box::new(df),
|
||||
span,
|
||||
})
|
||||
} else {
|
||||
Ok(Value::CustomValue {
|
||||
val: Box::new(self),
|
||||
span,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_polars(self) -> LazyFrame {
|
||||
self.lazy.expect("lazyframe cannot be none to convert")
|
||||
}
|
||||
|
||||
pub fn collect(self, span: Span) -> Result<NuDataFrame, ShellError> {
|
||||
self.lazy
|
||||
.expect("No empty lazy for collect")
|
||||
.collect()
|
||||
.map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Error collecting lazy frame".to_string(),
|
||||
e.to_string(),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)
|
||||
})
|
||||
.map(|df| NuDataFrame {
|
||||
df,
|
||||
from_lazy: !self.from_eager,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn try_from_value(value: Value) -> Result<Self, ShellError> {
|
||||
if Self::can_downcast(&value) {
|
||||
Ok(Self::get_lazy_df(value)?)
|
||||
} else if NuDataFrame::can_downcast(&value) {
|
||||
let df = NuDataFrame::try_from_value(value)?;
|
||||
Ok(NuLazyFrame::from_dataframe(df))
|
||||
} else {
|
||||
Err(ShellError::CantConvert {
|
||||
to_type: "lazy or eager dataframe".into(),
|
||||
from_type: value.get_type().to_string(),
|
||||
span: value.span()?,
|
||||
help: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_from_pipeline(input: PipelineData, span: Span) -> Result<Self, ShellError> {
|
||||
let value = input.into_value(span);
|
||||
Self::try_from_value(value)
|
||||
}
|
||||
|
||||
pub fn get_lazy_df(value: Value) -> Result<Self, ShellError> {
|
||||
match value {
|
||||
Value::CustomValue { val, span } => match val.as_any().downcast_ref::<Self>() {
|
||||
Some(expr) => Ok(Self {
|
||||
lazy: expr.lazy.clone(),
|
||||
from_eager: false,
|
||||
schema: None,
|
||||
}),
|
||||
None => Err(ShellError::CantConvert {
|
||||
to_type: "lazy frame".into(),
|
||||
from_type: "non-dataframe".into(),
|
||||
span,
|
||||
help: None,
|
||||
}),
|
||||
},
|
||||
x => Err(ShellError::CantConvert {
|
||||
to_type: "lazy frame".into(),
|
||||
from_type: x.get_type().to_string(),
|
||||
span: x.span()?,
|
||||
help: None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn can_downcast(value: &Value) -> bool {
|
||||
if let Value::CustomValue { val, .. } = value {
|
||||
val.as_any().downcast_ref::<Self>().is_some()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn apply_with_expr<F>(self, expr: NuExpression, f: F) -> Self
|
||||
where
|
||||
F: Fn(LazyFrame, Expr) -> LazyFrame,
|
||||
{
|
||||
let df = self.lazy.expect("Lazy frame must not be empty to apply");
|
||||
let expr = expr.into_polars();
|
||||
let new_frame = f(df, expr);
|
||||
|
||||
Self {
|
||||
from_eager: self.from_eager,
|
||||
lazy: Some(new_frame),
|
||||
schema: None,
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,44 @@
|
||||
use super::NuLazyGroupBy;
|
||||
use nu_protocol::{CustomValue, ShellError, Span, Value};
|
||||
|
||||
// CustomValue implementation for NuDataFrame
|
||||
impl CustomValue for NuLazyGroupBy {
|
||||
fn typetag_name(&self) -> &'static str {
|
||||
"lazygroupby"
|
||||
}
|
||||
|
||||
fn typetag_deserialize(&self) {
|
||||
unimplemented!("typetag_deserialize")
|
||||
}
|
||||
|
||||
fn clone_value(&self, span: nu_protocol::Span) -> Value {
|
||||
let cloned = NuLazyGroupBy {
|
||||
group_by: self.group_by.clone(),
|
||||
schema: self.schema.clone(),
|
||||
from_eager: self.from_eager,
|
||||
};
|
||||
|
||||
Value::CustomValue {
|
||||
val: Box::new(cloned),
|
||||
span,
|
||||
}
|
||||
}
|
||||
|
||||
fn value_string(&self) -> String {
|
||||
self.typetag_name().to_string()
|
||||
}
|
||||
|
||||
fn to_base_value(&self, span: Span) -> Result<Value, ShellError> {
|
||||
let cols = vec!["LazyGroupBy".into()];
|
||||
let vals = vec![Value::String {
|
||||
val: "apply aggregation to complete execution plan".into(),
|
||||
span,
|
||||
}];
|
||||
|
||||
Ok(Value::Record { cols, vals, span })
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
self
|
||||
}
|
||||
}
|
@ -0,0 +1,117 @@
|
||||
mod custom_value;
|
||||
|
||||
use core::fmt;
|
||||
use nu_protocol::{PipelineData, ShellError, Span, Value};
|
||||
use polars::prelude::{LazyGroupBy, Schema};
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
|
||||
// Lazyframe wrapper for Nushell operations
|
||||
// Polars LazyFrame is behind and Option to allow easy implementation of
|
||||
// the Deserialize trait
|
||||
#[derive(Default)]
|
||||
pub struct NuLazyGroupBy {
|
||||
pub group_by: Option<LazyGroupBy>,
|
||||
pub schema: Option<Schema>,
|
||||
pub from_eager: bool,
|
||||
}
|
||||
|
||||
// Mocked serialization of the LazyFrame object
|
||||
impl Serialize for NuLazyGroupBy {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
serializer.serialize_none()
|
||||
}
|
||||
}
|
||||
|
||||
// Mocked deserialization of the LazyFrame object
|
||||
impl<'de> Deserialize<'de> for NuLazyGroupBy {
|
||||
fn deserialize<D>(_deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
Ok(NuLazyGroupBy::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for NuLazyGroupBy {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "NuLazyGroupBy")
|
||||
}
|
||||
}
|
||||
|
||||
// Referenced access to the real LazyFrame
|
||||
impl AsRef<LazyGroupBy> for NuLazyGroupBy {
|
||||
fn as_ref(&self) -> &polars::prelude::LazyGroupBy {
|
||||
// The only case when there cannot be a lazy frame is if it is created
|
||||
// using the default function or if created by deserializing something
|
||||
self.group_by
|
||||
.as_ref()
|
||||
.expect("there should always be a frame")
|
||||
}
|
||||
}
|
||||
|
||||
impl AsMut<LazyGroupBy> for NuLazyGroupBy {
|
||||
fn as_mut(&mut self) -> &mut polars::prelude::LazyGroupBy {
|
||||
// The only case when there cannot be a lazy frame is if it is created
|
||||
// using the default function or if created by deserializing something
|
||||
self.group_by
|
||||
.as_mut()
|
||||
.expect("there should always be a frame")
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LazyGroupBy> for NuLazyGroupBy {
|
||||
fn from(group_by: LazyGroupBy) -> Self {
|
||||
Self {
|
||||
group_by: Some(group_by),
|
||||
from_eager: false,
|
||||
schema: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NuLazyGroupBy {
|
||||
pub fn into_value(self, span: Span) -> Value {
|
||||
Value::CustomValue {
|
||||
val: Box::new(self),
|
||||
span,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_polars(self) -> LazyGroupBy {
|
||||
self.group_by.expect("GroupBy cannot be none to convert")
|
||||
}
|
||||
|
||||
pub fn try_from_value(value: Value) -> Result<Self, ShellError> {
|
||||
match value {
|
||||
Value::CustomValue { val, span } => {
|
||||
match val.as_any().downcast_ref::<NuLazyGroupBy>() {
|
||||
Some(group) => Ok(Self {
|
||||
group_by: group.group_by.clone(),
|
||||
schema: group.schema.clone(),
|
||||
from_eager: group.from_eager,
|
||||
}),
|
||||
None => Err(ShellError::CantConvert {
|
||||
to_type: "lazy groupby".into(),
|
||||
from_type: "custom value".into(),
|
||||
span,
|
||||
help: None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
x => Err(ShellError::CantConvert {
|
||||
to_type: "lazy groupby".into(),
|
||||
from_type: x.get_type().to_string(),
|
||||
span: x.span()?,
|
||||
help: None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_from_pipeline(input: PipelineData, span: Span) -> Result<Self, ShellError> {
|
||||
let value = input.into_value(span);
|
||||
Self::try_from_value(value)
|
||||
}
|
||||
}
|
@ -0,0 +1,40 @@
|
||||
use super::NuWhen;
|
||||
use nu_protocol::{CustomValue, ShellError, Span, Value};
|
||||
|
||||
// CustomValue implementation for NuDataFrame
|
||||
impl CustomValue for NuWhen {
|
||||
fn typetag_name(&self) -> &'static str {
|
||||
"when"
|
||||
}
|
||||
|
||||
fn typetag_deserialize(&self) {
|
||||
unimplemented!("typetag_deserialize")
|
||||
}
|
||||
|
||||
fn clone_value(&self, span: nu_protocol::Span) -> Value {
|
||||
let cloned = self.clone();
|
||||
|
||||
Value::CustomValue {
|
||||
val: Box::new(cloned),
|
||||
span,
|
||||
}
|
||||
}
|
||||
|
||||
fn value_string(&self) -> String {
|
||||
self.typetag_name().to_string()
|
||||
}
|
||||
|
||||
fn to_base_value(&self, span: Span) -> Result<Value, ShellError> {
|
||||
let val = match self {
|
||||
NuWhen::WhenThen(_) => "whenthen".into(),
|
||||
NuWhen::WhenThenThen(_) => "whenthenthen".into(),
|
||||
};
|
||||
|
||||
let value = Value::String { val, span };
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
self
|
||||
}
|
||||
}
|
79
crates/nu-cmd-dataframe/src/dataframe/values/nu_when/mod.rs
Normal file
79
crates/nu-cmd-dataframe/src/dataframe/values/nu_when/mod.rs
Normal file
@ -0,0 +1,79 @@
|
||||
mod custom_value;
|
||||
|
||||
use core::fmt;
|
||||
use nu_protocol::{ShellError, Span, Value};
|
||||
use polars::prelude::{col, when, WhenThen, WhenThenThen};
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum NuWhen {
|
||||
WhenThen(Box<WhenThen>),
|
||||
WhenThenThen(WhenThenThen),
|
||||
}
|
||||
|
||||
// Mocked serialization of the LazyFrame object
|
||||
impl Serialize for NuWhen {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
serializer.serialize_none()
|
||||
}
|
||||
}
|
||||
|
||||
// Mocked deserialization of the LazyFrame object
|
||||
impl<'de> Deserialize<'de> for NuWhen {
|
||||
fn deserialize<D>(_deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
Ok(NuWhen::WhenThen(Box::new(when(col("a")).then(col("b")))))
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for NuWhen {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "NuWhen")
|
||||
}
|
||||
}
|
||||
|
||||
impl From<WhenThen> for NuWhen {
|
||||
fn from(when_then: WhenThen) -> Self {
|
||||
NuWhen::WhenThen(Box::new(when_then))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<WhenThenThen> for NuWhen {
|
||||
fn from(when_then_then: WhenThenThen) -> Self {
|
||||
NuWhen::WhenThenThen(when_then_then)
|
||||
}
|
||||
}
|
||||
|
||||
impl NuWhen {
|
||||
pub fn into_value(self, span: Span) -> Value {
|
||||
Value::CustomValue {
|
||||
val: Box::new(self),
|
||||
span,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_from_value(value: Value) -> Result<Self, ShellError> {
|
||||
match value {
|
||||
Value::CustomValue { val, span } => match val.as_any().downcast_ref::<Self>() {
|
||||
Some(expr) => Ok(expr.clone()),
|
||||
None => Err(ShellError::CantConvert {
|
||||
to_type: "when expression".into(),
|
||||
from_type: "non when expression".into(),
|
||||
span,
|
||||
help: None,
|
||||
}),
|
||||
},
|
||||
x => Err(ShellError::CantConvert {
|
||||
to_type: "when expression".into(),
|
||||
from_type: x.get_type().to_string(),
|
||||
span: x.span()?,
|
||||
help: None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
84
crates/nu-cmd-dataframe/src/dataframe/values/utils.rs
Normal file
84
crates/nu-cmd-dataframe/src/dataframe/values/utils.rs
Normal file
@ -0,0 +1,84 @@
|
||||
use nu_protocol::{span as span_join, ShellError, Span, Spanned, Value};
|
||||
|
||||
// Default value used when selecting rows from dataframe
|
||||
pub const DEFAULT_ROWS: usize = 5;
|
||||
|
||||
// Converts a Vec<Value> to a Vec<Spanned<String>> with a Span marking the whole
|
||||
// location of the columns for error referencing
|
||||
pub(crate) fn convert_columns(
|
||||
columns: Vec<Value>,
|
||||
span: Span,
|
||||
) -> Result<(Vec<Spanned<String>>, Span), ShellError> {
|
||||
// First column span
|
||||
let mut col_span = columns
|
||||
.get(0)
|
||||
.ok_or_else(|| {
|
||||
ShellError::GenericError(
|
||||
"Empty column list".into(),
|
||||
"Empty list found for command".into(),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)
|
||||
})
|
||||
.and_then(|v| v.span())?;
|
||||
|
||||
let res = columns
|
||||
.into_iter()
|
||||
.map(|value| match value {
|
||||
Value::String { val, span } => {
|
||||
col_span = span_join(&[col_span, span]);
|
||||
Ok(Spanned { item: val, span })
|
||||
}
|
||||
_ => Err(ShellError::GenericError(
|
||||
"Incorrect column format".into(),
|
||||
"Only string as column name".into(),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
})
|
||||
.collect::<Result<Vec<Spanned<String>>, _>>()?;
|
||||
|
||||
Ok((res, col_span))
|
||||
}
|
||||
|
||||
// Converts a Vec<Value> to a Vec<String> with a Span marking the whole
|
||||
// location of the columns for error referencing
|
||||
pub(crate) fn convert_columns_string(
|
||||
columns: Vec<Value>,
|
||||
span: Span,
|
||||
) -> Result<(Vec<String>, Span), ShellError> {
|
||||
// First column span
|
||||
let mut col_span = columns
|
||||
.get(0)
|
||||
.ok_or_else(|| {
|
||||
ShellError::GenericError(
|
||||
"Empty column list".into(),
|
||||
"Empty list found for command".into(),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)
|
||||
})
|
||||
.and_then(|v| v.span())?;
|
||||
|
||||
let res = columns
|
||||
.into_iter()
|
||||
.map(|value| match value {
|
||||
Value::String { val, span } => {
|
||||
col_span = span_join(&[col_span, span]);
|
||||
Ok(val)
|
||||
}
|
||||
_ => Err(ShellError::GenericError(
|
||||
"Incorrect column format".into(),
|
||||
"Only string as column name".into(),
|
||||
Some(span),
|
||||
None,
|
||||
Vec::new(),
|
||||
)),
|
||||
})
|
||||
.collect::<Result<Vec<String>, _>>()?;
|
||||
|
||||
Ok((res, col_span))
|
||||
}
|
Reference in New Issue
Block a user