Dataframe commands (#3502)

* Sample command

* Join command with checks

* More dataframes commands

* Groupby and aggregate commands

* Missing feature dataframe flag

* Renamed file

* New commands for dataframes

* error parser and df reference

* filter command for dataframes

* removed name from nu_dataframe

* commands to save to parquet and csv
This commit is contained in:
Fernando Herrera
2021-06-03 07:23:14 +01:00
committed by GitHub
parent fd5da62c66
commit 5537dce3cc
30 changed files with 1591 additions and 470 deletions

580
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -102,7 +102,7 @@ zip = { version = "0.5.9", optional = true }
[dependencies.polars] [dependencies.polars]
version = "0.13.4" version = "0.13.4"
optional = true optional = true
features = ["parquet", "json", "random"] features = ["parquet", "json", "random", "pivot"]
[target.'cfg(unix)'.dependencies] [target.'cfg(unix)'.dependencies]
umask = "1.0.0" umask = "1.0.0"

View File

@ -192,8 +192,9 @@ pub(crate) use any::Command as Any;
#[cfg(feature = "dataframe")] #[cfg(feature = "dataframe")]
pub(crate) use dataframe::{ pub(crate) use dataframe::{
DataFrame, DataFrameAggregate, DataFrameConvert, DataFrameDTypes, DataFrameDrop, DataFrame, DataFrameAggregate, DataFrameConvert, DataFrameDTypes, DataFrameDrop,
DataFrameGroupBy, DataFrameJoin, DataFrameList, DataFrameLoad, DataFrameSample, DataFrameDummies, DataFrameGroupBy, DataFrameHead, DataFrameJoin, DataFrameList, DataFrameLoad,
DataFrameSelect, DataFrameShow, DataFrameMelt, DataFramePivot, DataFrameSample, DataFrameSelect, DataFrameShow, DataFrameSlice,
DataFrameTail, DataFrameToCsv, DataFrameToParquet, DataFrameWhere,
}; };
pub(crate) use enter::Enter; pub(crate) use enter::Enter;
pub(crate) use every::Every; pub(crate) use every::Every;

View File

@ -1,4 +1,4 @@
use crate::prelude::*; use crate::{commands::dataframe::utils::parse_polars_error, prelude::*};
use nu_engine::WholeStreamCommand; use nu_engine::WholeStreamCommand;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ use nu_protocol::{
@ -6,7 +6,7 @@ use nu_protocol::{
Signature, SyntaxShape, UntaggedValue, Value, Signature, SyntaxShape, UntaggedValue, Value,
}; };
use nu_source::Tagged; use nu_source::Tagged;
use polars::frame::groupby::GroupBy; use polars::{frame::groupby::GroupBy, prelude::PolarsError};
use super::utils::convert_columns; use super::utils::convert_columns;
@ -101,20 +101,19 @@ impl WholeStreamCommand for DataFrame {
} }
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> { fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
aggregate(args) command(args)
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
vec![Example { vec![Example {
description: "Aggregate sum by grouping by column a and summing on col b", description: "Aggregate sum by grouping by column a and summing on col b",
example: example: "[[a b]; [one 1] [one 2]] | pls convert | pls groupby [a] | pls aggregate sum",
"echo [[a b]; [one 1] [one 2]] | pls convert | pls groupby [a] | pls aggregate sum",
result: None, result: None,
}] }]
} }
} }
fn aggregate(args: CommandArgs) -> Result<OutputStream, ShellError> { fn command(args: CommandArgs) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone(); let tag = args.call_info.name_tag.clone();
let mut args = args.evaluate_once()?; let mut args = args.evaluate_once()?;
@ -132,12 +131,12 @@ fn aggregate(args: CommandArgs) -> Result<OutputStream, ShellError> {
None => (None, Span::unknown()), None => (None, Span::unknown()),
}; };
// The operation is only done in one dataframe. Only one input is // The operation is only done in one groupby. Only one input is
// expected from the InputStream // expected from the InputStream
match args.input.next() { match args.input.next() {
None => Err(ShellError::labeled_error( None => Err(ShellError::labeled_error(
"No input received", "No input received",
"missing dataframe input from stream", "missing groupby input from stream",
&tag, &tag,
)), )),
Some(value) => { Some(value) => {
@ -191,12 +190,11 @@ fn perform_aggregation(
Operation::Count => groupby.count(), Operation::Count => groupby.count(),
} }
.map_err(|e| { .map_err(|e| {
let span = if e.to_string().contains("Not found") { let span = match &e {
agg_span PolarsError::NotFound(_) => agg_span,
} else { _ => &operation_tag.span,
&operation_tag.span
}; };
ShellError::labeled_error("Aggregation error", format!("{}", e), span) parse_polars_error::<&str>(&e, span, None)
}) })
} }

View File

@ -36,7 +36,7 @@ impl WholeStreamCommand for DataFrame {
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
vec![Example { vec![Example {
description: "Takes an input stream and converts it to a polars dataframe", description: "Takes an input stream and converts it to a polars dataframe",
example: "echo [[a b];[1 2] [3 4]] | pls convert", example: "[[a b];[1 2] [3 4]] | pls convert",
result: None, result: None,
}] }]
} }

View File

@ -6,7 +6,7 @@ use nu_protocol::{
Signature, SyntaxShape, UntaggedValue, Value, Signature, SyntaxShape, UntaggedValue, Value,
}; };
use super::utils::convert_columns; use super::utils::{convert_columns, parse_polars_error};
pub struct DataFrame; pub struct DataFrame;
@ -28,19 +28,19 @@ impl WholeStreamCommand for DataFrame {
} }
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> { fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
drop(args) command(args)
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
vec![Example { vec![Example {
description: "drop column a", description: "drop column a",
example: "echo [[a b]; [1 2] [3 4]] | pls convert | pls drop [a]", example: "[[a b]; [1 2] [3 4]] | pls convert | pls drop [a]",
result: None, result: None,
}] }]
} }
} }
fn drop(args: CommandArgs) -> Result<OutputStream, ShellError> { fn command(args: CommandArgs) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone(); let tag = args.call_info.name_tag.clone();
let mut args = args.evaluate_once()?; let mut args = args.evaluate_once()?;
@ -55,15 +55,13 @@ fn drop(args: CommandArgs) -> Result<OutputStream, ShellError> {
&tag, &tag,
)), )),
Some(value) => { Some(value) => {
if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame { if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value {
dataframe: Some(ref df), // Dataframe with the first selected column
..
})) = value.value
{
let new_df = match col_string.iter().next() { let new_df = match col_string.iter().next() {
Some(col) => df.drop(col).map_err(|e| { Some(col) => df
ShellError::labeled_error("Join error", format!("{}", e), &col_span) .as_ref()
}), .drop(col)
.map_err(|e| parse_polars_error::<&str>(&e, &col_span, None)),
None => Err(ShellError::labeled_error( None => Err(ShellError::labeled_error(
"Empty names list", "Empty names list",
"No column names where found", "No column names where found",
@ -71,10 +69,12 @@ fn drop(args: CommandArgs) -> Result<OutputStream, ShellError> {
)), )),
}?; }?;
// If there are more columns in the drop selection list, these
// are added from the resulting dataframe
let res = col_string.iter().skip(1).try_fold(new_df, |new_df, col| { let res = col_string.iter().skip(1).try_fold(new_df, |new_df, col| {
new_df.drop(col).map_err(|e| { new_df
ShellError::labeled_error("Drop error", format!("{}", e), &col_span) .drop(col)
}) .map_err(|e| parse_polars_error::<&str>(&e, &col_span, None))
})?; })?;
let value = Value { let value = Value {

View File

@ -1,10 +1,7 @@
use crate::prelude::*; use crate::prelude::*;
use nu_engine::WholeStreamCommand; use nu_engine::WholeStreamCommand;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ use nu_protocol::{dataframe::PolarsData, Signature, TaggedDictBuilder, UntaggedValue};
dataframe::{NuDataFrame, PolarsData},
Signature, TaggedDictBuilder, UntaggedValue,
};
pub struct DataFrame; pub struct DataFrame;
@ -22,19 +19,19 @@ impl WholeStreamCommand for DataFrame {
} }
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> { fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
dtypes(args) command(args)
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
vec![Example { vec![Example {
description: "drop column a", description: "drop column a",
example: "echo [[a b]; [1 2] [3 4]] | pls convert | pls dtypes", example: "[[a b]; [1 2] [3 4]] | pls convert | pls dtypes",
result: None, result: None,
}] }]
} }
} }
fn dtypes(args: CommandArgs) -> Result<OutputStream, ShellError> { fn command(args: CommandArgs) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone(); let tag = args.call_info.name_tag.clone();
let mut args = args.evaluate_once()?; let mut args = args.evaluate_once()?;
@ -45,28 +42,26 @@ fn dtypes(args: CommandArgs) -> Result<OutputStream, ShellError> {
&tag, &tag,
)), )),
Some(value) => { Some(value) => {
if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame { if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value {
dataframe: Some(df),
..
})) = value.value
{
let col_names = df let col_names = df
.as_ref()
.get_column_names() .get_column_names()
.iter() .iter()
.map(|v| v.to_string()) .map(|v| v.to_string())
.collect::<Vec<String>>(); .collect::<Vec<String>>();
let values = let values = df
df.dtypes() .as_ref()
.into_iter() .dtypes()
.zip(col_names.into_iter()) .into_iter()
.map(move |(dtype, name)| { .zip(col_names.into_iter())
let mut data = TaggedDictBuilder::new(tag.clone()); .map(move |(dtype, name)| {
data.insert_value("column", name.as_ref()); let mut data = TaggedDictBuilder::new(tag.clone());
data.insert_value("dtype", format!("{}", dtype)); data.insert_value("column", name.as_ref());
data.insert_value("dtype", format!("{}", dtype));
data.into_value() data.into_value()
}); });
Ok(OutputStream::from_stream(values)) Ok(OutputStream::from_stream(values))
} else { } else {

View File

@ -0,0 +1,76 @@
use crate::prelude::*;
use nu_engine::WholeStreamCommand;
use nu_errors::ShellError;
use nu_protocol::{
dataframe::{NuDataFrame, PolarsData},
Signature, UntaggedValue, Value,
};
use super::utils::parse_polars_error;
pub struct DataFrame;
impl WholeStreamCommand for DataFrame {
fn name(&self) -> &str {
"pls to_dummies"
}
fn usage(&self) -> &str {
"Creates a new dataframe with dummy variables"
}
fn signature(&self) -> Signature {
Signature::build("pls select")
}
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
command(args)
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Create new dataframe with dummy variables",
example: "[[a b]; [1 2] [3 4]] | pls convert | pls to_dummies",
result: None,
}]
}
}
fn command(args: CommandArgs) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone();
let mut args = args.evaluate_once()?;
match args.input.next() {
None => Err(ShellError::labeled_error(
"No input received",
"missing dataframe input from stream",
&tag,
)),
Some(value) => {
if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value {
let res = df.as_ref().to_dummies().map_err(|e| {
parse_polars_error(
&e,
&tag.span,
Some("The only allowed column types for dummies are String or Int"),
)
})?;
let value = Value {
value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new(
res,
))),
tag: tag.clone(),
};
Ok(OutputStream::one(value))
} else {
Err(ShellError::labeled_error(
"No dataframe in stream",
"no dataframe found in input stream",
&tag,
))
}
}
}
}

View File

@ -1,4 +1,4 @@
use crate::prelude::*; use crate::{commands::dataframe::utils::parse_polars_error, prelude::*};
use nu_engine::WholeStreamCommand; use nu_engine::WholeStreamCommand;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ use nu_protocol::{
@ -28,19 +28,19 @@ impl WholeStreamCommand for DataFrame {
} }
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> { fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
groupby(args) command(args)
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
vec![Example { vec![Example {
description: "Grouping by column a", description: "Grouping by column a",
example: "echo [[a b]; [one 1] [one 2]] | pls convert | pls groupby [a]", example: "[[a b]; [one 1] [one 2]] | pls convert | pls groupby [a]",
result: None, result: None,
}] }]
} }
} }
fn groupby(args: CommandArgs) -> Result<OutputStream, ShellError> { fn command(args: CommandArgs) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone(); let tag = args.call_info.name_tag.clone();
let mut args = args.evaluate_once()?; let mut args = args.evaluate_once()?;
@ -58,24 +58,20 @@ fn groupby(args: CommandArgs) -> Result<OutputStream, ShellError> {
)), )),
Some(value) => { Some(value) => {
if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(nu_df)) = value.value { if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(nu_df)) = value.value {
let df = match nu_df.dataframe {
Some(df) => df,
None => unreachable!("No dataframe in nu_dataframe"),
};
// This is the expensive part of the groupby; to create the // This is the expensive part of the groupby; to create the
// groups that will be used for grouping the data in the // groups that will be used for grouping the data in the
// dataframe. Once it has been done these values can be stored // dataframe. Once it has been done these values can be stored
// in the NuGroupBy // in a NuGroupBy
let groupby = df.groupby(&columns_string).map_err(|e| { let groupby = nu_df
ShellError::labeled_error("Groupby error", format!("{}", e), col_span) .as_ref()
})?; .groupby(&columns_string)
.map_err(|e| parse_polars_error::<&str>(&e, &col_span, None))?;
let groups = groupby.get_groups().to_vec(); let groups = groupby.get_groups().to_vec();
let groupby = Value { let groupby = Value {
tag: value.tag, tag: value.tag,
value: UntaggedValue::DataFrame(PolarsData::GroupBy(NuGroupBy::new( value: UntaggedValue::DataFrame(PolarsData::GroupBy(NuGroupBy::new(
NuDataFrame::new_with_name(df, nu_df.name), NuDataFrame::new(nu_df.as_ref().clone()),
columns_string, columns_string,
groups, groups,
))), ))),

View File

@ -0,0 +1,80 @@
use crate::prelude::*;
use nu_engine::WholeStreamCommand;
use nu_errors::ShellError;
use nu_protocol::{
dataframe::{NuDataFrame, PolarsData},
Signature, SyntaxShape, UntaggedValue, Value,
};
use nu_source::Tagged;
pub struct DataFrame;
impl WholeStreamCommand for DataFrame {
fn name(&self) -> &str {
"pls head"
}
fn usage(&self) -> &str {
"Creates new dataframe with head rows"
}
fn signature(&self) -> Signature {
Signature::build("pls select").optional(
"n_rows",
SyntaxShape::Number,
"Number of rows for head",
)
}
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
command(args)
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Create new dataframe with head rows",
example: "[[a b]; [1 2] [3 4]] | pls convert | pls head",
result: None,
}]
}
}
fn command(args: CommandArgs) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone();
let mut args = args.evaluate_once()?;
let rows: Option<Tagged<usize>> = args.opt(0)?;
let rows = match rows {
Some(val) => val.item,
None => 5,
};
match args.input.next() {
None => Err(ShellError::labeled_error(
"No input received",
"missing dataframe input from stream",
&tag,
)),
Some(value) => {
if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value {
let res = df.as_ref().head(Some(rows));
let value = Value {
value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new(
res,
))),
tag: tag.clone(),
};
Ok(OutputStream::one(value))
} else {
Err(ShellError::labeled_error(
"No dataframe in stream",
"no dataframe found in input stream",
&tag,
))
}
}
}
}

View File

@ -6,7 +6,7 @@ use nu_protocol::{
Signature, SyntaxShape, UntaggedValue, Value, Signature, SyntaxShape, UntaggedValue, Value,
}; };
use super::utils::convert_columns; use super::utils::{convert_columns, parse_polars_error};
use polars::prelude::JoinType; use polars::prelude::JoinType;
@ -45,7 +45,7 @@ impl WholeStreamCommand for DataFrame {
} }
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> { fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
join(args) command(args)
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -58,14 +58,14 @@ impl WholeStreamCommand for DataFrame {
Example { Example {
description: "right join dataframe", description: "right join dataframe",
example: example:
"echo [[a b]; [1 2] [3 4] [5 6]] | pls convert | pls join $right [b] [b] -t right", "[[a b]; [1 2] [3 4] [5 6]] | pls convert | pls join $right [b] [b] -t right",
result: None, result: None,
}, },
] ]
} }
} }
fn join(args: CommandArgs) -> Result<OutputStream, ShellError> { fn command(args: CommandArgs) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone(); let tag = args.call_info.name_tag.clone();
let mut args = args.evaluate_once()?; let mut args = args.evaluate_once()?;
@ -102,33 +102,21 @@ fn join(args: CommandArgs) -> Result<OutputStream, ShellError> {
&tag, &tag,
)), )),
Some(value) => { Some(value) => {
if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame { if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value {
dataframe: Some(ref df),
..
})) = value.value
{
let res = match r_df.value { let res = match r_df.value {
UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame { UntaggedValue::DataFrame(PolarsData::EagerDataFrame(r_df)) => {
dataframe: Some(r_df),
..
})) => {
// Checking the column types before performing the join // Checking the column types before performing the join
check_column_datatypes( check_column_datatypes(
df, df.as_ref(),
&l_col_string, &l_col_string,
&l_col_span, &l_col_span,
&r_col_string, &r_col_string,
&r_col_span, &r_col_span,
)?; )?;
df.join(&r_df, &l_col_string, &r_col_string, join_type) df.as_ref()
.map_err(|e| { .join(r_df.as_ref(), &l_col_string, &r_col_string, join_type)
ShellError::labeled_error( .map_err(|e| parse_polars_error::<&str>(&e, &l_col_span, None))
"Join error",
format!("{}", e),
&l_col_span,
)
})
} }
_ => Err(ShellError::labeled_error( _ => Err(ShellError::labeled_error(
"Not a dataframe", "Not a dataframe",
@ -180,11 +168,11 @@ fn check_column_datatypes<T: AsRef<str>>(
for (l, r) in l_cols.iter().zip(r_cols.iter()) { for (l, r) in l_cols.iter().zip(r_cols.iter()) {
let l_series = df let l_series = df
.column(l.as_ref()) .column(l.as_ref())
.map_err(|e| ShellError::labeled_error("Join error", format!("{}", e), l_col_span))?; .map_err(|e| parse_polars_error::<&str>(&e, &l_col_span, None))?;
let r_series = df let r_series = df
.column(r.as_ref()) .column(r.as_ref())
.map_err(|e| ShellError::labeled_error("Join error", format!("{}", e), r_col_span))?; .map_err(|e| parse_polars_error::<&str>(&e, &r_col_span, None))?;
if l_series.dtype() != r_series.dtype() { if l_series.dtype() != r_series.dtype() {
return Err(ShellError::labeled_error_with_secondary( return Err(ShellError::labeled_error_with_secondary(

View File

@ -1,10 +1,7 @@
use crate::prelude::*; use crate::prelude::*;
use nu_engine::WholeStreamCommand; use nu_engine::WholeStreamCommand;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ use nu_protocol::{dataframe::PolarsData, Signature, TaggedDictBuilder, UntaggedValue};
dataframe::{NuDataFrame, PolarsData},
Signature, TaggedDictBuilder, UntaggedValue,
};
pub struct DataFrame; pub struct DataFrame;
@ -30,21 +27,26 @@ impl WholeStreamCommand for DataFrame {
.get_vars() .get_vars()
.into_iter() .into_iter()
.filter_map(|(name, value)| { .filter_map(|(name, value)| {
if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame { if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = &value.value {
dataframe: Some(df),
name: file_name,
})) = &value.value
{
let mut data = TaggedDictBuilder::new(value.tag.clone()); let mut data = TaggedDictBuilder::new(value.tag.clone());
let rows = df.height(); let rows = df.as_ref().height();
let cols = df.width(); let cols = df.as_ref().width();
data.insert_value("name", name.as_ref()); data.insert_value("name", name.as_ref());
data.insert_value("file", file_name.as_ref());
data.insert_value("rows", format!("{}", rows)); data.insert_value("rows", format!("{}", rows));
data.insert_value("columns", format!("{}", cols)); data.insert_value("columns", format!("{}", cols));
match value.tag.anchor {
Some(AnchorLocation::File(name)) => data.insert_value("location", name),
Some(AnchorLocation::Url(name)) => data.insert_value("location", name),
Some(AnchorLocation::Source(text)) => {
let loc_name = text.slice(0..text.end);
data.insert_value("location", loc_name.text)
}
None => data.insert_value("location", "stream"),
}
Some(data.into_value()) Some(data.into_value())
} else { } else {
None None

View File

@ -1,6 +1,6 @@
use std::path::PathBuf; use std::path::PathBuf;
use crate::prelude::*; use crate::{commands::dataframe::utils::parse_polars_error, prelude::*};
use nu_engine::{EvaluatedCommandArgs, WholeStreamCommand}; use nu_engine::{EvaluatedCommandArgs, WholeStreamCommand};
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ use nu_protocol::{
@ -28,7 +28,7 @@ impl WholeStreamCommand for DataFrame {
.required( .required(
"file", "file",
SyntaxShape::FilePath, SyntaxShape::FilePath,
"the file path to load values from", "file path to load values from",
) )
.named( .named(
"delimiter", "delimiter",
@ -62,7 +62,7 @@ impl WholeStreamCommand for DataFrame {
} }
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> { fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
create_from_file(args) command(args)
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -74,7 +74,7 @@ impl WholeStreamCommand for DataFrame {
} }
} }
fn create_from_file(args: CommandArgs) -> Result<OutputStream, ShellError> { fn command(args: CommandArgs) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone(); let tag = args.call_info.name_tag.clone();
let args = args.evaluate_once()?; let args = args.evaluate_once()?;
let file: Tagged<PathBuf> = args.req(0)?; let file: Tagged<PathBuf> = args.req(0)?;
@ -101,21 +101,24 @@ fn create_from_file(args: CommandArgs) -> Result<OutputStream, ShellError> {
Ok(name) => name, Ok(name) => name,
Err(e) => { Err(e) => {
return Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Error with file name", "File Name Error",
format!("{:?}", e), format!("{:?}", e),
&file.tag, &file.tag,
)) ))
} }
}; };
let init = InputStream::one( let df_tag = Tag {
UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new_with_name( anchor: Some(AnchorLocation::File(file_name.to_string())),
df, file_name, span: tag.span,
))) };
.into_value(&tag),
);
Ok(init.to_output_stream()) let tagged_value = Value {
value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new(df))),
tag: df_tag,
};
Ok(InputStream::one(tagged_value).to_output_stream())
} }
fn from_parquet(args: EvaluatedCommandArgs) -> Result<polars::prelude::DataFrame, ShellError> { fn from_parquet(args: EvaluatedCommandArgs) -> Result<polars::prelude::DataFrame, ShellError> {
@ -128,7 +131,7 @@ fn from_parquet(args: EvaluatedCommandArgs) -> Result<polars::prelude::DataFrame
reader reader
.finish() .finish()
.map_err(|e| ShellError::labeled_error("Error with file", format!("{:?}", e), &file.tag)) .map_err(|e| parse_polars_error::<&str>(&e, &file.tag.span, None))
} }
fn from_json(args: EvaluatedCommandArgs) -> Result<polars::prelude::DataFrame, ShellError> { fn from_json(args: EvaluatedCommandArgs) -> Result<polars::prelude::DataFrame, ShellError> {
@ -141,7 +144,7 @@ fn from_json(args: EvaluatedCommandArgs) -> Result<polars::prelude::DataFrame, S
reader reader
.finish() .finish()
.map_err(|e| ShellError::labeled_error("Error with file", format!("{:?}", e), &file.tag)) .map_err(|e| parse_polars_error::<&str>(&e, &file.tag.span, None))
} }
fn from_csv(args: EvaluatedCommandArgs) -> Result<polars::prelude::DataFrame, ShellError> { fn from_csv(args: EvaluatedCommandArgs) -> Result<polars::prelude::DataFrame, ShellError> {
@ -152,9 +155,8 @@ fn from_csv(args: EvaluatedCommandArgs) -> Result<polars::prelude::DataFrame, Sh
let skip_rows: Option<Tagged<usize>> = args.get_flag("skip_rows")?; let skip_rows: Option<Tagged<usize>> = args.get_flag("skip_rows")?;
let columns: Option<Vec<Value>> = args.get_flag("columns")?; let columns: Option<Vec<Value>> = args.get_flag("columns")?;
let csv_reader = CsvReader::from_path(&file.item).map_err(|e| { let csv_reader = CsvReader::from_path(&file.item)
ShellError::labeled_error("Unable to parse file", format!("{}", e), &file.tag) .map_err(|e| parse_polars_error::<&str>(&e, &file.tag.span, None))?;
})?;
let csv_reader = match delimiter { let csv_reader = match delimiter {
None => csv_reader, None => csv_reader,
@ -212,10 +214,6 @@ fn from_csv(args: EvaluatedCommandArgs) -> Result<polars::prelude::DataFrame, Sh
match csv_reader.finish() { match csv_reader.finish() {
Ok(csv_reader) => Ok(csv_reader), Ok(csv_reader) => Ok(csv_reader),
Err(e) => Err(ShellError::labeled_error( Err(e) => Err(parse_polars_error::<&str>(&e, &file.tag.span, None)),
"Error while parsing dataframe",
format!("{}", e),
&file.tag,
)),
} }
} }

View File

@ -0,0 +1,131 @@
use crate::{commands::dataframe::utils::parse_polars_error, prelude::*};
use nu_engine::WholeStreamCommand;
use nu_errors::ShellError;
use nu_protocol::{
dataframe::{NuDataFrame, PolarsData},
Signature, SyntaxShape, UntaggedValue, Value,
};
use super::utils::convert_columns;
pub struct DataFrame;
impl WholeStreamCommand for DataFrame {
fn name(&self) -> &str {
"pls melt"
}
fn usage(&self) -> &str {
"Unpivot a DataFrame from wide to long format"
}
fn signature(&self) -> Signature {
Signature::build("pls join")
.required("id_columns", SyntaxShape::Table, "Id columns for melting")
.required(
"value_columns",
SyntaxShape::Table,
"columns used as value columns",
)
}
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
command(args)
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "melt dataframe",
example: "[[a b]; [a 2] [b 4] [a 6]] | pls convert | pls melt [a] [b]",
result: None,
}]
}
}
fn command(args: CommandArgs) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone();
let mut args = args.evaluate_once()?;
let id_col: Vec<Value> = args.req(0)?;
let val_col: Vec<Value> = args.req(1)?;
let (id_col_string, id_col_span) = convert_columns(&id_col, &tag)?;
let (val_col_string, val_col_span) = convert_columns(&val_col, &tag)?;
match args.input.next() {
None => Err(ShellError::labeled_error(
"No input received",
"missing dataframe input from stream",
&tag,
)),
Some(value) => {
if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value {
check_column_datatypes(df.as_ref(), &id_col_string, &id_col_span)?;
check_column_datatypes(df.as_ref(), &val_col_string, &val_col_span)?;
let res = df
.as_ref()
.melt(&id_col_string, &val_col_string)
.map_err(|e| parse_polars_error::<&str>(&e, &tag.span, None))?;
let value = Value {
value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new(
res,
))),
tag: tag.clone(),
};
Ok(OutputStream::one(value))
} else {
Err(ShellError::labeled_error(
"No dataframe in stream",
"no dataframe found in input stream",
&tag,
))
}
}
}
}
fn check_column_datatypes<T: AsRef<str>>(
df: &polars::prelude::DataFrame,
cols: &[T],
col_span: &Span,
) -> Result<(), ShellError> {
if cols.len() == 0 {
return Err(ShellError::labeled_error(
"Merge error",
"empty column list",
col_span,
));
}
// Checking if they are same type
if cols.len() > 1 {
for w in cols.windows(2) {
let l_series = df
.column(w[0].as_ref())
.map_err(|e| parse_polars_error::<&str>(&e, &col_span, None))?;
let r_series = df
.column(w[1].as_ref())
.map_err(|e| parse_polars_error::<&str>(&e, &col_span, None))?;
if l_series.dtype() != r_series.dtype() {
return Err(ShellError::labeled_error_with_secondary(
"Merge error",
"found different column types in list",
col_span,
format!(
"datatypes {} and {} are incompatible",
l_series.dtype(),
r_series.dtype()
),
col_span,
));
}
}
}
Ok(())
}

View File

@ -3,24 +3,42 @@ pub mod command;
pub mod convert; pub mod convert;
pub mod drop; pub mod drop;
pub mod dtypes; pub mod dtypes;
pub mod dummies;
pub mod groupby; pub mod groupby;
pub mod head;
pub mod join; pub mod join;
pub mod list; pub mod list;
pub mod load; pub mod load;
pub mod melt;
pub mod pivot;
pub mod sample; pub mod sample;
pub mod select; pub mod select;
pub mod show; pub mod show;
pub mod slice;
pub mod tail;
pub mod to_csv;
pub mod to_parquet;
pub(crate) mod utils; pub(crate) mod utils;
pub mod where_;
pub use aggregate::DataFrame as DataFrameAggregate; pub use aggregate::DataFrame as DataFrameAggregate;
pub use command::Command as DataFrame; pub use command::Command as DataFrame;
pub use convert::DataFrame as DataFrameConvert; pub use convert::DataFrame as DataFrameConvert;
pub use drop::DataFrame as DataFrameDrop; pub use drop::DataFrame as DataFrameDrop;
pub use dtypes::DataFrame as DataFrameDTypes; pub use dtypes::DataFrame as DataFrameDTypes;
pub use dummies::DataFrame as DataFrameDummies;
pub use groupby::DataFrame as DataFrameGroupBy; pub use groupby::DataFrame as DataFrameGroupBy;
pub use head::DataFrame as DataFrameHead;
pub use join::DataFrame as DataFrameJoin; pub use join::DataFrame as DataFrameJoin;
pub use list::DataFrame as DataFrameList; pub use list::DataFrame as DataFrameList;
pub use load::DataFrame as DataFrameLoad; pub use load::DataFrame as DataFrameLoad;
pub use melt::DataFrame as DataFrameMelt;
pub use pivot::DataFrame as DataFramePivot;
pub use sample::DataFrame as DataFrameSample; pub use sample::DataFrame as DataFrameSample;
pub use select::DataFrame as DataFrameSelect; pub use select::DataFrame as DataFrameSelect;
pub use show::DataFrame as DataFrameShow; pub use show::DataFrame as DataFrameShow;
pub use slice::DataFrame as DataFrameSlice;
pub use tail::DataFrame as DataFrameTail;
pub use to_csv::DataFrame as DataFrameToCsv;
pub use to_parquet::DataFrame as DataFrameToParquet;
pub use where_::DataFrame as DataFrameWhere;

View File

@ -0,0 +1,193 @@
use crate::{commands::dataframe::utils::parse_polars_error, prelude::*};
use nu_engine::WholeStreamCommand;
use nu_errors::ShellError;
use nu_protocol::{
dataframe::{NuDataFrame, PolarsData},
Signature, SyntaxShape, UntaggedValue, Value,
};
use nu_source::Tagged;
use polars::prelude::DataType;
enum Operation {
First,
Sum,
Min,
Max,
Mean,
Median,
}
impl Operation {
fn from_tagged(name: &Tagged<String>) -> Result<Operation, ShellError> {
match name.item.as_ref() {
"first" => Ok(Operation::First),
"sum" => Ok(Operation::Sum),
"min" => Ok(Operation::Min),
"max" => Ok(Operation::Max),
"mean" => Ok(Operation::Mean),
"median" => Ok(Operation::Median),
_ => Err(ShellError::labeled_error_with_secondary(
"Operation not fount",
"Operation does not exist for pivot",
&name.tag,
"Perhaps you want: first, sum, min, max, mean, median",
&name.tag,
)),
}
}
}
pub struct DataFrame;
impl WholeStreamCommand for DataFrame {
fn name(&self) -> &str {
"pls pivot"
}
fn usage(&self) -> &str {
"Performs a pivot operation on a groupby object"
}
fn signature(&self) -> Signature {
Signature::build("pls pivot")
.required(
"pivot column",
SyntaxShape::String,
"pivot column to perform pivot",
)
.required(
"value column",
SyntaxShape::String,
"value column to perform pivot",
)
.required("operation", SyntaxShape::String, "aggregate operation")
}
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
command(args)
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Pivot a dataframe on b and aggregation on col c",
example:
"[[a b c]; [one x 1] [two y 2]] | pls convert | pls groupby [a] | pls pivot b c sum",
result: None,
}]
}
}
fn command(args: CommandArgs) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone();
let mut args = args.evaluate_once()?;
// Extracting the pivot col from arguments
let pivot_col: Tagged<String> = args.req(0)?;
// Extracting the value col from arguments
let value_col: Tagged<String> = args.req(1)?;
let operation: Tagged<String> = args.req(2)?;
let op = Operation::from_tagged(&operation)?;
// The operation is only done in one groupby. Only one input is
// expected from the InputStream
match args.input.next() {
None => Err(ShellError::labeled_error(
"No input received",
"missing groupby input from stream",
&tag,
)),
Some(value) => {
if let UntaggedValue::DataFrame(PolarsData::GroupBy(nu_groupby)) = value.value {
let df_ref = nu_groupby.as_ref();
check_pivot_column(df_ref, &pivot_col)?;
check_value_column(df_ref, &value_col)?;
let mut groupby = nu_groupby.to_groupby()?;
let pivot = groupby.pivot(pivot_col.item.as_ref(), value_col.item.as_ref());
let res = match op {
Operation::Mean => pivot.mean(),
Operation::Sum => pivot.sum(),
Operation::Min => pivot.min(),
Operation::Max => pivot.max(),
Operation::First => pivot.first(),
Operation::Median => pivot.median(),
}
.map_err(|e| parse_polars_error::<&str>(&e, &tag.span, None))?;
let final_df = Value {
tag,
value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new(
res,
))),
};
Ok(OutputStream::one(final_df))
} else {
Err(ShellError::labeled_error(
"No groupby in stream",
"no groupby found in input stream",
&tag,
))
}
}
}
}
fn check_pivot_column(
df: &polars::prelude::DataFrame,
col: &Tagged<String>,
) -> Result<(), ShellError> {
let series = df
.column(col.item.as_ref())
.map_err(|e| parse_polars_error::<&str>(&e, &col.tag.span, None))?;
match series.dtype() {
DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Utf8 => Ok(()),
_ => Err(ShellError::labeled_error(
"Pivot error",
format!("Unsupported datatype {}", series.dtype()),
col.tag.span,
)),
}
}
fn check_value_column(
df: &polars::prelude::DataFrame,
col: &Tagged<String>,
) -> Result<(), ShellError> {
let series = df
.column(col.item.as_ref())
.map_err(|e| parse_polars_error::<&str>(&e, &col.tag.span, None))?;
match series.dtype() {
DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Float32
| DataType::Float64 => Ok(()),
_ => Err(ShellError::labeled_error(
"Pivot error",
format!("Unsupported datatype {}", series.dtype()),
col.tag.span,
)),
}
}

View File

@ -1,4 +1,4 @@
use crate::prelude::*; use crate::{commands::dataframe::utils::parse_polars_error, prelude::*};
use nu_engine::WholeStreamCommand; use nu_engine::WholeStreamCommand;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ use nu_protocol::{
@ -37,26 +37,26 @@ impl WholeStreamCommand for DataFrame {
} }
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> { fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
sample(args) command(args)
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
vec![ vec![
Example { Example {
description: "Sample rows from dataframe", description: "Sample rows from dataframe",
example: "echo [[a b]; [1 2] [3 4]] | pls load | pls sample -r 1", example: "[[a b]; [1 2] [3 4]] | pls load | pls sample -r 1",
result: None, result: None,
}, },
Example { Example {
description: "Shows sample row using fraction and replace", description: "Shows sample row using fraction and replace",
example: "echo [[a b]; [1 2] [3 4] [5 6]] | pls load | pls sample -f 0.5 -e", example: "[[a b]; [1 2] [3 4] [5 6]] | pls load | pls sample -f 0.5 -e",
result: None, result: None,
}, },
] ]
} }
} }
fn sample(args: CommandArgs) -> Result<OutputStream, ShellError> { fn command(args: CommandArgs) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone(); let tag = args.call_info.name_tag.clone();
let mut args = args.evaluate_once()?; let mut args = args.evaluate_once()?;
@ -71,18 +71,16 @@ fn sample(args: CommandArgs) -> Result<OutputStream, ShellError> {
&tag, &tag,
)), )),
Some(value) => { Some(value) => {
if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame { if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value {
dataframe: Some(ref df),
..
})) = value.value
{
let res = match (rows, fraction) { let res = match (rows, fraction) {
(Some(rows), None) => df.sample_n(rows.item, replace).map_err(|e| { (Some(rows), None) => df
ShellError::labeled_error("Polars error", format!("{}", e), &rows.tag) .as_ref()
}), .sample_n(rows.item, replace)
(None, Some(frac)) => df.sample_frac(frac.item, replace).map_err(|e| { .map_err(|e| parse_polars_error::<&str>(&e, &rows.tag.span, None)),
ShellError::labeled_error("Polars error", format!("{}", e), &frac.tag) (None, Some(frac)) => df
}), .as_ref()
.sample_frac(frac.item, replace)
.map_err(|e| parse_polars_error::<&str>(&e, &frac.tag.span, None)),
(Some(_), Some(_)) => Err(ShellError::labeled_error( (Some(_), Some(_)) => Err(ShellError::labeled_error(
"Incompatible flags", "Incompatible flags",
"Only one selection criterion allowed", "Only one selection criterion allowed",

View File

@ -6,7 +6,7 @@ use nu_protocol::{
Signature, SyntaxShape, UntaggedValue, Value, Signature, SyntaxShape, UntaggedValue, Value,
}; };
use super::utils::convert_columns; use super::utils::{convert_columns, parse_polars_error};
pub struct DataFrame; pub struct DataFrame;
@ -28,19 +28,19 @@ impl WholeStreamCommand for DataFrame {
} }
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> { fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
select(args) command(args)
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
vec![Example { vec![Example {
description: "Create new dataframe with column a", description: "Create new dataframe with column a",
example: "echo [[a b]; [1 2] [3 4]] | pls convert | pls select [a]", example: "[[a b]; [1 2] [3 4]] | pls convert | pls select [a]",
result: None, result: None,
}] }]
} }
} }
fn select(args: CommandArgs) -> Result<OutputStream, ShellError> { fn command(args: CommandArgs) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone(); let tag = args.call_info.name_tag.clone();
let mut args = args.evaluate_once()?; let mut args = args.evaluate_once()?;
@ -55,14 +55,11 @@ fn select(args: CommandArgs) -> Result<OutputStream, ShellError> {
&tag, &tag,
)), )),
Some(value) => { Some(value) => {
if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame { if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value {
dataframe: Some(ref df), let res = df
.. .as_ref()
})) = value.value .select(&col_string)
{ .map_err(|e| parse_polars_error::<&str>(&e, &col_span, None))?;
let res = df.select(&col_string).map_err(|e| {
ShellError::labeled_error("Drop error", format!("{}", e), &col_span)
})?;
let value = Value { let value = Value {
value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new( value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new(

View File

@ -13,7 +13,7 @@ impl WholeStreamCommand for DataFrame {
} }
fn usage(&self) -> &str { fn usage(&self) -> &str {
"Show dataframe" "Converts a section of the dataframe to a Table or List value"
} }
fn signature(&self) -> Signature { fn signature(&self) -> Signature {
@ -28,30 +28,30 @@ impl WholeStreamCommand for DataFrame {
} }
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> { fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
show(args) command(args)
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
vec![ vec![
Example { Example {
description: "Shows head rows from dataframe", description: "Shows head rows from dataframe",
example: "echo [[a b]; [1 2] [3 4]] | pls convert | pls show", example: "[[a b]; [1 2] [3 4]] | pls convert | pls show",
result: None, result: None,
}, },
Example { Example {
description: "Shows tail rows from dataframe", description: "Shows tail rows from dataframe",
example: "echo [[a b]; [1 2] [3 4] [5 6]] | pls convert | pls show -t -n 1", example: "[[a b]; [1 2] [3 4] [5 6]] | pls convert | pls show -t -n 1",
result: None, result: None,
}, },
] ]
} }
} }
fn show(args: CommandArgs) -> Result<OutputStream, ShellError> { fn command(args: CommandArgs) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone(); let tag = args.call_info.name_tag.clone();
let mut args = args.evaluate_once()?; let mut args = args.evaluate_once()?;
let rows: Option<Tagged<usize>> = args.get_flag("rows")?; let rows: Option<Tagged<usize>> = args.get_flag("n_rows")?;
let tail: bool = args.has_flag("tail"); let tail: bool = args.has_flag("tail");
match args.input.next() { match args.input.next() {

View File

@ -0,0 +1,74 @@
use crate::prelude::*;
use nu_engine::WholeStreamCommand;
use nu_errors::ShellError;
use nu_protocol::{
dataframe::{NuDataFrame, PolarsData},
Signature, SyntaxShape, UntaggedValue, Value,
};
use nu_source::Tagged;
pub struct DataFrame;
impl WholeStreamCommand for DataFrame {
fn name(&self) -> &str {
"pls slice"
}
fn usage(&self) -> &str {
"Creates new dataframe from a slice of rows"
}
fn signature(&self) -> Signature {
Signature::build("pls select")
.required("offset", SyntaxShape::Number, "start of slice")
.required("size", SyntaxShape::Number, "size of slice")
}
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
command(args)
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Create new dataframe from a slice of the rows",
example: "[[a b]; [1 2] [3 4]] | pls convert | pls slice 0 1",
result: None,
}]
}
}
fn command(args: CommandArgs) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone();
let mut args = args.evaluate_once()?;
let offset: Tagged<usize> = args.req(0)?;
let size: Tagged<usize> = args.req(1)?;
match args.input.next() {
None => Err(ShellError::labeled_error(
"No input received",
"missing dataframe input from stream",
&tag,
)),
Some(value) => {
if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value {
let res = df.as_ref().slice(offset.item as i64, size.item);
let value = Value {
value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new(
res,
))),
tag: tag.clone(),
};
Ok(OutputStream::one(value))
} else {
Err(ShellError::labeled_error(
"No dataframe in stream",
"no dataframe found in input stream",
&tag,
))
}
}
}
}

View File

@ -0,0 +1,79 @@
use crate::prelude::*;
use nu_engine::WholeStreamCommand;
use nu_errors::ShellError;
use nu_protocol::{
dataframe::{NuDataFrame, PolarsData},
Signature, SyntaxShape, UntaggedValue, Value,
};
use nu_source::Tagged;
pub struct DataFrame;
impl WholeStreamCommand for DataFrame {
fn name(&self) -> &str {
"pls tail"
}
fn usage(&self) -> &str {
"Creates new dataframe with tail rows"
}
fn signature(&self) -> Signature {
Signature::build("pls select").optional(
"n_rows",
SyntaxShape::Number,
"Number of rows for tail",
)
}
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
command(args)
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Create new dataframe with tail rows",
example: "[[a b]; [1 2] [3 4]] | pls convert | pls tail",
result: None,
}]
}
}
fn command(args: CommandArgs) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone();
let mut args = args.evaluate_once()?;
let rows: Option<Tagged<usize>> = args.opt(0)?;
let rows = match rows {
Some(val) => val.item,
None => 5,
};
match args.input.next() {
None => Err(ShellError::labeled_error(
"No input received",
"missing dataframe input from stream",
&tag,
)),
Some(value) => {
if let UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) = value.value {
let res = df.as_ref().tail(Some(rows));
let value = Value {
value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new(
res,
))),
tag: tag.clone(),
};
Ok(OutputStream::one(value))
} else {
Err(ShellError::labeled_error(
"No dataframe in stream",
"no dataframe found in input stream",
&tag,
))
}
}
}
}

View File

@ -0,0 +1,128 @@
use std::fs::File;
use std::path::PathBuf;
use crate::prelude::*;
use nu_engine::WholeStreamCommand;
use nu_errors::ShellError;
use nu_protocol::Primitive;
use nu_protocol::Value;
use nu_protocol::{dataframe::PolarsData, Signature, SyntaxShape, UntaggedValue};
use polars::prelude::{CsvWriter, SerWriter};
use nu_source::Tagged;
use super::utils::parse_polars_error;
pub struct DataFrame;
impl WholeStreamCommand for DataFrame {
fn name(&self) -> &str {
"pls to_csv"
}
fn usage(&self) -> &str {
"Saves dataframe to csv file"
}
fn signature(&self) -> Signature {
Signature::build("pls to_csv")
.required("file", SyntaxShape::FilePath, "file path to save dataframe")
.named(
"delimiter",
SyntaxShape::String,
"file delimiter character",
Some('d'),
)
.switch("no_header", "Indicates if file doesn't have header", None)
}
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
command(args)
}
fn examples(&self) -> Vec<Example> {
vec![
Example {
description: "Saves dataframe to csv file",
example: "[[a b]; [1 2] [3 4]] | pls convert | pls to_csv test.csv",
result: None,
},
Example {
description: "Saves dataframe to csv file using other delimiter",
example: "[[a b]; [1 2] [3 4]] | pls convert | pls to_csv test.csv -d '|'",
result: None,
},
]
}
}
fn command(args: CommandArgs) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone();
let mut args = args.evaluate_once()?;
let file_name: Tagged<PathBuf> = args.req(0)?;
let delimiter: Option<Tagged<String>> = args.get_flag("delimiter")?;
let no_header: bool = args.has_flag("no_header");
let mut df = args
.input
.next()
.and_then(|value| match value.value {
UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) => Some(df),
_ => None,
})
.ok_or(ShellError::labeled_error(
"No input received",
"missing dataframe input from stream",
&tag.span,
))?;
let mut file = File::create(&file_name.item).map_err(|e| {
ShellError::labeled_error(
"Error with file name",
format!("{}", e),
&file_name.tag.span,
)
})?;
let writer = CsvWriter::new(&mut file);
let writer = if no_header {
writer.has_headers(false)
} else {
writer.has_headers(true)
};
let writer = match delimiter {
None => writer,
Some(d) => {
if d.item.len() != 1 {
return Err(ShellError::labeled_error(
"Incorrect delimiter",
"Delimiter has to be one char",
&d.tag,
));
} else {
let delimiter = match d.item.chars().nth(0) {
Some(d) => d as u8,
None => unreachable!(),
};
writer.with_delimiter(delimiter)
}
}
};
writer
.finish(df.as_mut())
.map_err(|e| parse_polars_error::<&str>(&e, &file_name.tag.span, None))?;
let tagged_value = Value {
value: UntaggedValue::Primitive(Primitive::String(format!(
"saved {}",
&file_name.item.to_str().expect("csv file")
))),
tag: Tag::unknown(),
};
Ok(InputStream::one(tagged_value).to_output_stream())
}

View File

@ -0,0 +1,85 @@
use std::fs::File;
use std::path::PathBuf;
use crate::prelude::*;
use nu_engine::WholeStreamCommand;
use nu_errors::ShellError;
use nu_protocol::{dataframe::PolarsData, Primitive, Signature, SyntaxShape, UntaggedValue, Value};
use polars::prelude::ParquetWriter;
use nu_source::Tagged;
use super::utils::parse_polars_error;
pub struct DataFrame;
impl WholeStreamCommand for DataFrame {
fn name(&self) -> &str {
"pls to_parquet"
}
fn usage(&self) -> &str {
"Saves dataframe to parquet file"
}
fn signature(&self) -> Signature {
Signature::build("pls to_parquet").required(
"file",
SyntaxShape::FilePath,
"file path to save dataframe",
)
}
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
command(args)
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Saves dataframe to parquet file",
example: "[[a b]; [1 2] [3 4]] | pls convert | pls to_parquet test.parquet",
result: None,
}]
}
}
fn command(args: CommandArgs) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone();
let mut args = args.evaluate_once()?;
let file_name: Tagged<PathBuf> = args.req(0)?;
let mut df = args
.input
.next()
.and_then(|value| match value.value {
UntaggedValue::DataFrame(PolarsData::EagerDataFrame(df)) => Some(df),
_ => None,
})
.ok_or(ShellError::labeled_error(
"No input received",
"missing dataframe input from stream",
&tag.span,
))?;
let file = File::create(&file_name.item).map_err(|e| {
ShellError::labeled_error(
"Error with file name",
format!("{}", e),
&file_name.tag.span,
)
})?;
ParquetWriter::new(file)
.finish(df.as_mut())
.map_err(|e| parse_polars_error::<&str>(&e, &file_name.tag.span, None))?;
let tagged_value = Value {
value: UntaggedValue::Primitive(Primitive::String(format!(
"saved {}",
&file_name.item.to_str().expect("parquet file")
))),
tag: Tag::unknown(),
};
Ok(InputStream::one(tagged_value).to_output_stream())
}

View File

@ -1,6 +1,7 @@
use crate::prelude::*; use crate::prelude::*;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{Primitive, UntaggedValue, Value}; use nu_protocol::{Primitive, UntaggedValue, Value};
use polars::prelude::PolarsError;
// Converts a Vec<Value> to a Vec<String> with a Span marking the whole // Converts a Vec<Value> to a Vec<String> with a Span marking the whole
// location of the columns for error referencing // location of the columns for error referencing
@ -40,3 +41,38 @@ pub(crate) fn convert_columns<'columns>(
Ok((res, col_span)) Ok((res, col_span))
} }
pub(crate) fn parse_polars_error<T: AsRef<str>>(
e: &PolarsError,
span: &Span,
secondary: Option<T>,
) -> ShellError {
let (msg, label) = match e {
PolarsError::PolarsArrowError(_) => ("PolarsArrow Error", format!("{}", e)),
PolarsError::ArrowError(_) => ("Arrow Error", format!("{}", e)),
PolarsError::InvalidOperation(_) => ("Invalid Operation", format!("{}", e)),
PolarsError::DataTypeMisMatch(_) => ("Data Type Mismatch", format!("{}", e)),
PolarsError::NotFound(_) => ("Not Found", format!("{}", e)),
PolarsError::ShapeMisMatch(_) => ("Shape Mismatch", format!("{}", e)),
PolarsError::Other(_) => ("Other", format!("{}", e)),
PolarsError::OutOfBounds(_) => ("Out Of Bounds", format!("{}", e)),
PolarsError::NoSlice => ("No Slice", format!("{}", e)),
PolarsError::NoData(_) => ("No Data", format!("{}", e)),
PolarsError::ValueError(_) => ("Value Error", format!("{}", e)),
PolarsError::MemoryNotAligned => ("Memory Not Aligned", format!("{}", e)),
PolarsError::ParquetError(_) => ("Parquet Error", format!("{}", e)),
PolarsError::RandError(_) => ("Rand Error", format!("{}", e)),
PolarsError::HasNullValues(_) => ("Has Null Values", format!("{}", e)),
PolarsError::UnknownSchema(_) => ("Unknown Schema", format!("{}", e)),
PolarsError::Various(_) => ("Various", format!("{}", e)),
PolarsError::Io(_) => ("Io Error", format!("{}", e)),
PolarsError::Regex(_) => ("Regex Error", format!("{}", e)),
PolarsError::Duplicate(_) => ("Duplicate Error", format!("{}", e)),
PolarsError::ImplementationError => ("Implementation Error", format!("{}", e)),
};
match secondary {
None => ShellError::labeled_error(msg, label, span),
Some(s) => ShellError::labeled_error_with_secondary(msg, label, span, s.as_ref(), span),
}
}

View File

@ -0,0 +1,207 @@
use crate::prelude::*;
use nu_engine::{evaluate_baseline_expr, EvaluatedCommandArgs, WholeStreamCommand};
use nu_errors::ShellError;
use nu_protocol::{
dataframe::{NuDataFrame, PolarsData},
hir::{CapturedBlock, ClassifiedCommand, Expression, Literal, Operator, SpannedExpression},
Primitive, Signature, SyntaxShape, UnspannedPathMember, UntaggedValue, Value,
};
use super::utils::parse_polars_error;
use polars::prelude::{ChunkCompare, Series};
pub struct DataFrame;
impl WholeStreamCommand for DataFrame {
fn name(&self) -> &str {
"pls where"
}
fn signature(&self) -> Signature {
Signature::build("pls where").required(
"condition",
SyntaxShape::RowCondition,
"the condition that must match",
)
}
fn usage(&self) -> &str {
"Filter dataframe to match the condition"
}
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
command(args)
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Filter dataframe based on column a",
example: "[[a b]; [1 2] [3 4]] | pls convert | pls where a == 1",
result: None,
}]
}
}
fn command(args: CommandArgs) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone();
let args = args.evaluate_once()?;
let block: CapturedBlock = args.req(0)?;
let expression = block
.block
.block
.get(0)
.and_then(|group| {
group
.pipelines
.get(0)
.and_then(|v| v.list.get(0))
.and_then(|expr| match &expr {
ClassifiedCommand::Expr(expr) => match &expr.as_ref().expr {
Expression::Binary(expr) => Some(expr),
_ => None,
},
_ => None,
})
})
.ok_or(ShellError::labeled_error(
"Expected a condition",
"expected a condition",
&tag.span,
))?;
let lhs = match &expression.left.expr {
Expression::FullColumnPath(p) => p.as_ref().tail.get(0),
_ => None,
}
.ok_or(ShellError::labeled_error(
"No column name",
"Not a column name found in left hand side of comparison",
&expression.left.span,
))?;
let (col_name, col_name_span) = match &lhs.unspanned {
UnspannedPathMember::String(name) => Ok((name, &lhs.span)),
_ => Err(ShellError::labeled_error(
"No column name",
"Not a string as column name",
&lhs.span,
)),
}?;
let rhs = evaluate_baseline_expr(&expression.right, &args.args.context)?;
let right_condition = match &rhs.value {
UntaggedValue::Primitive(primitive) => Ok(primitive),
_ => Err(ShellError::labeled_error(
"Incorrect argument",
"Expected primitive values",
&rhs.tag.span,
)),
}?;
filter_dataframe(
args,
&col_name,
&col_name_span,
&right_condition,
&expression.op,
)
}
macro_rules! comparison_arm {
($comparison:expr, $col:expr, $condition:expr, $span:expr) => {
match $condition {
Primitive::Int(val) => Ok($comparison($col, *val)),
Primitive::BigInt(val) => Ok($comparison(
$col,
val.to_i64()
.expect("Internal error: protocol did not use compatible decimal"),
)),
Primitive::Decimal(val) => Ok($comparison(
$col,
val.to_f64()
.expect("Internal error: protocol did not use compatible decimal"),
)),
Primitive::String(val) => {
let temp: &str = val.as_ref();
Ok($comparison($col, temp))
}
_ => Err(ShellError::labeled_error(
"Invalid datatype",
format!(
"this operator cannot be used with the selected '{}' datatype",
$col.dtype()
),
&$span,
)),
}
};
}
// With the information extracted from the block we can filter the dataframe using
// polars operations
fn filter_dataframe(
mut args: EvaluatedCommandArgs,
col_name: &str,
col_name_span: &Span,
right_condition: &Primitive,
operator: &SpannedExpression,
) -> Result<OutputStream, ShellError> {
let df = args
.input
.next()
.and_then(|value| match value.value {
UntaggedValue::DataFrame(PolarsData::EagerDataFrame(nu)) => Some(nu),
_ => None,
})
.ok_or(ShellError::labeled_error(
"Incorrect stream input",
"Expected dataframe in stream",
&args.call_info.name_tag.span,
))?;
let col = df
.as_ref()
.column(col_name)
.map_err(|e| parse_polars_error::<&str>(&e, &col_name_span, None))?;
let op = match &operator.expr {
Expression::Literal(Literal::Operator(op)) => Ok(op),
_ => Err(ShellError::labeled_error(
"Incorrect argument",
"Expected operator",
&operator.span,
)),
}?;
let mask = match op {
Operator::Equal => comparison_arm!(Series::eq, col, right_condition, operator.span),
Operator::NotEqual => comparison_arm!(Series::neq, col, right_condition, operator.span),
Operator::LessThan => comparison_arm!(Series::lt, col, right_condition, operator.span),
Operator::LessThanOrEqual => {
comparison_arm!(Series::lt_eq, col, right_condition, operator.span)
}
Operator::GreaterThan => comparison_arm!(Series::gt, col, right_condition, operator.span),
Operator::GreaterThanOrEqual => {
comparison_arm!(Series::gt_eq, col, right_condition, operator.span)
}
_ => Err(ShellError::labeled_error(
"Incorrect operator",
"Not implemented operator for dataframes filter",
&operator.span,
)),
}?;
let res = df
.as_ref()
.filter(&mask)
.map_err(|e| parse_polars_error::<&str>(&e, &args.call_info.name_tag.span, None))?;
let value = Value {
value: UntaggedValue::DataFrame(PolarsData::EagerDataFrame(NuDataFrame::new(res))),
tag: args.call_info.name_tag.clone(),
};
Ok(OutputStream::one(value))
}

View File

@ -278,6 +278,24 @@ pub fn create_default_context(interactive: bool) -> Result<EvaluationContext, Bo
whole_stream_command(DataFrameSelect), whole_stream_command(DataFrameSelect),
#[cfg(feature = "dataframe")] #[cfg(feature = "dataframe")]
whole_stream_command(DataFrameDTypes), whole_stream_command(DataFrameDTypes),
#[cfg(feature = "dataframe")]
whole_stream_command(DataFrameDummies),
#[cfg(feature = "dataframe")]
whole_stream_command(DataFrameHead),
#[cfg(feature = "dataframe")]
whole_stream_command(DataFrameTail),
#[cfg(feature = "dataframe")]
whole_stream_command(DataFrameSlice),
#[cfg(feature = "dataframe")]
whole_stream_command(DataFrameMelt),
#[cfg(feature = "dataframe")]
whole_stream_command(DataFramePivot),
#[cfg(feature = "dataframe")]
whole_stream_command(DataFrameWhere),
#[cfg(feature = "dataframe")]
whole_stream_command(DataFrameToParquet),
#[cfg(feature = "dataframe")]
whole_stream_command(DataFrameToCsv),
]); ]);
#[cfg(feature = "clipboard-cli")] #[cfg(feature = "clipboard-cli")]

View File

@ -24,7 +24,15 @@ num-integer = "0.1.44"
num-traits = "0.2.14" num-traits = "0.2.14"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_bytes = "0.11.5" serde_bytes = "0.11.5"
polars = {version = "0.13.4", optional = true}
# implement conversions
serde_json = "1.0"
serde_yaml = "0.8.16"
toml = "0.5.8"
[dependencies.polars]
version = "0.13.4"
optional = true
[features] [features]
dataframe = ["polars"] dataframe = ["polars"]

View File

@ -46,15 +46,11 @@ type ColumnMap = HashMap<String, ColumnValues>;
pub struct NuDataFrame { pub struct NuDataFrame {
#[serde(skip_serializing)] #[serde(skip_serializing)]
pub dataframe: Option<DataFrame>, pub dataframe: Option<DataFrame>,
pub name: String,
} }
impl Default for NuDataFrame { impl Default for NuDataFrame {
fn default() -> Self { fn default() -> Self {
NuDataFrame { NuDataFrame { dataframe: None }
dataframe: None,
name: String::from("From Stream"),
}
} }
} }
@ -62,14 +58,6 @@ impl NuDataFrame {
pub fn new(df: polars::prelude::DataFrame) -> Self { pub fn new(df: polars::prelude::DataFrame) -> Self {
NuDataFrame { NuDataFrame {
dataframe: Some(df), dataframe: Some(df),
name: String::from("dataframe"),
}
}
pub fn new_with_name(df: polars::prelude::DataFrame, name: String) -> Self {
NuDataFrame {
dataframe: Some(df),
name,
} }
} }
} }
@ -225,6 +213,24 @@ impl NuDataFrame {
} }
} }
impl AsRef<polars::prelude::DataFrame> for NuDataFrame {
fn as_ref(&self) -> &polars::prelude::DataFrame {
match &self.dataframe {
Some(df) => df,
None => unreachable!("Accessing ref to dataframe from nu_dataframe"),
}
}
}
impl AsMut<polars::prelude::DataFrame> for NuDataFrame {
fn as_mut(&mut self) -> &mut polars::prelude::DataFrame {
match &mut self.dataframe {
Some(df) => df,
None => unreachable!("Accessing mut ref to dataframe from nu_dataframe"),
}
}
}
// Adds a separator to the vector of values using the column names from the // Adds a separator to the vector of values using the column names from the
// dataframe to create the Values Row // dataframe to create the Values Row
fn add_separator(values: &mut Vec<Value>, df: &DataFrame) { fn add_separator(values: &mut Vec<Value>, df: &DataFrame) {
@ -430,7 +436,6 @@ fn from_parsed_columns(column_values: ColumnMap, tag: &Tag) -> Result<NuDataFram
match df { match df {
Ok(df) => Ok(NuDataFrame { Ok(df) => Ok(NuDataFrame {
dataframe: Some(df), dataframe: Some(df),
name: "From stream".to_string(),
}), }),
Err(e) => { Err(e) => {
return Err(ShellError::labeled_error( return Err(ShellError::labeled_error(

View File

@ -39,11 +39,6 @@ impl NuGroupBy {
pub fn print(&self) -> Result<Vec<Value>, ShellError> { pub fn print(&self) -> Result<Vec<Value>, ShellError> {
let mut values: Vec<Value> = Vec::new(); let mut values: Vec<Value> = Vec::new();
let mut data = TaggedDictBuilder::new(Tag::unknown());
data.insert_value("property", "dataframe");
data.insert_value("value", self.dataframe.name.as_ref());
values.push(data.into_value());
let mut data = TaggedDictBuilder::new(Tag::unknown()); let mut data = TaggedDictBuilder::new(Tag::unknown());
data.insert_value("property", "group by"); data.insert_value("property", "group by");
data.insert_value("value", self.by.join(", ")); data.insert_value("value", self.by.join(", "));
@ -52,3 +47,12 @@ impl NuGroupBy {
Ok(values) Ok(values)
} }
} }
impl AsRef<polars::prelude::DataFrame> for NuGroupBy {
fn as_ref(&self) -> &polars::prelude::DataFrame {
match &self.dataframe.dataframe {
Some(df) => df,
None => unreachable!("Accessing reference to dataframe from nu_groupby"),
}
}
}

View File

@ -11,9 +11,9 @@ use std::ops::Range;
/// Used to represent the value of an input file. /// Used to represent the value of an input file.
#[derive(Clone)] #[derive(Clone)]
pub struct Text { pub struct Text {
text: String, pub text: String,
start: usize, pub start: usize,
end: usize, pub end: usize,
} }
impl Text { impl Text {