Another batch of removing async_stream (#1981)

This commit is contained in:
Jonathan Turner 2020-06-13 16:54:35 -07:00 committed by GitHub
parent 86b316e930
commit d7b1480ad0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 266 additions and 278 deletions

View File

@ -3,9 +3,7 @@ use crate::evaluate::evaluate_baseline_expr;
use crate::prelude::*; use crate::prelude::*;
use log::trace; use log::trace;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ use nu_protocol::{hir::ClassifiedCommand, Signature, SyntaxShape, UntaggedValue, Value};
hir::ClassifiedCommand, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value,
};
pub struct KeepUntil; pub struct KeepUntil;
@ -34,80 +32,81 @@ impl WholeStreamCommand for KeepUntil {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = Arc::new(registry.clone());
let scope = args.call_info.scope.clone(); let scope = Arc::new(args.call_info.scope.clone());
let stream = async_stream! {
let mut call_info = args.evaluate_once(&registry).await?; let call_info = args.evaluate_once(&registry).await?;
let block = call_info.args.expect_nth(0)?.clone(); let block = call_info.args.expect_nth(0)?.clone();
let condition = match block { let condition = Arc::new(match block {
Value { Value {
value: UntaggedValue::Block(block), value: UntaggedValue::Block(block),
tag, tag,
} => { } => {
if block.block.len() != 1 { if block.block.len() != 1 {
yield Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Expected a condition", "Expected a condition",
"expected a condition", "expected a condition",
tag, tag,
)); ));
return;
} }
match block.block[0].list.get(0) { match block.block[0].list.get(0) {
Some(item) => match item { Some(item) => match item {
ClassifiedCommand::Expr(expr) => expr.clone(), ClassifiedCommand::Expr(expr) => expr.clone(),
_ => { _ => {
yield Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Expected a condition", "Expected a condition",
"expected a condition", "expected a condition",
tag, tag,
)); ));
return;
} }
}, },
None => { None => {
yield Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Expected a condition", "Expected a condition",
"expected a condition", "expected a condition",
tag, tag,
)); ));
return;
} }
} }
} }
Value { tag, .. } => { Value { tag, .. } => {
yield Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Expected a condition", "Expected a condition",
"expected a condition", "expected a condition",
tag, tag,
)); ));
return;
} }
}; });
while let Some(item) = call_info.input.next().await { Ok(call_info
.input
.take_while(move |item| {
let condition = condition.clone(); let condition = condition.clone();
let registry = registry.clone();
let scope = scope.clone();
let item = item.clone();
trace!("ITEM = {:?}", item); trace!("ITEM = {:?}", item);
let result =
evaluate_baseline_expr(&*condition, &registry, &item, &scope.vars, &scope.env) async move {
let result = evaluate_baseline_expr(
&*condition,
&registry,
&item,
&scope.vars,
&scope.env,
)
.await; .await;
trace!("RESULT = {:?}", result); trace!("RESULT = {:?}", result);
let return_value = match result { match result {
Ok(ref v) if v.is_true() => false, Ok(ref v) if v.is_true() => false,
_ => true, _ => true,
};
if return_value {
yield ReturnSuccess::value(item);
} else {
break;
} }
} }
}; })
.to_output_stream())
Ok(stream.to_output_stream())
} }
} }

View File

@ -3,9 +3,7 @@ use crate::evaluate::evaluate_baseline_expr;
use crate::prelude::*; use crate::prelude::*;
use log::trace; use log::trace;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ use nu_protocol::{hir::ClassifiedCommand, Signature, SyntaxShape, UntaggedValue, Value};
hir::ClassifiedCommand, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value,
};
pub struct KeepWhile; pub struct KeepWhile;
@ -34,80 +32,81 @@ impl WholeStreamCommand for KeepWhile {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = Arc::new(registry.clone());
let scope = args.call_info.scope.clone(); let scope = Arc::new(args.call_info.scope.clone());
let stream = async_stream! { let call_info = args.evaluate_once(&registry).await?;
let mut call_info = args.evaluate_once(&registry).await?;
let block = call_info.args.expect_nth(0)?.clone(); let block = call_info.args.expect_nth(0)?.clone();
let condition = match block { let condition = Arc::new(match block {
Value { Value {
value: UntaggedValue::Block(block), value: UntaggedValue::Block(block),
tag, tag,
} => { } => {
if block.block.len() != 1 { if block.block.len() != 1 {
yield Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Expected a condition", "Expected a condition",
"expected a condition", "expected a condition",
tag, tag,
)); ));
return;
} }
match block.block[0].list.get(0) { match block.block[0].list.get(0) {
Some(item) => match item { Some(item) => match item {
ClassifiedCommand::Expr(expr) => expr.clone(), ClassifiedCommand::Expr(expr) => expr.clone(),
_ => { _ => {
yield Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Expected a condition", "Expected a condition",
"expected a condition", "expected a condition",
tag, tag,
)); ));
return;
} }
}, },
None => { None => {
yield Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Expected a condition", "Expected a condition",
"expected a condition", "expected a condition",
tag, tag,
)); ));
return;
} }
} }
} }
Value { tag, .. } => { Value { tag, .. } => {
yield Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Expected a condition", "Expected a condition",
"expected a condition", "expected a condition",
tag, tag,
)); ));
return;
} }
}; });
while let Some(item) = call_info.input.next().await { Ok(call_info
.input
.take_while(move |item| {
let condition = condition.clone(); let condition = condition.clone();
let registry = registry.clone();
let scope = scope.clone();
let item = item.clone();
trace!("ITEM = {:?}", item); trace!("ITEM = {:?}", item);
let result =
evaluate_baseline_expr(&*condition, &registry, &item, &scope.vars, &scope.env) async move {
let result = evaluate_baseline_expr(
&*condition,
&registry,
&item,
&scope.vars,
&scope.env,
)
.await; .await;
trace!("RESULT = {:?}", result); trace!("RESULT = {:?}", result);
let return_value = match result { match result {
Ok(ref v) if v.is_true() => true, Ok(ref v) if v.is_true() => true,
_ => false, _ => false,
};
if return_value {
yield ReturnSuccess::value(item);
} else {
break;
} }
} }
}; })
.to_output_stream())
Ok(stream.to_output_stream())
} }
} }

View File

@ -3,9 +3,7 @@ use crate::evaluate::evaluate_baseline_expr;
use crate::prelude::*; use crate::prelude::*;
use log::trace; use log::trace;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ use nu_protocol::{hir::ClassifiedCommand, Signature, SyntaxShape, UntaggedValue, Value};
hir::ClassifiedCommand, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value,
};
pub struct SkipUntil; pub struct SkipUntil;
@ -34,83 +32,80 @@ impl WholeStreamCommand for SkipUntil {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = Arc::new(registry.clone());
let scope = args.call_info.scope.clone(); let scope = Arc::new(args.call_info.scope.clone());
let stream = async_stream! { let call_info = args.evaluate_once(&registry).await?;
let mut call_info = args.evaluate_once(&registry).await?;
let block = call_info.args.expect_nth(0)?.clone(); let block = call_info.args.expect_nth(0)?.clone();
let condition = match block { let condition = Arc::new(match block {
Value { Value {
value: UntaggedValue::Block(block), value: UntaggedValue::Block(block),
tag, tag,
} => { } => {
if block.block.len() != 1 { if block.block.len() != 1 {
yield Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Expected a condition", "Expected a condition",
"expected a condition", "expected a condition",
tag, tag,
)); ));
return;
} }
match block.block[0].list.get(0) { match block.block[0].list.get(0) {
Some(item) => match item { Some(item) => match item {
ClassifiedCommand::Expr(expr) => expr.clone(), ClassifiedCommand::Expr(expr) => expr.clone(),
_ => { _ => {
yield Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Expected a condition", "Expected a condition",
"expected a condition", "expected a condition",
tag, tag,
)); ));
return;
} }
}, },
None => { None => {
yield Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Expected a condition", "Expected a condition",
"expected a condition", "expected a condition",
tag, tag,
)); ));
return;
} }
} }
} }
Value { tag, .. } => { Value { tag, .. } => {
yield Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Expected a condition", "Expected a condition",
"expected a condition", "expected a condition",
tag, tag,
)); ));
return;
} }
}; });
let mut skipping = true; Ok(call_info
while let Some(item) = call_info.input.next().await { .input
.skip_while(move |item| {
let condition = condition.clone(); let condition = condition.clone();
let registry = registry.clone();
let scope = scope.clone();
let item = item.clone();
trace!("ITEM = {:?}", item); trace!("ITEM = {:?}", item);
let result =
evaluate_baseline_expr(&*condition, &registry, &item, &scope.vars, &scope.env) async move {
let result = evaluate_baseline_expr(
&*condition,
&registry,
&item,
&scope.vars,
&scope.env,
)
.await; .await;
trace!("RESULT = {:?}", result); trace!("RESULT = {:?}", result);
let return_value = match result { match result {
Ok(ref v) if v.is_true() => true, Ok(ref v) if v.is_true() => true,
_ => false, _ => false,
};
if return_value {
skipping = false;
}
if !skipping {
yield ReturnSuccess::value(item);
} }
} }
}; })
.to_output_stream())
Ok(stream.to_output_stream())
} }
} }

View File

@ -3,9 +3,7 @@ use crate::evaluate::evaluate_baseline_expr;
use crate::prelude::*; use crate::prelude::*;
use log::trace; use log::trace;
use nu_errors::ShellError; use nu_errors::ShellError;
use nu_protocol::{ use nu_protocol::{hir::ClassifiedCommand, Signature, SyntaxShape, UntaggedValue, Value};
hir::ClassifiedCommand, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value,
};
pub struct SkipWhile; pub struct SkipWhile;
@ -34,83 +32,80 @@ impl WholeStreamCommand for SkipWhile {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = Arc::new(registry.clone());
let scope = args.call_info.scope.clone(); let scope = Arc::new(args.call_info.scope.clone());
let stream = async_stream! { let call_info = args.evaluate_once(&registry).await?;
let mut call_info = args.evaluate_once(&registry).await?;
let block = call_info.args.expect_nth(0)?.clone(); let block = call_info.args.expect_nth(0)?.clone();
let condition = match block { let condition = Arc::new(match block {
Value { Value {
value: UntaggedValue::Block(block), value: UntaggedValue::Block(block),
tag, tag,
} => { } => {
if block.block.len() != 1 { if block.block.len() != 1 {
yield Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Expected a condition", "Expected a condition",
"expected a condition", "expected a condition",
tag, tag,
)); ));
return;
} }
match block.block[0].list.get(0) { match block.block[0].list.get(0) {
Some(item) => match item { Some(item) => match item {
ClassifiedCommand::Expr(expr) => expr.clone(), ClassifiedCommand::Expr(expr) => expr.clone(),
_ => { _ => {
yield Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Expected a condition", "Expected a condition",
"expected a condition", "expected a condition",
tag, tag,
)); ));
return;
} }
}, },
None => { None => {
yield Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Expected a condition", "Expected a condition",
"expected a condition", "expected a condition",
tag, tag,
)); ));
return;
} }
} }
} }
Value { tag, .. } => { Value { tag, .. } => {
yield Err(ShellError::labeled_error( return Err(ShellError::labeled_error(
"Expected a condition", "Expected a condition",
"expected a condition", "expected a condition",
tag, tag,
)); ));
return;
} }
}; });
let mut skipping = true; Ok(call_info
while let Some(item) = call_info.input.next().await { .input
.skip_while(move |item| {
let item = item.clone();
let condition = condition.clone(); let condition = condition.clone();
let registry = registry.clone();
let scope = scope.clone();
trace!("ITEM = {:?}", item); trace!("ITEM = {:?}", item);
let result =
evaluate_baseline_expr(&*condition, &registry, &item, &scope.vars, &scope.env) async move {
let result = evaluate_baseline_expr(
&*condition,
&registry,
&item,
&scope.vars,
&scope.env,
)
.await; .await;
trace!("RESULT = {:?}", result); trace!("RESULT = {:?}", result);
let return_value = match result { match result {
Ok(ref v) if v.is_true() => false, Ok(ref v) if v.is_true() => true,
_ => true, _ => false,
};
if return_value {
skipping = false;
}
if !skipping {
yield ReturnSuccess::value(item);
} }
} }
}; })
.to_output_stream())
Ok(stream.to_output_stream())
} }
} }

View File

@ -4,17 +4,17 @@ use std::sync::atomic::{AtomicBool, Ordering};
pub struct InterruptibleStream<V> { pub struct InterruptibleStream<V> {
inner: BoxStream<'static, V>, inner: BoxStream<'static, V>,
ctrl_c: Arc<AtomicBool>, interrupt_signal: Arc<AtomicBool>,
} }
impl<V> InterruptibleStream<V> { impl<V> InterruptibleStream<V> {
pub fn new<S>(inner: S, ctrl_c: Arc<AtomicBool>) -> InterruptibleStream<V> pub fn new<S>(inner: S, interrupt_signal: Arc<AtomicBool>) -> InterruptibleStream<V>
where where
S: Stream<Item = V> + Send + 'static, S: Stream<Item = V> + Send + 'static,
{ {
InterruptibleStream { InterruptibleStream {
inner: inner.boxed(), inner: inner.boxed(),
ctrl_c, interrupt_signal,
} }
} }
} }
@ -26,7 +26,7 @@ impl<V> Stream for InterruptibleStream<V> {
mut self: std::pin::Pin<&mut Self>, mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
) -> core::task::Poll<Option<Self::Item>> { ) -> core::task::Poll<Option<Self::Item>> {
if self.ctrl_c.load(Ordering::SeqCst) { if self.interrupt_signal.load(Ordering::SeqCst) {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx) Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx)