'stor create/insert/open' & 'query db' now support JSON columns (#16258)

Co-authored-by: Tim 'Piepmatz' Hesse <git+github@cptpiepmatz.de>
This commit is contained in:
YPares
2025-08-04 16:58:21 +02:00
committed by GitHub
parent 9f4c3a1d10
commit f33d952adf
9 changed files with 243 additions and 95 deletions

View File

@ -94,6 +94,7 @@ rusqlite = { workspace = true, features = [
"bundled",
"backup",
"chrono",
"column_decltype",
], optional = true }
rustls = { workspace = true, optional = true, features = ["ring"] }
rustls-native-certs = { workspace = true, optional = true }

View File

@ -76,6 +76,17 @@ impl Command for IntoSqliteDb {
example: "{ foo: bar, baz: quux } | into sqlite filename.db",
result: None,
},
Example {
description: "Insert data that contains records, lists or tables, that will be stored as JSONB columns
These columns will be automatically turned back into nu objects when read directly via cell-path",
example: "{a_record: {foo: bar, baz: quux}, a_list: [1 2 3], a_table: [[a b]; [0 1] [2 3]]} | into sqlite filename.db -t my_table
(open filename.db).my_table.0.a_list",
result: Some(Value::test_list(vec![
Value::test_int(1),
Value::test_int(2),
Value::test_int(3)
]))
}
]
}
}
@ -183,10 +194,11 @@ fn operate(
let file_name: Spanned<String> = call.req(engine_state, stack, 0)?;
let table_name: Option<Spanned<String>> = call.get_flag(engine_state, stack, "table-name")?;
let table = Table::new(&file_name, table_name)?;
Ok(action(input, table, span, engine_state.signals())?.into_pipeline_data())
Ok(action(engine_state, input, table, span, engine_state.signals())?.into_pipeline_data())
}
fn action(
engine_state: &EngineState,
input: PipelineData,
table: Table,
span: Span,
@ -194,17 +206,17 @@ fn action(
) -> Result<Value, ShellError> {
match input {
PipelineData::ListStream(stream, _) => {
insert_in_transaction(stream.into_iter(), span, table, signals)
insert_in_transaction(engine_state, stream.into_iter(), span, table, signals)
}
PipelineData::Value(value @ Value::List { .. }, _) => {
let span = value.span();
let vals = value
.into_list()
.expect("Value matched as list above, but is not a list");
insert_in_transaction(vals.into_iter(), span, table, signals)
insert_in_transaction(engine_state, vals.into_iter(), span, table, signals)
}
PipelineData::Value(val, _) => {
insert_in_transaction(std::iter::once(val), span, table, signals)
insert_in_transaction(engine_state, std::iter::once(val), span, table, signals)
}
_ => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "list".into(),
@ -216,6 +228,7 @@ fn action(
}
fn insert_in_transaction(
engine_state: &EngineState,
stream: impl Iterator<Item = Value>,
span: Span,
mut table: Table,
@ -272,7 +285,7 @@ fn insert_in_transaction(
inner: Vec::new(),
})?;
let result = insert_value(stream_value, &mut insert_statement);
let result = insert_value(engine_state, stream_value, span, &mut insert_statement);
insert_statement
.finalize()
@ -299,13 +312,15 @@ fn insert_in_transaction(
}
fn insert_value(
engine_state: &EngineState,
stream_value: Value,
call_span: Span,
insert_statement: &mut rusqlite::Statement<'_>,
) -> Result<(), ShellError> {
match stream_value {
// map each column value into its SQL representation
Value::Record { val, .. } => {
let sql_vals = values_to_sql(val.values().cloned())?;
let sql_vals = values_to_sql(engine_state, val.values().cloned(), call_span)?;
insert_statement
.execute(rusqlite::params_from_iter(sql_vals))
@ -345,6 +360,7 @@ fn nu_value_to_sqlite_type(val: &Value) -> Result<&'static str, ShellError> {
Type::Date => Ok("DATETIME"),
Type::Duration => Ok("BIGINT"),
Type::Filesize => Ok("INTEGER"),
Type::List(_) | Type::Record(_) | Type::Table(_) => Ok("JSONB"),
// [NOTE] On null values, we just assume TEXT. This could end up
// creating a table where the column type is wrong in the table schema.
@ -358,11 +374,8 @@ fn nu_value_to_sqlite_type(val: &Value) -> Result<&'static str, ShellError> {
| Type::Closure
| Type::Custom(_)
| Type::Error
| Type::List(_)
| Type::Range
| Type::Record(_)
| Type::Glob
| Type::Table(_) => Err(ShellError::OnlySupportsThisInputType {
| Type::Glob => Err(ShellError::OnlySupportsThisInputType {
exp_input_type: "sql".into(),
wrong_type: val.get_type().to_string(),
dst_span: Span::unknown(),
@ -388,17 +401,3 @@ fn get_columns_with_sqlite_types(
Ok(columns)
}
#[cfg(test)]
mod tests {
use super::*;
// use super::{action, IntoSqliteDb};
// use nu_protocol::Type::Error;
#[test]
fn test_examples() {
use crate::test_examples;
test_examples(IntoSqliteDb {})
}
}

View File

@ -41,7 +41,7 @@ impl Command for QueryDb {
Example {
description: "Execute a SQL statement with parameters",
example: r#"stor create -t my_table -c { first: str, second: int }
stor open | query db "INSERT INTO my_table VALUES (?, ?)" -p [hello 123]"#,
stor open | query db "INSERT INTO my_table VALUES (?, ?)" -p [hello 123]"#,
result: None,
},
Example {
@ -54,6 +54,36 @@ stor open | query db "SELECT * FROM my_table WHERE second = :search_second" -p {
"second" => Value::test_int(123)
})])),
},
Example {
description: "Execute a SQL query, selecting a declared JSON(B) column that will automatically be parsed",
example: r#"stor create -t my_table -c {data: jsonb}
[{data: {name: Albert, age: 40}} {data: {name: Barnaby, age: 54}}] | stor insert -t my_table
stor open | query db "SELECT data FROM my_table WHERE data->>'age' < 45""#,
result: Some(Value::test_list(vec![Value::test_record(record! {
"data" => Value::test_record(
record! {
"name" => Value::test_string("Albert"),
"age" => Value::test_int(40),
}
)})])),
},
Example {
description: "Execute a SQL query selecting a sub-field of a JSON(B) column.
In this case, results must be parsed afterwards because SQLite does not
return declaration types when a JSON(B) column is not directly selected",
example: r#"stor create -t my_table -c {data: jsonb}
stor insert -t my_table -d {data: {foo: foo, bar: 12, baz: [0 1 2]}}
stor open | query db "SELECT data->'baz' AS baz FROM my_table" | update baz {from json}"#,
result: Some(Value::test_list(vec![Value::test_record(
record! { "baz" =>
Value::test_list(vec![
Value::test_int(0),
Value::test_int(1),
Value::test_int(2),
])
},
)])),
},
]
}
@ -73,7 +103,7 @@ stor open | query db "SELECT * FROM my_table WHERE second = :search_second" -p {
.get_flag(engine_state, stack, "params")?
.unwrap_or_else(|| Value::nothing(Span::unknown()));
let params = nu_value_to_params(params_value)?;
let params = nu_value_to_params(engine_state, params_value, call.head)?;
let db = SQLiteDatabase::try_from_pipeline(input, call.head)?;
db.query(&sql, params, call.head)

View File

@ -4,7 +4,7 @@ use super::definitions::{
};
use nu_protocol::{
CustomValue, PipelineData, Record, ShellError, Signals, Span, Spanned, Value,
shell_error::io::IoError,
engine::EngineState, shell_error::io::IoError,
};
use rusqlite::{
Connection, DatabaseName, Error as SqliteError, OpenFlags, Row, Statement, ToSql,
@ -431,35 +431,44 @@ fn run_sql_query(
}
// This is taken from to text local_into_string but tweaks it a bit so that certain formatting does not happen
pub fn value_to_sql(value: Value) -> Result<Box<dyn rusqlite::ToSql>, ShellError> {
Ok(match value {
Value::Bool { val, .. } => Box::new(val),
Value::Int { val, .. } => Box::new(val),
Value::Float { val, .. } => Box::new(val),
Value::Filesize { val, .. } => Box::new(val.get()),
Value::Duration { val, .. } => Box::new(val),
Value::Date { val, .. } => Box::new(val),
Value::String { val, .. } => Box::new(val),
Value::Binary { val, .. } => Box::new(val),
Value::Nothing { .. } => Box::new(rusqlite::types::Null),
pub fn value_to_sql(
engine_state: &EngineState,
value: Value,
call_span: Span,
) -> Result<Box<dyn rusqlite::ToSql>, ShellError> {
match value {
Value::Bool { val, .. } => Ok(Box::new(val)),
Value::Int { val, .. } => Ok(Box::new(val)),
Value::Float { val, .. } => Ok(Box::new(val)),
Value::Filesize { val, .. } => Ok(Box::new(val.get())),
Value::Duration { val, .. } => Ok(Box::new(val)),
Value::Date { val, .. } => Ok(Box::new(val)),
Value::String { val, .. } => Ok(Box::new(val)),
Value::Binary { val, .. } => Ok(Box::new(val)),
Value::Nothing { .. } => Ok(Box::new(rusqlite::types::Null)),
val => {
return Err(ShellError::OnlySupportsThisInputType {
exp_input_type:
"bool, int, float, filesize, duration, date, string, nothing, binary".into(),
wrong_type: val.get_type().to_string(),
dst_span: Span::unknown(),
src_span: val.span(),
});
let json_value = crate::value_to_json_value(engine_state, &val, call_span, false)?;
match nu_json::to_string_raw(&json_value) {
Ok(s) => Ok(Box::new(s)),
Err(err) => Err(ShellError::CantConvert {
to_type: "JSON".into(),
from_type: val.get_type().to_string(),
span: val.span(),
help: Some(err.to_string()),
}),
}
}
})
}
}
pub fn values_to_sql(
engine_state: &EngineState,
values: impl IntoIterator<Item = Value>,
call_span: Span,
) -> Result<Vec<Box<dyn rusqlite::ToSql>>, ShellError> {
values
.into_iter()
.map(value_to_sql)
.map(|v| value_to_sql(engine_state, v, call_span))
.collect::<Result<Vec<_>, _>>()
}
@ -474,13 +483,17 @@ impl Default for NuSqlParams {
}
}
pub fn nu_value_to_params(value: Value) -> Result<NuSqlParams, ShellError> {
pub fn nu_value_to_params(
engine_state: &EngineState,
value: Value,
call_span: Span,
) -> Result<NuSqlParams, ShellError> {
match value {
Value::Record { val, .. } => {
let mut params = Vec::with_capacity(val.len());
for (mut column, value) in val.into_owned().into_iter() {
let sql_type_erased = value_to_sql(value)?;
let sql_type_erased = value_to_sql(engine_state, value, call_span)?;
if !column.starts_with([':', '@', '$']) {
column.insert(0, ':');
@ -495,7 +508,7 @@ pub fn nu_value_to_params(value: Value) -> Result<NuSqlParams, ShellError> {
let mut params = Vec::with_capacity(vals.len());
for value in vals.into_iter() {
let sql_type_erased = value_to_sql(value)?;
let sql_type_erased = value_to_sql(engine_state, value, call_span)?;
params.push(sql_type_erased);
}
@ -557,17 +570,49 @@ fn read_single_table(
prepared_statement_to_nu_list(stmt, NuSqlParams::default(), call_span, signals)
}
/// The SQLite type behind a query column returned as some raw type (e.g. 'text')
#[derive(Clone, Copy)]
pub enum DeclType {
Json,
Jsonb,
}
impl DeclType {
pub fn from_str(s: &str) -> Option<Self> {
match s.to_uppercase().as_str() {
"JSON" => Some(DeclType::Json),
"JSONB" => Some(DeclType::Jsonb),
_ => None, // We are only special-casing JSON(B) columns for now
}
}
}
/// A column out of an SQLite query, together with its type
pub struct TypedColumn {
pub name: String,
pub decl_type: Option<DeclType>,
}
impl TypedColumn {
pub fn from_rusqlite_column(c: &rusqlite::Column) -> Self {
Self {
name: c.name().to_owned(),
decl_type: c.decl_type().and_then(DeclType::from_str),
}
}
}
fn prepared_statement_to_nu_list(
mut stmt: Statement,
params: NuSqlParams,
call_span: Span,
signals: &Signals,
) -> Result<Value, SqliteOrShellError> {
let column_names = stmt
.column_names()
.into_iter()
.map(String::from)
.collect::<Vec<String>>();
let columns: Vec<TypedColumn> = stmt
.columns()
.iter()
.map(TypedColumn::from_rusqlite_column)
.collect();
// I'm very sorry for this repetition
// I tried scoping the match arms to the query_map alone, but lifetime and closure reference escapes
@ -577,11 +622,7 @@ fn prepared_statement_to_nu_list(
let refs: Vec<&dyn ToSql> = params.iter().map(|value| (&**value)).collect();
let row_results = stmt.query_map(refs.as_slice(), |row| {
Ok(convert_sqlite_row_to_nu_value(
row,
call_span,
&column_names,
))
Ok(convert_sqlite_row_to_nu_value(row, call_span, &columns))
})?;
// we collect all rows before returning them. Not ideal but it's hard/impossible to return a stream from a CustomValue
@ -603,11 +644,7 @@ fn prepared_statement_to_nu_list(
.collect();
let row_results = stmt.query_map(refs.as_slice(), |row| {
Ok(convert_sqlite_row_to_nu_value(
row,
call_span,
&column_names,
))
Ok(convert_sqlite_row_to_nu_value(row, call_span, &columns))
})?;
// we collect all rows before returning them. Not ideal but it's hard/impossible to return a stream from a CustomValue
@ -650,14 +687,14 @@ fn read_entire_sqlite_db(
Ok(Value::record(tables, call_span))
}
pub fn convert_sqlite_row_to_nu_value(row: &Row, span: Span, column_names: &[String]) -> Value {
let record = column_names
pub fn convert_sqlite_row_to_nu_value(row: &Row, span: Span, columns: &[TypedColumn]) -> Value {
let record = columns
.iter()
.enumerate()
.map(|(i, col)| {
(
col.clone(),
convert_sqlite_value_to_nu_value(row.get_ref_unwrap(i), span),
col.name.clone(),
convert_sqlite_value_to_nu_value(row.get_ref_unwrap(i), col.decl_type, span),
)
})
.collect();
@ -665,18 +702,25 @@ pub fn convert_sqlite_row_to_nu_value(row: &Row, span: Span, column_names: &[Str
Value::record(record, span)
}
pub fn convert_sqlite_value_to_nu_value(value: ValueRef, span: Span) -> Value {
pub fn convert_sqlite_value_to_nu_value(
value: ValueRef,
decl_type: Option<DeclType>,
span: Span,
) -> Value {
match value {
ValueRef::Null => Value::nothing(span),
ValueRef::Integer(i) => Value::int(i, span),
ValueRef::Real(f) => Value::float(f, span),
ValueRef::Text(buf) => {
let s = match std::str::from_utf8(buf) {
Ok(v) => v,
Err(_) => return Value::error(ShellError::NonUtf8 { span }, span),
};
Value::string(s.to_string(), span)
}
ValueRef::Text(buf) => match (std::str::from_utf8(buf), decl_type) {
(Ok(txt), Some(DeclType::Json | DeclType::Jsonb)) => {
match crate::convert_json_string_to_value(txt, span) {
Ok(val) => val,
Err(err) => Value::error(err, span),
}
}
(Ok(txt), _) => Value::string(txt.to_string(), span),
(Err(_), _) => Value::error(ShellError::NonUtf8 { span }, span),
},
ValueRef::Blob(u) => Value::binary(u.to_vec(), span),
}
}

View File

@ -186,7 +186,7 @@ fn convert_nujson_to_value(value: nu_json::Value, span: Span) -> Value {
}
}
fn convert_string_to_value(string_input: &str, span: Span) -> Result<Value, ShellError> {
pub(crate) fn convert_string_to_value(string_input: &str, span: Span) -> Result<Value, ShellError> {
match nu_json::from_str(string_input) {
Ok(value) => Ok(convert_nujson_to_value(value, span)),

View File

@ -27,3 +27,6 @@ pub use xlsx::FromXlsx;
pub use xml::FromXml;
pub use yaml::FromYaml;
pub use yaml::FromYml;
#[cfg(feature = "sqlite")]
pub(crate) use json::convert_string_to_value as convert_json_string_to_value;

View File

@ -37,11 +37,18 @@ impl Command for StorCreate {
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Create an in-memory sqlite database with specified table name, column names, and column data types",
example: "stor create --table-name nudb --columns {bool1: bool, int1: int, float1: float, str1: str, datetime1: datetime}",
result: None,
}]
vec![
Example {
description: "Create an in-memory sqlite database with specified table name, column names, and column data types",
example: "stor create --table-name nudb --columns {bool1: bool, int1: int, float1: float, str1: str, datetime1: datetime}",
result: None,
},
Example {
description: "Create an in-memory sqlite database with a json column",
example: "stor create --table-name files_with_md --columns {file: str, metadata: jsonb}",
result: None,
},
]
}
fn run(
@ -83,7 +90,7 @@ fn process(
Some(record) => {
let mut create_stmt = format!("CREATE TABLE {new_table_name} ( ");
for (column_name, column_datatype) in record {
match column_datatype.coerce_str()?.as_ref() {
match column_datatype.coerce_str()?.to_lowercase().as_ref() {
"int" => {
create_stmt.push_str(&format!("{column_name} INTEGER, "));
}
@ -102,10 +109,16 @@ fn process(
"{column_name} DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), "
));
}
"json" => {
create_stmt.push_str(&format!("{column_name} JSON, "));
}
"jsonb" => {
create_stmt.push_str(&format!("{column_name} JSONB, "));
}
_ => {
return Err(ShellError::UnsupportedInput {
msg: "unsupported column data type".into(),
msg: "Unsupported column data type. Please use: int, float, str, bool, datetime, json, jsonb".into(),
input: format!("{column_datatype:?}"),
msg_span: column_datatype.span(),
input_span: column_datatype.span(),

View File

@ -67,6 +67,11 @@ impl Command for StorInsert {
example: "ls | stor insert --table-name files",
result: None,
},
Example {
description: "Insert nu records as json data",
example: "ls -l | each {{file: $in.name, metadata: ($in | reject name)}} | stor insert --table-name files_with_md",
result: None,
},
]
}
@ -89,7 +94,7 @@ impl Command for StorInsert {
let records = handle(span, data_record, input)?;
for record in records {
process(table_name.clone(), span, &db, record)?;
process(engine_state, table_name.clone(), span, &db, record)?;
}
Ok(Value::custom(db, span).into_pipeline_data())
@ -151,6 +156,7 @@ fn handle(
}
fn process(
engine_state: &EngineState,
table_name: Option<String>,
span: Span,
db: &SQLiteDatabase,
@ -186,7 +192,7 @@ fn process(
// dbg!(&create_stmt);
// Get the params from the passed values
let params = values_to_sql(record.values().cloned())?;
let params = values_to_sql(engine_state, record.values().cloned(), span)?;
if let Ok(conn) = db.open_connection() {
conn.execute(&create_stmt, params_from_iter(params))
@ -253,7 +259,7 @@ mod test {
),
);
let result = process(table_name, span, &db, columns);
let result = process(&EngineState::new(), table_name, span, &db, columns);
assert!(result.is_ok());
}
@ -281,7 +287,7 @@ mod test {
Value::test_string("String With Spaces".to_string()),
);
let result = process(table_name, span, &db, columns);
let result = process(&EngineState::new(), table_name, span, &db, columns);
assert!(result.is_ok());
}
@ -309,7 +315,7 @@ mod test {
Value::test_string("ThisIsALongString".to_string()),
);
let result = process(table_name, span, &db, columns);
let result = process(&EngineState::new(), table_name, span, &db, columns);
// SQLite uses dynamic typing, making any length acceptable for a varchar column
assert!(result.is_ok());
}
@ -337,7 +343,7 @@ mod test {
Value::test_string("ThisIsTheWrongType".to_string()),
);
let result = process(table_name, span, &db, columns);
let result = process(&EngineState::new(), table_name, span, &db, columns);
// SQLite uses dynamic typing, making any type acceptable for a column
assert!(result.is_ok());
}
@ -365,7 +371,7 @@ mod test {
Value::test_string("ThisIsALongString".to_string()),
);
let result = process(table_name, span, &db, columns);
let result = process(&EngineState::new(), table_name, span, &db, columns);
assert!(result.is_err());
}
@ -385,8 +391,52 @@ mod test {
Value::test_string("ThisIsALongString".to_string()),
);
let result = process(table_name, span, &db, columns);
let result = process(&EngineState::new(), table_name, span, &db, columns);
assert!(result.is_err());
}
#[test]
fn test_insert_json() {
let db = Box::new(SQLiteDatabase::new(
std::path::Path::new(MEMORY_DB),
Signals::empty(),
));
let create_stmt = "CREATE TABLE test_insert_json (
json_field JSON,
jsonb_field JSONB
)";
let conn = db
.open_connection()
.expect("Test was unable to open connection.");
conn.execute(create_stmt, [])
.expect("Failed to create table as part of test.");
let mut record = Record::new();
record.insert("x", Value::test_int(89));
record.insert("y", Value::test_int(12));
record.insert(
"z",
Value::test_list(vec![
Value::test_string("hello"),
Value::test_string("goodbye"),
]),
);
let mut row = Record::new();
row.insert("json_field", Value::test_record(record.clone()));
row.insert("jsonb_field", Value::test_record(record));
let result = process(
&EngineState::new(),
Some("test_insert_json".to_owned()),
Span::unknown(),
&db,
row,
);
assert!(result.is_ok());
}
}

View File

@ -92,7 +92,14 @@ impl Command for StorUpdate {
// Check if the record is being passed as input or using the update record parameter
let columns = handle(span, update_record, input)?;
process(table_name, span, &db, columns, where_clause_opt)?;
process(
engine_state,
table_name,
span,
&db,
columns,
where_clause_opt,
)?;
Ok(Value::custom(db, span).into_pipeline_data())
}
@ -150,6 +157,7 @@ fn handle(
}
fn process(
engine_state: &EngineState,
table_name: Option<String>,
span: Span,
db: &SQLiteDatabase,
@ -183,7 +191,7 @@ fn process(
// dbg!(&update_stmt);
// Get the params from the passed values
let params = values_to_sql(record.values().cloned())?;
let params = values_to_sql(engine_state, record.values().cloned(), span)?;
conn.execute(&update_stmt, params_from_iter(params))
.map_err(|err| ShellError::GenericError {