From 8b160f98503587afd7c41b8f95e5e7fc12175a5d Mon Sep 17 00:00:00 2001 From: Jack Wright <56345+ayax79@users.noreply.github.com> Date: Tue, 15 Aug 2023 04:54:37 -0700 Subject: [PATCH] Nushell table list columns -> dataframe list columns. Explode / Flatten dataframe support. (#9951) # Description - Adds support for conversion between nushell lists and polars lists instead of treating them as a polars object. - Fixed explode and flatten to work both as expressions or lazy dataframe commands. The previous item was required to make this work. --------- Co-authored-by: Jack Wright Co-authored-by: Darren Schroeder <343840+fdncred@users.noreply.github.com> --- .../expressions/expressions_macro.rs | 30 - .../src/dataframe/expressions/mod.rs | 2 - .../src/dataframe/lazy/explode.rs | 158 ++ .../src/dataframe/lazy/flatten.rs | 132 ++ .../src/dataframe/lazy/mod.rs | 8 +- .../values/nu_dataframe/conversion.rs | 1368 ++++++++++------- 6 files changed, 1129 insertions(+), 569 deletions(-) create mode 100644 crates/nu-cmd-dataframe/src/dataframe/lazy/explode.rs create mode 100644 crates/nu-cmd-dataframe/src/dataframe/lazy/flatten.rs diff --git a/crates/nu-cmd-dataframe/src/dataframe/expressions/expressions_macro.rs b/crates/nu-cmd-dataframe/src/dataframe/expressions/expressions_macro.rs index 11a73fe209..95aec00b29 100644 --- a/crates/nu-cmd-dataframe/src/dataframe/expressions/expressions_macro.rs +++ b/crates/nu-cmd-dataframe/src/dataframe/expressions/expressions_macro.rs @@ -344,36 +344,6 @@ expr_command!( test_groups ); -// ExprFlatten command -// Expands to a command definition for a flatten expression -expr_command!( - ExprFlatten, - "dfr flatten", - "creates a flatten expression", - vec![Example { - description: "", - example: "", - result: None, - }], - flatten, - test_flatten -); - -// ExprExplode command -// Expands to a command definition for a explode expression -expr_command!( - ExprExplode, - "dfr explode", - "creates an explode expression", - vec![Example { - description: "", - example: "", - result: None, - }], - explode, - test_explode -); - // ExprCount command // Expands to a command definition for a count expression expr_command!( diff --git a/crates/nu-cmd-dataframe/src/dataframe/expressions/mod.rs b/crates/nu-cmd-dataframe/src/dataframe/expressions/mod.rs index ab098803da..4ba70d900d 100644 --- a/crates/nu-cmd-dataframe/src/dataframe/expressions/mod.rs +++ b/crates/nu-cmd-dataframe/src/dataframe/expressions/mod.rs @@ -47,8 +47,6 @@ pub fn add_expressions(working_set: &mut StateWorkingSet) { ExprQuantile, ExprList, ExprAggGroups, - ExprFlatten, - ExprExplode, ExprCount, ExprIsIn, ExprNot, diff --git a/crates/nu-cmd-dataframe/src/dataframe/lazy/explode.rs b/crates/nu-cmd-dataframe/src/dataframe/lazy/explode.rs new file mode 100644 index 0000000000..a445a5d09c --- /dev/null +++ b/crates/nu-cmd-dataframe/src/dataframe/lazy/explode.rs @@ -0,0 +1,158 @@ +use crate::dataframe::values::{Column, NuDataFrame, NuExpression, NuLazyFrame}; + +use nu_protocol::{ + ast::Call, + engine::{Command, EngineState, Stack}, + Category, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Type, Value, +}; + +#[derive(Clone)] +pub struct LazyExplode; + +impl Command for LazyExplode { + fn name(&self) -> &str { + "dfr explode" + } + + fn usage(&self) -> &str { + "Explodes a dataframe or creates a explode expression." + } + + fn signature(&self) -> Signature { + Signature::build(self.name()) + .rest( + "columns", + SyntaxShape::String, + "columns to explode, only applicable for dataframes", + ) + .input_output_types(vec![ + ( + Type::Custom("expression".into()), + Type::Custom("expression".into()), + ), + ( + Type::Custom("dataframe".into()), + Type::Custom("dataframe".into()), + ), + ]) + .category(Category::Custom("lazyframe".into())) + } + + fn examples(&self) -> Vec { + vec![ + Example { + description: "Explode the specified dataframe", + example: "[[id name hobbies]; [1 Mercy [Cycling Knitting]] [2 Bob [Skiing Football]]] | dfr into-df | dfr explode hobbies | dfr collect", + result: Some( + NuDataFrame::try_from_columns(vec![ + Column::new( + "id".to_string(), + vec![ + Value::test_int(1), + Value::test_int(1), + Value::test_int(2), + Value::test_int(2), + ]), + Column::new( + "name".to_string(), + vec![ + Value::test_string("Mercy"), + Value::test_string("Mercy"), + Value::test_string("Bob"), + Value::test_string("Bob"), + ]), + Column::new( + "hobbies".to_string(), + vec![ + Value::test_string("Cycling"), + Value::test_string("Knitting"), + Value::test_string("Skiing"), + Value::test_string("Football"), + ]), + ]).expect("simple df for test should not fail") + .into_value(Span::test_data()), + ) + }, + Example { + description: "Select a column and explode the values", + example: "[[id name hobbies]; [1 Mercy [Cycling Knitting]] [2 Bob [Skiing Football]]] | dfr into-df | dfr select (dfr col hobbies | dfr explode)", + result: Some( + NuDataFrame::try_from_columns(vec![ + Column::new( + "hobbies".to_string(), + vec![ + Value::test_string("Cycling"), + Value::test_string("Knitting"), + Value::test_string("Skiing"), + Value::test_string("Football"), + ]), + ]).expect("simple df for test should not fail") + .into_value(Span::test_data()), + ), + }, + ] + } + + fn run( + &self, + _engine_state: &EngineState, + _stack: &mut Stack, + call: &Call, + input: PipelineData, + ) -> Result { + explode(call, input) + } +} + +pub(crate) fn explode(call: &Call, input: PipelineData) -> Result { + let value = input.into_value(call.head); + if NuDataFrame::can_downcast(&value) { + let df = NuLazyFrame::try_from_value(value)?; + let columns: Vec = call + .positional_iter() + .filter_map(|e| e.as_string()) + .collect(); + + let exploded = df + .into_polars() + .explode(columns.iter().map(AsRef::as_ref).collect::>()); + + Ok(PipelineData::Value( + NuLazyFrame::from(exploded).into_value(call.head)?, + None, + )) + } else { + let expr = NuExpression::try_from_value(value)?; + let expr: NuExpression = expr.into_polars().explode().into(); + + Ok(PipelineData::Value( + NuExpression::into_value(expr, call.head), + None, + )) + } +} + +#[cfg(test)] +mod test { + use super::super::super::test_dataframe::{build_test_engine_state, test_dataframe_example}; + use super::*; + use crate::dataframe::lazy::aggregate::LazyAggregate; + use crate::dataframe::lazy::groupby::ToLazyGroupBy; + + #[test] + fn test_examples_dataframe() { + let mut engine_state = build_test_engine_state(vec![Box::new(LazyExplode {})]); + test_dataframe_example(&mut engine_state, &LazyExplode.examples()[0]); + } + + #[ignore] + #[test] + fn test_examples_expression() { + let mut engine_state = build_test_engine_state(vec![ + Box::new(LazyExplode {}), + Box::new(LazyAggregate {}), + Box::new(ToLazyGroupBy {}), + ]); + test_dataframe_example(&mut engine_state, &LazyExplode.examples()[1]); + } +} diff --git a/crates/nu-cmd-dataframe/src/dataframe/lazy/flatten.rs b/crates/nu-cmd-dataframe/src/dataframe/lazy/flatten.rs new file mode 100644 index 0000000000..272ae4b781 --- /dev/null +++ b/crates/nu-cmd-dataframe/src/dataframe/lazy/flatten.rs @@ -0,0 +1,132 @@ +use nu_protocol::{ + ast::Call, + engine::{Command, EngineState, Stack}, + Category, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Type, Value, +}; + +use crate::dataframe::values::{Column, NuDataFrame}; + +use super::explode::explode; + +#[derive(Clone)] +pub struct LazyFlatten; + +impl Command for LazyFlatten { + fn name(&self) -> &str { + "dfr flatten" + } + + fn usage(&self) -> &str { + "An alias for dfr explode" + } + + fn signature(&self) -> Signature { + Signature::build(self.name()) + .rest( + "columns", + SyntaxShape::String, + "columns to flatten, only applicable for dataframes", + ) + .input_output_types(vec![ + ( + Type::Custom("expression".into()), + Type::Custom("expression".into()), + ), + ( + Type::Custom("dataframe".into()), + Type::Custom("dataframe".into()), + ), + ]) + .category(Category::Custom("lazyframe".into())) + } + + fn examples(&self) -> Vec { + vec![ +Example { + description: "Flatten the specified dataframe", + example: "[[id name hobbies]; [1 Mercy [Cycling Knitting]] [2 Bob [Skiing Football]]] | dfr into-df | dfr flatten hobbies | dfr collect", + result: Some( + NuDataFrame::try_from_columns(vec![ + Column::new( + "id".to_string(), + vec![ + Value::test_int(1), + Value::test_int(1), + Value::test_int(2), + Value::test_int(2), + ]), + Column::new( + "name".to_string(), + vec![ + Value::test_string("Mercy"), + Value::test_string("Mercy"), + Value::test_string("Bob"), + Value::test_string("Bob"), + ]), + Column::new( + "hobbies".to_string(), + vec![ + Value::test_string("Cycling"), + Value::test_string("Knitting"), + Value::test_string("Skiing"), + Value::test_string("Football"), + ]), + ]).expect("simple df for test should not fail") + .into_value(Span::test_data()), + ) + }, + Example { + description: "Select a column and flatten the values", + example: "[[id name hobbies]; [1 Mercy [Cycling Knitting]] [2 Bob [Skiing Football]]] | dfr into-df | dfr select (dfr col hobbies | dfr flatten)", + result: Some( + NuDataFrame::try_from_columns(vec![ + Column::new( + "hobbies".to_string(), + vec![ + Value::test_string("Cycling"), + Value::test_string("Knitting"), + Value::test_string("Skiing"), + Value::test_string("Football"), + ]), + ]).expect("simple df for test should not fail") + .into_value(Span::test_data()), + ), + }, + ] + } + + fn run( + &self, + _engine_state: &EngineState, + _stack: &mut Stack, + call: &Call, + input: PipelineData, + ) -> Result { + explode(call, input) + } +} + +#[cfg(test)] +mod test { + use super::super::super::test_dataframe::{build_test_engine_state, test_dataframe_example}; + use super::*; + use crate::dataframe::lazy::aggregate::LazyAggregate; + use crate::dataframe::lazy::groupby::ToLazyGroupBy; + + #[test] + fn test_examples_dataframe() { + let mut engine_state = build_test_engine_state(vec![Box::new(LazyFlatten {})]); + test_dataframe_example(&mut engine_state, &LazyFlatten.examples()[0]); + } + + #[ignore] + #[test] + fn test_examples_expression() { + let mut engine_state = build_test_engine_state(vec![ + Box::new(LazyFlatten {}), + Box::new(LazyAggregate {}), + Box::new(ToLazyGroupBy {}), + ]); + test_dataframe_example(&mut engine_state, &LazyFlatten.examples()[1]); + } +} diff --git a/crates/nu-cmd-dataframe/src/dataframe/lazy/mod.rs b/crates/nu-cmd-dataframe/src/dataframe/lazy/mod.rs index 3788e6511b..331992d371 100644 --- a/crates/nu-cmd-dataframe/src/dataframe/lazy/mod.rs +++ b/crates/nu-cmd-dataframe/src/dataframe/lazy/mod.rs @@ -1,9 +1,11 @@ pub mod aggregate; mod collect; +mod explode; mod fetch; mod fill_nan; mod fill_null; mod filter; +mod flatten; pub mod groupby; mod join; mod macro_commands; @@ -27,6 +29,8 @@ use crate::dataframe::lazy::quantile::LazyQuantile; pub(crate) use crate::dataframe::lazy::select::LazySelect; use crate::dataframe::lazy::sort_by_expr::LazySortBy; pub use crate::dataframe::lazy::to_lazy::ToLazyFrame; +pub use explode::LazyExplode; +pub use flatten::LazyFlatten; pub fn add_lazy_decls(working_set: &mut StateWorkingSet) { macro_rules! bind_command { @@ -54,6 +58,8 @@ pub fn add_lazy_decls(working_set: &mut StateWorkingSet) { LazySelect, LazySortBy, ToLazyFrame, - ToLazyGroupBy + ToLazyGroupBy, + LazyExplode, + LazyFlatten ); } diff --git a/crates/nu-cmd-dataframe/src/dataframe/values/nu_dataframe/conversion.rs b/crates/nu-cmd-dataframe/src/dataframe/values/nu_dataframe/conversion.rs index 06c7e81f4c..2b3e3b18b7 100644 --- a/crates/nu-cmd-dataframe/src/dataframe/values/nu_dataframe/conversion.rs +++ b/crates/nu-cmd-dataframe/src/dataframe/values/nu_dataframe/conversion.rs @@ -3,16 +3,27 @@ use super::{DataFrameValue, NuDataFrame}; use chrono::{DateTime, FixedOffset, NaiveDateTime}; use indexmap::map::{Entry, IndexMap}; use nu_protocol::{ShellError, Span, Value}; +use polars::chunked_array::builder::AnonymousOwnedListBuilder; use polars::chunked_array::object::builder::ObjectChunkedBuilder; use polars::chunked_array::ChunkedArray; +use polars::export::arrow::Either; use polars::prelude::{ - DataFrame, DataType, DatetimeChunked, Int64Type, IntoSeries, NamedFrom, NewChunkedArray, - ObjectType, Series, TemporalMethods, TimeUnit, + DataFrame, DataType, DatetimeChunked, Float64Type, Int64Type, IntoSeries, + ListBooleanChunkedBuilder, ListBuilderTrait, ListPrimitiveChunkedBuilder, ListType, + ListUtf8ChunkedBuilder, NamedFrom, NewChunkedArray, ObjectType, Series, TemporalMethods, + TimeUnit, }; use std::ops::{Deref, DerefMut}; const SECS_PER_DAY: i64 = 86_400; +// The values capacity is for the size of an internal vec. +// Since this is impossible to determine without traversing every value +// I just picked one. Since this is for converting back and forth +// between nushell tables the values shouldn't be too extremely large for +// practical reasons (~ a few thousand rows). +const VALUES_CAPACITY: usize = 10; + #[derive(Debug)] pub struct Column { name: String, @@ -69,6 +80,7 @@ pub enum InputType { Date, Duration, Filesize, + List(Box), } #[derive(Debug)] @@ -109,495 +121,8 @@ pub fn create_column( span: Span, ) -> Result { let size = to_row - from_row; - match series.dtype() { - DataType::Null => { - let values = std::iter::repeat(Value::Nothing { span }) - .take(size) - .collect::>(); - - Ok(Column::new(series.name().into(), values)) - } - DataType::UInt8 => { - let casted = series.u8().map_err(|e| { - ShellError::GenericError( - "Error casting column to u8".into(), - "".to_string(), - None, - Some(e.to_string()), - Vec::new(), - ) - })?; - let values = casted - .into_iter() - .skip(from_row) - .take(size) - .map(|v| match v { - Some(a) => Value::Int { - val: a as i64, - span, - }, - None => Value::Nothing { span }, - }) - .collect::>(); - - Ok(Column::new(casted.name().into(), values)) - } - DataType::UInt16 => { - let casted = series.u16().map_err(|e| { - ShellError::GenericError( - "Error casting column to u16".into(), - "".to_string(), - None, - Some(e.to_string()), - Vec::new(), - ) - })?; - let values = casted - .into_iter() - .skip(from_row) - .take(size) - .map(|v| match v { - Some(a) => Value::Int { - val: a as i64, - span, - }, - None => Value::Nothing { span }, - }) - .collect::>(); - - Ok(Column::new(casted.name().into(), values)) - } - DataType::UInt32 => { - let casted = series.u32().map_err(|e| { - ShellError::GenericError( - "Error casting column to u32".into(), - "".to_string(), - None, - Some(e.to_string()), - Vec::new(), - ) - })?; - let values = casted - .into_iter() - .skip(from_row) - .take(size) - .map(|v| match v { - Some(a) => Value::Int { - val: a as i64, - span, - }, - None => Value::Nothing { span }, - }) - .collect::>(); - - Ok(Column::new(casted.name().into(), values)) - } - DataType::UInt64 => { - let casted = series.u64().map_err(|e| { - ShellError::GenericError( - "Error casting column to u64".into(), - "".to_string(), - None, - Some(e.to_string()), - Vec::new(), - ) - })?; - let values = casted - .into_iter() - .skip(from_row) - .take(size) - .map(|v| match v { - Some(a) => Value::Int { - val: a as i64, - span, - }, - None => Value::Nothing { span }, - }) - .collect::>(); - - Ok(Column::new(casted.name().into(), values)) - } - DataType::Int8 => { - let casted = series.i8().map_err(|e| { - ShellError::GenericError( - "Error casting column to i8".into(), - "".to_string(), - None, - Some(e.to_string()), - Vec::new(), - ) - })?; - let values = casted - .into_iter() - .skip(from_row) - .take(size) - .map(|v| match v { - Some(a) => Value::Int { - val: a as i64, - span, - }, - None => Value::Nothing { span }, - }) - .collect::>(); - - Ok(Column::new(casted.name().into(), values)) - } - DataType::Int16 => { - let casted = series.i16().map_err(|e| { - ShellError::GenericError( - "Error casting column to i16".into(), - "".to_string(), - None, - Some(e.to_string()), - Vec::new(), - ) - })?; - let values = casted - .into_iter() - .skip(from_row) - .take(size) - .map(|v| match v { - Some(a) => Value::Int { - val: a as i64, - span, - }, - None => Value::Nothing { span }, - }) - .collect::>(); - - Ok(Column::new(casted.name().into(), values)) - } - DataType::Int32 => { - let casted = series.i32().map_err(|e| { - ShellError::GenericError( - "Error casting column to i32".into(), - "".to_string(), - None, - Some(e.to_string()), - Vec::new(), - ) - })?; - let values = casted - .into_iter() - .skip(from_row) - .take(size) - .map(|v| match v { - Some(a) => Value::Int { - val: a as i64, - span, - }, - None => Value::Nothing { span }, - }) - .collect::>(); - - Ok(Column::new(casted.name().into(), values)) - } - DataType::Int64 => { - let casted = series.i64().map_err(|e| { - ShellError::GenericError( - "Error casting column to i64".into(), - "".to_string(), - None, - Some(e.to_string()), - Vec::new(), - ) - })?; - let values = casted - .into_iter() - .skip(from_row) - .take(size) - .map(|v| match v { - Some(a) => Value::Int { val: a, span }, - None => Value::Nothing { span }, - }) - .collect::>(); - - Ok(Column::new(casted.name().into(), values)) - } - DataType::Float32 => { - let casted = series.f32().map_err(|e| { - ShellError::GenericError( - "Error casting column to f32".into(), - "".to_string(), - None, - Some(e.to_string()), - Vec::new(), - ) - })?; - let values = casted - .into_iter() - .skip(from_row) - .take(size) - .map(|v| match v { - Some(a) => Value::Float { - val: a as f64, - span, - }, - None => Value::Nothing { span }, - }) - .collect::>(); - - Ok(Column::new(casted.name().into(), values)) - } - DataType::Float64 => { - let casted = series.f64().map_err(|e| { - ShellError::GenericError( - "Error casting column to f64".into(), - "".to_string(), - None, - Some(e.to_string()), - Vec::new(), - ) - })?; - let values = casted - .into_iter() - .skip(from_row) - .take(size) - .map(|v| match v { - Some(a) => Value::Float { val: a, span }, - None => Value::Nothing { span }, - }) - .collect::>(); - - Ok(Column::new(casted.name().into(), values)) - } - DataType::Boolean => { - let casted = series.bool().map_err(|e| { - ShellError::GenericError( - "Error casting column to bool".into(), - "".to_string(), - None, - Some(e.to_string()), - Vec::new(), - ) - })?; - - let values = casted - .into_iter() - .skip(from_row) - .take(size) - .map(|v| match v { - Some(a) => Value::Bool { val: a, span }, - None => Value::Nothing { span }, - }) - .collect::>(); - - Ok(Column::new(casted.name().into(), values)) - } - DataType::Utf8 => { - let casted = series.utf8().map_err(|e| { - ShellError::GenericError( - "Error casting column to string".into(), - "".to_string(), - None, - Some(e.to_string()), - Vec::new(), - ) - })?; - - let values = casted - .into_iter() - .skip(from_row) - .take(size) - .map(|v| match v { - Some(a) => Value::String { - val: a.into(), - span, - }, - None => Value::Nothing { span }, - }) - .collect::>(); - - Ok(Column::new(casted.name().into(), values)) - } - DataType::Object(x) => { - let casted = series - .as_any() - .downcast_ref::>>(); - - match casted { - None => Err(ShellError::GenericError( - "Error casting object from series".into(), - "".to_string(), - None, - Some(format!("Object not supported for conversion: {x}")), - Vec::new(), - )), - Some(ca) => { - let values = ca - .into_iter() - .skip(from_row) - .take(size) - .map(|v| match v { - Some(a) => a.get_value(), - None => Value::Nothing { span }, - }) - .collect::>(); - - Ok(Column::new(ca.name().into(), values)) - } - } - } - DataType::Date => { - let casted = series.date().map_err(|e| { - ShellError::GenericError( - "Error casting column to date".into(), - "".to_string(), - None, - Some(e.to_string()), - Vec::new(), - ) - })?; - - let values = casted - .into_iter() - .skip(from_row) - .take(size) - .map(|v| match v { - Some(a) => { - // elapsed time in day since 1970-01-01 - let seconds = a as i64 * SECS_PER_DAY; - let naive_datetime = match NaiveDateTime::from_timestamp_opt(seconds, 0) { - Some(val) => val, - None => { - return Value::Error { - error: Box::new(ShellError::UnsupportedInput( - "The given local datetime representation is invalid." - .to_string(), - format!("timestamp is {a:?}"), - span, - Span::unknown(), - )), - } - } - }; - // Zero length offset - let offset = match FixedOffset::east_opt(0) { - Some(val) => val, - None => { - return Value::Error { - error: Box::new(ShellError::UnsupportedInput( - "The given local datetime representation is invalid." - .to_string(), - format!("timestamp is {a:?}"), - span, - Span::unknown(), - )), - } - } - }; - let datetime = DateTime::::from_utc(naive_datetime, offset); - - Value::Date { - val: datetime, - span, - } - } - None => Value::Nothing { span }, - }) - .collect::>(); - - Ok(Column::new(casted.name().into(), values)) - } - DataType::Datetime(time_unit, _) => { - let casted = series.datetime().map_err(|e| { - ShellError::GenericError( - "Error casting column to datetime".into(), - "".to_string(), - None, - Some(e.to_string()), - Vec::new(), - ) - })?; - let values = casted - .into_iter() - .skip(from_row) - .take(size) - .map(|v| match v { - Some(a) => { - let unit_divisor = match time_unit { - TimeUnit::Nanoseconds => 1_000_000_000, - TimeUnit::Microseconds => 1_000_000, - TimeUnit::Milliseconds => 1_000, - }; - // elapsed time in nano/micro/milliseconds since 1970-01-01 - let seconds = a / unit_divisor; - let naive_datetime = match NaiveDateTime::from_timestamp_opt(seconds, 0) { - Some(val) => val, - None => { - return Value::Error { - error: Box::new(ShellError::UnsupportedInput( - "The given local datetime representation is invalid." - .to_string(), - format!("timestamp is {a:?}"), - span, - Span::unknown(), - )), - } - } - }; - // Zero length offset - let offset = match FixedOffset::east_opt(0) { - Some(val) => val, - None => { - return Value::Error { - error: Box::new(ShellError::UnsupportedInput( - "The given local datetime representation is invalid." - .to_string(), - format!("timestamp is {a:?}"), - span, - Span::unknown(), - )), - } - } - }; - let datetime = DateTime::::from_utc(naive_datetime, offset); - - Value::Date { - val: datetime, - span, - } - } - None => Value::Nothing { span }, - }) - .collect::>(); - - Ok(Column::new(casted.name().into(), values)) - } - DataType::Time => { - let casted = series.timestamp(TimeUnit::Nanoseconds).map_err(|e| { - ShellError::GenericError( - "Error casting column to time".into(), - "".to_string(), - None, - Some(e.to_string()), - Vec::new(), - ) - })?; - - let values = casted - .into_iter() - .skip(from_row) - .take(size) - .map(|v| match v { - Some(nanoseconds) => Value::Duration { - val: nanoseconds, - span, - }, - None => Value::Nothing { span }, - }) - .collect::>(); - - Ok(Column::new(casted.name().into(), values)) - } - e => Err(ShellError::GenericError( - "Error creating Dataframe".into(), - "".to_string(), - None, - Some(format!("Value not supported in nushell: {e}")), - Vec::new(), - )), - } + let values = series_to_values(series, Some(from_row), Some(size), span)?; + Ok(Column::new(series.name().into(), values)) } // Adds a separator to the vector of values using the column names from the @@ -651,30 +176,7 @@ pub fn insert_value( // Checking that the type for the value is the same // for the previous value in the column if col_val.values.is_empty() { - match &value { - Value::Int { .. } => { - col_val.column_type = Some(InputType::Integer); - } - Value::Float { .. } => { - col_val.column_type = Some(InputType::Float); - } - Value::String { .. } => { - col_val.column_type = Some(InputType::String); - } - Value::Bool { .. } => { - col_val.column_type = Some(InputType::Boolean); - } - Value::Date { .. } => { - col_val.column_type = Some(InputType::Date); - } - Value::Duration { .. } => { - col_val.column_type = Some(InputType::Duration); - } - Value::Filesize { .. } => { - col_val.column_type = Some(InputType::Filesize); - } - _ => col_val.column_type = Some(InputType::Object), - } + col_val.column_type = Some(value_to_input_type(&value)); col_val.values.push(value); } else { let prev_value = &col_val.values[col_val.values.len() - 1]; @@ -687,6 +189,10 @@ pub fn insert_value( | (Value::Date { .. }, Value::Date { .. }) | (Value::Filesize { .. }, Value::Filesize { .. }) | (Value::Duration { .. }, Value::Duration { .. }) => col_val.values.push(value), + (Value::List { .. }, _) => { + col_val.column_type = Some(value_to_input_type(&value)); + col_val.values.push(value); + } _ => { col_val.column_type = Some(InputType::Object); col_val.values.push(value); @@ -697,6 +203,35 @@ pub fn insert_value( Ok(()) } +fn value_to_input_type(value: &Value) -> InputType { + match &value { + Value::Int { .. } => InputType::Integer, + Value::Float { .. } => InputType::Float, + Value::String { .. } => InputType::String, + Value::Bool { .. } => InputType::Boolean, + Value::Date { .. } => InputType::Date, + Value::Duration { .. } => InputType::Duration, + Value::Filesize { .. } => InputType::Filesize, + Value::List { vals, .. } => { + // We need to determined the type inside of the list. + // Since Value::List does not have any kind of internal + // type information, we need to look inside the list. + // This will cause errors if lists have inconsistent types. + // Basically, if a list column needs to be converted to dataframe, + // needs to have consistent types. + let list_type = vals + .iter() + .filter(|v| !matches!(v, Value::Nothing { .. })) + .map(value_to_input_type) + .nth(1) + .unwrap_or(InputType::Object); + + InputType::List(Box::new(list_type)) + } + _ => InputType::Object, + } +} + // The ColumnMap has the parsed data from the StreamInput // This data can be used to create a Series object that can initialize // the dataframe based on the type of data that is found @@ -711,13 +246,7 @@ pub fn from_parsed_columns(column_values: ColumnMap) -> Result { - let series_values: Result, _> = - column.values.iter().map(|v| v.as_i64()).collect(); - let series = Series::new(&name, series_values?); - df_series.push(series) - } - InputType::Filesize => { + InputType::Integer | InputType::Filesize | InputType::Duration => { let series_values: Result, _> = column.values.iter().map(|v| v.as_i64()).collect(); let series = Series::new(&name, series_values?); @@ -736,15 +265,21 @@ pub fn from_parsed_columns(column_values: ColumnMap) -> Result { - let mut builder = - ObjectChunkedBuilder::::new(&name, column.values.len()); - - for v in &column.values { - builder.append_value(DataFrameValue::new(v.clone())); + df_series.push(input_type_object_to_series(&name, &column.values)?) + } + InputType::List(list_type) => { + match input_type_list_to_series(&name, list_type.as_ref(), &column.values) { + Ok(series) => df_series.push(series), + Err(_) => { + // An error case will occur when there are lists of mixed types. + // If this happens, fallback to object list + df_series.push(input_type_list_to_series( + &name, + &InputType::Object, + &column.values, + )?) + } } - - let res = builder.finish(); - df_series.push(res.into_series()) } InputType::Date => { let it = column.values.iter().map(|v| { @@ -761,12 +296,6 @@ pub fn from_parsed_columns(column_values: ColumnMap) -> Result { - let series_values: Result, _> = - column.values.iter().map(|v| v.as_i64()).collect(); - let series = Series::new(&name, series_values?); - df_series.push(series) - } } } } @@ -783,3 +312,770 @@ pub fn from_parsed_columns(column_values: ColumnMap) -> Result Result { + let mut builder = ObjectChunkedBuilder::::new(name, values.len()); + + for v in values { + builder.append_value(DataFrameValue::new(v.clone())); + } + + let res = builder.finish(); + Ok(res.into_series()) +} + +fn input_type_list_to_series( + name: &str, + list_type: &InputType, + values: &[Value], +) -> Result { + let inconsistent_error = |_| { + ShellError::GenericError( + format!( + "column {name} contains a list with inconsistent types: Expecting: {list_type:?}" + ), + "".to_string(), + None, + None, + Vec::new(), + ) + }; + match *list_type { + // list of boolean values + InputType::Boolean => { + let mut builder = ListBooleanChunkedBuilder::new(name, values.len(), VALUES_CAPACITY); + for v in values { + let value_list = v + .as_list()? + .iter() + .map(|v| v.as_bool()) + .collect::, _>>() + .map_err(inconsistent_error)?; + builder.append_iter(value_list.iter().map(|v| Some(*v))); + } + let res = builder.finish(); + Ok(res.into_series()) + } + // list of values that reduce down to i64 + InputType::Integer | InputType::Filesize | InputType::Duration => { + let logical_type = match list_type { + InputType::Duration => DataType::Duration(TimeUnit::Milliseconds), + _ => DataType::Int64, + }; + + let mut builder = ListPrimitiveChunkedBuilder::::new( + name, + values.len(), + VALUES_CAPACITY, + logical_type, + ); + + for v in values { + let value_list = v + .as_list()? + .iter() + .map(|v| v.as_i64()) + .collect::, _>>() + .map_err(inconsistent_error)?; + builder.append_iter_values(value_list.iter().copied()); + } + let res = builder.finish(); + Ok(res.into_series()) + } + InputType::Float => { + let mut builder = ListPrimitiveChunkedBuilder::::new( + name, + values.len(), + VALUES_CAPACITY, + DataType::Float64, + ); + for v in values { + let value_list = v + .as_list()? + .iter() + .map(|v| v.as_f64()) + .collect::, _>>() + .map_err(inconsistent_error)?; + builder.append_iter_values(value_list.iter().copied()); + } + let res = builder.finish(); + Ok(res.into_series()) + } + InputType::String => { + let mut builder = ListUtf8ChunkedBuilder::new(name, values.len(), VALUES_CAPACITY); + for v in values { + let value_list = v + .as_list()? + .iter() + .map(|v| v.as_string()) + .collect::, _>>() + .map_err(inconsistent_error)?; + builder.append_values_iter(value_list.iter().map(AsRef::as_ref)); + } + let res = builder.finish(); + Ok(res.into_series()) + } + // Treat lists as objects at this depth as it is expensive to calculate the list type + // We can revisit this later if necessary + InputType::Date => { + let mut builder = AnonymousOwnedListBuilder::new( + name, + values.len(), + Some(DataType::Datetime(TimeUnit::Nanoseconds, None)), + ); + for (i, v) in values.iter().enumerate() { + let list_name = i.to_string(); + + let it = v.as_list()?.iter().map(|v| { + if let Value::Date { val, .. } = &v { + Some(val.timestamp_nanos()) + } else { + None + } + }); + let dt_chunked = ChunkedArray::::from_iter_options(&list_name, it) + .into_datetime(TimeUnit::Nanoseconds, None); + + builder.append_series(&dt_chunked.into_series()); + } + let res = builder.finish(); + Ok(res.into_series()) + } + InputType::List(ref sub_list_type) => { + Ok(input_type_list_to_series(name, sub_list_type, values)?) + } + // treat everything else as an object + _ => Ok(input_type_object_to_series(name, values)?), + } +} + +fn series_to_values( + series: &Series, + maybe_from_row: Option, + maybe_size: Option, + span: Span, +) -> Result, ShellError> { + match series.dtype() { + DataType::Null => { + let it = std::iter::repeat(Value::Nothing { span }); + let values = if let Some(size) = maybe_size { + Either::Left(it.take(size)) + } else { + Either::Right(it) + } + .collect::>(); + + Ok(values) + } + DataType::UInt8 => { + let casted = series.u8().map_err(|e| { + ShellError::GenericError( + "Error casting column to u8".into(), + "".to_string(), + None, + Some(e.to_string()), + Vec::new(), + ) + })?; + + let it = casted.into_iter(); + let values = if let (Some(size), Some(from_row)) = (maybe_size, maybe_from_row) { + Either::Left(it.skip(from_row).take(size)) + } else { + Either::Right(it) + } + .map(|v| match v { + Some(a) => Value::Int { + val: a as i64, + span, + }, + None => Value::Nothing { span }, + }) + .collect::>(); + + Ok(values) + } + DataType::UInt16 => { + let casted = series.u16().map_err(|e| { + ShellError::GenericError( + "Error casting column to u16".into(), + "".to_string(), + None, + Some(e.to_string()), + Vec::new(), + ) + })?; + + let it = casted.into_iter(); + let values = if let (Some(size), Some(from_row)) = (maybe_size, maybe_from_row) { + Either::Left(it.skip(from_row).take(size)) + } else { + Either::Right(it) + } + .map(|v| match v { + Some(a) => Value::Int { + val: a as i64, + span, + }, + None => Value::Nothing { span }, + }) + .collect::>(); + + Ok(values) + } + DataType::UInt32 => { + let casted = series.u32().map_err(|e| { + ShellError::GenericError( + "Error casting column to u32".into(), + "".to_string(), + None, + Some(e.to_string()), + Vec::new(), + ) + })?; + + let it = casted.into_iter(); + let values = if let (Some(size), Some(from_row)) = (maybe_size, maybe_from_row) { + Either::Left(it.skip(from_row).take(size)) + } else { + Either::Right(it) + } + .map(|v| match v { + Some(a) => Value::Int { + val: a as i64, + span, + }, + None => Value::Nothing { span }, + }) + .collect::>(); + + Ok(values) + } + DataType::UInt64 => { + let casted = series.u64().map_err(|e| { + ShellError::GenericError( + "Error casting column to u64".into(), + "".to_string(), + None, + Some(e.to_string()), + Vec::new(), + ) + })?; + + let it = casted.into_iter(); + let values = if let (Some(size), Some(from_row)) = (maybe_size, maybe_from_row) { + Either::Left(it.skip(from_row).take(size)) + } else { + Either::Right(it) + } + .map(|v| match v { + Some(a) => Value::Int { + val: a as i64, + span, + }, + None => Value::Nothing { span }, + }) + .collect::>(); + + Ok(values) + } + DataType::Int8 => { + let casted = series.i8().map_err(|e| { + ShellError::GenericError( + "Error casting column to i8".into(), + "".to_string(), + None, + Some(e.to_string()), + Vec::new(), + ) + })?; + + let it = casted.into_iter(); + let values = if let (Some(size), Some(from_row)) = (maybe_size, maybe_from_row) { + Either::Left(it.skip(from_row).take(size)) + } else { + Either::Right(it) + } + .map(|v| match v { + Some(a) => Value::Int { + val: a as i64, + span, + }, + None => Value::Nothing { span }, + }) + .collect::>(); + + Ok(values) + } + DataType::Int16 => { + let casted = series.i16().map_err(|e| { + ShellError::GenericError( + "Error casting column to i16".into(), + "".to_string(), + None, + Some(e.to_string()), + Vec::new(), + ) + })?; + + let it = casted.into_iter(); + let values = if let (Some(size), Some(from_row)) = (maybe_size, maybe_from_row) { + Either::Left(it.skip(from_row).take(size)) + } else { + Either::Right(it) + } + .map(|v| match v { + Some(a) => Value::Int { + val: a as i64, + span, + }, + None => Value::Nothing { span }, + }) + .collect::>(); + + Ok(values) + } + DataType::Int32 => { + let casted = series.i32().map_err(|e| { + ShellError::GenericError( + "Error casting column to i32".into(), + "".to_string(), + None, + Some(e.to_string()), + Vec::new(), + ) + })?; + + let it = casted.into_iter(); + let values = if let (Some(size), Some(from_row)) = (maybe_size, maybe_from_row) { + Either::Left(it.skip(from_row).take(size)) + } else { + Either::Right(it) + } + .map(|v| match v { + Some(a) => Value::Int { + val: a as i64, + span, + }, + None => Value::Nothing { span }, + }) + .collect::>(); + + Ok(values) + } + DataType::Int64 => { + let casted = series.i64().map_err(|e| { + ShellError::GenericError( + "Error casting column to i64".into(), + "".to_string(), + None, + Some(e.to_string()), + Vec::new(), + ) + })?; + + let it = casted.into_iter(); + let values = if let (Some(size), Some(from_row)) = (maybe_size, maybe_from_row) { + Either::Left(it.skip(from_row).take(size)) + } else { + Either::Right(it) + } + .map(|v| match v { + Some(a) => Value::Int { val: a, span }, + None => Value::Nothing { span }, + }) + .collect::>(); + + Ok(values) + } + DataType::Float32 => { + let casted = series.f32().map_err(|e| { + ShellError::GenericError( + "Error casting column to f32".into(), + "".to_string(), + None, + Some(e.to_string()), + Vec::new(), + ) + })?; + + let it = casted.into_iter(); + let values = if let (Some(size), Some(from_row)) = (maybe_size, maybe_from_row) { + Either::Left(it.skip(from_row).take(size)) + } else { + Either::Right(it) + } + .map(|v| match v { + Some(a) => Value::Float { + val: a as f64, + span, + }, + None => Value::Nothing { span }, + }) + .collect::>(); + + Ok(values) + } + DataType::Float64 => { + let casted = series.f64().map_err(|e| { + ShellError::GenericError( + "Error casting column to f64".into(), + "".to_string(), + None, + Some(e.to_string()), + Vec::new(), + ) + })?; + + let it = casted.into_iter(); + let values = if let (Some(size), Some(from_row)) = (maybe_size, maybe_from_row) { + Either::Left(it.skip(from_row).take(size)) + } else { + Either::Right(it) + } + .map(|v| match v { + Some(a) => Value::Float { val: a, span }, + None => Value::Nothing { span }, + }) + .collect::>(); + + Ok(values) + } + DataType::Boolean => { + let casted = series.bool().map_err(|e| { + ShellError::GenericError( + "Error casting column to bool".into(), + "".to_string(), + None, + Some(e.to_string()), + Vec::new(), + ) + })?; + + let it = casted.into_iter(); + let values = if let (Some(size), Some(from_row)) = (maybe_size, maybe_from_row) { + Either::Left(it.skip(from_row).take(size)) + } else { + Either::Right(it) + } + .map(|v| match v { + Some(a) => Value::Bool { val: a, span }, + None => Value::Nothing { span }, + }) + .collect::>(); + + Ok(values) + } + DataType::Utf8 => { + let casted = series.utf8().map_err(|e| { + ShellError::GenericError( + "Error casting column to string".into(), + "".to_string(), + None, + Some(e.to_string()), + Vec::new(), + ) + })?; + + let it = casted.into_iter(); + let values = if let (Some(size), Some(from_row)) = (maybe_size, maybe_from_row) { + Either::Left(it.skip(from_row).take(size)) + } else { + Either::Right(it) + } + .map(|v| match v { + Some(a) => Value::String { + val: a.into(), + span, + }, + None => Value::Nothing { span }, + }) + .collect::>(); + + Ok(values) + } + DataType::Object(x) => { + let casted = series + .as_any() + .downcast_ref::>>(); + + match casted { + None => Err(ShellError::GenericError( + "Error casting object from series".into(), + "".to_string(), + None, + Some(format!("Object not supported for conversion: {x}")), + Vec::new(), + )), + Some(ca) => { + let it = ca.into_iter(); + let values = if let (Some(size), Some(from_row)) = (maybe_size, maybe_from_row) + { + Either::Left(it.skip(from_row).take(size)) + } else { + Either::Right(it) + } + .map(|v| match v { + Some(a) => a.get_value(), + None => Value::Nothing { span }, + }) + .collect::>(); + + Ok(values) + } + } + } + DataType::List(x) => { + let casted = series.as_any().downcast_ref::>(); + match casted { + None => Err(ShellError::GenericError( + "Error casting list from series".into(), + "".to_string(), + None, + Some(format!("List not supported for conversion: {x}")), + Vec::new(), + )), + Some(ca) => { + let it = ca.into_iter(); + let values: Vec = + if let (Some(size), Some(from_row)) = (maybe_size, maybe_from_row) { + Either::Left(it.skip(from_row).take(size)) + } else { + Either::Right(it) + } + .map(|ca| { + let sublist = ca + .map(|ref s| { + match series_to_values(s, None, None, Span::unknown()) { + Ok(v) => v, + Err(e) => { + eprintln!("Error list values: {e}"); + vec![] + } + } + }) + .unwrap_or(vec![]); + Value::List { + vals: sublist, + span, + } + }) + .collect::>(); + Ok(values) + } + } + } + DataType::Date => { + let casted = series.date().map_err(|e| { + ShellError::GenericError( + "Error casting column to date".into(), + "".to_string(), + None, + Some(e.to_string()), + Vec::new(), + ) + })?; + + let it = casted.into_iter(); + let values = if let (Some(size), Some(from_row)) = (maybe_size, maybe_from_row) { + Either::Left(it.skip(from_row).take(size)) + } else { + Either::Right(it) + } + .map(|v| match v { + Some(a) => { + // elapsed time in day since 1970-01-01 + let seconds = a as i64 * SECS_PER_DAY; + let naive_datetime = match NaiveDateTime::from_timestamp_opt(seconds, 0) { + Some(val) => val, + None => { + return Value::Error { + error: Box::new(ShellError::UnsupportedInput( + "The given local datetime representation is invalid." + .to_string(), + format!("timestamp is {a:?}"), + span, + Span::unknown(), + )), + } + } + }; + // Zero length offset + let offset = match FixedOffset::east_opt(0) { + Some(val) => val, + None => { + return Value::Error { + error: Box::new(ShellError::UnsupportedInput( + "The given local datetime representation is invalid." + .to_string(), + format!("timestamp is {a:?}"), + span, + Span::unknown(), + )), + } + } + }; + let datetime = DateTime::::from_utc(naive_datetime, offset); + + Value::Date { + val: datetime, + span, + } + } + None => Value::Nothing { span }, + }) + .collect::>(); + + Ok(values) + } + DataType::Datetime(time_unit, _) => { + let casted = series.datetime().map_err(|e| { + ShellError::GenericError( + "Error casting column to datetime".into(), + "".to_string(), + None, + Some(e.to_string()), + Vec::new(), + ) + })?; + + let it = casted.into_iter(); + let values = if let (Some(size), Some(from_row)) = (maybe_size, maybe_from_row) { + Either::Left(it.skip(from_row).take(size)) + } else { + Either::Right(it) + } + .map(|v| match v { + Some(a) => { + let unit_divisor = match time_unit { + TimeUnit::Nanoseconds => 1_000_000_000, + TimeUnit::Microseconds => 1_000_000, + TimeUnit::Milliseconds => 1_000, + }; + // elapsed time in nano/micro/milliseconds since 1970-01-01 + let seconds = a / unit_divisor; + let naive_datetime = match NaiveDateTime::from_timestamp_opt(seconds, 0) { + Some(val) => val, + None => { + return Value::Error { + error: Box::new(ShellError::UnsupportedInput( + "The given local datetime representation is invalid." + .to_string(), + format!("timestamp is {a:?}"), + span, + Span::unknown(), + )), + } + } + }; + // Zero length offset + let offset = match FixedOffset::east_opt(0) { + Some(val) => val, + None => { + return Value::Error { + error: Box::new(ShellError::UnsupportedInput( + "The given local datetime representation is invalid." + .to_string(), + format!("timestamp is {a:?}"), + span, + Span::unknown(), + )), + } + } + }; + let datetime = DateTime::::from_utc(naive_datetime, offset); + + Value::Date { + val: datetime, + span, + } + } + None => Value::Nothing { span }, + }) + .collect::>(); + + Ok(values) + } + DataType::Time => { + let casted = series.timestamp(TimeUnit::Nanoseconds).map_err(|e| { + ShellError::GenericError( + "Error casting column to time".into(), + "".to_string(), + None, + Some(e.to_string()), + Vec::new(), + ) + })?; + + let it = casted.into_iter(); + let values = if let (Some(size), Some(from_row)) = (maybe_size, maybe_from_row) { + Either::Left(it.skip(from_row).take(size)) + } else { + Either::Right(it) + } + .map(|v| match v { + Some(nanoseconds) => Value::Duration { + val: nanoseconds, + span, + }, + None => Value::Nothing { span }, + }) + .collect::>(); + + Ok(values) + } + e => Err(ShellError::GenericError( + "Error creating Dataframe".into(), + "".to_string(), + None, + Some(format!("Value not supported in nushell: {e}")), + Vec::new(), + )), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use indexmap::indexmap; + + #[test] + fn test_parsed_column_string_list() -> Result<(), Box> { + let values = vec![ + Value::List { + vals: vec![Value::String { + val: "bar".to_string(), + span: Span::test_data(), + }], + span: Span::test_data(), + }, + Value::List { + vals: vec![Value::String { + val: "baz".to_string(), + span: Span::test_data(), + }], + span: Span::test_data(), + }, + ]; + let column = Column { + name: "foo".to_string(), + values: values.clone(), + }; + let typed_column = TypedColumn { + column, + column_type: Some(InputType::List(Box::new(InputType::String))), + }; + + let column_map = indexmap!("foo".to_string() => typed_column); + let parsed_df = from_parsed_columns(column_map)?; + let parsed_columns = parsed_df.columns(Span::test_data())?; + assert_eq!(parsed_columns.len(), 1); + let column = parsed_columns + .first() + .expect("There should be a first value in columns"); + assert_eq!(column.name(), "foo"); + assert_eq!(column.values, values); + + Ok(()) + } +}