From 7b89fab3277ff11e7cf19a2feec63d3dd24ef010 Mon Sep 17 00:00:00 2001 From: Nano Date: Mon, 11 Sep 2023 23:42:09 +1200 Subject: [PATCH] Keep order for `par-each` (#10249) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Description This PR adds new flag `--keep-order/-k` for the `par_each` filter. This flag keeps sequence of output same as the order of input. Output without the flag: ```nu > 1..6 | par-each {|n| $n * 2 } ╭────╮ │ 4 │ │ 10 │ │ 2 │ │ 8 │ │ 12 │ │ 6 │ ╰────╯ ``` Output with the `--keep-order` flag: ```nu > 1..6 | par-each --keep-order {|n| $n * 2 } ╭────╮ │ 2 │ │ 4 │ │ 6 │ │ 8 │ │ 10 │ │ 12 │ ╰────╯ ``` I think the presence of this flag is justified, since: - Much easier to use than `.. | enumerate | par-each {|p| update item ..} | sort-by index | get item` - Faster, as it uses internally parallel sorting in the same thread pool A note about naming: it may conflict with `--keep-empty/-k` flag of the `each` filter if the same feature will be used in `par-each`, so maybe it needs some other name. --- crates/nu-command/src/filters/par_each.rs | 132 ++++++++++++++-------- 1 file changed, 83 insertions(+), 49 deletions(-) diff --git a/crates/nu-command/src/filters/par_each.rs b/crates/nu-command/src/filters/par_each.rs index b0573d012..23113c3ed 100644 --- a/crates/nu-command/src/filters/par_each.rs +++ b/crates/nu-command/src/filters/par_each.rs @@ -37,6 +37,11 @@ impl Command for ParEach { "the number of threads to use", Some('t'), ) + .switch( + "keep-order", + "keep sequence of output same as the order of input", + Some('k'), + ) .required( "closure", SyntaxShape::Closure(Some(vec![SyntaxShape::Any, SyntaxShape::Int])), @@ -49,39 +54,43 @@ impl Command for ParEach { fn examples(&self) -> Vec { vec![ Example { - example: "[1 2 3] | par-each {|| 2 * $in }", + example: "[1 2 3] | par-each {|e| $e * 2 }", description: "Multiplies each number. Note that the list will become arbitrarily disordered.", result: None, }, Example { - example: r#"[foo bar baz] | par-each {|e| $e + '!' } | sort"#, - description: "Output can still be sorted afterward", - result: Some(Value::list( - vec![ - Value::test_string("bar!"), - Value::test_string("baz!"), - Value::test_string("foo!"), - ], - Span::test_data(), - )), + example: r#"[1 2 3] | par-each --keep-order {|e| $e * 2 }"#, + description: "Multiplies each number, keeping an original order", + result: Some(Value::test_list(vec![ + Value::test_int(2), + Value::test_int(4), + Value::test_int(6), + ])), }, Example { example: r#"1..3 | enumerate | par-each {|p| update item ($p.item * 2)} | sort-by item | get item"#, description: "Enumerate and sort-by can be used to reconstruct the original order", - result: Some(Value::list( - vec![Value::test_int(2), Value::test_int(4), Value::test_int(6)], - Span::test_data(), - )), + result: Some(Value::test_list(vec![ + Value::test_int(2), + Value::test_int(4), + Value::test_int(6), + ])), + }, + Example { + example: r#"[foo bar baz] | par-each {|e| $e + '!' } | sort"#, + description: "Output can still be sorted afterward", + result: Some(Value::test_list(vec![ + Value::test_string("bar!"), + Value::test_string("baz!"), + Value::test_string("foo!"), + ])), }, Example { example: r#"[1 2 3] | enumerate | par-each { |e| if $e.item == 2 { $"found 2 at ($e.index)!"} }"#, description: "Iterate over each element, producing a list showing indexes of any 2s", - result: Some(Value::list( - vec![Value::test_string("found 2 at 1!")], - Span::test_data(), - )), + result: Some(Value::test_list(vec![Value::test_string("found 2 at 1!")])), }, ] } @@ -114,6 +123,7 @@ impl Command for ParEach { let capture_block: Closure = call.req(engine_state, stack, 0)?; let threads: Option = call.get_flag(engine_state, stack, "threads")?; let max_threads = threads.unwrap_or(0); + let keep_order = call.has_flag("keep-order"); let metadata = input.metadata(); let ctrlc = engine_state.ctrlc.clone(); let outer_ctrlc = engine_state.ctrlc.clone(); @@ -123,14 +133,27 @@ impl Command for ParEach { let redirect_stdout = call.redirect_stdout; let redirect_stderr = call.redirect_stderr; + // A helper function sorts the output if needed + let apply_order = |mut vec: Vec<(usize, Value)>| { + if keep_order { + // It runs inside the rayon's thread pool so parallel sorting can be used. + // There are no identical indexes, so unstable sorting can be used. + vec.par_sort_unstable_by_key(|(index, _)| *index); + } + + vec.into_iter().map(|(_, val)| val) + }; + match input { PipelineData::Empty => Ok(PipelineData::Empty), PipelineData::Value(Value::Range { val, .. }, ..) => Ok(create_pool(max_threads)? .install(|| { - val.into_range_iter(ctrlc.clone()) + let vec = val + .into_range_iter(ctrlc.clone()) .expect("unable to create a range iterator") + .enumerate() .par_bridge() - .map(move |x| { + .map(move |(index, x)| { let block = engine_state.get_block(block_id); let mut stack = stack.clone(); @@ -144,7 +167,7 @@ impl Command for ParEach { let val_span = x.span(); let x_is_error = x.is_error(); - match eval_block_with_early_return( + let val = match eval_block_with_early_return( engine_state, &mut stack, block, @@ -153,21 +176,24 @@ impl Command for ParEach { redirect_stderr, ) { Ok(v) => v.into_value(span), - Err(error) => Value::error( chain_error_with_input(error, x_is_error, val_span), val_span, ), - } + }; + + (index, val) }) - .collect::>() - .into_iter() - .into_pipeline_data(ctrlc) + .collect::>(); + + apply_order(vec).into_pipeline_data(ctrlc) })), PipelineData::Value(Value::List { vals: val, .. }, ..) => Ok(create_pool(max_threads)? .install(|| { - val.par_iter() - .map(move |x| { + let vec = val + .par_iter() + .enumerate() + .map(move |(index, x)| { let block = engine_state.get_block(block_id); let mut stack = stack.clone(); @@ -181,7 +207,7 @@ impl Command for ParEach { let val_span = x.span(); let x_is_error = x.is_error(); - match eval_block_with_early_return( + let val = match eval_block_with_early_return( engine_state, &mut stack, block, @@ -194,16 +220,19 @@ impl Command for ParEach { chain_error_with_input(error, x_is_error, val_span), val_span, ), - } + }; + + (index, val) }) - .collect::>() - .into_iter() - .into_pipeline_data(ctrlc) + .collect::>(); + + apply_order(vec).into_pipeline_data(ctrlc) })), PipelineData::ListStream(stream, ..) => Ok(create_pool(max_threads)?.install(|| { - stream + let vec = stream + .enumerate() .par_bridge() - .map(move |x| { + .map(move |(index, x)| { let block = engine_state.get_block(block_id); let mut stack = stack.clone(); @@ -217,7 +246,7 @@ impl Command for ParEach { let val_span = x.span(); let x_is_error = x.is_error(); - match eval_block_with_early_return( + let val = match eval_block_with_early_return( engine_state, &mut stack, block, @@ -230,23 +259,26 @@ impl Command for ParEach { chain_error_with_input(error, x_is_error, val_span), val_span, ), - } + }; + + (index, val) }) - .collect::>() - .into_iter() - .into_pipeline_data(ctrlc) + .collect::>(); + + apply_order(vec).into_pipeline_data(ctrlc) })), PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::empty()), PipelineData::ExternalStream { stdout: Some(stream), .. } => Ok(create_pool(max_threads)?.install(|| { - stream + let vec = stream + .enumerate() .par_bridge() - .map(move |x| { + .map(move |(index, x)| { let x = match x { Ok(x) => x, - Err(err) => return Value::error(err, span), + Err(err) => return (index, Value::error(err, span)), }; let block = engine_state.get_block(block_id); @@ -259,7 +291,7 @@ impl Command for ParEach { } } - match eval_block_with_early_return( + let val = match eval_block_with_early_return( engine_state, &mut stack, block, @@ -269,11 +301,13 @@ impl Command for ParEach { ) { Ok(v) => v.into_value(span), Err(error) => Value::error(error, span), - } + }; + + (index, val) }) - .collect::>() - .into_iter() - .into_pipeline_data(ctrlc) + .collect::>(); + + apply_order(vec).into_pipeline_data(ctrlc) })), // This match allows non-iterables to be accepted, // which is currently considered undesirable (Nov 2022).