apply all the block run's stream. (#2339)

This commit is contained in:
Andrés N. Robalino 2020-08-12 02:51:24 -05:00 committed by GitHub
parent 87d71604ad
commit 48cfc9b598
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 65 additions and 41 deletions

View File

@ -73,27 +73,35 @@ async fn process_row(
match result {
Ok(mut stream) => {
let values = stream.drain_vec().await;
let errors = context.get_errors();
if let Some(error) = errors.first() {
return Err(error.clone());
}
let result = if values.len() == 1 {
let value = values
.get(0)
.ok_or_else(|| ShellError::unexpected("No value to insert with"))?;
value.clone()
} else if values.is_empty() {
UntaggedValue::nothing().into_untagged_value()
} else {
UntaggedValue::table(&values).into_untagged_value()
};
match input {
obj
@
Value {
value: UntaggedValue::Row(_),
..
} => {
if let Some(result) = stream.next().await {
match obj.insert_data_at_column_path(&column, result) {
Ok(v) => OutputStream::one(ReturnSuccess::value(v)),
Err(e) => OutputStream::one(Err(e)),
}
} else {
OutputStream::empty()
}
}
} => match obj.insert_data_at_column_path(&column, result) {
Ok(v) => OutputStream::one(ReturnSuccess::value(v)),
Err(e) => OutputStream::one(Err(e)),
},
Value { tag, .. } => OutputStream::one(Err(ShellError::labeled_error(
"Unrecognized type in stream",
"original value",

View File

@ -77,31 +77,39 @@ async fn process_row(
match result {
Ok(mut stream) => {
let values = stream.drain_vec().await;
let errors = context.get_errors();
if let Some(error) = errors.first() {
return Err(error.clone());
}
let result = if values.len() == 1 {
let value = values
.get(0)
.ok_or_else(|| ShellError::unexpected("No value to update with"))?;
value.clone()
} else if values.is_empty() {
UntaggedValue::nothing().into_untagged_value()
} else {
UntaggedValue::table(&values).into_untagged_value()
};
match input {
obj
@
Value {
value: UntaggedValue::Row(_),
..
} => {
if let Some(result) = stream.next().await {
match obj.replace_data_at_column_path(&field, result) {
Some(v) => OutputStream::one(ReturnSuccess::value(v)),
None => OutputStream::one(Err(ShellError::labeled_error(
"update could not find place to insert column",
"column name",
obj.tag,
))),
}
} else {
OutputStream::empty()
}
}
} => match obj.replace_data_at_column_path(&field, result) {
Some(v) => OutputStream::one(ReturnSuccess::value(v)),
None => OutputStream::one(Err(ShellError::labeled_error(
"update could not find place to insert column",
"column name",
obj.tag,
))),
},
Value { tag, .. } => OutputStream::one(Err(ShellError::labeled_error(
"Unrecognized type in stream",
"original value",

View File

@ -1,7 +1,7 @@
use nu_test_support::{nu, pipeline};
#[test]
fn insert_plugin() {
fn sets_the_column_from_a_block_run_output() {
let actual = nu!(
cwd: "tests/fixtures/formats", pipeline(
r#"
@ -16,25 +16,17 @@ fn insert_plugin() {
}
#[test]
fn downcase_upcase() {
fn sets_the_column_from_a_block_full_stream_output() {
let actual = nu!(
cwd: ".", pipeline(
cwd: "tests/fixtures/formats", pipeline(
r#"
echo abcd | wrap downcase | insert upcase { echo $it.downcase | str upcase } | format "{downcase}{upcase}"
wrap _
| insert content { open --raw cargo_sample.toml | lines | first 5 }
| get content.1
| str contains "nu"
| echo $it
"#
));
assert_eq!(actual.out, "abcdABCD");
}
#[test]
fn number_and_its_negative_equal_zero() {
let actual = nu!(
cwd: ".", pipeline(
r#"
echo 1..10 | wrap num | insert neg { = $it.num * -1 } | math sum | = $it.num + $it.neg
"#
));
assert_eq!(actual.out, "0");
assert_eq!(actual.out, "true");
}

View File

@ -30,3 +30,19 @@ fn sets_the_column_from_a_block_run_output() {
assert_eq!(actual.out, "0.7.0");
}
#[test]
fn sets_the_column_from_a_block_full_stream_output() {
let actual = nu!(
cwd: "tests/fixtures/formats", pipeline(
r#"
wrap content
| update content { open --raw cargo_sample.toml | lines | first 5 }
| get content.1
| str contains "nu"
| echo $it
"#
));
assert_eq!(actual.out, "true");
}