Add columns to dataframe that are present in the schema but not present the Dataframe when applying schema. (#11987)

This commit is contained in:
Jack Wright 2024-02-26 15:22:33 -08:00 committed by GitHub
parent f4d9ddd3ad
commit 2a721bad52
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 68 additions and 6 deletions

View File

@ -148,6 +148,18 @@ impl Command for ToDataFrame {
.into_value(Span::test_data()), .into_value(Span::test_data()),
), ),
}, },
Example {
description: "Convert to a dataframe and provide a schema that adds a new column",
example: r#"[[a b]; [1 "foo"] [2 "bar"]] | dfr into-df -s {a: u8, b:str, c:i64} | dfr fill-null 3"#,
result: Some(NuDataFrame::try_from_series(vec![
Series::new("a", [1u8, 2]),
Series::new("b", ["foo", "bar"]),
Series::new("c", [3i64, 3]),
], Span::test_data())
.expect("simple df for test should not fail")
.into_value(Span::test_data()),
),
}
] ]
} }
@ -163,8 +175,12 @@ impl Command for ToDataFrame {
.map(|schema| NuSchema::try_from(&schema)) .map(|schema| NuSchema::try_from(&schema))
.transpose()?; .transpose()?;
NuDataFrame::try_from_iter(input.into_iter(), maybe_schema) let df = NuDataFrame::try_from_iter(input.into_iter(), maybe_schema.clone())?;
.map(|df| PipelineData::Value(NuDataFrame::into_value(df, call.head), None))
Ok(PipelineData::Value(
NuDataFrame::into_value(df, call.head),
None,
))
} }
} }

View File

@ -20,7 +20,7 @@ use crate::dataframe::lazy::aggregate::LazyAggregate;
pub use crate::dataframe::lazy::collect::LazyCollect; pub use crate::dataframe::lazy::collect::LazyCollect;
use crate::dataframe::lazy::fetch::LazyFetch; use crate::dataframe::lazy::fetch::LazyFetch;
use crate::dataframe::lazy::fill_nan::LazyFillNA; use crate::dataframe::lazy::fill_nan::LazyFillNA;
use crate::dataframe::lazy::fill_null::LazyFillNull; pub use crate::dataframe::lazy::fill_null::LazyFillNull;
use crate::dataframe::lazy::filter::LazyFilter; use crate::dataframe::lazy::filter::LazyFilter;
use crate::dataframe::lazy::groupby::ToLazyGroupBy; use crate::dataframe::lazy::groupby::ToLazyGroupBy;
use crate::dataframe::lazy::join::LazyJoin; use crate::dataframe::lazy::join::LazyJoin;

View File

@ -7,6 +7,7 @@ use nu_protocol::{
use super::eager::{SchemaDF, ToDataFrame}; use super::eager::{SchemaDF, ToDataFrame};
use super::expressions::ExprCol; use super::expressions::ExprCol;
use super::lazy::LazyFillNull;
use super::lazy::{LazyCollect, ToLazyFrame}; use super::lazy::{LazyCollect, ToLazyFrame};
use nu_cmd_lang::Let; use nu_cmd_lang::Let;
@ -37,6 +38,7 @@ pub fn build_test_engine_state(cmds: Vec<Box<dyn Command + 'static>>) -> Box<Eng
working_set.add_decl(Box::new(LazyCollect)); working_set.add_decl(Box::new(LazyCollect));
working_set.add_decl(Box::new(ExprCol)); working_set.add_decl(Box::new(ExprCol));
working_set.add_decl(Box::new(SchemaDF)); working_set.add_decl(Box::new(SchemaDF));
working_set.add_decl(Box::new(LazyFillNull));
// Adding the command that is being tested to the working set // Adding the command that is being tested to the working set
for cmd in cmds.clone() { for cmd in cmds.clone() {

View File

@ -9,9 +9,10 @@ pub use operations::Axis;
use indexmap::map::IndexMap; use indexmap::map::IndexMap;
use nu_protocol::{did_you_mean, PipelineData, Record, ShellError, Span, Value}; use nu_protocol::{did_you_mean, PipelineData, Record, ShellError, Span, Value};
use polars::prelude::{DataFrame, DataType, IntoLazy, LazyFrame, PolarsObject, Series}; use polars::prelude::{DataFrame, DataType, IntoLazy, LazyFrame, PolarsObject, Series};
use polars_plan::prelude::{lit, Expr, Null};
use polars_utils::total_ord::TotalEq; use polars_utils::total_ord::TotalEq;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{cmp::Ordering, fmt::Display, hash::Hasher}; use std::{cmp::Ordering, collections::HashSet, fmt::Display, hash::Hasher};
use super::{nu_schema::NuSchema, utils::DEFAULT_ROWS, NuLazyFrame}; use super::{nu_schema::NuSchema, utils::DEFAULT_ROWS, NuLazyFrame};
@ -172,7 +173,8 @@ impl NuDataFrame {
} }
} }
conversion::from_parsed_columns(column_values) let df = conversion::from_parsed_columns(column_values)?;
add_missing_columns(df, &maybe_schema, Span::unknown())
} }
pub fn try_from_series(columns: Vec<Series>, span: Span) -> Result<Self, ShellError> { pub fn try_from_series(columns: Vec<Series>, span: Span) -> Result<Self, ShellError> {
@ -200,7 +202,8 @@ impl NuDataFrame {
} }
} }
conversion::from_parsed_columns(column_values) let df = conversion::from_parsed_columns(column_values)?;
add_missing_columns(df, &maybe_schema, Span::unknown())
} }
pub fn fill_list_nan(list: Vec<Value>, list_span: Span, fill: Value) -> Value { pub fn fill_list_nan(list: Vec<Value>, list_span: Span, fill: Value) -> Value {
@ -510,3 +513,44 @@ impl NuDataFrame {
NuSchema::new(self.df.schema()) NuSchema::new(self.df.schema())
} }
} }
fn add_missing_columns(
df: NuDataFrame,
maybe_schema: &Option<NuSchema>,
span: Span,
) -> Result<NuDataFrame, ShellError> {
// If there are fields that are in the schema, but not in the dataframe
// add them to the dataframe.
if let Some(schema) = maybe_schema {
let fields = df.df.fields();
let df_field_names: HashSet<&str> = fields.iter().map(|f| f.name().as_str()).collect();
let missing: Vec<(&str, &DataType)> = schema
.schema
.iter()
.filter_map(|(name, dtype)| {
let name = name.as_str();
if !df_field_names.contains(name) {
Some((name, dtype))
} else {
None
}
})
.collect();
let missing_exprs: Vec<Expr> = missing
.iter()
.map(|(name, dtype)| lit(Null {}).cast((*dtype).to_owned()).alias(name))
.collect();
let df = if !missing.is_empty() {
let with_columns = df.lazy().with_columns(missing_exprs);
NuLazyFrame::new(true, with_columns).collect(span)?
} else {
df
};
Ok(df)
} else {
Ok(df)
}
}