Remove async_stream! from some commands (#1951)

* Remove async_stream! from open.rs

* Ran rustfmt

* Fix Clippy warning

* Removed async_stream! from evaluate_by.rs

* Removed async_stream! from exit.rs

* Removed async_stream! from from_eml.rs

* Removed async_stream! from group_by_date.rs

* Removed async_stream! from group_by.rs

* Removed async_stream! from map_max.rs

* Removed async_stream! from to_sqlite.rs

* Removed async_stream! from to_md.rs

* Removed async_stream! from to_html.rs
This commit is contained in:
Joseph T. Lyons 2020-06-08 00:48:10 -04:00 committed by GitHub
parent 545f70705e
commit ec7ff5960d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 306 additions and 299 deletions

View File

@ -37,42 +37,37 @@ impl WholeStreamCommand for EvaluateBy {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
evaluate_by(args, registry) evaluate_by(args, registry).await
} }
} }
pub fn evaluate_by( pub async fn evaluate_by(
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let name = args.call_info.name_tag.clone();
let name = args.call_info.name_tag.clone(); let (EvaluateByArgs { evaluate_with }, mut input) = args.process(&registry).await?;
let (EvaluateByArgs { evaluate_with }, mut input) = args.process(&registry).await?; let values: Vec<Value> = input.collect().await;
let values: Vec<Value> = input.collect().await;
if values.is_empty() { if values.is_empty() {
yield Err(ShellError::labeled_error( Err(ShellError::labeled_error(
"Expected table from pipeline", "Expected table from pipeline",
"requires a table input", "requires a table input",
name name,
)) ))
} else {
let evaluate_with = if let Some(evaluator) = evaluate_with {
Some(evaluator.item().clone())
} else { } else {
None
};
let evaluate_with = if let Some(evaluator) = evaluate_with { match evaluate(&values[0], evaluate_with, name) {
Some(evaluator.item().clone()) Ok(evaluated) => Ok(OutputStream::one(ReturnSuccess::value(evaluated))),
} else { Err(err) => Err(err),
None
};
match evaluate(&values[0], evaluate_with, name) {
Ok(evaluated) => yield ReturnSuccess::value(evaluated),
Err(err) => yield Err(err)
}
} }
}; }
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -25,7 +25,7 @@ impl WholeStreamCommand for Exit {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
exit(args, registry) exit(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -44,19 +44,20 @@ impl WholeStreamCommand for Exit {
} }
} }
pub fn exit(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { pub async fn exit(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let args = args.evaluate_once(&registry).await?;
let args = args.evaluate_once(&registry).await?;
if args.call_info.args.has("now") { let command_action = if args.call_info.args.has("now") {
yield Ok(ReturnSuccess::Action(CommandAction::Exit)); CommandAction::Exit
} else { } else {
yield Ok(ReturnSuccess::Action(CommandAction::LeaveShell)); CommandAction::LeaveShell
}
}; };
Ok(stream.to_output_stream()) Ok(OutputStream::one(ReturnSuccess::action(command_action)))
} }
#[cfg(test)] #[cfg(test)]

View File

@ -40,7 +40,7 @@ impl WholeStreamCommand for FromEML {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
from_eml(args, registry) from_eml(args, registry).await
} }
} }
@ -77,46 +77,54 @@ fn headerfieldvalue_to_value(tag: &Tag, value: &HeaderFieldValue) -> UntaggedVal
} }
} }
fn from_eml(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn from_eml(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let tag = args.call_info.name_tag.clone(); let tag = args.call_info.name_tag.clone();
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let (eml_args, input): (FromEMLArgs, _) = args.process(&registry).await?;
let (eml_args, mut input): (FromEMLArgs, _) = args.process(&registry).await?; let value = input.collect_string(tag.clone()).await?;
let value = input.collect_string(tag.clone()).await?;
let body_preview = eml_args.preview_body.map(|b| b.item).unwrap_or(DEFAULT_BODY_PREVIEW); let body_preview = eml_args
.preview_body
.map(|b| b.item)
.unwrap_or(DEFAULT_BODY_PREVIEW);
let eml = EmlParser::from_string(value.item) let eml = EmlParser::from_string(value.item)
.with_body_preview(body_preview) .with_body_preview(body_preview)
.parse() .parse()
.map_err(|_| ShellError::labeled_error("Could not parse .eml file", "could not parse .eml file", &tag))?; .map_err(|_| {
ShellError::labeled_error(
"Could not parse .eml file",
"could not parse .eml file",
&tag,
)
})?;
let mut dict = TaggedDictBuilder::new(&tag); let mut dict = TaggedDictBuilder::new(&tag);
if let Some(subj) = eml.subject { if let Some(subj) = eml.subject {
dict.insert_untagged("Subject", UntaggedValue::string(subj)); dict.insert_untagged("Subject", UntaggedValue::string(subj));
} }
if let Some(from) = eml.from { if let Some(from) = eml.from {
dict.insert_untagged("From", headerfieldvalue_to_value(&tag, &from)); dict.insert_untagged("From", headerfieldvalue_to_value(&tag, &from));
} }
if let Some(to) = eml.to { if let Some(to) = eml.to {
dict.insert_untagged("To", headerfieldvalue_to_value(&tag, &to)); dict.insert_untagged("To", headerfieldvalue_to_value(&tag, &to));
} }
for HeaderField{ name, value } in eml.headers.iter() { for HeaderField { name, value } in eml.headers.iter() {
dict.insert_untagged(name, headerfieldvalue_to_value(&tag, &value)); dict.insert_untagged(name, headerfieldvalue_to_value(&tag, &value));
} }
if let Some(body) = eml.body { if let Some(body) = eml.body {
dict.insert_untagged("Body", UntaggedValue::string(body)); dict.insert_untagged("Body", UntaggedValue::string(body));
} }
yield ReturnSuccess::value(dict.into_value()); Ok(OutputStream::one(ReturnSuccess::value(dict.into_value())))
};
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -35,7 +35,7 @@ impl WholeStreamCommand for GroupBy {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
group_by(args, registry) group_by(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -71,30 +71,27 @@ impl WholeStreamCommand for GroupBy {
} }
} }
pub fn group_by(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { pub async fn group_by(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let name = args.call_info.name_tag.clone(); let name = args.call_info.name_tag.clone();
let stream = async_stream! { let (GroupByArgs { column_name }, input) = args.process(&registry).await?;
let (GroupByArgs { column_name }, mut input) = args.process(&registry).await?; let values: Vec<Value> = input.collect().await;
let values: Vec<Value> = input.collect().await;
if values.is_empty() {
yield Err(ShellError::labeled_error(
"Expected table from pipeline",
"requires a table input",
name
))
} else {
match crate::utils::data::group(column_name, &values, None, &name) {
Ok(grouped) => yield ReturnSuccess::value(grouped),
Err(err) => yield Err(err),
}
if values.is_empty() {
Err(ShellError::labeled_error(
"Expected table from pipeline",
"requires a table input",
name,
))
} else {
match crate::utils::data::group(column_name, &values, None, &name) {
Ok(grouped) => Ok(OutputStream::one(ReturnSuccess::value(grouped))),
Err(err) => Err(err),
} }
}; }
Ok(stream.to_output_stream())
} }
pub fn group( pub fn group(

View File

@ -42,7 +42,7 @@ impl WholeStreamCommand for GroupByDate {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
group_by_date(args, registry) group_by_date(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -58,50 +58,59 @@ enum Grouper {
ByDate(Option<String>), ByDate(Option<String>),
} }
pub fn group_by_date( pub async fn group_by_date(
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let name = args.call_info.name_tag.clone(); let name = args.call_info.name_tag.clone();
let stream = async_stream! { let (
let (GroupByDateArgs { column_name, format }, mut input) = args.process(&registry).await?; GroupByDateArgs {
let values: Vec<Value> = input.collect().await; column_name,
format,
},
input,
) = args.process(&registry).await?;
let values: Vec<Value> = input.collect().await;
if values.is_empty() { if values.is_empty() {
yield Err(ShellError::labeled_error( Err(ShellError::labeled_error(
"Expected table from pipeline", "Expected table from pipeline",
"requires a table input", "requires a table input",
name name,
)) ))
} else {
let grouper = if let Some(Tagged { item: fmt, tag: _ }) = format {
Grouper::ByDate(Some(fmt))
} else { } else {
Grouper::ByDate(None)
};
let grouper = if let Some(Tagged { item: fmt, tag }) = format { match grouper {
Grouper::ByDate(Some(fmt)) Grouper::ByDate(None) => {
} else { match crate::utils::data::group(
Grouper::ByDate(None) column_name,
}; &values,
Some(Box::new(|row: &Value| row.format("%Y-%b-%d"))),
match grouper { &name,
Grouper::ByDate(None) => { ) {
match crate::utils::data::group(column_name, &values, Some(Box::new(|row: &Value| row.format("%Y-%b-%d"))), &name) { Ok(grouped) => Ok(OutputStream::one(ReturnSuccess::value(grouped))),
Ok(grouped) => yield ReturnSuccess::value(grouped), Err(err) => Err(err),
Err(err) => yield Err(err),
}
} }
Grouper::ByDate(Some(fmt)) => { }
match crate::utils::data::group(column_name, &values, Some(Box::new(move |row: &Value| { Grouper::ByDate(Some(fmt)) => {
row.format(&fmt) match crate::utils::data::group(
})), &name) { column_name,
Ok(grouped) => yield ReturnSuccess::value(grouped), &values,
Err(err) => yield Err(err), Some(Box::new(move |row: &Value| row.format(&fmt))),
} &name,
) {
Ok(grouped) => Ok(OutputStream::one(ReturnSuccess::value(grouped))),
Err(err) => Err(err),
} }
} }
} }
}; }
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -38,42 +38,37 @@ impl WholeStreamCommand for MapMaxBy {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
map_max_by(args, registry) map_max_by(args, registry).await
} }
} }
pub fn map_max_by( pub async fn map_max_by(
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let name = args.call_info.name_tag.clone(); let name = args.call_info.name_tag.clone();
let stream = async_stream! { let (MapMaxByArgs { column_name }, mut input) = args.process(&registry).await?;
let (MapMaxByArgs { column_name }, mut input) = args.process(&registry).await?; let values: Vec<Value> = input.collect().await;
let values: Vec<Value> = input.collect().await;
if values.is_empty() { if values.is_empty() {
yield Err(ShellError::labeled_error( Err(ShellError::labeled_error(
"Expected table from pipeline", "Expected table from pipeline",
"requires a table input", "requires a table input",
name name,
)) ))
} else {
let map_by_column = if let Some(column_to_map) = column_name {
Some(column_to_map.item().clone())
} else { } else {
None
};
let map_by_column = if let Some(column_to_map) = column_name { match map_max(&values[0], map_by_column, name) {
Some(column_to_map.item().clone()) Ok(table_maxed) => Ok(OutputStream::one(ReturnSuccess::value(table_maxed))),
} else { Err(err) => Err(err),
None
};
match map_max(&values[0], map_by_column, name) {
Ok(table_maxed) => yield ReturnSuccess::value(table_maxed),
Err(err) => yield Err(err)
}
} }
}; }
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -42,7 +42,7 @@ impl WholeStreamCommand for Open {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
open(args, registry) open(args, registry).await
} }
fn examples(&self) -> Vec<Example> { fn examples(&self) -> Vec<Example> {
@ -54,39 +54,33 @@ impl WholeStreamCommand for Open {
} }
} }
fn open(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn open(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let cwd = PathBuf::from(args.shell_manager.path()); let cwd = PathBuf::from(args.shell_manager.path());
let full_path = cwd; let full_path = cwd;
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let (OpenArgs { path, raw }, _) = args.process(&registry).await?;
let (OpenArgs { path, raw }, _) = args.process(&registry).await?; let result = fetch(&full_path, &path.item, path.tag.span).await;
let result = fetch(&full_path, &path.item, path.tag.span).await;
if let Err(e) = result { let (file_extension, contents, contents_tag) = result?;
yield Err(e);
return;
}
let (file_extension, contents, contents_tag) = result?;
let file_extension = if raw.item { let file_extension = if raw.item {
None None
} else { } else {
// If the extension could not be determined via mimetype, try to use the path // If the extension could not be determined via mimetype, try to use the path
// extension. Some file types do not declare their mimetypes (such as bson files). // extension. Some file types do not declare their mimetypes (such as bson files).
file_extension.or(path.extension().map(|x| x.to_string_lossy().to_string())) file_extension.or_else(|| path.extension().map(|x| x.to_string_lossy().to_string()))
};
let tagged_contents = contents.into_value(&contents_tag);
if let Some(extension) = file_extension {
yield Ok(ReturnSuccess::Action(CommandAction::AutoConvert(tagged_contents, extension)))
} else {
yield ReturnSuccess::value(tagged_contents);
}
}; };
Ok(stream.to_output_stream()) let tagged_contents = contents.into_value(&contents_tag);
if let Some(extension) = file_extension {
Ok(OutputStream::one(ReturnSuccess::action(
CommandAction::AutoConvert(tagged_contents, extension),
)))
} else {
Ok(OutputStream::one(ReturnSuccess::value(tagged_contents)))
}
} }
pub async fn fetch( pub async fn fetch(

View File

@ -27,98 +27,106 @@ impl WholeStreamCommand for ToHTML {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
to_html(args, registry) to_html(args, registry).await
} }
} }
fn to_html(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn to_html(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let args = args.evaluate_once(&registry).await?;
let args = args.evaluate_once(&registry).await?; let name_tag = args.name_tag();
let name_tag = args.name_tag(); let input: Vec<Value> = args.input.collect().await;
let input: Vec<Value> = args.input.collect().await; let headers = nu_protocol::merge_descriptors(&input);
let headers = nu_protocol::merge_descriptors(&input); let mut output_string = "<html><body>".to_string();
let mut output_string = "<html><body>".to_string();
if !headers.is_empty() && (headers.len() > 1 || headers[0] != "") { if !headers.is_empty() && (headers.len() > 1 || headers[0] != "") {
output_string.push_str("<table>"); output_string.push_str("<table>");
output_string.push_str("<tr>"); output_string.push_str("<tr>");
for header in &headers { for header in &headers {
output_string.push_str("<th>"); output_string.push_str("<th>");
output_string.push_str(&htmlescape::encode_minimal(&header)); output_string.push_str(&htmlescape::encode_minimal(&header));
output_string.push_str("</th>"); output_string.push_str("</th>");
}
output_string.push_str("</tr>");
} }
output_string.push_str("</tr>");
}
for row in input { for row in input {
match row.value { match row.value {
UntaggedValue::Primitive(Primitive::Binary(b)) => { UntaggedValue::Primitive(Primitive::Binary(b)) => {
// This might be a bit much, but it's fun :) // This might be a bit much, but it's fun :)
match row.tag.anchor { match row.tag.anchor {
Some(AnchorLocation::Url(f)) | Some(AnchorLocation::Url(f)) | Some(AnchorLocation::File(f)) => {
Some(AnchorLocation::File(f)) => { let extension = f.split('.').last().map(String::from);
let extension = f.split('.').last().map(String::from); match extension {
match extension { Some(s)
Some(s) if ["png", "jpg", "bmp", "gif", "tiff", "jpeg"].contains(&s.to_lowercase().as_str()) => { if ["png", "jpg", "bmp", "gif", "tiff", "jpeg"]
output_string.push_str("<img src=\"data:image/"); .contains(&s.to_lowercase().as_str()) =>
output_string.push_str(&s); {
output_string.push_str(";base64,"); output_string.push_str("<img src=\"data:image/");
output_string.push_str(&base64::encode(&b)); output_string.push_str(&s);
output_string.push_str("\">"); output_string.push_str(";base64,");
} output_string.push_str(&base64::encode(&b));
_ => {} output_string.push_str("\">");
} }
_ => {}
} }
_ => {}
} }
} _ => {}
UntaggedValue::Primitive(Primitive::String(ref b)) => {
// This might be a bit much, but it's fun :)
match row.tag.anchor {
Some(AnchorLocation::Url(f)) |
Some(AnchorLocation::File(f)) => {
let extension = f.split('.').last().map(String::from);
match extension {
Some(s) if s.to_lowercase() == "svg" => {
output_string.push_str("<img src=\"data:image/svg+xml;base64,");
output_string.push_str(&base64::encode(&b.as_bytes()));
output_string.push_str("\">");
continue;
}
_ => {}
}
}
_ => {}
}
output_string.push_str(&(htmlescape::encode_minimal(&format_leaf(&row.value).plain_string(100_000)).replace("\n", "<br>")));
}
UntaggedValue::Row(row) => {
output_string.push_str("<tr>");
for header in &headers {
let data = row.get_data(header);
output_string.push_str("<td>");
output_string.push_str(&format_leaf(data.borrow()).plain_string(100_000));
output_string.push_str("</td>");
}
output_string.push_str("</tr>");
}
p => {
output_string.push_str(&(htmlescape::encode_minimal(&format_leaf(&p).plain_string(100_000)).replace("\n", "<br>")));
} }
} }
UntaggedValue::Primitive(Primitive::String(ref b)) => {
// This might be a bit much, but it's fun :)
match row.tag.anchor {
Some(AnchorLocation::Url(f)) | Some(AnchorLocation::File(f)) => {
let extension = f.split('.').last().map(String::from);
match extension {
Some(s) if s.to_lowercase() == "svg" => {
output_string.push_str("<img src=\"data:image/svg+xml;base64,");
output_string.push_str(&base64::encode(&b.as_bytes()));
output_string.push_str("\">");
continue;
}
_ => {}
}
}
_ => {}
}
output_string.push_str(
&(htmlescape::encode_minimal(&format_leaf(&row.value).plain_string(100_000))
.replace("\n", "<br>")),
);
}
UntaggedValue::Row(row) => {
output_string.push_str("<tr>");
for header in &headers {
let data = row.get_data(header);
output_string.push_str("<td>");
output_string.push_str(&format_leaf(data.borrow()).plain_string(100_000));
output_string.push_str("</td>");
}
output_string.push_str("</tr>");
}
p => {
output_string.push_str(
&(htmlescape::encode_minimal(&format_leaf(&p).plain_string(100_000))
.replace("\n", "<br>")),
);
}
} }
}
if !headers.is_empty() && (headers.len() > 1 || headers[0] != "") { if !headers.is_empty() && (headers.len() > 1 || headers[0] != "") {
output_string.push_str("</table>"); output_string.push_str("</table>");
} }
output_string.push_str("</body></html>"); output_string.push_str("</body></html>");
yield ReturnSuccess::value(UntaggedValue::string(output_string).into_value(name_tag)); Ok(OutputStream::one(ReturnSuccess::value(
}; UntaggedValue::string(output_string).into_value(name_tag),
)))
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -26,55 +26,58 @@ impl WholeStreamCommand for ToMarkdown {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
to_html(args, registry) to_html(args, registry).await
} }
} }
fn to_html(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn to_html(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let args = args.evaluate_once(&registry).await?;
let args = args.evaluate_once(&registry).await?; let name_tag = args.name_tag();
let name_tag = args.name_tag(); let input: Vec<Value> = args.input.collect().await;
let input: Vec<Value> = args.input.collect().await; let headers = nu_protocol::merge_descriptors(&input);
let headers = nu_protocol::merge_descriptors(&input); let mut output_string = String::new();
let mut output_string = String::new();
if !headers.is_empty() && (headers.len() > 1 || headers[0] != "") { if !headers.is_empty() && (headers.len() > 1 || headers[0] != "") {
output_string.push_str("|");
for header in &headers {
output_string.push_str(&htmlescape::encode_minimal(&header));
output_string.push_str("|"); output_string.push_str("|");
for header in &headers {
output_string.push_str(&htmlescape::encode_minimal(&header));
output_string.push_str("|");
}
output_string.push_str("\n|");
for _ in &headers {
output_string.push_str("-");
output_string.push_str("|");
}
output_string.push_str("\n");
} }
output_string.push_str("\n|");
for _ in &headers {
output_string.push_str("-");
output_string.push_str("|");
}
output_string.push_str("\n");
}
for row in input { for row in input {
match row.value { match row.value {
UntaggedValue::Row(row) => { UntaggedValue::Row(row) => {
output_string.push_str("|");
for header in &headers {
let data = row.get_data(header);
output_string.push_str(&format_leaf(data.borrow()).plain_string(100_000));
output_string.push_str("|"); output_string.push_str("|");
for header in &headers {
let data = row.get_data(header);
output_string.push_str(&format_leaf(data.borrow()).plain_string(100_000));
output_string.push_str("|");
}
output_string.push_str("\n");
}
p => {
output_string.push_str(&(htmlescape::encode_minimal(&format_leaf(&p).plain_string(100_000))));
output_string.push_str("\n");
} }
output_string.push_str("\n");
}
p => {
output_string.push_str(
&(htmlescape::encode_minimal(&format_leaf(&p).plain_string(100_000))),
);
output_string.push_str("\n");
} }
} }
}
yield ReturnSuccess::value(UntaggedValue::string(output_string).into_value(name_tag)); Ok(OutputStream::one(ReturnSuccess::value(
}; UntaggedValue::string(output_string).into_value(name_tag),
)))
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -27,7 +27,7 @@ impl WholeStreamCommand for ToSQLite {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
to_sqlite(args, registry) to_sqlite(args, registry).await
} }
fn is_binary(&self) -> bool { fn is_binary(&self) -> bool {
@ -56,7 +56,7 @@ impl WholeStreamCommand for ToDB {
args: CommandArgs, args: CommandArgs,
registry: &CommandRegistry, registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> { ) -> Result<OutputStream, ShellError> {
to_sqlite(args, registry) to_sqlite(args, registry).await
} }
fn is_binary(&self) -> bool { fn is_binary(&self) -> bool {
@ -203,26 +203,23 @@ fn sqlite_input_stream_to_bytes(values: Vec<Value>) -> Result<Value, std::io::Er
Ok(UntaggedValue::binary(out).into_value(tag)) Ok(UntaggedValue::binary(out).into_value(tag))
} }
fn to_sqlite(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> { async fn to_sqlite(
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
let registry = registry.clone(); let registry = registry.clone();
let stream = async_stream! { let args = args.evaluate_once(&registry).await?;
let args = args.evaluate_once(&registry).await?; let name_tag = args.name_tag();
let name_tag = args.name_tag(); let input: Vec<Value> = args.input.collect().await;
let input: Vec<Value> = args.input.collect().await;
match sqlite_input_stream_to_bytes(input) { match sqlite_input_stream_to_bytes(input) {
Ok(out) => yield ReturnSuccess::value(out), Ok(out) => Ok(OutputStream::one(ReturnSuccess::value(out))),
_ => { _ => Err(ShellError::labeled_error(
yield Err(ShellError::labeled_error( "Expected a table with SQLite-compatible structure from pipeline",
"Expected a table with SQLite-compatible structure from pipeline", "requires SQLite-compatible input",
"requires SQLite-compatible input", name_tag,
name_tag, )),
)) }
},
}
};
Ok(stream.to_output_stream())
} }
#[cfg(test)] #[cfg(test)]