From 653cbe651f3c12e66a922852b64a8459508dce31 Mon Sep 17 00:00:00 2001 From: Fernando Herrera Date: Thu, 29 Jul 2021 22:16:30 +0100 Subject: [PATCH] Going deeper (#3864) * nuframe in its own type in UntaggedValue * Removed eager dataframe from enum * Dataframe created from list of values * Corrected order in dataframe columns * Returned tag from stream collection * Removed series from dataframe commands * Arithmetic operators * forced push * forced push * Replace all command * String commands * appending operations with dfs * Testing suite for dataframes * Unit test for dataframe commands * improved equality for dataframes * moving all dataframe operations to protocol * objects in dataframes * Removed cloning when converting to row --- Cargo.lock | 1 + crates/nu-protocol/Cargo.toml | 2 +- .../nu-protocol/src/dataframe/conversion.rs | 626 ++++++++++++++++++ crates/nu-protocol/src/dataframe/mod.rs | 4 +- .../nu-protocol/src/dataframe/nu_dataframe.rs | 376 ++--------- 5 files changed, 696 insertions(+), 313 deletions(-) create mode 100644 crates/nu-protocol/src/dataframe/conversion.rs diff --git a/Cargo.lock b/Cargo.lock index 7449ec32a..ec425c0fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4341,6 +4341,7 @@ dependencies = [ "rayon", "regex", "serde 1.0.126", + "serde_json", "thiserror", "unsafe_unwrap", ] diff --git a/crates/nu-protocol/Cargo.toml b/crates/nu-protocol/Cargo.toml index fa5b51120..a81524480 100644 --- a/crates/nu-protocol/Cargo.toml +++ b/crates/nu-protocol/Cargo.toml @@ -34,7 +34,7 @@ toml = "0.5.8" [dependencies.polars] version = "0.14.8" optional = true -features = ["serde", "rows", "strings", "checked_arithmetic"] +features = ["default", "serde", "rows", "strings", "checked_arithmetic", "object"] [features] dataframe = ["polars"] diff --git a/crates/nu-protocol/src/dataframe/conversion.rs b/crates/nu-protocol/src/dataframe/conversion.rs new file mode 100644 index 000000000..9f0d655e7 --- /dev/null +++ b/crates/nu-protocol/src/dataframe/conversion.rs @@ -0,0 +1,626 @@ +use indexmap::map::{Entry, IndexMap}; +use polars::chunked_array::object::builder::ObjectChunkedBuilder; +use polars::chunked_array::ChunkedArray; + +use bigdecimal::FromPrimitive; +use chrono::{DateTime, FixedOffset, NaiveDateTime}; +use nu_errors::ShellError; +use nu_source::{Span, Tag}; +use num_bigint::BigInt; +use polars::prelude::{ + DataFrame, DataType, IntoSeries, NamedFrom, ObjectType, PolarsNumericType, Series, TimeUnit, +}; +use std::ops::{Deref, DerefMut}; + +use super::NuDataFrame; +use crate::{Dictionary, Primitive, UntaggedValue, Value}; + +const SECS_PER_DAY: i64 = 86_400; + +#[derive(Debug)] +pub struct Column { + name: String, + values: Vec, +} + +impl Column { + pub fn new(name: String, values: Vec) -> Self { + Self { name, values } + } + + pub fn new_empty(name: String) -> Self { + Self { + name, + values: Vec::new(), + } + } + + pub fn name(&self) -> &str { + self.name.as_str() + } + + pub fn iter(&self) -> impl Iterator { + self.values.iter() + } +} + +impl IntoIterator for Column { + type Item = Value; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.values.into_iter() + } +} + +impl Deref for Column { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.values + } +} + +impl DerefMut for Column { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.values + } +} + +#[derive(Debug)] +pub enum InputType { + Integer, + Decimal, + String, + Boolean, + Object, +} + +#[derive(Debug)] +pub struct TypedColumn { + column: Column, + column_type: Option, +} + +impl TypedColumn { + fn new_empty(name: String) -> Self { + Self { + column: Column::new_empty(name), + column_type: None, + } + } +} + +impl Deref for TypedColumn { + type Target = Column; + + fn deref(&self) -> &Self::Target { + &self.column + } +} + +impl DerefMut for TypedColumn { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.column + } +} + +pub type ColumnMap = IndexMap; + +pub fn create_column( + series: &Series, + from_row: usize, + to_row: usize, +) -> Result { + let size = to_row - from_row; + match series.dtype() { + DataType::Null => { + let values = std::iter::repeat(Value { + value: UntaggedValue::Primitive(Primitive::Nothing), + tag: Tag::default(), + }) + .take(size) + .collect::>(); + + Ok(Column::new(series.name().into(), values)) + } + DataType::UInt8 => { + let casted = series.u8().map_err(|e| { + ShellError::labeled_error( + "Casting error", + format!("casting error: {}", e), + Span::default(), + ) + })?; + Ok(column_from_casted(casted, from_row, size)) + } + DataType::UInt16 => { + let casted = series.u16().map_err(|e| { + ShellError::labeled_error( + "Casting error", + format!("casting error: {}", e), + Span::default(), + ) + })?; + Ok(column_from_casted(casted, from_row, size)) + } + DataType::UInt32 => { + let casted = series.u32().map_err(|e| { + ShellError::labeled_error( + "Casting error", + format!("casting error: {}", e), + Span::default(), + ) + })?; + Ok(column_from_casted(casted, from_row, size)) + } + DataType::UInt64 => { + let casted = series.u64().map_err(|e| { + ShellError::labeled_error( + "Casting error", + format!("casting error: {}", e), + Span::default(), + ) + })?; + Ok(column_from_casted(casted, from_row, size)) + } + DataType::Int8 => { + let casted = series.i8().map_err(|e| { + ShellError::labeled_error( + "Casting error", + format!("casting error: {}", e), + Span::default(), + ) + })?; + Ok(column_from_casted(casted, from_row, size)) + } + DataType::Int16 => { + let casted = series.i16().map_err(|e| { + ShellError::labeled_error( + "Casting error", + format!("casting error: {}", e), + Span::default(), + ) + })?; + Ok(column_from_casted(casted, from_row, size)) + } + DataType::Int32 => { + let casted = series.i32().map_err(|e| { + ShellError::labeled_error( + "Casting error", + format!("casting error: {}", e), + Span::default(), + ) + })?; + Ok(column_from_casted(casted, from_row, size)) + } + DataType::Int64 => { + let casted = series.i64().map_err(|e| { + ShellError::labeled_error( + "Casting error", + format!("casting error: {}", e), + Span::default(), + ) + })?; + Ok(column_from_casted(casted, from_row, size)) + } + DataType::Float32 => { + let casted = series.f32().map_err(|e| { + ShellError::labeled_error( + "Casting error", + format!("casting error: {}", e), + Span::default(), + ) + })?; + Ok(column_from_casted(casted, from_row, size)) + } + DataType::Float64 => { + let casted = series.f64().map_err(|e| { + ShellError::labeled_error( + "Casting error", + format!("casting error: {}", e), + Span::default(), + ) + })?; + Ok(column_from_casted(casted, from_row, size)) + } + DataType::Boolean => { + let casted = series.bool().map_err(|e| { + ShellError::labeled_error( + "Casting error", + format!("casting error: {}", e), + Span::default(), + ) + })?; + + let values = casted + .into_iter() + .skip(from_row) + .take(size) + .map(|v| match v { + Some(a) => Value { + value: UntaggedValue::Primitive((a).into()), + tag: Tag::default(), + }, + None => Value { + value: UntaggedValue::Primitive(Primitive::Nothing), + tag: Tag::default(), + }, + }) + .collect::>(); + + Ok(Column::new(casted.name().into(), values)) + } + DataType::Utf8 => { + let casted = series.utf8().map_err(|e| { + ShellError::labeled_error( + "Casting error", + format!("casting error: {}", e), + Span::default(), + ) + })?; + + let values = casted + .into_iter() + .skip(from_row) + .take(size) + .map(|v| match v { + Some(a) => Value { + value: UntaggedValue::Primitive((a).into()), + tag: Tag::default(), + }, + None => Value { + value: UntaggedValue::Primitive(Primitive::Nothing), + tag: Tag::default(), + }, + }) + .collect::>(); + + Ok(Column::new(casted.name().into(), values)) + } + DataType::Object(_) => { + let casted = series + .as_any() + .downcast_ref::>>(); + + match casted { + None => Err(ShellError::labeled_error( + "Format not supported", + "Value not supported for conversion", + Tag::unknown(), + )), + Some(ca) => { + let values = ca + .into_iter() + .skip(from_row) + .take(size) + .map(|v| match v { + Some(a) => a.clone(), + None => Value { + value: UntaggedValue::Primitive(Primitive::Nothing), + tag: Tag::default(), + }, + }) + .collect::>(); + + Ok(Column::new(ca.name().into(), values)) + } + } + } + DataType::Date32 => { + let casted = series.date32().map_err(|e| { + ShellError::labeled_error( + "Casting error", + format!("casting error: {}", e), + Span::default(), + ) + })?; + + let values = casted + .into_iter() + .skip(from_row) + .take(size) + .map(|v| match v { + Some(a) => { + // elapsed time in day since 1970-01-01 + let seconds = a as i64 * SECS_PER_DAY; + let naive_datetime = NaiveDateTime::from_timestamp(seconds, 0); + + // Zero length offset + let offset = FixedOffset::east(0); + let datetime = DateTime::::from_utc(naive_datetime, offset); + + Value { + value: UntaggedValue::Primitive(Primitive::Date(datetime)), + tag: Tag::default(), + } + } + None => Value { + value: UntaggedValue::Primitive(Primitive::Nothing), + tag: Tag::default(), + }, + }) + .collect::>(); + + Ok(Column::new(casted.name().into(), values)) + } + DataType::Date64 => { + let casted = series.date64().map_err(|e| { + ShellError::labeled_error( + "Casting error", + format!("casting error: {}", e), + Span::default(), + ) + })?; + + let values = casted + .into_iter() + .skip(from_row) + .take(size) + .map(|v| match v { + Some(a) => { + // elapsed time in milliseconds since 1970-01-01 + let seconds = a / 1000; + let naive_datetime = NaiveDateTime::from_timestamp(seconds, 0); + + // Zero length offset + let offset = FixedOffset::east(0); + let datetime = DateTime::::from_utc(naive_datetime, offset); + + Value { + value: UntaggedValue::Primitive(Primitive::Date(datetime)), + tag: Tag::default(), + } + } + None => Value { + value: UntaggedValue::Primitive(Primitive::Nothing), + tag: Tag::default(), + }, + }) + .collect::>(); + + Ok(Column::new(casted.name().into(), values)) + } + DataType::Time64(timeunit) | DataType::Duration(timeunit) => { + let casted = series.time64_nanosecond().map_err(|e| { + ShellError::labeled_error( + "Casting error", + format!("casting error: {}", e), + Span::default(), + ) + })?; + + let values = casted + .into_iter() + .skip(from_row) + .take(size) + .map(|v| match v { + Some(a) => { + let nanoseconds = match timeunit { + TimeUnit::Second => a / 1_000_000_000, + TimeUnit::Millisecond => a / 1_000_000, + TimeUnit::Microsecond => a / 1_000, + TimeUnit::Nanosecond => a, + }; + + let untagged = if let Some(bigint) = BigInt::from_i64(nanoseconds) { + UntaggedValue::Primitive(Primitive::Duration(bigint)) + } else { + unreachable!("Internal error: protocol did not use compatible decimal") + }; + + Value { + value: untagged, + tag: Tag::default(), + } + } + None => Value { + value: UntaggedValue::Primitive(Primitive::Nothing), + tag: Tag::default(), + }, + }) + .collect::>(); + + Ok(Column::new(casted.name().into(), values)) + } + e => Err(ShellError::labeled_error( + "Format not supported", + format!("Value not supported for conversion: {}", e), + Tag::unknown(), + )), + } +} + +fn column_from_casted(casted: &ChunkedArray, from_row: usize, size: usize) -> Column +where + T: PolarsNumericType, + T::Native: Into, +{ + let values = casted + .into_iter() + .skip(from_row) + .take(size) + .map(|v| match v { + Some(a) => Value { + value: UntaggedValue::Primitive((a).into()), + tag: Tag::default(), + }, + None => Value { + value: UntaggedValue::Primitive(Primitive::Nothing), + tag: Tag::default(), + }, + }) + .collect::>(); + + Column::new(casted.name().into(), values) +} + +// Adds a separator to the vector of values using the column names from the +// dataframe to create the Values Row +pub fn add_separator(values: &mut Vec, df: &DataFrame) { + let column_names = df.get_column_names(); + + let mut dictionary = Dictionary::default(); + for name in column_names { + let indicator = Value { + value: UntaggedValue::Primitive(Primitive::String("...".to_string())), + tag: Tag::unknown(), + }; + + dictionary.insert(name.to_string(), indicator); + } + + let extra_column = Value { + value: UntaggedValue::Row(dictionary), + tag: Tag::unknown(), + }; + + values.push(extra_column); +} + +// Inserting the values found in a UntaggedValue::Row +// All the entries for the dictionary are checked in order to check if +// the column values have the same type value. +pub fn insert_row(column_values: &mut ColumnMap, dictionary: Dictionary) -> Result<(), ShellError> { + for (key, value) in dictionary.entries { + insert_value(value, key, column_values)?; + } + + Ok(()) +} + +// Inserting the values found in a UntaggedValue::Table +// All the entries for the table are checked in order to check if +// the column values have the same type value. +// The names for the columns are the enumerated numbers from the values +pub fn insert_table(column_values: &mut ColumnMap, table: Vec) -> Result<(), ShellError> { + for (index, value) in table.into_iter().enumerate() { + let key = format!("{}", index); + insert_value(value, key, column_values)?; + } + + Ok(()) +} + +pub fn insert_value( + value: Value, + key: String, + column_values: &mut ColumnMap, +) -> Result<(), ShellError> { + let col_val = match column_values.entry(key.clone()) { + Entry::Vacant(entry) => entry.insert(TypedColumn::new_empty(key)), + Entry::Occupied(entry) => entry.into_mut(), + }; + + // Checking that the type for the value is the same + // for the previous value in the column + if col_val.values.is_empty() { + match &value.value { + UntaggedValue::Primitive(Primitive::Int(_)) => { + col_val.column_type = Some(InputType::Integer); + } + UntaggedValue::Primitive(Primitive::Decimal(_)) => { + col_val.column_type = Some(InputType::Decimal); + } + UntaggedValue::Primitive(Primitive::String(_)) => { + col_val.column_type = Some(InputType::String); + } + UntaggedValue::Primitive(Primitive::Boolean(_)) => { + col_val.column_type = Some(InputType::Boolean); + } + _ => col_val.column_type = Some(InputType::Object), + } + col_val.values.push(value); + } else { + let prev_value = &col_val.values[col_val.values.len() - 1]; + + match (&prev_value.value, &value.value) { + ( + UntaggedValue::Primitive(Primitive::Int(_)), + UntaggedValue::Primitive(Primitive::Int(_)), + ) + | ( + UntaggedValue::Primitive(Primitive::Decimal(_)), + UntaggedValue::Primitive(Primitive::Decimal(_)), + ) + | ( + UntaggedValue::Primitive(Primitive::String(_)), + UntaggedValue::Primitive(Primitive::String(_)), + ) + | ( + UntaggedValue::Primitive(Primitive::Boolean(_)), + UntaggedValue::Primitive(Primitive::Boolean(_)), + ) => col_val.values.push(value), + _ => { + col_val.column_type = Some(InputType::Object); + col_val.values.push(value); + } + } + } + + Ok(()) +} + +// The ColumnMap has the parsed data from the StreamInput +// This data can be used to create a Series object that can initialize +// the dataframe based on the type of data that is found +pub fn from_parsed_columns( + column_values: ColumnMap, + span: &Span, +) -> Result { + let mut df_series: Vec = Vec::new(); + for (name, column) in column_values { + if let Some(column_type) = &column.column_type { + match column_type { + InputType::Decimal => { + let series_values: Result, _> = + column.values.iter().map(|v| v.as_f64()).collect(); + let series = Series::new(&name, series_values?); + df_series.push(series) + } + InputType::Integer => { + let series_values: Result, _> = + column.values.iter().map(|v| v.as_i64()).collect(); + let series = Series::new(&name, series_values?); + df_series.push(series) + } + InputType::String => { + let series_values: Result, _> = + column.values.iter().map(|v| v.as_string()).collect(); + let series = Series::new(&name, series_values?); + df_series.push(series) + } + InputType::Boolean => { + let series_values: Result, _> = + column.values.iter().map(|v| v.as_bool()).collect(); + let series = Series::new(&name, series_values?); + df_series.push(series) + } + InputType::Object => { + let mut builder = + ObjectChunkedBuilder::::new(&name, column.values.len()); + + for v in column.values.iter() { + builder.append_value(v.clone()); + } + + let res = builder.finish(); + df_series.push(res.into_series()) + } + } + } + } + + let df = DataFrame::new(df_series); + + match df { + Ok(df) => Ok(NuDataFrame::new(df)), + Err(e) => { + return Err(ShellError::labeled_error( + "Error while creating dataframe", + format!("{}", e), + span, + )) + } + } +} diff --git a/crates/nu-protocol/src/dataframe/mod.rs b/crates/nu-protocol/src/dataframe/mod.rs index 94aa62705..35efa4c0d 100644 --- a/crates/nu-protocol/src/dataframe/mod.rs +++ b/crates/nu-protocol/src/dataframe/mod.rs @@ -1,10 +1,12 @@ pub mod compute_between; +pub mod conversion; pub mod nu_dataframe; pub mod nu_groupby; pub mod operations; pub use compute_between::{compute_between_dataframes, compute_series_single_value}; -pub use nu_dataframe::{Column, NuDataFrame}; +pub use conversion::Column; +pub use nu_dataframe::NuDataFrame; pub use nu_groupby::NuGroupBy; pub use operations::Axis; use serde::{Deserialize, Serialize}; diff --git a/crates/nu-protocol/src/dataframe/nu_dataframe.rs b/crates/nu-protocol/src/dataframe/nu_dataframe.rs index 68e39eb9d..f6150b95b 100644 --- a/crates/nu-protocol/src/dataframe/nu_dataframe.rs +++ b/crates/nu-protocol/src/dataframe/nu_dataframe.rs @@ -1,82 +1,40 @@ -use indexmap::{map::Entry, IndexMap}; +use indexmap::IndexMap; use std::cmp::Ordering; +use std::fmt::Display; use std::hash::{Hash, Hasher}; -use std::ops::{Deref, DerefMut}; -use bigdecimal::FromPrimitive; -use chrono::{DateTime, FixedOffset, NaiveDateTime}; use nu_errors::ShellError; use nu_source::{Span, Tag}; -use num_bigint::BigInt; -use polars::prelude::{AnyValue, DataFrame, DataType, NamedFrom, Series, TimeUnit}; +use polars::prelude::{DataFrame, DataType, PolarsObject, Series}; use serde::{Deserialize, Serialize}; -use crate::{Dictionary, Primitive, UntaggedValue, Value}; +use super::conversion::{ + add_separator, create_column, from_parsed_columns, insert_row, insert_table, insert_value, + Column, ColumnMap, +}; +use crate::{Dictionary, Primitive, ShellTypeName, UntaggedValue, Value}; -const SECS_PER_DAY: i64 = 86_400; - -#[derive(Debug)] -pub struct Column { - name: String, - values: Vec, +impl Display for Value { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.type_name()) + } } -impl Column { - pub fn new(name: String, values: Vec) -> Self { - Self { name, values } - } - - pub fn new_empty(name: String) -> Self { +impl Default for Value { + fn default() -> Self { Self { - name, - values: Vec::new(), - } - } - - pub fn push(&mut self, value: Value) { - self.values.push(value) - } -} - -#[derive(Debug)] -enum InputType { - Integer, - Decimal, - String, - Boolean, -} - -#[derive(Debug)] -struct TypedColumn { - pub column: Column, - pub column_type: Option, -} - -impl TypedColumn { - fn new_empty(name: String) -> Self { - Self { - column: Column::new_empty(name), - column_type: None, + value: UntaggedValue::Primitive(Primitive::Nothing), + tag: Tag::default(), } } } -impl Deref for TypedColumn { - type Target = Column; - - fn deref(&self) -> &Self::Target { - &self.column +impl PolarsObject for Value { + fn type_name() -> &'static str { + "object" } } -impl DerefMut for TypedColumn { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.column - } -} - -type ColumnMap = IndexMap; - #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NuDataFrame { dataframe: DataFrame, @@ -250,8 +208,9 @@ impl NuDataFrame { let mut column_values: ColumnMap = IndexMap::new(); for column in columns { - for value in column.values { - insert_value(value, column.name.clone(), &mut column_values)?; + let name = column.name().to_string(); + for value in column.into_iter() { + insert_value(value, name.clone(), &mut column_values)?; } } @@ -369,256 +328,51 @@ impl NuDataFrame { let df = self.as_ref(); let upper_row = to_row.min(df.height()); - let mut values: Vec = Vec::new(); - for i in from_row..upper_row { - let mut dictionary_row = Dictionary::default(); - for col in df.get_columns() { - let dict_val = Value { - value: anyvalue_to_untagged(&col.get(i))?, + let mut size: usize = 0; + let columns = self + .as_ref() + .get_columns() + .iter() + .map(|col| match create_column(col, from_row, upper_row) { + Ok(col) => { + size = col.len(); + Ok(col) + } + Err(e) => Err(e), + }) + .collect::, ShellError>>()?; + + let mut iterators = columns + .into_iter() + .map(|col| (col.name().to_string(), col.into_iter())) + .collect::)>>(); + + let values = (0..size) + .into_iter() + .map(|i| { + let mut dictionary_row = Dictionary::default(); + + for (name, col) in iterators.iter_mut() { + let dict_val = match col.next() { + Some(v) => v, + None => { + println!("index: {}", i); + Value { + value: UntaggedValue::Primitive(Primitive::Nothing), + tag: Tag::default(), + } + } + }; + dictionary_row.insert(name.clone(), dict_val); + } + + Value { + value: UntaggedValue::Row(dictionary_row), tag: Tag::unknown(), - }; - dictionary_row.insert(col.name().into(), dict_val); - } - - let value = Value { - value: UntaggedValue::Row(dictionary_row), - tag: Tag::unknown(), - }; - - values.push(value) - } + } + }) + .collect::>(); Ok(values) } } - -// Adds a separator to the vector of values using the column names from the -// dataframe to create the Values Row -fn add_separator(values: &mut Vec, df: &DataFrame) { - let column_names = df.get_column_names(); - - let mut dictionary = Dictionary::default(); - for name in column_names { - let indicator = Value { - value: UntaggedValue::Primitive(Primitive::String("...".to_string())), - tag: Tag::unknown(), - }; - - dictionary.insert(name.to_string(), indicator); - } - - let extra_column = Value { - value: UntaggedValue::Row(dictionary), - tag: Tag::unknown(), - }; - - values.push(extra_column); -} - -// Converts a polars AnyValue to an UntaggedValue -// This is used when printing values coming for polars dataframes -fn anyvalue_to_untagged(anyvalue: &AnyValue) -> Result { - Ok(match anyvalue { - AnyValue::Null => UntaggedValue::Primitive(Primitive::Nothing), - AnyValue::Utf8(a) => UntaggedValue::Primitive((*a).into()), - AnyValue::Boolean(a) => UntaggedValue::Primitive((*a).into()), - AnyValue::Float32(a) => UntaggedValue::Primitive((*a).into()), - AnyValue::Float64(a) => UntaggedValue::Primitive((*a).into()), - AnyValue::Int32(a) => UntaggedValue::Primitive((*a).into()), - AnyValue::Int64(a) => UntaggedValue::Primitive((*a).into()), - AnyValue::UInt8(a) => UntaggedValue::Primitive((*a).into()), - AnyValue::UInt16(a) => UntaggedValue::Primitive((*a).into()), - AnyValue::Int8(a) => UntaggedValue::Primitive((*a).into()), - AnyValue::Int16(a) => UntaggedValue::Primitive((*a).into()), - AnyValue::UInt32(a) => UntaggedValue::Primitive((*a).into()), - AnyValue::UInt64(a) => UntaggedValue::Primitive((*a).into()), - AnyValue::Date32(a) => { - // elapsed time in day since 1970-01-01 - let seconds = *a as i64 * SECS_PER_DAY; - let naive_datetime = NaiveDateTime::from_timestamp(seconds, 0); - - // Zero length offset - let offset = FixedOffset::east(0); - let datetime = DateTime::::from_utc(naive_datetime, offset); - - UntaggedValue::Primitive(Primitive::Date(datetime)) - } - AnyValue::Date64(a) => { - // elapsed time in milliseconds since 1970-01-01 - let seconds = *a / 1000; - let naive_datetime = NaiveDateTime::from_timestamp(seconds, 0); - - // Zero length offset - let offset = FixedOffset::east(0); - let datetime = DateTime::::from_utc(naive_datetime, offset); - - UntaggedValue::Primitive(Primitive::Date(datetime)) - } - AnyValue::Time64(a, _) => UntaggedValue::Primitive((*a).into()), - AnyValue::Duration(a, unit) => { - let nanoseconds = match unit { - TimeUnit::Second => *a / 1_000_000_000, - TimeUnit::Millisecond => *a / 1_000_000, - TimeUnit::Microsecond => *a / 1_000, - TimeUnit::Nanosecond => *a, - }; - - if let Some(bigint) = BigInt::from_i64(nanoseconds) { - UntaggedValue::Primitive(Primitive::Duration(bigint)) - } else { - unreachable!("Internal error: protocol did not use compatible decimal") - } - } - AnyValue::List(_) => { - return Err(ShellError::labeled_error( - "Format not supported", - "Value not supported for conversion", - Tag::unknown(), - )); - } - }) -} - -// Inserting the values found in a UntaggedValue::Row -// All the entries for the dictionary are checked in order to check if -// the column values have the same type value. -fn insert_row(column_values: &mut ColumnMap, dictionary: Dictionary) -> Result<(), ShellError> { - for (key, value) in dictionary.entries { - insert_value(value, key, column_values)?; - } - - Ok(()) -} - -// Inserting the values found in a UntaggedValue::Table -// All the entries for the table are checked in order to check if -// the column values have the same type value. -// The names for the columns are the enumerated numbers from the values -fn insert_table(column_values: &mut ColumnMap, table: Vec) -> Result<(), ShellError> { - for (index, value) in table.into_iter().enumerate() { - let key = format!("{}", index); - insert_value(value, key, column_values)?; - } - - Ok(()) -} - -fn insert_value( - value: Value, - key: String, - column_values: &mut ColumnMap, -) -> Result<(), ShellError> { - let col_val = match column_values.entry(key.clone()) { - Entry::Vacant(entry) => entry.insert(TypedColumn::new_empty(key)), - Entry::Occupied(entry) => entry.into_mut(), - }; - - // Checking that the type for the value is the same - // for the previous value in the column - if col_val.values.is_empty() { - match &value.value { - UntaggedValue::Primitive(Primitive::Int(_)) => { - col_val.column_type = Some(InputType::Integer); - } - UntaggedValue::Primitive(Primitive::Decimal(_)) => { - col_val.column_type = Some(InputType::Decimal); - } - UntaggedValue::Primitive(Primitive::String(_)) => { - col_val.column_type = Some(InputType::String); - } - UntaggedValue::Primitive(Primitive::Boolean(_)) => { - col_val.column_type = Some(InputType::Boolean); - } - _ => { - return Err(ShellError::labeled_error( - "Only primitive values accepted", - "Not a primitive value", - &value.tag, - )); - } - } - col_val.values.push(value); - } else { - let prev_value = &col_val.values[col_val.values.len() - 1]; - - match (&prev_value.value, &value.value) { - ( - UntaggedValue::Primitive(Primitive::Int(_)), - UntaggedValue::Primitive(Primitive::Int(_)), - ) - | ( - UntaggedValue::Primitive(Primitive::Decimal(_)), - UntaggedValue::Primitive(Primitive::Decimal(_)), - ) - | ( - UntaggedValue::Primitive(Primitive::String(_)), - UntaggedValue::Primitive(Primitive::String(_)), - ) - | ( - UntaggedValue::Primitive(Primitive::Boolean(_)), - UntaggedValue::Primitive(Primitive::Boolean(_)), - ) => col_val.values.push(value), - _ => { - return Err(ShellError::labeled_error_with_secondary( - "Different values in column", - "Value with different type", - &value.tag, - "Perhaps you want to change it to this value type", - &prev_value.tag, - )); - } - } - } - - Ok(()) -} - -// The ColumnMap has the parsed data from the StreamInput -// This data can be used to create a Series object that can initialize -// the dataframe based on the type of data that is found -fn from_parsed_columns(column_values: ColumnMap, span: &Span) -> Result { - let mut df_series: Vec = Vec::new(); - for (name, column) in column_values { - if let Some(column_type) = &column.column_type { - match column_type { - InputType::Decimal => { - let series_values: Result, _> = - column.values.iter().map(|v| v.as_f64()).collect(); - let series = Series::new(&name, series_values?); - df_series.push(series) - } - InputType::Integer => { - let series_values: Result, _> = - column.values.iter().map(|v| v.as_i64()).collect(); - let series = Series::new(&name, series_values?); - df_series.push(series) - } - InputType::String => { - let series_values: Result, _> = - column.values.iter().map(|v| v.as_string()).collect(); - let series = Series::new(&name, series_values?); - df_series.push(series) - } - InputType::Boolean => { - let series_values: Result, _> = - column.values.iter().map(|v| v.as_bool()).collect(); - let series = Series::new(&name, series_values?); - df_series.push(series) - } - } - } - } - - let df = DataFrame::new(df_series); - - match df { - Ok(df) => Ok(NuDataFrame::new(df)), - Err(e) => { - return Err(ShellError::labeled_error( - "Error while creating dataframe", - format!("{}", e), - span, - )) - } - } -}