nushell/src/commands/to_sqlite.rs

223 lines
6.9 KiB
Rust
Raw Normal View History

2019-08-27 23:45:18 +02:00
use crate::commands::WholeStreamCommand;
use crate::prelude::*;
use hex::encode;
use nu_errors::ShellError;
2019-11-30 01:21:05 +01:00
use nu_protocol::{Dictionary, Primitive, ReturnSuccess, Signature, UntaggedValue, Value};
2019-08-27 23:45:18 +02:00
use rusqlite::{Connection, NO_PARAMS};
use std::io::Read;
pub struct ToSQLite;
impl WholeStreamCommand for ToSQLite {
fn name(&self) -> &str {
"to-sqlite"
}
fn signature(&self) -> Signature {
Signature::build("to-sqlite")
}
fn usage(&self) -> &str {
"Convert table to sqlite .db binary data"
}
2019-08-31 03:30:41 +02:00
fn run(
&self,
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
to_sqlite(args, registry)
}
2019-09-04 03:50:23 +02:00
fn is_binary(&self) -> bool {
true
}
}
2019-08-31 03:30:41 +02:00
pub struct ToDB;
impl WholeStreamCommand for ToDB {
2019-08-31 03:30:41 +02:00
fn name(&self) -> &str {
"to-db"
}
fn signature(&self) -> Signature {
Signature::build("to-db")
}
fn usage(&self) -> &str {
"Convert table to db data"
}
fn run(
&self,
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
to_sqlite(args, registry)
}
2019-09-04 03:50:23 +02:00
fn is_binary(&self) -> bool {
true
}
2019-08-31 03:30:41 +02:00
}
2019-08-27 23:45:18 +02:00
fn comma_concat(acc: String, current: String) -> String {
if acc == "" {
current
} else {
format!("{}, {}", acc, current)
}
}
fn get_columns(rows: &[Value]) -> Result<String, std::io::Error> {
match &rows[0].value {
UntaggedValue::Row(d) => Ok(d
2019-08-27 23:45:18 +02:00
.entries
.iter()
.map(|(k, _v)| k.clone())
.fold("".to_string(), comma_concat)),
_ => Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Could not find table column names",
)),
}
}
fn nu_value_to_sqlite_string(v: Value) -> String {
match &v.value {
UntaggedValue::Primitive(p) => match p {
2019-08-27 23:45:18 +02:00
Primitive::Nothing => "NULL".into(),
Primitive::Int(i) => format!("{}", i),
2019-11-17 06:48:48 +01:00
Primitive::Duration(u) => format!("{}", u),
Primitive::Decimal(f) => format!("{}", f),
2019-08-27 23:45:18 +02:00
Primitive::Bytes(u) => format!("{}", u),
Primitive::Pattern(s) => format!("'{}'", s.replace("'", "''")),
2019-08-27 23:45:18 +02:00
Primitive::String(s) => format!("'{}'", s.replace("'", "''")),
Primitive::Line(s) => format!("'{}'", s.replace("'", "''")),
2019-08-27 23:45:18 +02:00
Primitive::Boolean(true) => "1".into(),
Primitive::Boolean(_) => "0".into(),
Primitive::Date(d) => format!("'{}'", d),
Primitive::Path(p) => format!("'{}'", p.display().to_string().replace("'", "''")),
Primitive::Binary(u) => format!("x'{}'", encode(u)),
2019-11-04 16:47:03 +01:00
Primitive::BeginningOfStream | Primitive::EndOfStream | Primitive::ColumnPath(_) => {
"NULL".into()
}
2019-08-27 23:45:18 +02:00
},
_ => "NULL".into(),
}
}
fn get_insert_values(rows: Vec<Value>) -> Result<String, std::io::Error> {
2019-08-27 23:45:18 +02:00
let values: Result<Vec<_>, _> = rows
.into_iter()
.map(|value| match value.value {
UntaggedValue::Row(d) => Ok(format!(
2019-08-27 23:45:18 +02:00
"({})",
d.entries
.iter()
.map(|(_k, v)| nu_value_to_sqlite_string(v.clone()))
2019-08-27 23:45:18 +02:00
.fold("".to_string(), comma_concat)
)),
_ => Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Could not find table column names",
)),
})
.collect();
let values = values?;
Ok(values.into_iter().fold("".to_string(), comma_concat))
}
fn generate_statements(table: Dictionary) -> Result<(String, String), std::io::Error> {
let table_name = match table.entries.get("table_name") {
Some(Value {
value: UntaggedValue::Primitive(Primitive::String(table_name)),
2019-08-27 23:45:18 +02:00
..
}) => table_name,
_ => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Could not find table name",
))
}
};
let (columns, insert_values) = match table.entries.get("table_values") {
Some(Value {
value: UntaggedValue::Table(l),
2019-08-27 23:45:18 +02:00
..
}) => (get_columns(l), get_insert_values(l.to_vec())),
_ => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Could not find table values",
))
}
};
let create = format!("create table {}({})", table_name, columns?);
let insert = format!("insert into {} values {}", table_name, insert_values?);
Ok((create, insert))
}
fn sqlite_input_stream_to_bytes(values: Vec<Value>) -> Result<Value, std::io::Error> {
2019-08-27 23:45:18 +02:00
// FIXME: should probably write a sqlite virtual filesystem
// that will allow us to use bytes as a file to avoid this
// write out, but this will require C code. Might be
// best done as a PR to rusqlite.
let mut tempfile = tempfile::NamedTempFile::new()?;
let conn = match Connection::open(tempfile.path()) {
Ok(conn) => conn,
Err(e) => return Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
};
let tag = values[0].tag.clone();
for value in values.into_iter() {
match &value.value {
UntaggedValue::Row(d) => {
2019-08-27 23:45:18 +02:00
let (create, insert) = generate_statements(d.to_owned())?;
match conn
.execute(&create, NO_PARAMS)
.and_then(|_| conn.execute(&insert, NO_PARAMS))
{
Ok(_) => (),
Err(e) => {
2019-11-04 16:47:03 +01:00
outln!("{}", create);
outln!("{}", insert);
outln!("{:?}", e);
2019-08-27 23:45:18 +02:00
return Err(std::io::Error::new(std::io::ErrorKind::Other, e));
}
}
}
other => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("Expected row, found {:?}", other),
2019-08-27 23:45:18 +02:00
))
}
}
}
let mut out = Vec::new();
tempfile.read_to_end(&mut out)?;
Ok(UntaggedValue::binary(out).into_value(tag))
2019-08-27 23:45:18 +02:00
}
fn to_sqlite(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let name_tag = args.name_tag();
let stream = async_stream! {
let input: Vec<Value> = args.input.values.collect().await;
match sqlite_input_stream_to_bytes(input) {
Ok(out) => yield ReturnSuccess::value(out),
_ => {
2019-08-27 23:45:18 +02:00
yield Err(ShellError::labeled_error(
"Expected a table with SQLite-compatible structure.tag() from pipeline",
2019-08-27 23:45:18 +02:00
"requires SQLite-compatible input",
name_tag,
))
},
}
2019-08-27 23:45:18 +02:00
};
2019-08-27 23:45:18 +02:00
Ok(stream.to_output_stream())
}