Refactor using ClosureEval types (#12541)

# Description
Adds two new types in `nu-engine` for evaluating closures: `ClosureEval`
and `ClosureEvalOnce`. This removed some duplicate code and centralizes
our logic for setting up, running, and cleaning up closures. For
example, in the future if we are able to reduce the cloning necessary to
run a closure, then we only have to change the code related to these
types.

`ClosureEval` and `ClosureEvalOnce` are designed with a builder API.
`ClosureEval` is used to run a closure multiple times whereas
`ClosureEvalOnce` is used for a one-shot closure.

# User-Facing Changes
Should be none, unless I messed up one of the command migrations.
Actually, this will fix any unreported environment bugs for commands
that didn't reset the env after running a closure.
This commit is contained in:
Ian Manske
2024-04-22 06:15:09 +00:00
committed by GitHub
parent 83720a9f30
commit bae6d694ca
29 changed files with 930 additions and 1486 deletions

View File

@ -1,8 +1,5 @@
use nu_engine::{command_prelude::*, eval_block_with_early_return};
use nu_protocol::{
debugger::{Profiler, WithDebug},
engine::Closure,
};
use nu_engine::{command_prelude::*, ClosureEvalOnce};
use nu_protocol::{debugger::Profiler, engine::Closure};
#[derive(Clone)]
pub struct DebugProfile;
@ -84,23 +81,18 @@ confusing the id/parent_id hierarchy. The --expr flag is helpful for investigati
fn run(
&self,
engine_state: &EngineState,
caller_stack: &mut Stack,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let closure: Closure = call.req(engine_state, caller_stack, 0)?;
let mut callee_stack = caller_stack.captures_to_stack(closure.captures);
let block = engine_state.get_block(closure.block_id);
let default_max_depth = 2;
let collect_spans = call.has_flag(engine_state, caller_stack, "spans")?;
let collect_expanded_source =
call.has_flag(engine_state, caller_stack, "expanded-source")?;
let collect_values = call.has_flag(engine_state, caller_stack, "values")?;
let collect_exprs = call.has_flag(engine_state, caller_stack, "expr")?;
let closure: Closure = call.req(engine_state, stack, 0)?;
let collect_spans = call.has_flag(engine_state, stack, "spans")?;
let collect_expanded_source = call.has_flag(engine_state, stack, "expanded-source")?;
let collect_values = call.has_flag(engine_state, stack, "values")?;
let collect_exprs = call.has_flag(engine_state, stack, "expr")?;
let max_depth = call
.get_flag(engine_state, caller_stack, "max-depth")?
.unwrap_or(default_max_depth);
.get_flag(engine_state, stack, "max-depth")?
.unwrap_or(2);
let profiler = Profiler::new(
max_depth,
@ -112,26 +104,19 @@ confusing the id/parent_id hierarchy. The --expr flag is helpful for investigati
call.span(),
);
let lock_err = {
|_| ShellError::GenericError {
error: "Profiler Error".to_string(),
msg: "could not lock debugger, poisoned mutex".to_string(),
span: Some(call.head),
help: None,
inner: vec![],
}
let lock_err = |_| ShellError::GenericError {
error: "Profiler Error".to_string(),
msg: "could not lock debugger, poisoned mutex".to_string(),
span: Some(call.head),
help: None,
inner: vec![],
};
engine_state
.activate_debugger(Box::new(profiler))
.map_err(lock_err)?;
let result = eval_block_with_early_return::<WithDebug>(
engine_state,
&mut callee_stack,
block,
input,
);
let result = ClosureEvalOnce::new(engine_state, stack, closure).run_with_input(input);
// TODO: See eval_source()
match result {
@ -142,10 +127,11 @@ confusing the id/parent_id hierarchy. The --expr flag is helpful for investigati
Err(_e) => (), // TODO: Report error
}
let debugger = engine_state.deactivate_debugger().map_err(lock_err)?;
let res = debugger.report(engine_state, call.span());
res.map(|val| val.into_pipeline_data())
Ok(engine_state
.deactivate_debugger()
.map_err(lock_err)?
.report(engine_state, call.span())?
.into_pipeline_data())
}
fn examples(&self) -> Vec<Example> {

View File

@ -5,7 +5,7 @@ use notify_debouncer_full::{
EventKind, RecursiveMode, Watcher,
},
};
use nu_engine::{command_prelude::*, current_dir, get_eval_block};
use nu_engine::{command_prelude::*, current_dir, ClosureEval};
use nu_protocol::{
engine::{Closure, StateWorkingSet},
format_error,
@ -72,6 +72,7 @@ impl Command for Watch {
call: &Call,
_input: PipelineData,
) -> Result<PipelineData, ShellError> {
let head = call.head;
let cwd = current_dir(engine_state, stack)?;
let path_arg: Spanned<String> = call.req(engine_state, stack, 0)?;
@ -89,11 +90,7 @@ impl Command for Watch {
}
};
let capture_block: Closure = call.req(engine_state, stack, 1)?;
let block = engine_state
.clone()
.get_block(capture_block.block_id)
.clone();
let closure: Closure = call.req(engine_state, stack, 1)?;
let verbose = call.has_flag(engine_state, stack, "verbose")?;
@ -167,69 +164,43 @@ impl Command for Watch {
eprintln!("Now watching files at {path:?}. Press ctrl+c to abort.");
let eval_block = get_eval_block(engine_state);
let mut closure = ClosureEval::new(engine_state, stack, closure);
let event_handler =
|operation: &str, path: PathBuf, new_path: Option<PathBuf>| -> Result<(), ShellError> {
let glob_pattern = glob_pattern.clone();
let matches_glob = match glob_pattern.clone() {
Some(glob) => glob.matches_path(&path),
None => true,
};
if verbose && glob_pattern.is_some() {
eprintln!("Matches glob: {matches_glob}");
}
if matches_glob {
let stack = &mut stack.clone();
if let Some(position) = block.signature.get_positional(0) {
if let Some(position_id) = &position.var_id {
stack.add_var(*position_id, Value::string(operation, call.span()));
}
}
if let Some(position) = block.signature.get_positional(1) {
if let Some(position_id) = &position.var_id {
stack.add_var(
*position_id,
Value::string(path.to_string_lossy(), call.span()),
);
}
}
if let Some(position) = block.signature.get_positional(2) {
if let Some(position_id) = &position.var_id {
stack.add_var(
*position_id,
Value::string(
new_path.unwrap_or_else(|| "".into()).to_string_lossy(),
call.span(),
),
);
}
}
let eval_result = eval_block(
engine_state,
stack,
&block,
Value::nothing(call.span()).into_pipeline_data(),
);
match eval_result {
Ok(val) => {
val.print(engine_state, stack, false, false)?;
}
Err(err) => {
let working_set = StateWorkingSet::new(engine_state);
eprintln!("{}", format_error(&working_set, &err));
}
}
}
Ok(())
let mut event_handler = move |operation: &str,
path: PathBuf,
new_path: Option<PathBuf>|
-> Result<(), ShellError> {
let matches_glob = match &glob_pattern {
Some(glob) => glob.matches_path(&path),
None => true,
};
if verbose && glob_pattern.is_some() {
eprintln!("Matches glob: {matches_glob}");
}
if matches_glob {
let result = closure
.add_arg(Value::string(operation, head))
.add_arg(Value::string(path.to_string_lossy(), head))
.add_arg(Value::string(
new_path.unwrap_or_else(|| "".into()).to_string_lossy(),
head,
))
.run_with_input(PipelineData::Empty);
match result {
Ok(val) => {
val.print(engine_state, stack, false, false)?;
}
Err(err) => {
let working_set = StateWorkingSet::new(engine_state);
eprintln!("{}", format_error(&working_set, &err));
}
}
}
Ok(())
};
loop {
match rx.recv_timeout(CHECK_CTRL_C_FREQUENCY) {

View File

@ -1,5 +1,5 @@
use super::utils::chain_error_with_input;
use nu_engine::{command_prelude::*, get_eval_block_with_early_return};
use nu_engine::{command_prelude::*, ClosureEval, ClosureEvalOnce};
use nu_protocol::engine::Closure;
#[derive(Clone)]
@ -107,126 +107,79 @@ with 'transpose' first."#
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let eval_block_with_early_return = get_eval_block_with_early_return(engine_state);
let capture_block: Closure = call.req(engine_state, stack, 0)?;
let head = call.head;
let closure: Closure = call.req(engine_state, stack, 0)?;
let keep_empty = call.has_flag(engine_state, stack, "keep-empty")?;
let metadata = input.metadata();
let ctrlc = engine_state.ctrlc.clone();
let outer_ctrlc = engine_state.ctrlc.clone();
let engine_state = engine_state.clone();
let block = engine_state.get_block(capture_block.block_id).clone();
let mut stack = stack.captures_to_stack(capture_block.captures);
let orig_env_vars = stack.env_vars.clone();
let orig_env_hidden = stack.env_hidden.clone();
let span = call.head;
match input {
PipelineData::Empty => Ok(PipelineData::Empty),
PipelineData::Value(Value::Range { .. }, ..)
| PipelineData::Value(Value::List { .. }, ..)
| PipelineData::ListStream { .. } => Ok(input
.into_iter()
.map_while(move |x| {
// with_env() is used here to ensure that each iteration uses
// a different set of environment variables.
// Hence, a 'cd' in the first loop won't affect the next loop.
stack.with_env(&orig_env_vars, &orig_env_hidden);
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
stack.add_var(*var_id, x.clone());
| PipelineData::ListStream(..) => {
let mut closure = ClosureEval::new(engine_state, stack, closure);
Ok(input
.into_iter()
.map_while(move |value| {
let span = value.span();
let is_error = value.is_error();
match closure.run_with_value(value) {
Ok(data) => Some(data.into_value(head)),
Err(ShellError::Continue { span }) => Some(Value::nothing(span)),
Err(ShellError::Break { .. }) => None,
Err(error) => {
let error = chain_error_with_input(error, is_error, span);
Some(Value::error(error, span))
}
}
}
let input_span = x.span();
let x_is_error = x.is_error();
match eval_block_with_early_return(
&engine_state,
&mut stack,
&block,
x.into_pipeline_data(),
) {
Ok(v) => Some(v.into_value(span)),
Err(ShellError::Continue { span }) => Some(Value::nothing(span)),
Err(ShellError::Break { .. }) => None,
Err(error) => {
let error = chain_error_with_input(error, x_is_error, input_span);
Some(Value::error(error, input_span))
}
}
})
.into_pipeline_data(ctrlc)),
})
.into_pipeline_data(engine_state.ctrlc.clone()))
}
PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::empty()),
PipelineData::ExternalStream {
stdout: Some(stream),
..
} => Ok(stream
.into_iter()
.map_while(move |x| {
// with_env() is used here to ensure that each iteration uses
// a different set of environment variables.
// Hence, a 'cd' in the first loop won't affect the next loop.
stack.with_env(&orig_env_vars, &orig_env_hidden);
} => {
let mut closure = ClosureEval::new(engine_state, stack, closure);
Ok(stream
.into_iter()
.map_while(move |value| {
let value = match value {
Ok(value) => value,
Err(ShellError::Continue { span }) => {
return Some(Value::nothing(span))
}
Err(ShellError::Break { .. }) => return None,
Err(err) => return Some(Value::error(err, head)),
};
let x = match x {
Ok(x) => x,
Err(ShellError::Continue { span }) => return Some(Value::nothing(span)),
Err(ShellError::Break { .. }) => return None,
Err(err) => return Some(Value::error(err, span)),
};
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
stack.add_var(*var_id, x.clone());
let span = value.span();
let is_error = value.is_error();
match closure.run_with_value(value) {
Ok(data) => Some(data.into_value(head)),
Err(ShellError::Continue { span }) => Some(Value::nothing(span)),
Err(ShellError::Break { .. }) => None,
Err(error) => {
let error = chain_error_with_input(error, is_error, span);
Some(Value::error(error, span))
}
}
}
let input_span = x.span();
let x_is_error = x.is_error();
match eval_block_with_early_return(
&engine_state,
&mut stack,
&block,
x.into_pipeline_data(),
) {
Ok(v) => Some(v.into_value(span)),
Err(ShellError::Continue { span }) => Some(Value::nothing(span)),
Err(ShellError::Break { .. }) => None,
Err(error) => {
let error = chain_error_with_input(error, x_is_error, input_span);
Some(Value::error(error, input_span))
}
}
})
.into_pipeline_data(ctrlc)),
})
.into_pipeline_data(engine_state.ctrlc.clone()))
}
// This match allows non-iterables to be accepted,
// which is currently considered undesirable (Nov 2022).
PipelineData::Value(x, ..) => {
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
stack.add_var(*var_id, x.clone());
}
}
eval_block_with_early_return(
&engine_state,
&mut stack,
&block,
x.into_pipeline_data(),
)
PipelineData::Value(value, ..) => {
ClosureEvalOnce::new(engine_state, stack, closure).run_with_value(value)
}
}
.and_then(|x| {
x.filter(
move |x| if !keep_empty { !x.is_nothing() } else { true },
outer_ctrlc,
engine_state.ctrlc.clone(),
)
})
.map(|x| x.set_metadata(metadata))
.map(|data| data.set_metadata(metadata))
}
}

View File

@ -1,5 +1,5 @@
use super::utils::chain_error_with_input;
use nu_engine::{command_prelude::*, get_eval_block_with_early_return};
use nu_engine::{command_prelude::*, ClosureEval, ClosureEvalOnce};
use nu_protocol::engine::Closure;
#[derive(Clone)]
@ -47,135 +47,71 @@ a variable. On the other hand, the "row condition" syntax is not supported."#
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let capture_block: Closure = call.req(engine_state, stack, 0)?;
let metadata = input.metadata();
let ctrlc = engine_state.ctrlc.clone();
let engine_state = engine_state.clone();
let block = engine_state.get_block(capture_block.block_id).clone();
let mut stack = stack.captures_to_stack(capture_block.captures);
let orig_env_vars = stack.env_vars.clone();
let orig_env_hidden = stack.env_hidden.clone();
let span = call.head;
let eval_block_with_early_return = get_eval_block_with_early_return(&engine_state);
let head = call.head;
let closure: Closure = call.req(engine_state, stack, 0)?;
let metadata = input.metadata();
match input {
PipelineData::Empty => Ok(PipelineData::Empty),
PipelineData::Value(Value::Range { .. }, ..)
| PipelineData::Value(Value::List { .. }, ..)
| PipelineData::ListStream { .. } => Ok(input
// To enumerate over the input (for the index argument),
// it must be converted into an iterator using into_iter().
.into_iter()
.filter_map(move |x| {
// with_env() is used here to ensure that each iteration uses
// a different set of environment variables.
// Hence, a 'cd' in the first loop won't affect the next loop.
stack.with_env(&orig_env_vars, &orig_env_hidden);
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
stack.add_var(*var_id, x.clone());
| PipelineData::ListStream(..) => {
let mut closure = ClosureEval::new(engine_state, stack, closure);
Ok(input
.into_iter()
.filter_map(move |value| match closure.run_with_value(value.clone()) {
Ok(pred) => pred.into_value(head).is_true().then_some(value),
Err(err) => {
let span = value.span();
let err = chain_error_with_input(err, value.is_error(), span);
Some(Value::error(err, span))
}
}
match eval_block_with_early_return(
&engine_state,
&mut stack,
&block,
// clone() is used here because x is given to Ok() below.
x.clone().into_pipeline_data(),
) {
Ok(v) => {
if v.into_value(span).is_true() {
Some(x)
} else {
None
}
}
Err(error) => Some(Value::error(
chain_error_with_input(error, x.is_error(), x.span()),
x.span(),
)),
}
})
.into_pipeline_data(ctrlc)),
})
.into_pipeline_data(engine_state.ctrlc.clone()))
}
PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::empty()),
PipelineData::ExternalStream {
stdout: Some(stream),
..
} => Ok(stream
.into_iter()
.filter_map(move |x| {
// see note above about with_env()
stack.with_env(&orig_env_vars, &orig_env_hidden);
} => {
let mut closure = ClosureEval::new(engine_state, stack, closure);
Ok(stream
.into_iter()
.filter_map(move |value| {
let value = match value {
Ok(value) => value,
Err(err) => return Some(Value::error(err, head)),
};
let x = match x {
Ok(x) => x,
Err(err) => return Some(Value::error(err, span)),
};
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
stack.add_var(*var_id, x.clone());
}
}
match eval_block_with_early_return(
&engine_state,
&mut stack,
&block,
// clone() is used here because x is given to Ok() below.
x.clone().into_pipeline_data(),
) {
Ok(v) => {
if v.into_value(span).is_true() {
Some(x)
} else {
None
match closure.run_with_value(value.clone()) {
Ok(pred) => pred.into_value(head).is_true().then_some(value),
Err(err) => {
let span = value.span();
let err = chain_error_with_input(err, value.is_error(), span);
Some(Value::error(err, span))
}
}
Err(error) => Some(Value::error(
chain_error_with_input(error, x.is_error(), x.span()),
x.span(),
)),
}
})
.into_pipeline_data(ctrlc)),
})
.into_pipeline_data(engine_state.ctrlc.clone()))
}
// This match allows non-iterables to be accepted,
// which is currently considered undesirable (Nov 2022).
PipelineData::Value(x, ..) => {
// see note above about with_env()
stack.with_env(&orig_env_vars, &orig_env_hidden);
PipelineData::Value(value, ..) => {
let result = ClosureEvalOnce::new(engine_state, stack, closure)
.run_with_value(value.clone());
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
stack.add_var(*var_id, x.clone());
Ok(match result {
Ok(pred) => pred.into_value(head).is_true().then_some(value),
Err(err) => {
let span = value.span();
let err = chain_error_with_input(err, value.is_error(), span);
Some(Value::error(err, span))
}
}
Ok(match eval_block_with_early_return(
&engine_state,
&mut stack,
&block,
// clone() is used here because x is given to Ok() below.
x.clone().into_pipeline_data(),
) {
Ok(v) => {
if v.into_value(span).is_true() {
Some(x)
} else {
None
}
}
Err(error) => Some(Value::error(
chain_error_with_input(error, x.is_error(), x.span()),
x.span(),
)),
}
.into_pipeline_data(ctrlc))
.into_pipeline_data(engine_state.ctrlc.clone()))
}
}
.map(|x| x.set_metadata(metadata))
.map(|data| data.set_metadata(metadata))
}
fn examples(&self) -> Vec<Example> {

View File

@ -1,5 +1,5 @@
use indexmap::IndexMap;
use nu_engine::{command_prelude::*, get_eval_block};
use nu_engine::{command_prelude::*, ClosureEval};
use nu_protocol::engine::Closure;
#[derive(Clone)]
@ -202,20 +202,13 @@ fn group_closure(
stack: &mut Stack,
) -> Result<IndexMap<String, Vec<Value>>, ShellError> {
let mut groups = IndexMap::<_, Vec<_>>::new();
let eval_block = get_eval_block(engine_state);
let block = engine_state.get_block(closure.block_id);
let mut closure = ClosureEval::new(engine_state, stack, closure);
for value in values {
let mut stack = stack.captures_to_stack(closure.captures.clone());
let key = eval_block(
engine_state,
&mut stack,
block,
value.clone().into_pipeline_data(),
)?
.into_value(span)
.coerce_into_string()?;
let key = closure
.run_with_value(value.clone())?
.into_value(span)
.coerce_into_string()?;
groups.entry(key).or_default().push(value);
}

View File

@ -1,8 +1,5 @@
use nu_engine::{command_prelude::*, get_eval_block, EvalBlockFn};
use nu_protocol::{
ast::{Block, PathMember},
engine::Closure,
};
use nu_engine::{command_prelude::*, ClosureEval, ClosureEvalOnce};
use nu_protocol::ast::PathMember;
#[derive(Clone)]
pub struct Insert;
@ -127,51 +124,38 @@ fn insert(
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let span = call.head;
let head = call.head;
let cell_path: CellPath = call.req(engine_state, stack, 0)?;
let replacement: Value = call.req(engine_state, stack, 1)?;
let replacement_span = replacement.span();
let ctrlc = engine_state.ctrlc.clone();
let eval_block = get_eval_block(engine_state);
match input {
PipelineData::Value(mut value, metadata) => {
if let Value::Closure { val: closure, .. } = replacement {
if let Value::Closure { val, .. } = replacement {
match (cell_path.members.first(), &mut value) {
(Some(PathMember::String { .. }), Value::List { vals, .. }) => {
let block = engine_state.get_block(closure.block_id);
let stack = stack.captures_to_stack(closure.captures);
let mut closure = ClosureEval::new(engine_state, stack, val);
for val in vals {
let mut stack = stack.clone();
insert_value_by_closure(
val,
replacement_span,
engine_state,
&mut stack,
block,
&mut closure,
head,
&cell_path.members,
false,
eval_block,
)?;
}
}
(first, _) => {
insert_single_value_by_closure(
&mut value,
closure,
replacement_span,
engine_state,
stack,
ClosureEvalOnce::new(engine_state, stack, val),
head,
&cell_path.members,
matches!(first, Some(PathMember::Int { .. })),
eval_block,
)?;
}
}
} else {
value.insert_data_at_cell_path(&cell_path.members, replacement, span)?;
value.insert_data_at_cell_path(&cell_path.members, replacement, head)?;
}
Ok(value.into_pipeline_data_with_metadata(metadata))
}
@ -199,27 +183,15 @@ fn insert(
}
if path.is_empty() {
if let Value::Closure { val: closure, .. } = replacement {
if let Value::Closure { val, .. } = replacement {
let value = stream.next();
let end_of_stream = value.is_none();
let value = value.unwrap_or(Value::nothing(replacement_span));
let block = engine_state.get_block(closure.block_id);
let mut stack = stack.captures_to_stack(closure.captures);
let value = value.unwrap_or(Value::nothing(head));
let new_value = ClosureEvalOnce::new(engine_state, stack, val)
.run_with_value(value.clone())?
.into_value(head);
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
stack.add_var(*var_id, value.clone())
}
}
let output = eval_block(
engine_state,
&mut stack,
block,
value.clone().into_pipeline_data(),
)?;
pre_elems.push(output.into_value(replacement_span));
pre_elems.push(new_value);
if !end_of_stream {
pre_elems.push(value);
}
@ -227,19 +199,16 @@ fn insert(
pre_elems.push(replacement);
}
} else if let Some(mut value) = stream.next() {
if let Value::Closure { val: closure, .. } = replacement {
if let Value::Closure { val, .. } = replacement {
insert_single_value_by_closure(
&mut value,
closure,
replacement_span,
engine_state,
stack,
ClosureEvalOnce::new(engine_state, stack, val),
head,
path,
true,
eval_block,
)?;
} else {
value.insert_data_at_cell_path(path, replacement, span)?;
value.insert_data_at_cell_path(path, replacement, head)?;
}
pre_elems.push(value)
} else {
@ -252,75 +221,61 @@ fn insert(
Ok(pre_elems
.into_iter()
.chain(stream)
.into_pipeline_data_with_metadata(metadata, ctrlc))
} else if let Value::Closure { val: closure, .. } = replacement {
let engine_state = engine_state.clone();
let block = engine_state.get_block(closure.block_id).clone();
let stack = stack.captures_to_stack(closure.captures);
.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone()))
} else if let Value::Closure { val, .. } = replacement {
let mut closure = ClosureEval::new(engine_state, stack, val);
Ok(stream
.map(move |mut input| {
// Recreate the stack for each iteration to
// isolate environment variable changes, etc.
let mut stack = stack.clone();
.map(move |mut value| {
let err = insert_value_by_closure(
&mut input,
replacement_span,
&engine_state,
&mut stack,
&block,
&mut value,
&mut closure,
head,
&cell_path.members,
false,
eval_block,
);
if let Err(e) = err {
Value::error(e, span)
Value::error(e, head)
} else {
input
value
}
})
.into_pipeline_data_with_metadata(metadata, ctrlc))
.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone()))
} else {
Ok(stream
.map(move |mut input| {
if let Err(e) = input.insert_data_at_cell_path(
.map(move |mut value| {
if let Err(e) = value.insert_data_at_cell_path(
&cell_path.members,
replacement.clone(),
span,
head,
) {
Value::error(e, span)
Value::error(e, head)
} else {
input
value
}
})
.into_pipeline_data_with_metadata(metadata, ctrlc))
.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone()))
}
}
PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
type_name: "empty pipeline".to_string(),
span,
span: head,
}),
PipelineData::ExternalStream { .. } => Err(ShellError::IncompatiblePathAccess {
type_name: "external stream".to_string(),
span,
span: head,
}),
}
}
#[allow(clippy::too_many_arguments)]
fn insert_value_by_closure(
value: &mut Value,
closure: &mut ClosureEval,
span: Span,
engine_state: &EngineState,
stack: &mut Stack,
block: &Block,
cell_path: &[PathMember],
first_path_member_int: bool,
eval_block_fn: EvalBlockFn,
) -> Result<(), ShellError> {
let input = if first_path_member_int {
let value_at_path = if first_path_member_int {
value
.clone()
.follow_cell_path(cell_path, false)
@ -329,41 +284,28 @@ fn insert_value_by_closure(
value.clone()
};
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = var.var_id {
stack.add_var(var_id, input.clone());
}
}
let output = eval_block_fn(engine_state, stack, block, input.into_pipeline_data())?;
value.insert_data_at_cell_path(cell_path, output.into_value(span), span)
let new_value = closure.run_with_value(value_at_path)?.into_value(span);
value.insert_data_at_cell_path(cell_path, new_value, span)
}
#[allow(clippy::too_many_arguments)]
fn insert_single_value_by_closure(
value: &mut Value,
closure: Closure,
closure: ClosureEvalOnce,
span: Span,
engine_state: &EngineState,
stack: &mut Stack,
cell_path: &[PathMember],
first_path_member_int: bool,
eval_block_fn: EvalBlockFn,
) -> Result<(), ShellError> {
let block = engine_state.get_block(closure.block_id);
let mut stack = stack.captures_to_stack(closure.captures);
let value_at_path = if first_path_member_int {
value
.clone()
.follow_cell_path(cell_path, false)
.unwrap_or(Value::nothing(span))
} else {
value.clone()
};
insert_value_by_closure(
value,
span,
engine_state,
&mut stack,
block,
cell_path,
first_path_member_int,
eval_block_fn,
)
let new_value = closure.run_with_value(value_at_path)?.into_value(span);
value.insert_data_at_cell_path(cell_path, new_value, span)
}
#[cfg(test)]

View File

@ -1,4 +1,4 @@
use nu_engine::{command_prelude::*, get_eval_block_with_early_return};
use nu_engine::{command_prelude::*, ClosureEvalOnce};
use nu_protocol::engine::Closure;
use std::{sync::mpsc, thread};
@ -106,23 +106,21 @@ interleave
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let head = call.head;
let closures: Vec<Closure> = call.rest(engine_state, stack, 0)?;
let buffer_size: usize = call
.get_flag(engine_state, stack, "buffer-size")?
.unwrap_or(0);
let (tx, rx) = mpsc::sync_channel(buffer_size);
let closures: Vec<Closure> = call.rest(engine_state, stack, 0)?;
let eval_block_with_early_return = get_eval_block_with_early_return(engine_state);
let (tx, rx) = mpsc::sync_channel(buffer_size);
// Spawn the threads for the input and closure outputs
(!input.is_nothing())
.then(|| Ok(input))
.into_iter()
.chain(closures.into_iter().map(|closure| {
// Evaluate the closure on this thread
let block = engine_state.get_block(closure.block_id);
let mut stack = stack.captures_to_stack(closure.captures);
eval_block_with_early_return(engine_state, &mut stack, block, PipelineData::Empty)
ClosureEvalOnce::new(engine_state, stack, closure)
.run_with_input(PipelineData::Empty)
}))
.try_for_each(|stream| {
stream.and_then(|stream| {
@ -141,7 +139,7 @@ interleave
.map(|_| ())
.map_err(|err| ShellError::IOErrorSpanned {
msg: err.to_string(),
span: call.head,
span: head,
})
})
})?;

View File

@ -1,5 +1,5 @@
use super::utils::chain_error_with_input;
use nu_engine::{command_prelude::*, get_eval_block_with_early_return};
use nu_engine::{command_prelude::*, ClosureEval};
use nu_protocol::engine::Closure;
#[derive(Clone)]
@ -37,96 +37,68 @@ impl Command for Items {
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let capture_block: Closure = call.req(engine_state, stack, 0)?;
let head = call.head;
let closure: Closure = call.req(engine_state, stack, 0)?;
let metadata = input.metadata();
let ctrlc = engine_state.ctrlc.clone();
let engine_state = engine_state.clone();
let block = engine_state.get_block(capture_block.block_id).clone();
let mut stack = stack.captures_to_stack(capture_block.captures);
let orig_env_vars = stack.env_vars.clone();
let orig_env_hidden = stack.env_hidden.clone();
let span = call.head;
let eval_block_with_early_return = get_eval_block_with_early_return(&engine_state);
let input_span = input.span().unwrap_or(call.head);
let run_for_each_item = move |keyval: (String, Value)| -> Option<Value> {
// with_env() is used here to ensure that each iteration uses
// a different set of environment variables.
// Hence, a 'cd' in the first loop won't affect the next loop.
stack.with_env(&orig_env_vars, &orig_env_hidden);
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
stack.add_var(*var_id, Value::string(keyval.0.clone(), span));
}
}
if let Some(var) = block.signature.get_positional(1) {
if let Some(var_id) = &var.var_id {
stack.add_var(*var_id, keyval.1);
}
}
match eval_block_with_early_return(
&engine_state,
&mut stack,
&block,
PipelineData::empty(),
) {
Ok(v) => Some(v.into_value(span)),
Err(ShellError::Break { .. }) => None,
Err(error) => {
let error = chain_error_with_input(error, false, input_span);
Some(Value::error(error, span))
}
}
};
match input {
PipelineData::Empty => Ok(PipelineData::Empty),
PipelineData::Value(v, ..) => match v {
Value::Record { val, .. } => Ok(val
.into_owned()
.into_iter()
.map_while(run_for_each_item)
.into_pipeline_data(ctrlc)),
Value::LazyRecord { val, .. } => {
let record = match val.collect()? {
Value::Record { val, .. } => val,
_ => Err(ShellError::NushellFailedSpanned {
msg: "`LazyRecord::collect()` promises `Value::Record`".into(),
label: "Violating lazy record found here".into(),
span,
})?,
};
Ok(record
.into_owned()
.into_iter()
.map_while(run_for_each_item)
.into_pipeline_data(ctrlc))
PipelineData::Value(value, ..) => {
let value = if let Value::LazyRecord { val, .. } = value {
val.collect()?
} else {
value
};
let span = value.span();
match value {
Value::Record { val, .. } => {
let mut closure = ClosureEval::new(engine_state, stack, closure);
Ok(val
.into_owned()
.into_iter()
.map_while(move |(col, val)| {
let result = closure
.add_arg(Value::string(col, span))
.add_arg(val)
.run_with_input(PipelineData::Empty);
match result {
Ok(data) => Some(data.into_value(head)),
Err(ShellError::Break { .. }) => None,
Err(err) => {
let err = chain_error_with_input(err, false, span);
Some(Value::error(err, head))
}
}
})
.into_pipeline_data(engine_state.ctrlc.clone()))
}
Value::Error { error, .. } => Err(*error),
other => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "record".into(),
wrong_type: other.get_type().to_string(),
dst_span: head,
src_span: other.span(),
}),
}
Value::Error { error, .. } => Err(*error),
other => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "record".into(),
wrong_type: other.get_type().to_string(),
dst_span: call.head,
src_span: other.span(),
}),
},
}
PipelineData::ListStream(..) => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "record".into(),
wrong_type: "stream".into(),
dst_span: call.head,
src_span: input_span,
}),
PipelineData::ExternalStream { .. } => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "record".into(),
wrong_type: "raw data".into(),
dst_span: call.head,
src_span: input_span,
dst_span: head,
src_span: head,
}),
PipelineData::ExternalStream { span, .. } => {
Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "record".into(),
wrong_type: "raw data".into(),
dst_span: head,
src_span: span,
})
}
}
.map(|x| x.set_metadata(metadata))
.map(|data| data.set_metadata(metadata))
}
fn examples(&self) -> Vec<Example> {

View File

@ -1,9 +1,8 @@
use nu_engine::{command_prelude::*, get_eval_block_with_early_return};
use super::utils::chain_error_with_input;
use nu_engine::{command_prelude::*, ClosureEvalOnce};
use nu_protocol::engine::Closure;
use rayon::prelude::*;
use super::utils::chain_error_with_input;
#[derive(Clone)]
pub struct ParEach;
@ -113,16 +112,13 @@ impl Command for ParEach {
}
}
let capture_block: Closure = call.req(engine_state, stack, 0)?;
let head = call.head;
let closure: Closure = call.req(engine_state, stack, 0)?;
let threads: Option<usize> = call.get_flag(engine_state, stack, "threads")?;
let max_threads = threads.unwrap_or(0);
let keep_order = call.has_flag(engine_state, stack, "keep-order")?;
let metadata = input.metadata();
let ctrlc = engine_state.ctrlc.clone();
let outer_ctrlc = engine_state.ctrlc.clone();
let block_id = capture_block.block_id;
let mut stack = stack.captures_to_stack(capture_block.captures);
let span = call.head;
// A helper function sorts the output if needed
let apply_order = |mut vec: Vec<(usize, Value)>| {
@ -135,8 +131,6 @@ impl Command for ParEach {
vec.into_iter().map(|(_, val)| val)
};
let eval_block_with_early_return = get_eval_block_with_early_return(engine_state);
match input {
PipelineData::Empty => Ok(PipelineData::Empty),
PipelineData::Value(value, ..) => {
@ -144,74 +138,51 @@ impl Command for ParEach {
match value {
Value::List { vals, .. } => Ok(create_pool(max_threads)?.install(|| {
let vec = vals
.par_iter()
.into_par_iter()
.enumerate()
.map(move |(index, x)| {
let block = engine_state.get_block(block_id);
.map(move |(index, value)| {
let span = value.span();
let is_error = value.is_error();
let result =
ClosureEvalOnce::new(engine_state, stack, closure.clone())
.run_with_value(value);
let mut stack = stack.clone();
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
stack.add_var(*var_id, x.clone());
}
}
let val_span = x.span();
let x_is_error = x.is_error();
let val = match eval_block_with_early_return(
engine_state,
&mut stack,
block,
x.clone().into_pipeline_data(),
) {
Ok(v) => v.into_value(span),
Err(error) => Value::error(
chain_error_with_input(error, x_is_error, val_span),
val_span,
let value = match result {
Ok(data) => data.into_value(span),
Err(err) => Value::error(
chain_error_with_input(err, is_error, span),
span,
),
};
(index, val)
(index, value)
})
.collect::<Vec<_>>();
apply_order(vec).into_pipeline_data(ctrlc)
apply_order(vec).into_pipeline_data(engine_state.ctrlc.clone())
})),
Value::Range { val, .. } => Ok(create_pool(max_threads)?.install(|| {
let ctrlc = engine_state.ctrlc.clone();
let vec = val
.into_range_iter(span, ctrlc.clone())
.enumerate()
.par_bridge()
.map(move |(index, x)| {
let block = engine_state.get_block(block_id);
.map(move |(index, value)| {
let span = value.span();
let is_error = value.is_error();
let result =
ClosureEvalOnce::new(engine_state, stack, closure.clone())
.run_with_value(value);
let mut stack = stack.clone();
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
stack.add_var(*var_id, x.clone());
}
}
let val_span = x.span();
let x_is_error = x.is_error();
let val = match eval_block_with_early_return(
engine_state,
&mut stack,
block,
x.into_pipeline_data(),
) {
Ok(v) => v.into_value(span),
Err(error) => Value::error(
chain_error_with_input(error, x_is_error, val_span),
val_span,
let value = match result {
Ok(data) => data.into_value(span),
Err(err) => Value::error(
chain_error_with_input(err, is_error, span),
span,
),
};
(index, val)
(index, value)
})
.collect::<Vec<_>>();
@ -220,20 +191,7 @@ impl Command for ParEach {
// This match allows non-iterables to be accepted,
// which is currently considered undesirable (Nov 2022).
value => {
let block = engine_state.get_block(block_id);
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
stack.add_var(*var_id, value.clone());
}
}
eval_block_with_early_return(
engine_state,
&mut stack,
block,
value.into_pipeline_data(),
)
ClosureEvalOnce::new(engine_state, stack, closure).run_with_value(value)
}
}
}
@ -241,38 +199,24 @@ impl Command for ParEach {
let vec = stream
.enumerate()
.par_bridge()
.map(move |(index, x)| {
let block = engine_state.get_block(block_id);
.map(move |(index, value)| {
let span = value.span();
let is_error = value.is_error();
let result = ClosureEvalOnce::new(engine_state, stack, closure.clone())
.run_with_value(value);
let mut stack = stack.clone();
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
stack.add_var(*var_id, x.clone());
let value = match result {
Ok(data) => data.into_value(head),
Err(err) => {
Value::error(chain_error_with_input(err, is_error, span), span)
}
}
let val_span = x.span();
let x_is_error = x.is_error();
let val = match eval_block_with_early_return(
engine_state,
&mut stack,
block,
x.into_pipeline_data(),
) {
Ok(v) => v.into_value(span),
Err(error) => Value::error(
chain_error_with_input(error, x_is_error, val_span),
val_span,
),
};
(index, val)
(index, value)
})
.collect::<Vec<_>>();
apply_order(vec).into_pipeline_data(ctrlc)
apply_order(vec).into_pipeline_data(engine_state.ctrlc.clone())
})),
PipelineData::ExternalStream { stdout: None, .. } => Ok(PipelineData::empty()),
PipelineData::ExternalStream {
@ -282,41 +226,26 @@ impl Command for ParEach {
let vec = stream
.enumerate()
.par_bridge()
.map(move |(index, x)| {
let x = match x {
Ok(x) => x,
Err(err) => return (index, Value::error(err, span)),
.map(move |(index, value)| {
let value = match value {
Ok(value) => value,
Err(err) => return (index, Value::error(err, head)),
};
let block = engine_state.get_block(block_id);
let value = ClosureEvalOnce::new(engine_state, stack, closure.clone())
.run_with_value(value)
.map(|data| data.into_value(head))
.unwrap_or_else(|err| Value::error(err, head));
let mut stack = stack.clone();
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
stack.add_var(*var_id, x.clone());
}
}
let val = match eval_block_with_early_return(
engine_state,
&mut stack,
block,
x.into_pipeline_data(),
) {
Ok(v) => v.into_value(span),
Err(error) => Value::error(error, span),
};
(index, val)
(index, value)
})
.collect::<Vec<_>>();
apply_order(vec).into_pipeline_data(ctrlc)
apply_order(vec).into_pipeline_data(engine_state.ctrlc.clone())
})),
}
.and_then(|x| x.filter(|v| !v.is_nothing(), outer_ctrlc))
.map(|res| res.set_metadata(metadata))
.and_then(|x| x.filter(|v| !v.is_nothing(), engine_state.ctrlc.clone()))
.map(|data| data.set_metadata(metadata))
}
}

View File

@ -1,4 +1,4 @@
use nu_engine::{command_prelude::*, get_eval_block_with_early_return};
use nu_engine::{command_prelude::*, ClosureEval};
use nu_protocol::engine::Closure;
#[derive(Clone)]
@ -88,72 +88,37 @@ impl Command for Reduce {
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let span = call.head;
let head = call.head;
let fold: Option<Value> = call.get_flag(engine_state, stack, "fold")?;
let capture_block: Closure = call.req(engine_state, stack, 0)?;
let mut stack = stack.captures_to_stack(capture_block.captures);
let block = engine_state.get_block(capture_block.block_id);
let ctrlc = engine_state.ctrlc.clone();
let eval_block_with_early_return = get_eval_block_with_early_return(engine_state);
let closure: Closure = call.req(engine_state, stack, 0)?;
let orig_env_vars = stack.env_vars.clone();
let orig_env_hidden = stack.env_hidden.clone();
let mut iter = input.into_iter();
// To enumerate over the input (for the index argument),
// it must be converted into an iterator using into_iter().
let mut input_iter = input.into_iter();
let start_val = if let Some(val) = fold {
val
} else if let Some(val) = input_iter.next() {
val
} else {
return Err(ShellError::GenericError {
let mut acc = fold
.or_else(|| iter.next())
.ok_or_else(|| ShellError::GenericError {
error: "Expected input".into(),
msg: "needs input".into(),
span: Some(span),
span: Some(head),
help: None,
inner: vec![],
});
};
})?;
let mut acc = start_val;
let mut closure = ClosureEval::new(engine_state, stack, closure);
for x in input_iter {
// with_env() is used here to ensure that each iteration uses
// a different set of environment variables.
// Hence, a 'cd' in the first loop won't affect the next loop.
stack.with_env(&orig_env_vars, &orig_env_hidden);
// Element argument
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
stack.add_var(*var_id, x);
}
}
// Accumulator argument
if let Some(var) = block.signature.get_positional(1) {
if let Some(var_id) = &var.var_id {
stack.add_var(*var_id, acc);
}
}
acc = eval_block_with_early_return(
engine_state,
&mut stack,
block,
PipelineData::empty(),
)?
.into_value(span);
if nu_utils::ctrl_c::was_pressed(&ctrlc) {
for value in iter {
if nu_utils::ctrl_c::was_pressed(&engine_state.ctrlc) {
break;
}
acc = closure
.add_arg(value)
.add_arg(acc)
.run_with_input(PipelineData::Empty)?
.into_value(head);
}
Ok(acc.with_span(span).into_pipeline_data())
Ok(acc.with_span(head).into_pipeline_data())
}
}

View File

@ -1,5 +1,5 @@
use indexmap::IndexMap;
use nu_engine::{command_prelude::*, get_eval_block_with_early_return};
use nu_engine::{command_prelude::*, ClosureEval};
use nu_protocol::engine::Closure;
#[derive(Clone)]
@ -104,6 +104,8 @@ fn rename(
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let head = call.head;
let columns: Vec<String> = call.rest(engine_state, stack, 0)?;
let specified_column: Option<Record> = call.get_flag(engine_state, stack, "column")?;
// convert from Record to HashMap for easily query.
let specified_column: Option<IndexMap<String, String>> = match specified_column {
@ -133,24 +135,11 @@ fn rename(
}
None => None,
};
let block_info =
if let Some(capture_block) = call.get_flag::<Closure>(engine_state, stack, "block")? {
let engine_state = engine_state.clone();
let block = engine_state.get_block(capture_block.block_id).clone();
let stack = stack.captures_to_stack(capture_block.captures);
let orig_env_vars = stack.env_vars.clone();
let orig_env_hidden = stack.env_hidden.clone();
Some((engine_state, block, stack, orig_env_vars, orig_env_hidden))
} else {
None
};
let closure: Option<Closure> = call.get_flag(engine_state, stack, "block")?;
let mut closure = closure.map(|closure| ClosureEval::new(engine_state, stack, closure));
let columns: Vec<String> = call.rest(engine_state, stack, 0)?;
let metadata = input.metadata();
let eval_block_with_early_return = get_eval_block_with_early_return(engine_state);
let head_span = call.head;
input
.map(
move |item| {
@ -158,31 +147,14 @@ fn rename(
match item {
Value::Record { val: record, .. } => {
let record =
if let Some((engine_state, block, mut stack, env_vars, env_hidden)) =
block_info.clone()
{
if let Some(closure) = &mut closure {
record
.into_owned().into_iter()
.map(|(col, val)| {
stack.with_env(&env_vars, &env_hidden);
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
stack.add_var(
*var_id,
Value::string(col.clone(), span),
)
}
}
eval_block_with_early_return(
&engine_state,
&mut stack,
&block,
Value::string(col, span).into_pipeline_data(),
)
.and_then(|data| data.collect_string_strict(span))
.map(|(col, _, _)| (col, val))
let col = Value::string(col, span);
let data = closure.run_with_value(col)?;
let col = data.collect_string_strict(span)?.0;
Ok((col, val))
})
.collect::<Result<Record, _>>()
} else {
@ -214,7 +186,7 @@ fn rename(
Err(ShellError::UnsupportedInput {
msg: format!("The column '{missing}' does not exist in the input"),
input: "value originated from here".into(),
msg_span: head_span,
msg_span: head,
input_span: span,
})
} else {
@ -242,16 +214,16 @@ fn rename(
ShellError::OnlySupportsThisInputType {
exp_input_type: "record".into(),
wrong_type: other.get_type().to_string(),
dst_span: head_span,
dst_span: head,
src_span: other.span(),
},
head_span,
head,
),
}
},
engine_state.ctrlc.clone(),
)
.map(|x| x.set_metadata(metadata))
.map(|data| data.set_metadata(metadata))
}
#[cfg(test)]

View File

@ -1,4 +1,4 @@
use nu_engine::{command_prelude::*, get_eval_block};
use nu_engine::{command_prelude::*, ClosureEval};
use nu_protocol::engine::Closure;
#[derive(Clone)]
@ -74,33 +74,21 @@ impl Command for SkipUntil {
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let span = call.head;
let head = call.head;
let closure: Closure = call.req(engine_state, stack, 0)?;
let mut closure = ClosureEval::new(engine_state, stack, closure);
let metadata = input.metadata();
let capture_block: Closure = call.req(engine_state, stack, 0)?;
let block = engine_state.get_block(capture_block.block_id).clone();
let var_id = block.signature.get_positional(0).and_then(|arg| arg.var_id);
let mut stack = stack.captures_to_stack(capture_block.captures);
let ctrlc = engine_state.ctrlc.clone();
let engine_state = engine_state.clone();
let eval_block = get_eval_block(&engine_state);
Ok(input
.into_iter_strict(span)?
.into_iter_strict(head)?
.skip_while(move |value| {
if let Some(var_id) = var_id {
stack.add_var(var_id, value.clone());
}
!eval_block(&engine_state, &mut stack, &block, PipelineData::empty())
.map_or(false, |pipeline_data| {
pipeline_data.into_value(span).is_true()
})
closure
.run_with_value(value.clone())
.map(|data| data.into_value(head).is_false())
.unwrap_or(false)
})
.into_pipeline_data_with_metadata(metadata, ctrlc))
.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone()))
}
}

View File

@ -1,4 +1,4 @@
use nu_engine::{command_prelude::*, get_eval_block};
use nu_engine::{command_prelude::*, ClosureEval};
use nu_protocol::engine::Closure;
#[derive(Clone)]
@ -79,33 +79,21 @@ impl Command for SkipWhile {
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let span = call.head;
let head = call.head;
let closure: Closure = call.req(engine_state, stack, 0)?;
let mut closure = ClosureEval::new(engine_state, stack, closure);
let metadata = input.metadata();
let capture_block: Closure = call.req(engine_state, stack, 0)?;
let block = engine_state.get_block(capture_block.block_id).clone();
let var_id = block.signature.get_positional(0).and_then(|arg| arg.var_id);
let mut stack = stack.captures_to_stack(capture_block.captures);
let ctrlc = engine_state.ctrlc.clone();
let engine_state = engine_state.clone();
let eval_block = get_eval_block(&engine_state);
Ok(input
.into_iter_strict(span)?
.into_iter_strict(head)?
.skip_while(move |value| {
if let Some(var_id) = var_id {
stack.add_var(var_id, value.clone());
}
eval_block(&engine_state, &mut stack, &block, PipelineData::empty())
.map_or(false, |pipeline_data| {
pipeline_data.into_value(span).is_true()
})
closure
.run_with_value(value.clone())
.map(|data| data.into_value(head).is_true())
.unwrap_or(false)
})
.into_pipeline_data_with_metadata(metadata, ctrlc))
.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone()))
}
}

View File

@ -1,4 +1,4 @@
use nu_engine::{command_prelude::*, get_eval_block};
use nu_engine::{command_prelude::*, ClosureEval};
use nu_protocol::engine::Closure;
#[derive(Clone)]
@ -70,34 +70,21 @@ impl Command for TakeUntil {
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let head = call.head;
let closure: Closure = call.req(engine_state, stack, 0)?;
let mut closure = ClosureEval::new(engine_state, stack, closure);
let metadata = input.metadata();
let span = call.head;
let capture_block: Closure = call.req(engine_state, stack, 0)?;
let block = engine_state.get_block(capture_block.block_id).clone();
let var_id = block.signature.get_positional(0).and_then(|arg| arg.var_id);
let mut stack = stack.captures_to_stack(capture_block.captures);
let ctrlc = engine_state.ctrlc.clone();
let engine_state = engine_state.clone();
let eval_block = get_eval_block(&engine_state);
Ok(input
.into_iter_strict(span)?
.into_iter_strict(head)?
.take_while(move |value| {
if let Some(var_id) = var_id {
stack.add_var(var_id, value.clone());
}
!eval_block(&engine_state, &mut stack, &block, PipelineData::empty())
.map_or(false, |pipeline_data| {
pipeline_data.into_value(span).is_true()
})
closure
.run_with_value(value.clone())
.map(|data| data.into_value(head).is_false())
.unwrap_or(false)
})
.into_pipeline_data_with_metadata(metadata, ctrlc))
.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone()))
}
}

View File

@ -1,4 +1,4 @@
use nu_engine::{command_prelude::*, get_eval_block};
use nu_engine::{command_prelude::*, ClosureEval};
use nu_protocol::engine::Closure;
#[derive(Clone)]
@ -70,34 +70,21 @@ impl Command for TakeWhile {
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let head = call.head;
let closure: Closure = call.req(engine_state, stack, 0)?;
let mut closure = ClosureEval::new(engine_state, stack, closure);
let metadata = input.metadata();
let span = call.head;
let capture_block: Closure = call.req(engine_state, stack, 0)?;
let block = engine_state.get_block(capture_block.block_id).clone();
let var_id = block.signature.get_positional(0).and_then(|arg| arg.var_id);
let mut stack = stack.captures_to_stack(capture_block.captures);
let ctrlc = engine_state.ctrlc.clone();
let engine_state = engine_state.clone();
let eval_block = get_eval_block(&engine_state);
Ok(input
.into_iter_strict(span)?
.into_iter_strict(head)?
.take_while(move |value| {
if let Some(var_id) = var_id {
stack.add_var(var_id, value.clone());
}
eval_block(&engine_state, &mut stack, &block, PipelineData::empty())
.map_or(false, |pipeline_data| {
pipeline_data.into_value(span).is_true()
})
closure
.run_with_value(value.clone())
.map(|data| data.into_value(head).is_true())
.unwrap_or(false)
})
.into_pipeline_data_with_metadata(metadata, ctrlc))
.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone()))
}
}

View File

@ -1,8 +1,5 @@
use nu_engine::{command_prelude::*, get_eval_block, EvalBlockFn};
use nu_protocol::{
ast::{Block, PathMember},
engine::Closure,
};
use nu_engine::{command_prelude::*, ClosureEval, ClosureEvalOnce};
use nu_protocol::ast::PathMember;
#[derive(Clone)]
pub struct Update;
@ -111,46 +108,33 @@ fn update(
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let span = call.head;
let head = call.head;
let cell_path: CellPath = call.req(engine_state, stack, 0)?;
let replacement: Value = call.req(engine_state, stack, 1)?;
let replacement_span = replacement.span();
let ctrlc = engine_state.ctrlc.clone();
let eval_block = get_eval_block(engine_state);
match input {
PipelineData::Value(mut value, metadata) => {
if let Value::Closure { val: closure, .. } = replacement {
if let Value::Closure { val, .. } = replacement {
match (cell_path.members.first(), &mut value) {
(Some(PathMember::String { .. }), Value::List { vals, .. }) => {
let block = engine_state.get_block(closure.block_id);
let stack = stack.captures_to_stack(closure.captures);
let mut closure = ClosureEval::new(engine_state, stack, val);
for val in vals {
let mut stack = stack.clone();
update_value_by_closure(
val,
replacement_span,
engine_state,
&mut stack,
block,
&mut closure,
head,
&cell_path.members,
false,
eval_block,
)?;
}
}
(first, _) => {
update_single_value_by_closure(
&mut value,
closure,
replacement_span,
engine_state,
stack,
ClosureEvalOnce::new(engine_state, stack, val),
head,
&cell_path.members,
matches!(first, Some(PathMember::Int { .. })),
eval_block,
)?;
}
}
@ -187,16 +171,13 @@ fn update(
// cannot fail since loop above does at least one iteration or returns an error
let value = pre_elems.last_mut().expect("one element");
if let Value::Closure { val: closure, .. } = replacement {
if let Value::Closure { val, .. } = replacement {
update_single_value_by_closure(
value,
closure,
replacement_span,
engine_state,
stack,
ClosureEvalOnce::new(engine_state, stack, val),
head,
path,
true,
eval_block,
)?;
} else {
value.update_data_at_cell_path(path, replacement)?;
@ -205,121 +186,95 @@ fn update(
Ok(pre_elems
.into_iter()
.chain(stream)
.into_pipeline_data_with_metadata(metadata, ctrlc))
} else if let Value::Closure { val: closure, .. } = replacement {
let engine_state = engine_state.clone();
let block = engine_state.get_block(closure.block_id).clone();
let stack = stack.captures_to_stack(closure.captures);
.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone()))
} else if let Value::Closure { val, .. } = replacement {
let mut closure = ClosureEval::new(engine_state, stack, val);
Ok(stream
.map(move |mut input| {
// Recreate the stack for each iteration to
// isolate environment variable changes, etc.
let mut stack = stack.clone();
.map(move |mut value| {
let err = update_value_by_closure(
&mut input,
replacement_span,
&engine_state,
&mut stack,
&block,
&mut value,
&mut closure,
head,
&cell_path.members,
false,
eval_block,
);
if let Err(e) = err {
Value::error(e, span)
Value::error(e, head)
} else {
input
value
}
})
.into_pipeline_data_with_metadata(metadata, ctrlc))
.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone()))
} else {
Ok(stream
.map(move |mut input| {
.map(move |mut value| {
if let Err(e) =
input.update_data_at_cell_path(&cell_path.members, replacement.clone())
value.update_data_at_cell_path(&cell_path.members, replacement.clone())
{
Value::error(e, span)
Value::error(e, head)
} else {
input
value
}
})
.into_pipeline_data_with_metadata(metadata, ctrlc))
.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone()))
}
}
PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
type_name: "empty pipeline".to_string(),
span,
span: head,
}),
PipelineData::ExternalStream { .. } => Err(ShellError::IncompatiblePathAccess {
type_name: "external stream".to_string(),
span,
span: head,
}),
}
}
#[allow(clippy::too_many_arguments)]
fn update_value_by_closure(
value: &mut Value,
closure: &mut ClosureEval,
span: Span,
engine_state: &EngineState,
stack: &mut Stack,
block: &Block,
cell_path: &[PathMember],
first_path_member_int: bool,
eval_block_fn: EvalBlockFn,
) -> Result<(), ShellError> {
let input_at_path = value.clone().follow_cell_path(cell_path, false)?;
let value_at_path = value.clone().follow_cell_path(cell_path, false)?;
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
stack.add_var(
*var_id,
if first_path_member_int {
input_at_path.clone()
} else {
value.clone()
},
)
}
}
let arg = if first_path_member_int {
&value_at_path
} else {
&*value
};
let output = eval_block_fn(
engine_state,
stack,
block,
input_at_path.into_pipeline_data(),
)?;
let new_value = closure
.add_arg(arg.clone())
.run_with_input(value_at_path.into_pipeline_data())?
.into_value(span);
value.update_data_at_cell_path(cell_path, output.into_value(span))
value.update_data_at_cell_path(cell_path, new_value)
}
#[allow(clippy::too_many_arguments)]
fn update_single_value_by_closure(
value: &mut Value,
closure: Closure,
closure: ClosureEvalOnce,
span: Span,
engine_state: &EngineState,
stack: &mut Stack,
cell_path: &[PathMember],
first_path_member_int: bool,
eval_block_fn: EvalBlockFn,
) -> Result<(), ShellError> {
let block = engine_state.get_block(closure.block_id);
let mut stack = stack.captures_to_stack(closure.captures);
let value_at_path = value.clone().follow_cell_path(cell_path, false)?;
update_value_by_closure(
value,
span,
engine_state,
&mut stack,
block,
cell_path,
first_path_member_int,
eval_block_fn,
)
let arg = if first_path_member_int {
&value_at_path
} else {
&*value
};
let new_value = closure
.add_arg(arg.clone())
.run_with_input(value_at_path.into_pipeline_data())?
.into_value(span);
value.update_data_at_cell_path(cell_path, new_value)
}
#[cfg(test)]

View File

@ -1,8 +1,5 @@
use nu_engine::{command_prelude::*, get_eval_block, EvalBlockFn};
use nu_protocol::{
ast::{Block, PathMember},
engine::Closure,
};
use nu_engine::{command_prelude::*, ClosureEval, ClosureEvalOnce};
use nu_protocol::ast::PathMember;
#[derive(Clone)]
pub struct Upsert;
@ -157,46 +154,33 @@ fn upsert(
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let span = call.head;
let head = call.head;
let cell_path: CellPath = call.req(engine_state, stack, 0)?;
let replacement: Value = call.req(engine_state, stack, 1)?;
let replacement_span = replacement.span();
let eval_block = get_eval_block(engine_state);
let ctrlc = engine_state.ctrlc.clone();
match input {
PipelineData::Value(mut value, metadata) => {
if let Value::Closure { val: closure, .. } = replacement {
if let Value::Closure { val, .. } = replacement {
match (cell_path.members.first(), &mut value) {
(Some(PathMember::String { .. }), Value::List { vals, .. }) => {
let block = engine_state.get_block(closure.block_id);
let stack = stack.captures_to_stack(closure.captures);
let mut closure = ClosureEval::new(engine_state, stack, val);
for val in vals {
let mut stack = stack.clone();
upsert_value_by_closure(
val,
replacement_span,
engine_state,
&mut stack,
block,
&mut closure,
head,
&cell_path.members,
false,
eval_block,
)?;
}
}
(first, _) => {
upsert_single_value_by_closure(
&mut value,
closure,
replacement_span,
engine_state,
stack,
ClosureEvalOnce::new(engine_state, stack, val),
head,
&cell_path.members,
matches!(first, Some(PathMember::Int { .. })),
eval_block,
)?;
}
}
@ -228,169 +212,129 @@ fn upsert(
}
}
if path.is_empty() {
let value = stream.next().unwrap_or(Value::nothing(span));
if let Value::Closure { val: closure, .. } = replacement {
let block = engine_state.get_block(closure.block_id);
let mut stack = stack.captures_to_stack(closure.captures);
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
stack.add_var(*var_id, value.clone())
}
}
let output = eval_block(
engine_state,
&mut stack,
block,
value.clone().into_pipeline_data(),
)?;
pre_elems.push(output.into_value(replacement_span));
let value = if path.is_empty() {
let value = stream.next().unwrap_or(Value::nothing(head));
if let Value::Closure { val, .. } = replacement {
ClosureEvalOnce::new(engine_state, stack, val)
.run_with_value(value)?
.into_value(head)
} else {
pre_elems.push(replacement);
replacement
}
} else if let Some(mut value) = stream.next() {
if let Value::Closure { val: closure, .. } = replacement {
if let Value::Closure { val, .. } = replacement {
upsert_single_value_by_closure(
&mut value,
closure,
replacement_span,
engine_state,
stack,
ClosureEvalOnce::new(engine_state, stack, val),
head,
path,
true,
eval_block,
)?;
} else {
value.upsert_data_at_cell_path(path, replacement)?;
}
pre_elems.push(value)
value
} else {
return Err(ShellError::AccessBeyondEnd {
max_idx: pre_elems.len() - 1,
span: path_span,
});
}
};
pre_elems.push(value);
Ok(pre_elems
.into_iter()
.chain(stream)
.into_pipeline_data_with_metadata(metadata, ctrlc))
} else if let Value::Closure { val: closure, .. } = replacement {
let engine_state = engine_state.clone();
let block = engine_state.get_block(closure.block_id).clone();
let stack = stack.captures_to_stack(closure.captures);
.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone()))
} else if let Value::Closure { val, .. } = replacement {
let mut closure = ClosureEval::new(engine_state, stack, val);
Ok(stream
.map(move |mut input| {
// Recreate the stack for each iteration to
// isolate environment variable changes, etc.
let mut stack = stack.clone();
.map(move |mut value| {
let err = upsert_value_by_closure(
&mut input,
replacement_span,
&engine_state,
&mut stack,
&block,
&mut value,
&mut closure,
head,
&cell_path.members,
false,
eval_block,
);
if let Err(e) = err {
Value::error(e, span)
Value::error(e, head)
} else {
input
value
}
})
.into_pipeline_data_with_metadata(metadata, ctrlc))
.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone()))
} else {
Ok(stream
.map(move |mut input| {
.map(move |mut value| {
if let Err(e) =
input.upsert_data_at_cell_path(&cell_path.members, replacement.clone())
value.upsert_data_at_cell_path(&cell_path.members, replacement.clone())
{
Value::error(e, span)
Value::error(e, head)
} else {
input
value
}
})
.into_pipeline_data_with_metadata(metadata, ctrlc))
.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone()))
}
}
PipelineData::Empty => Err(ShellError::IncompatiblePathAccess {
type_name: "empty pipeline".to_string(),
span,
span: head,
}),
PipelineData::ExternalStream { .. } => Err(ShellError::IncompatiblePathAccess {
type_name: "external stream".to_string(),
span,
span: head,
}),
}
}
#[allow(clippy::too_many_arguments)]
fn upsert_value_by_closure(
value: &mut Value,
closure: &mut ClosureEval,
span: Span,
engine_state: &EngineState,
stack: &mut Stack,
block: &Block,
cell_path: &[PathMember],
first_path_member_int: bool,
eval_block_fn: EvalBlockFn,
) -> Result<(), ShellError> {
let input_at_path = value.clone().follow_cell_path(cell_path, false);
let value_at_path = value.clone().follow_cell_path(cell_path, false);
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
stack.add_var(
*var_id,
if first_path_member_int {
input_at_path.clone().unwrap_or(Value::nothing(span))
} else {
value.clone()
},
)
}
}
let arg = if first_path_member_int {
value_at_path.clone().unwrap_or(Value::nothing(span))
} else {
value.clone()
};
let input_at_path = input_at_path
let input = value_at_path
.map(IntoPipelineData::into_pipeline_data)
.unwrap_or(PipelineData::Empty);
let output = eval_block_fn(engine_state, stack, block, input_at_path)?;
value.upsert_data_at_cell_path(cell_path, output.into_value(span))
let new_value = closure.add_arg(arg).run_with_input(input)?.into_value(span);
value.upsert_data_at_cell_path(cell_path, new_value)
}
#[allow(clippy::too_many_arguments)]
fn upsert_single_value_by_closure(
value: &mut Value,
closure: Closure,
closure: ClosureEvalOnce,
span: Span,
engine_state: &EngineState,
stack: &mut Stack,
cell_path: &[PathMember],
first_path_member_int: bool,
eval_block_fn: EvalBlockFn,
) -> Result<(), ShellError> {
let block = engine_state.get_block(closure.block_id);
let mut stack = stack.captures_to_stack(closure.captures);
let value_at_path = value.clone().follow_cell_path(cell_path, false);
upsert_value_by_closure(
value,
span,
engine_state,
&mut stack,
block,
cell_path,
first_path_member_int,
eval_block_fn,
)
let arg = if first_path_member_int {
value_at_path.clone().unwrap_or(Value::nothing(span))
} else {
value.clone()
};
let input = value_at_path
.map(IntoPipelineData::into_pipeline_data)
.unwrap_or(PipelineData::Empty);
let new_value = closure.add_arg(arg).run_with_input(input)?.into_value(span);
value.upsert_data_at_cell_path(cell_path, new_value)
}
#[cfg(test)]

View File

@ -1,4 +1,4 @@
use nu_engine::{get_eval_block, CallExt};
use nu_engine::{CallExt, ClosureEval};
use nu_protocol::{
ast::Call,
engine::{Closure, EngineState, Stack},
@ -26,44 +26,22 @@ pub fn boolean_fold(
input: PipelineData,
accumulator: bool,
) -> Result<PipelineData, ShellError> {
let span = call.head;
let head = call.head;
let closure: Closure = call.req(engine_state, stack, 0)?;
let capture_block: Closure = call.req(engine_state, stack, 0)?;
let block_id = capture_block.block_id;
let mut closure = ClosureEval::new(engine_state, stack, closure);
let block = engine_state.get_block(block_id);
let var_id = block.signature.get_positional(0).and_then(|arg| arg.var_id);
let mut stack = stack.captures_to_stack(capture_block.captures);
let orig_env_vars = stack.env_vars.clone();
let orig_env_hidden = stack.env_hidden.clone();
let ctrlc = engine_state.ctrlc.clone();
let eval_block = get_eval_block(engine_state);
for value in input.into_interruptible_iter(ctrlc) {
// with_env() is used here to ensure that each iteration uses
// a different set of environment variables.
// Hence, a 'cd' in the first loop won't affect the next loop.
stack.with_env(&orig_env_vars, &orig_env_hidden);
if let Some(var_id) = var_id {
stack.add_var(var_id, value.clone());
for value in input {
if nu_utils::ctrl_c::was_pressed(&engine_state.ctrlc) {
break;
}
let eval = eval_block(engine_state, &mut stack, block, value.into_pipeline_data());
match eval {
Err(e) => {
return Err(e);
}
Ok(pipeline_data) => {
if pipeline_data.into_value(span).is_true() == accumulator {
return Ok(Value::bool(accumulator, span).into_pipeline_data());
}
}
let pred = closure.run_with_value(value)?.into_value(head).is_true();
if pred == accumulator {
return Ok(Value::bool(accumulator, head).into_pipeline_data());
}
}
Ok(Value::bool(!accumulator, span).into_pipeline_data())
Ok(Value::bool(!accumulator, head).into_pipeline_data())
}

View File

@ -1,4 +1,4 @@
use nu_engine::{command_prelude::*, get_eval_block};
use nu_engine::{command_prelude::*, ClosureEval};
use nu_protocol::engine::Closure;
#[derive(Clone)]
@ -49,54 +49,19 @@ not supported."#
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let head = call.head;
let closure: Closure = call.req(engine_state, stack, 0)?;
let span = call.head;
let mut closure = ClosureEval::new(engine_state, stack, closure);
let metadata = input.metadata();
let mut stack = stack.captures_to_stack(closure.captures);
let block = engine_state.get_block(closure.block_id).clone();
let orig_env_vars = stack.env_vars.clone();
let orig_env_hidden = stack.env_hidden.clone();
let ctrlc = engine_state.ctrlc.clone();
let engine_state = engine_state.clone();
let eval_block = get_eval_block(&engine_state);
Ok(input
.into_iter_strict(span)?
.filter_map(move |value| {
stack.with_env(&orig_env_vars, &orig_env_hidden);
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
stack.add_var(*var_id, value.clone());
}
}
let result = eval_block(
&engine_state,
&mut stack,
&block,
// clone() is used here because x is given to Ok() below.
value.clone().into_pipeline_data(),
);
match result {
Ok(result) => {
let result = result.into_value(span);
if result.is_true() {
Some(value)
} else {
None
}
}
Err(err) => Some(Value::error(err, span)),
}
.into_iter_strict(head)?
.filter_map(move |value| match closure.run_with_value(value.clone()) {
Ok(data) => data.into_value(head).is_true().then_some(value),
Err(err) => Some(Value::error(err, head)),
})
.into_pipeline_data_with_metadata(metadata, ctrlc))
.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone()))
}
fn examples(&self) -> Vec<Example> {

View File

@ -1,4 +1,4 @@
use nu_engine::{command_prelude::*, get_eval_block_with_early_return};
use nu_engine::{command_prelude::*, ClosureEvalOnce};
#[derive(Clone)]
pub struct Zip;
@ -98,26 +98,21 @@ impl Command for Zip {
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let head = call.head;
let ctrlc = engine_state.ctrlc.clone();
let metadata = input.metadata();
let eval_block_with_early_return = get_eval_block_with_early_return(engine_state);
let other = call.req(engine_state, stack, 0)?;
let other: PipelineData = match call.req(engine_state, stack, 0)? {
let metadata = input.metadata();
let other = if let Value::Closure { val, .. } = other {
// If a closure was provided, evaluate it and consume its stream output
Value::Closure { val, .. } => {
let block = engine_state.get_block(val.block_id);
let mut stack = stack.captures_to_stack(val.captures);
eval_block_with_early_return(engine_state, &mut stack, block, PipelineData::Empty)?
}
// If any other value, use it as-is.
val => val.into_pipeline_data(),
ClosureEvalOnce::new(engine_state, stack, val).run_with_input(PipelineData::Empty)?
} else {
other.into_pipeline_data()
};
Ok(input
.into_iter()
.zip(other)
.map(move |(x, y)| Value::list(vec![x, y], head))
.into_pipeline_data_with_metadata(metadata, ctrlc))
.into_pipeline_data_with_metadata(metadata, engine_state.ctrlc.clone()))
}
}

View File

@ -1,5 +1,5 @@
use itertools::unfold;
use nu_engine::{command_prelude::*, get_eval_block_with_early_return};
use nu_engine::{command_prelude::*, ClosureEval};
use nu_protocol::engine::Closure;
#[derive(Clone)]
@ -91,43 +91,19 @@ used as the next argument to the closure, otherwise generation stops.
call: &Call,
_input: PipelineData,
) -> Result<PipelineData, ShellError> {
let head = call.head;
let initial: Value = call.req(engine_state, stack, 0)?;
let capture_block: Spanned<Closure> = call.req(engine_state, stack, 1)?;
let block_span = capture_block.span;
let block = engine_state.get_block(capture_block.item.block_id).clone();
let ctrlc = engine_state.ctrlc.clone();
let engine_state = engine_state.clone();
let mut stack = stack.captures_to_stack(capture_block.item.captures);
let orig_env_vars = stack.env_vars.clone();
let orig_env_hidden = stack.env_hidden.clone();
let eval_block_with_early_return = get_eval_block_with_early_return(&engine_state);
let closure: Closure = call.req(engine_state, stack, 1)?;
let mut closure = ClosureEval::new(engine_state, stack, closure);
// A type of Option<S> is used to represent state. Invocation
// will stop on None. Using Option<S> allows functions to output
// one final value before stopping.
let iter = unfold(Some(initial), move |state| {
let arg = match state {
Some(state) => state.clone(),
None => return None,
};
let arg = state.take()?;
// with_env() is used here to ensure that each iteration uses
// a different set of environment variables.
// Hence, a 'cd' in the first loop won't affect the next loop.
stack.with_env(&orig_env_vars, &orig_env_hidden);
if let Some(var) = block.signature.get_positional(0) {
if let Some(var_id) = &var.var_id {
stack.add_var(*var_id, arg.clone());
}
}
let (output, next_input) = match eval_block_with_early_return(
&engine_state,
&mut stack,
&block,
arg.into_pipeline_data(),
) {
let (output, next_input) = match closure.run_with_value(arg) {
// no data -> output nothing and stop.
Ok(PipelineData::Empty) => (None, None),
@ -154,7 +130,7 @@ used as the next argument to the closure, otherwise generation stops.
help: None,
inner: vec![],
};
err = Some(Value::error(error, block_span));
err = Some(Value::error(error, head));
break;
}
}
@ -176,13 +152,13 @@ used as the next argument to the closure, otherwise generation stops.
inner: vec![],
};
(Some(Value::error(error, block_span)), None)
(Some(Value::error(error, head)), None)
}
}
}
Ok(other) => {
let val = other.into_value(block_span);
let val = other.into_value(head);
let error = ShellError::GenericError {
error: "Invalid block return".into(),
msg: format!("Expected record, found {}", val.get_type()),
@ -191,11 +167,11 @@ used as the next argument to the closure, otherwise generation stops.
inner: vec![],
};
(Some(Value::error(error, block_span)), None)
(Some(Value::error(error, head)), None)
}
// error -> error and stop
Err(error) => (Some(Value::error(error, block_span)), None),
Err(error) => (Some(Value::error(error, head)), None),
};
// We use `state` to control when to stop, not `output`. By wrapping
@ -205,7 +181,9 @@ used as the next argument to the closure, otherwise generation stops.
Some(output)
});
Ok(iter.flatten().into_pipeline_data(ctrlc))
Ok(iter
.flatten()
.into_pipeline_data(engine_state.ctrlc.clone()))
}
}