Another batch of removing async_stream (#1971)

This commit is contained in:
Jonathan Turner
2020-06-12 16:40:23 -07:00
committed by GitHub
parent 935a5f6b9e
commit d82ce26b44
12 changed files with 286 additions and 304 deletions

View File

@ -26,14 +26,10 @@ impl WholeStreamCommand for Command {
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
yield Ok(ReturnSuccess::Value(
UntaggedValue::string(crate::commands::help::get_help(&Command, &registry))
.into_value(Tag::unknown()),
));
};
Ok(stream.to_output_stream())
Ok(OutputStream::one(Ok(ReturnSuccess::Value(
UntaggedValue::string(crate::commands::help::get_help(&Command, &registry))
.into_value(Tag::unknown()),
))))
}
}

View File

@ -35,41 +35,52 @@ impl WholeStreamCommand for SubCommand {
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
split_row(args, registry)
split_row(args, registry).await
}
}
fn split_row(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
async fn split_row(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone();
let stream = async_stream! {
let name = args.call_info.name_tag.clone();
let (SplitRowArgs { separator }, mut input) = args.process(&registry).await?;
while let Some(v) = input.next().await {
let name = args.call_info.name_tag.clone();
let (SplitRowArgs { separator }, input) = args.process(&registry).await?;
Ok(input
.flat_map(move |v| {
if let Ok(s) = v.as_string() {
let splitter = separator.item.replace("\\n", "\n");
trace!("splitting with {:?}", splitter);
let split_result: Vec<_> = s.split(&splitter).filter(|s| s.trim() != "").collect();
let split_result: Vec<String> = s
.split(&splitter)
.filter_map(|s| {
if s.trim() != "" {
Some(s.to_string())
} else {
None
}
})
.collect();
trace!("split result = {:?}", split_result);
for s in split_result {
yield ReturnSuccess::value(
UntaggedValue::Primitive(Primitive::String(s.into())).into_value(&v.tag),
);
}
futures::stream::iter(split_result.into_iter().map(move |s| {
ReturnSuccess::value(
UntaggedValue::Primitive(Primitive::String(s)).into_value(&v.tag),
)
}))
.to_output_stream()
} else {
yield Err(ShellError::labeled_error_with_secondary(
OutputStream::one(Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
name.span,
"value originates from here",
v.tag.span,
));
)))
}
}
};
Ok(stream.to_output_stream())
})
.to_output_stream())
}
#[cfg(test)]