Add SQLite support

This commit is contained in:
Patrick Meredith
2019-08-27 17:45:18 -04:00
parent 4cde96bcc4
commit 3d147d1143
12 changed files with 507 additions and 79 deletions

View File

@ -179,6 +179,7 @@ pub async fn cli() -> Result<(), Box<dyn Error>> {
whole_stream_command(ToBSON),
whole_stream_command(ToCSV),
whole_stream_command(ToJSON),
whole_stream_command(ToSQLite),
whole_stream_command(ToTOML),
whole_stream_command(ToTSV),
whole_stream_command(ToYAML),
@ -193,6 +194,7 @@ pub async fn cli() -> Result<(), Box<dyn Error>> {
whole_stream_command(FromINI),
whole_stream_command(FromBSON),
whole_stream_command(FromJSON),
whole_stream_command(FromSQLite),
whole_stream_command(FromTOML),
whole_stream_command(FromXML),
whole_stream_command(FromYAML),

View File

@ -19,6 +19,7 @@ pub(crate) mod from_bson;
pub(crate) mod from_csv;
pub(crate) mod from_ini;
pub(crate) mod from_json;
pub(crate) mod from_sqlite;
pub(crate) mod from_toml;
pub(crate) mod from_tsv;
pub(crate) mod from_xml;
@ -52,6 +53,7 @@ pub(crate) mod to_array;
pub(crate) mod to_bson;
pub(crate) mod to_csv;
pub(crate) mod to_json;
pub(crate) mod to_sqlite;
pub(crate) mod to_toml;
pub(crate) mod to_tsv;
pub(crate) mod to_yaml;
@ -67,6 +69,7 @@ pub(crate) use command::{
per_item_command, whole_stream_command, Command, PerItemCommand, RawCommandArgs,
UnevaluatedCallInfo, WholeStreamCommand,
};
pub(crate) use config::Config;
pub(crate) use cp::Cpy;
pub(crate) use date::Date;
@ -79,6 +82,7 @@ pub(crate) use from_bson::FromBSON;
pub(crate) use from_csv::FromCSV;
pub(crate) use from_ini::FromINI;
pub(crate) use from_json::FromJSON;
pub(crate) use from_sqlite::FromSQLite;
pub(crate) use from_toml::FromTOML;
pub(crate) use from_tsv::FromTSV;
pub(crate) use from_xml::FromXML;
@ -111,6 +115,7 @@ pub(crate) use to_array::ToArray;
pub(crate) use to_bson::ToBSON;
pub(crate) use to_csv::ToCSV;
pub(crate) use to_json::ToJSON;
pub(crate) use to_sqlite::ToSQLite;
pub(crate) use to_toml::ToTOML;
pub(crate) use to_tsv::ToTSV;
pub(crate) use to_yaml::ToYAML;

View File

@ -45,7 +45,7 @@ fn convert_bson_value_to_nu_value(v: &Bson, tag: impl Into<Tag>) -> Tagged<Value
collected.into_tagged_value()
}
Bson::Boolean(b) => Value::Primitive(Primitive::Boolean(*b)).tagged(tag),
Bson::Null => Value::Primitive(Primitive::String(String::from(""))).tagged(tag),
Bson::Null => Value::Primitive(Primitive::Nothing).tagged(tag),
Bson::RegExp(r, opts) => {
let mut collected = TaggedDictBuilder::new(tag);
collected.insert_tagged(

139
src/commands/from_sqlite.rs Normal file
View File

@ -0,0 +1,139 @@
use crate::commands::WholeStreamCommand;
use crate::errors::ShellError;
use crate::object::base::OF64;
use crate::object::{Primitive, TaggedDictBuilder, Value};
use crate::prelude::*;
use rusqlite::{types::ValueRef, Connection, Row, NO_PARAMS};
use std::io::Write;
use std::path::Path;
pub struct FromSQLite;
impl WholeStreamCommand for FromSQLite {
fn run(
&self,
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
from_sqlite(args, registry)
}
fn name(&self) -> &str {
"from-sqlite"
}
fn signature(&self) -> Signature {
Signature::build("from-sqlite")
}
}
pub fn convert_sqlite_file_to_nu_value(
path: &Path,
tag: impl Into<Tag> + Clone,
) -> Result<Tagged<Value>, rusqlite::Error> {
let conn = Connection::open(path)?;
let mut meta_out = Vec::new();
let mut meta_stmt = conn.prepare("select name from sqlite_master where type='table'")?;
let mut meta_rows = meta_stmt.query(NO_PARAMS)?;
while let Some(meta_row) = meta_rows.next()? {
let table_name: String = meta_row.get(0)?;
let mut meta_dict = TaggedDictBuilder::new(tag.clone());
let mut out = Vec::new();
let mut table_stmt = conn.prepare(&format!("select * from [{}]", table_name))?;
let mut table_rows = table_stmt.query(NO_PARAMS)?;
while let Some(table_row) = table_rows.next()? {
out.push(convert_sqlite_row_to_nu_value(table_row, tag.clone())?)
}
meta_dict.insert_tagged(
"table_name".to_string(),
Value::Primitive(Primitive::String(table_name)).tagged(tag.clone()),
);
meta_dict.insert_tagged("table_values", Value::List(out).tagged(tag.clone()));
meta_out.push(meta_dict.into_tagged_value());
}
let tag = tag.into();
Ok(Value::List(meta_out).tagged(tag))
}
fn convert_sqlite_row_to_nu_value(
row: &Row,
tag: impl Into<Tag> + Clone,
) -> Result<Tagged<Value>, rusqlite::Error> {
let mut collected = TaggedDictBuilder::new(tag.clone());
for (i, c) in row.columns().iter().enumerate() {
collected.insert_tagged(
c.name().to_string(),
convert_sqlite_value_to_nu_value(row.get_raw(i), tag.clone()),
);
}
return Ok(collected.into_tagged_value());
}
fn convert_sqlite_value_to_nu_value(value: ValueRef, tag: impl Into<Tag> + Clone) -> Tagged<Value> {
match value {
ValueRef::Null => Value::Primitive(Primitive::String(String::from(""))).tagged(tag),
ValueRef::Integer(i) => Value::Primitive(Primitive::Int(i)).tagged(tag),
ValueRef::Real(f) => Value::Primitive(Primitive::Float(OF64::from(f))).tagged(tag),
t @ ValueRef::Text(_) => {
// this unwrap is safe because we know the ValueRef is Text.
Value::Primitive(Primitive::String(t.as_str().unwrap().to_string())).tagged(tag)
}
ValueRef::Blob(u) => Value::Binary(u.to_owned()).tagged(tag),
}
}
pub fn from_sqlite_bytes_to_value(
mut bytes: Vec<u8>,
tag: impl Into<Tag> + Clone,
) -> Result<Tagged<Value>, std::io::Error> {
// FIXME: should probably write a sqlite virtual filesystem
// that will allow us to use bytes as a file to avoid this
// write out, but this will require C code. Might be
// best done as a PR to rusqlite.
let mut tempfile = tempfile::NamedTempFile::new()?;
tempfile.write_all(bytes.as_mut_slice())?;
match convert_sqlite_file_to_nu_value(tempfile.path(), tag) {
Ok(value) => Ok(value),
Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
}
}
fn from_sqlite(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let span = args.name_span();
let input = args.input;
let stream = async_stream_block! {
let values: Vec<Tagged<Value>> = input.values.collect().await;
for value in values {
let value_tag = value.tag();
match value.item {
Value::Binary(vb) =>
match from_sqlite_bytes_to_value(vb, span) {
Ok(x) => yield ReturnSuccess::value(x),
Err(_) => {
yield Err(ShellError::labeled_error_with_secondary(
"Could not parse as SQLite",
"input cannot be parsed as SQLite",
span,
"value originates from here",
value_tag.span,
))
}
}
_ => yield Err(ShellError::labeled_error_with_secondary(
"Expected a string from pipeline",
"requires string input",
span,
"value originates from here",
value_tag.span,
)),
}
}
};
Ok(stream.to_output_stream())
}

View File

@ -527,6 +527,16 @@ pub fn parse_binary_as_value(
},
)
}
Some(ref x) if x == "db" => {
crate::commands::from_sqlite::from_sqlite_bytes_to_value(contents, contents_tag)
.map_err(move |_| {
ShellError::labeled_error(
"Could not open as SQLite",
"could not open as SQLite",
name_span,
)
})
}
_ => Ok(Value::Binary(contents).tagged(contents_tag)),
}
}

183
src/commands/to_sqlite.rs Normal file
View File

@ -0,0 +1,183 @@
use crate::commands::WholeStreamCommand;
use crate::object::{Dictionary, Primitive, Value};
use crate::prelude::*;
use hex::encode;
use rusqlite::{Connection, NO_PARAMS};
use std::io::Read;
pub struct ToSQLite;
impl WholeStreamCommand for ToSQLite {
fn run(
&self,
args: CommandArgs,
registry: &CommandRegistry,
) -> Result<OutputStream, ShellError> {
to_sqlite(args, registry)
}
fn name(&self) -> &str {
"to-sqlite"
}
fn signature(&self) -> Signature {
Signature::build("to-sqlite")
}
}
fn comma_concat(acc: String, current: String) -> String {
if acc == "" {
current
} else {
format!("{}, {}", acc, current)
}
}
fn get_columns(rows: &Vec<Tagged<Value>>) -> Result<String, std::io::Error> {
match &rows[0].item {
Value::Object(d) => Ok(d
.entries
.iter()
.map(|(k, _v)| k.clone())
.fold("".to_string(), comma_concat)),
_ => Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Could not find table column names",
)),
}
}
fn nu_value_to_sqlite_string(v: Value) -> String {
match v {
Value::Binary(u) => format!("x'{}'", encode(u)),
Value::Primitive(p) => match p {
Primitive::Nothing => "NULL".into(),
Primitive::Int(i) => format!("{}", i),
Primitive::Float(f) => format!("{}", f.into_inner()),
Primitive::Bytes(u) => format!("{}", u),
Primitive::String(s) => format!("'{}'", s.replace("'", "''")),
Primitive::Boolean(true) => "1".into(),
Primitive::Boolean(_) => "0".into(),
Primitive::Date(d) => format!("'{}'", d),
Primitive::Path(p) => format!("'{}'", p.display().to_string().replace("'", "''")),
Primitive::BeginningOfStream => "NULL".into(),
Primitive::EndOfStream => "NULL".into(),
},
_ => "NULL".into(),
}
}
fn get_insert_values(rows: Vec<Tagged<Value>>) -> Result<String, std::io::Error> {
let values: Result<Vec<_>, _> = rows
.into_iter()
.map(|value| match value.item {
Value::Object(d) => Ok(format!(
"({})",
d.entries
.iter()
.map(|(_k, v)| nu_value_to_sqlite_string(v.item.clone()))
.fold("".to_string(), comma_concat)
)),
_ => Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Could not find table column names",
)),
})
.collect();
let values = values?;
Ok(values.into_iter().fold("".to_string(), comma_concat))
}
fn generate_statements(table: Dictionary) -> Result<(String, String), std::io::Error> {
let table_name = match table.entries.get("table_name") {
Some(Tagged {
item: Value::Primitive(Primitive::String(table_name)),
..
}) => table_name,
_ => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Could not find table name",
))
}
};
let (columns, insert_values) = match table.entries.get("table_values") {
Some(Tagged {
item: Value::List(l),
..
}) => (get_columns(l), get_insert_values(l.to_vec())),
_ => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Could not find table values",
))
}
};
let create = format!("create table {}({})", table_name, columns?);
let insert = format!("insert into {} values {}", table_name, insert_values?);
Ok((create, insert))
}
fn sqlite_input_stream_to_bytes(
values: Vec<Tagged<Value>>,
) -> Result<Tagged<Value>, std::io::Error> {
// FIXME: should probably write a sqlite virtual filesystem
// that will allow us to use bytes as a file to avoid this
// write out, but this will require C code. Might be
// best done as a PR to rusqlite.
let mut tempfile = tempfile::NamedTempFile::new()?;
let conn = match Connection::open(tempfile.path()) {
Ok(conn) => conn,
Err(e) => return Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
};
let tag = values[0].tag.clone();
for value in values.into_iter() {
match value.item() {
Value::Object(d) => {
let (create, insert) = generate_statements(d.to_owned())?;
match conn
.execute(&create, NO_PARAMS)
.and_then(|_| conn.execute(&insert, NO_PARAMS))
{
Ok(_) => (),
Err(e) => {
println!("{}", create);
println!("{}", insert);
println!("{:?}", e);
return Err(std::io::Error::new(std::io::ErrorKind::Other, e));
}
}
}
other => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("Expected object, found {:?}", other),
))
}
}
}
let mut out = Vec::new();
tempfile.read_to_end(&mut out)?;
Ok(Value::Binary(out).tagged(tag))
}
fn to_sqlite(args: CommandArgs, registry: &CommandRegistry) -> Result<OutputStream, ShellError> {
let args = args.evaluate_once(registry)?;
let name_span = args.name_span();
let stream = async_stream_block! {
let values: Vec<_> = args.input.into_vec().await;
match sqlite_input_stream_to_bytes(values) {
Ok(out) => {
yield ReturnSuccess::value(out)
}
Err(_) => {
yield Err(ShellError::labeled_error(
"Expected an object with SQLite-compatible structure from pipeline",
"requires SQLite-compatible input",
name_span,
))
}
};
};
Ok(stream.to_output_stream())
}

View File

@ -266,6 +266,10 @@ mod tests {
loc: fixtures().join("sample.bson"),
at: 0
},
Res {
loc: fixtures().join("sample.db"),
at: 0
},
Res {
loc: fixtures().join("sample.ini"),
at: 0