Use async-stream crate to replace most async_stream_block invocations

This commit is contained in:
est31 2019-09-26 02:22:17 +02:00
parent 7113c702ff
commit 9891e5ab81
29 changed files with 50 additions and 25 deletions

22
Cargo.lock generated
View File

@ -53,6 +53,25 @@ dependencies = [
"nodrop 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "async-stream"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"async-stream-impl 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core-preview 0.3.0-alpha.18 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "async-stream-impl"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "atty"
version = "0.2.13"
@ -1516,6 +1535,7 @@ version = "0.3.0"
dependencies = [
"ansi_term 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)",
"app_dirs 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"async-stream 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"battery 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)",
"bigdecimal 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -3003,6 +3023,8 @@ dependencies = [
"checksum app_dirs 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e73a24bad9bd6a94d6395382a6c69fe071708ae4409f763c5475e14ee896313d"
"checksum arrayref 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "0d382e583f07208808f6b1249e60848879ba3543f57c32277bf52d69c2f0f0ee"
"checksum arrayvec 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)" = "b8d73f9beda665eaa98ab9e4f7442bd4e7de6652587de55b2525e52e29c1b0ba"
"checksum async-stream 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "650be9b667e47506c42ee53034fb1935443cb2447a3a5c0a75e303d2e756fa73"
"checksum async-stream-impl 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4f0d8c5b411e36dcfb04388bacfec54795726b1f0148adcb0f377a96d6747e0e"
"checksum atty 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "1803c647a3ec87095e7ae7acfca019e98de5ec9a7d01343f611cf3152ed71a90"
"checksum autocfg 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "22130e92352b948e7e82a49cdb0aa94f2211761117f29e052dd397c1ac33542b"
"checksum backtrace 0.3.34 (registry+https://github.com/rust-lang/crates.io-index)" = "b5164d292487f037ece34ec0de2fcede2faa162f085dd96d2385ab81b12765ba"

View File

@ -28,6 +28,7 @@ byte-unit = "3.0.1"
base64 = "0.10.1"
futures-preview = { version = "=0.3.0-alpha.18", features = ["compat", "io-compat"] }
futures-async-stream = "=0.1.0-alpha.5"
async-stream = "0.1.1"
futures_codec = "0.2.5"
num-traits = "0.2.8"
term = "0.5.2"

View File

@ -61,7 +61,7 @@ impl PerItemCommand for Enter {
)))]
.into())
} else {
let stream = async_stream_block! {
let stream = async_stream! {
// If it's a file, attempt to open the file as a value and enter it
let cwd = raw_args.shell_manager.path();

View File

@ -58,7 +58,7 @@ fn run(
let registry = registry.clone();
let raw_args = raw_args.clone();
let stream = async_stream_block! {
let stream = async_stream! {
let result = fetch(&path_str, path_span).await;

View File

@ -201,7 +201,7 @@ fn from_bson(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStre
let tag = args.name_tag();
let input = args.input;
let stream = async_stream_block! {
let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await;
for value in values {

View File

@ -88,7 +88,7 @@ fn from_csv(
) -> Result<OutputStream, ShellError> {
let name_tag = name;
let stream = async_stream_block! {
let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await;
let mut concat_string = String::new();

View File

@ -67,7 +67,7 @@ fn from_ini(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStrea
let tag = args.name_tag();
let input = args.input;
let stream = async_stream_block! {
let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await;
let mut concat_string = String::new();

View File

@ -74,7 +74,7 @@ fn from_json(
) -> Result<OutputStream, ShellError> {
let name_tag = name;
let stream = async_stream_block! {
let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await;
let mut concat_string = String::new();

View File

@ -131,7 +131,7 @@ fn from_sqlite(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputSt
let tag = args.name_tag();
let input = args.input;
let stream = async_stream_block! {
let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await;
for value in values {

View File

@ -71,7 +71,7 @@ pub fn from_toml(
let tag = args.name_tag();
let input = args.input;
let stream = async_stream_block! {
let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await;
let mut concat_string = String::new();

View File

@ -89,7 +89,7 @@ fn from_tsv(
) -> Result<OutputStream, ShellError> {
let name_tag = name;
let stream = async_stream_block! {
let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await;
let mut concat_string = String::new();

View File

@ -31,7 +31,7 @@ fn from_url(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStrea
let tag = args.name_tag();
let input = args.input;
let stream = async_stream_block! {
let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await;
let mut concat_string = String::new();

View File

@ -86,7 +86,7 @@ fn from_xml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStrea
let tag = args.name_tag();
let input = args.input;
let stream = async_stream_block! {
let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await;
let mut concat_string = String::new();

View File

@ -100,7 +100,7 @@ fn from_yaml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStre
let tag = args.name_tag();
let input = args.input;
let stream = async_stream_block! {
let stream = async_stream! {
let values: Vec<Tagged<Value>> = input.values.collect().await;
let mut concat_string = String::new();

View File

@ -36,7 +36,7 @@ fn last(
LastArgs { amount }: LastArgs,
context: RunnableContext,
) -> Result<OutputStream, ShellError> {
let stream = async_stream_block! {
let stream = async_stream! {
let v: Vec<_> = context.input.into_vec().await;
let k = v.len() - (*amount as usize);
for x in v[k..].iter() {

View File

@ -59,7 +59,7 @@ fn run(
let registry = registry.clone();
let raw_args = raw_args.clone();
let stream = async_stream_block! {
let stream = async_stream! {
let result = fetch(&full_path, &path_str, path_span).await;

View File

@ -52,7 +52,7 @@ fn merge_descriptors(values: &[Tagged<Value>]) -> Vec<String> {
}
pub fn pivot(args: PivotArgs, context: RunnableContext) -> Result<OutputStream, ShellError> {
let stream = async_stream_block! {
let stream = async_stream! {
let input = context.input.into_vec().await;
let descs = merge_descriptors(&input);

View File

@ -73,7 +73,7 @@ fn run(
let registry = registry.clone();
let raw_args = raw_args.clone();
let stream = async_stream_block! {
let stream = async_stream! {
let (file_extension, contents, contents_tag, span_source) =
post(&path_str, &body, user, password, path_span, &registry, &raw_args).await.unwrap();

View File

@ -35,7 +35,7 @@ fn sort_by(
SortByArgs { rest }: SortByArgs,
mut context: RunnableContext,
) -> Result<OutputStream, ShellError> {
Ok(OutputStream::new(async_stream_block! {
Ok(OutputStream::new(async_stream! {
let mut vec = context.input.drain_vec().await;
let calc_key = |item: &Tagged<Value>| {

View File

@ -233,7 +233,7 @@ fn bson_value_to_bytes(bson: Bson, tag: Tag) -> Result<Vec<u8>, ShellError> {
fn to_bson(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let name_tag = args.name_tag();
let stream = async_stream_block! {
let stream = async_stream! {
let input: Vec<Tagged<Value>> = args.input.values.collect().await;
let to_process_input = if input.len() > 1 {

View File

@ -135,7 +135,7 @@ fn to_csv(
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let name_tag = name;
let stream = async_stream_block! {
let stream = async_stream! {
let input: Vec<Tagged<Value>> = input.values.collect().await;
let to_process_input = if input.len() > 1 {

View File

@ -81,7 +81,7 @@ fn json_list(input: &Vec<Tagged<Value>>) -> Result<Vec<serde_json::Value>, Shell
fn to_json(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let name_tag = args.name_tag();
let stream = async_stream_block! {
let stream = async_stream! {
let input: Vec<Tagged<Value>> = args.input.values.collect().await;
let to_process_input = if input.len() > 1 {

View File

@ -201,7 +201,7 @@ fn sqlite_input_stream_to_bytes(
fn to_sqlite(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let name_tag = args.name_tag();
let stream = async_stream_block! {
let stream = async_stream! {
let input: Vec<Tagged<Value>> = args.input.values.collect().await;
match sqlite_input_stream_to_bytes(input) {

View File

@ -76,7 +76,7 @@ fn collect_values(input: &Vec<Tagged<Value>>) -> Result<Vec<toml::Value>, ShellE
fn to_toml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let name_tag = args.name_tag();
let stream = async_stream_block! {
let stream = async_stream! {
let input: Vec<Tagged<Value>> = args.input.values.collect().await;
let to_process_input = if input.len() > 1 {

View File

@ -134,7 +134,7 @@ fn to_tsv(
RunnableContext { input, name, .. }: RunnableContext,
) -> Result<OutputStream, ShellError> {
let name_tag = name;
let stream = async_stream_block! {
let stream = async_stream! {
let input: Vec<Tagged<Value>> = input.values.collect().await;
let to_process_input = if input.len() > 1 {

View File

@ -31,7 +31,7 @@ fn to_url(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream,
let tag = args.name_tag();
let input = args.input;
let stream = async_stream_block! {
let stream = async_stream! {
let input: Vec<Tagged<Value>> = input.values.collect().await;
for value in input {

View File

@ -77,7 +77,7 @@ pub fn value_to_yaml_value(v: &Tagged<Value>) -> Result<serde_yaml::Value, Shell
fn to_yaml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let name_tag = args.name_tag();
let stream = async_stream_block! {
let stream = async_stream! {
let input: Vec<Tagged<Value>> = args.input.values.collect().await;
let to_process_input = if input.len() > 1 {

View File

@ -1,5 +1,6 @@
#![feature(generators)]
#![feature(proc_macro_hygiene)]
#![recursion_limit = "512"]
#[macro_use]
mod prelude;

View File

@ -72,6 +72,7 @@ pub(crate) use crate::shell::value_shell::ValueShell;
pub(crate) use crate::stream::{InputStream, OutputStream};
pub(crate) use crate::traits::{HasTag, ToDebug};
pub(crate) use crate::Text;
pub(crate) use async_stream::stream as async_stream;
pub(crate) use bigdecimal::BigDecimal;
pub(crate) use futures::stream::BoxStream;
pub(crate) use futures::{FutureExt, Stream, StreamExt};