diff --git a/src/cli.rs b/src/cli.rs index 483ee332e..5af74b132 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -323,10 +323,17 @@ pub async fn cli() -> Result<(), Box> { whole_stream_command(Table), whole_stream_command(Version), whole_stream_command(Which), - #[cfg(data_processing_primitives)] - whole_stream_command(SplitBy), ]); + cfg_if::cfg_if! { + if #[cfg(data_processing_primitives)] { + context.add_commands(vec![ + whole_stream_command(SplitBy), + whole_stream_command(ReduceBy), + ]); + } + } + #[cfg(feature = "clipboard")] { context.add_commands(vec![whole_stream_command( diff --git a/src/commands.rs b/src/commands.rs index 7c2c18862..73a4b7244 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -58,8 +58,12 @@ pub(crate) mod size; pub(crate) mod skip_while; pub(crate) mod sort_by; -#[cfg(data_processing_primitives)] -pub(crate) mod split_by; +cfg_if::cfg_if! { + if #[cfg(data_processing_primitives)] { + pub(crate) mod split_by; + pub(crate) mod reduce_by; + } +} pub(crate) mod split_column; pub(crate) mod split_row; @@ -138,8 +142,12 @@ pub(crate) use size::Size; pub(crate) use skip_while::SkipWhile; pub(crate) use sort_by::SortBy; -#[cfg(data_processing_primitives)] -pub(crate) use split_by::SplitBy; +cfg_if::cfg_if! { + if #[cfg(data_processing_primitives)] { + pub(crate) use split_by::SplitBy; + pub(crate) use reduce_by::ReduceBy; + } +} pub(crate) use split_column::SplitColumn; pub(crate) use split_row::SplitRow; diff --git a/src/commands/reduce_by.rs b/src/commands/reduce_by.rs new file mode 100644 index 000000000..de64caac1 --- /dev/null +++ b/src/commands/reduce_by.rs @@ -0,0 +1,100 @@ +use crate::commands::WholeStreamCommand; +use crate::data::TaggedDictBuilder; +use crate::parser::hir::SyntaxShape; +use crate::parser::registry; +use crate::data::base::Block; +use crate::prelude::*; + +use log::trace; + +pub struct ReduceBy; + +#[derive(Deserialize)] +pub struct ReduceByArgs { + calculator: Block, +} + +impl WholeStreamCommand for ReduceBy { + fn name(&self) -> &str { + "reduce-by" + } + + fn signature(&self) -> Signature { + Signature::build("reduce-by").required( + "calculator", + SyntaxShape::Block, + "The block used for calculating values", + ) + } + + fn usage(&self) -> &str { + "Crates a new table with the data from the table rows reduced by the block given." + } + + fn run( + &self, + args: CommandArgs, + registry: &CommandRegistry, + ) -> Result { + args.process(registry, reduce_by)?.run() + } +} + +pub fn reduce_by( + ReduceByArgs { calculator }: ReduceByArgs, + RunnableContext { input, name, .. }: RunnableContext, +) -> Result { + let stream = async_stream! { + let values: Vec> = input.values.collect().await; + + trace!("{:?}", &calculator); + + if values.is_empty() { + yield Err(ShellError::labeled_error( + "Expected table from pipeline", + "requires a table input", + name + )) + } else { + match reduce(values, &calculator, name) { + Ok(reduced) => yield ReturnSuccess::value(reduced), + Err(err) => yield Err(err) + } + } + }; + + Ok(stream.to_output_stream()) +} + +pub fn reduce( + values: Vec>, + calculator: &Block, + tag: impl Into, +) -> Result, ShellError> { + let tag = tag.into(); + + let mut out = TaggedDictBuilder::new(&tag); + + Ok(out.into_tagged_value()) +} + +#[cfg(test)] +mod tests { + + use crate::commands::reduce_by::reduce; + use crate::data::meta::*; + use crate::Value; + use indexmap::IndexMap; + + fn string(input: impl Into) -> Tagged { + Value::string(input.into()).tagged_unknown() + } + + fn row(entries: IndexMap>) -> Tagged { + Value::row(entries).tagged_unknown() + } + + fn table(list: &Vec>) -> Tagged { + Value::table(list).tagged_unknown() + } +}