Remove uses of async_stream_block

This commit is contained in:
est31 2019-09-28 02:05:18 +02:00
parent 6aad0b8443
commit 1183d28b15
5 changed files with 63 additions and 47 deletions

View File

@ -1,7 +1,6 @@
use crate::commands::{RawCommandArgs, WholeStreamCommand}; use crate::commands::{RawCommandArgs, WholeStreamCommand};
use crate::errors::ShellError; use crate::errors::ShellError;
use crate::prelude::*; use crate::prelude::*;
use futures_async_stream::async_stream_block;
pub struct Autoview; pub struct Autoview;
@ -35,7 +34,7 @@ pub fn autoview(
mut context: RunnableContext, mut context: RunnableContext,
raw: RawCommandArgs, raw: RawCommandArgs,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
Ok(OutputStream::new(async_stream_block! { Ok(OutputStream::new(async_stream! {
let input = context.input.drain_vec().await; let input = context.input.drain_vec().await;
if input.len() > 0 { if input.len() > 0 {
@ -89,6 +88,11 @@ pub fn autoview(
result.collect::<Vec<_>>().await; result.collect::<Vec<_>>().await;
} }
} }
// Needed for async_stream to type check
if false {
yield ReturnSuccess::value(Value::nothing().tagged_unknown());
}
})) }))
} }

View File

@ -5,7 +5,6 @@ pub mod clipboard {
use crate::errors::ShellError; use crate::errors::ShellError;
use crate::prelude::*; use crate::prelude::*;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use futures_async_stream::async_stream_block;
use clipboard::{ClipboardContext, ClipboardProvider}; use clipboard::{ClipboardContext, ClipboardProvider};
@ -40,10 +39,13 @@ pub mod clipboard {
ClipArgs {}: ClipArgs, ClipArgs {}: ClipArgs,
RunnableContext { input, name, .. }: RunnableContext, RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let stream = async_stream_block! { let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await; let values: Vec<Tagged<Value>> = input.values.collect().await;
inner_clip(values, name).await; let mut clip_stream = inner_clip(values, name).await;
while let Some(value) = clip_stream.next().await {
yield value;
}
}; };
let stream: BoxStream<'static, ReturnValue> = stream.boxed(); let stream: BoxStream<'static, ReturnValue> = stream.boxed();

View File

@ -3,7 +3,6 @@ use crate::errors::ShellError;
use crate::parser::registry; use crate::parser::registry;
use crate::prelude::*; use crate::prelude::*;
use derive_new::new; use derive_new::new;
use futures_async_stream::async_stream_block;
use log::trace; use log::trace;
use serde::{self, Deserialize, Serialize}; use serde::{self, Deserialize, Serialize};
use std::io::prelude::*; use std::io::prelude::*;
@ -298,7 +297,7 @@ pub fn sink_plugin(
let args = args.evaluate_once(registry)?; let args = args.evaluate_once(registry)?;
let call_info = args.call_info.clone(); let call_info = args.call_info.clone();
let stream = async_stream_block! { let stream = async_stream! {
let input: Vec<Tagged<Value>> = args.input.values.collect().await; let input: Vec<Tagged<Value>> = args.input.values.collect().await;
let request = JsonRpc::new("sink", (call_info.clone(), input)); let request = JsonRpc::new("sink", (call_info.clone(), input));
@ -313,6 +312,11 @@ pub fn sink_plugin(
.expect("Failed to spawn child process"); .expect("Failed to spawn child process");
let _ = child.wait(); let _ = child.wait();
// Needed for async_stream to type check
if false {
yield ReturnSuccess::value(Value::nothing().tagged_unknown());
}
}; };
Ok(OutputStream::new(stream)) Ok(OutputStream::new(stream))
} }

View File

@ -2,13 +2,12 @@ use crate::commands::{UnevaluatedCallInfo, WholeStreamCommand};
use crate::data::Value; use crate::data::Value;
use crate::errors::ShellError; use crate::errors::ShellError;
use crate::prelude::*; use crate::prelude::*;
use futures_async_stream::async_stream_block;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
pub struct Save; pub struct Save;
macro_rules! process_string { macro_rules! process_string {
($input:ident, $name_tag:ident) => {{ ($scope:tt, $input:ident, $name_tag:ident) => {{
let mut result_string = String::new(); let mut result_string = String::new();
for res in $input { for res in $input {
match res { match res {
@ -19,11 +18,11 @@ macro_rules! process_string {
result_string.push_str(&s); result_string.push_str(&s);
} }
_ => { _ => {
yield core::task::Poll::Ready(Err(ShellError::labeled_error( break $scope Err(ShellError::labeled_error(
"Save could not successfully save", "Save could not successfully save",
"unexpected data during save", "unexpected data during save",
$name_tag, $name_tag,
))); ));
} }
} }
} }
@ -32,7 +31,7 @@ macro_rules! process_string {
} }
macro_rules! process_string_return_success { macro_rules! process_string_return_success {
($result_vec:ident, $name_tag:ident) => {{ ($scope:tt, $result_vec:ident, $name_tag:ident) => {{
let mut result_string = String::new(); let mut result_string = String::new();
for res in $result_vec { for res in $result_vec {
match res { match res {
@ -43,11 +42,11 @@ macro_rules! process_string_return_success {
result_string.push_str(&s); result_string.push_str(&s);
} }
_ => { _ => {
yield core::task::Poll::Ready(Err(ShellError::labeled_error( break $scope Err(ShellError::labeled_error(
"Save could not successfully save", "Save could not successfully save",
"unexpected data during text save", "unexpected data during text save",
$name_tag, $name_tag,
))); ));
} }
} }
} }
@ -56,7 +55,7 @@ macro_rules! process_string_return_success {
} }
macro_rules! process_binary_return_success { macro_rules! process_binary_return_success {
($result_vec:ident, $name_tag:ident) => {{ ($scope:tt, $result_vec:ident, $name_tag:ident) => {{
let mut result_binary: Vec<u8> = Vec::new(); let mut result_binary: Vec<u8> = Vec::new();
for res in $result_vec { for res in $result_vec {
match res { match res {
@ -69,11 +68,11 @@ macro_rules! process_binary_return_success {
} }
} }
_ => { _ => {
yield core::task::Poll::Ready(Err(ShellError::labeled_error( break $scope Err(ShellError::labeled_error(
"Save could not successfully save", "Save could not successfully save",
"unexpected data during binary save", "unexpected data during binary save",
$name_tag, $name_tag,
))); ));
} }
} }
} }
@ -131,7 +130,7 @@ fn save(
let name_tag = name; let name_tag = name;
let source_map = source_map.clone(); let source_map = source_map.clone();
let stream = async_stream_block! { let stream = async_stream! {
let input: Vec<Tagged<Value>> = input.values.collect().await; let input: Vec<Tagged<Value>> = input.values.collect().await;
if path.is_none() { if path.is_none() {
// If there is no filename, check the metadata for the origin filename // If there is no filename, check the metadata for the origin filename
@ -171,39 +170,43 @@ fn save(
} }
} }
let content : Result<Vec<u8>, ShellError> = if !save_raw { // TODO use label_break_value once it is stable:
if let Some(extension) = full_path.extension() { // https://github.com/rust-lang/rust/issues/48594
let command_name = format!("to-{}", extension.to_str().unwrap()); let content : Result<Vec<u8>, ShellError> = 'scope: loop {
if let Some(converter) = registry.get_command(&command_name) { break if !save_raw {
let new_args = RawCommandArgs { if let Some(extension) = full_path.extension() {
host, let command_name = format!("to-{}", extension.to_str().unwrap());
shell_manager, if let Some(converter) = registry.get_command(&command_name) {
call_info: UnevaluatedCallInfo { let new_args = RawCommandArgs {
args: crate::parser::hir::Call { host,
head: raw_args.call_info.args.head, shell_manager,
positional: None, call_info: UnevaluatedCallInfo {
named: None args: crate::parser::hir::Call {
}, head: raw_args.call_info.args.head,
source: raw_args.call_info.source, positional: None,
source_map: raw_args.call_info.source_map, named: None
name_tag: raw_args.call_info.name_tag, },
source: raw_args.call_info.source,
source_map: raw_args.call_info.source_map,
name_tag: raw_args.call_info.name_tag,
}
};
let mut result = converter.run(new_args.with_input(input), &registry, false);
let result_vec: Vec<Result<ReturnSuccess, ShellError>> = result.drain_vec().await;
if converter.is_binary() {
process_binary_return_success!('scope, result_vec, name_tag)
} else {
process_string_return_success!('scope, result_vec, name_tag)
} }
};
let mut result = converter.run(new_args.with_input(input), &registry, false);
let result_vec: Vec<Result<ReturnSuccess, ShellError>> = result.drain_vec().await;
if converter.is_binary() {
process_binary_return_success!(result_vec, name_tag)
} else { } else {
process_string_return_success!(result_vec, name_tag) process_string!('scope, input, name_tag)
} }
} else { } else {
process_string!(input, name_tag) process_string!('scope, input, name_tag)
} }
} else { } else {
process_string!(input, name_tag) Ok(string_from(&input).into_bytes())
} };
} else {
Ok(string_from(&input).into_bytes())
}; };
match content { match content {

View File

@ -2,7 +2,6 @@ use crate::commands::WholeStreamCommand;
use crate::errors::ShellError; use crate::errors::ShellError;
use crate::format::TableView; use crate::format::TableView;
use crate::prelude::*; use crate::prelude::*;
use futures_async_stream::async_stream_block;
pub struct Table; pub struct Table;
@ -32,7 +31,7 @@ impl WholeStreamCommand for Table {
} }
pub fn table(_args: TableArgs, context: RunnableContext) -> Result<OutputStream, ShellError> { pub fn table(_args: TableArgs, context: RunnableContext) -> Result<OutputStream, ShellError> {
let stream = async_stream_block! { let stream = async_stream! {
let input: Vec<Tagged<Value>> = context.input.into_vec().await; let input: Vec<Tagged<Value>> = context.input.into_vec().await;
if input.len() > 0 { if input.len() > 0 {
let mut host = context.host.lock().unwrap(); let mut host = context.host.lock().unwrap();
@ -41,6 +40,10 @@ pub fn table(_args: TableArgs, context: RunnableContext) -> Result<OutputStream,
handle_unexpected(&mut *host, |host| crate::format::print_view(&view, host)); handle_unexpected(&mut *host, |host| crate::format::print_view(&view, host));
} }
} }
// Needed for async_stream to type check
if false {
yield ReturnSuccess::value(Value::nothing().tagged_unknown());
}
}; };
Ok(OutputStream::new(stream)) Ok(OutputStream::new(stream))