forked from extern/nushell
Make SQLite queries cancellable (#7351)
This change makes SQLite queries (`open foo.db`, `open foo.db | query db "select ..."`) cancellable using `ctrl+c`. Previously they were not cancellable, which made it unpleasant to accidentally open a very large database or run an unexpectedly slow query! UX-wise there's not too much to show: ![image](https://user-images.githubusercontent.com/26268125/205519205-e1f2ab58-c92d-4b96-9f80-eb123f678ec3.png) ## Notes I was hoping to make SQLite queries streamable as part of this work, but I ran into 2 problems: 1. `rusqlite` lifetimes are nightmarishly complex and they make it hard to create a `ListStream` iterator 2. The functions on Nu's `CustomValue` trait return `Value` not `PipelineData` and so `CustomValue` implementations can't stream data AFAICT.
This commit is contained in:
parent
9fb9b16b38
commit
57ff668d2e
4
Cargo.lock
generated
4
Cargo.lock
generated
@ -110,9 +110,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.65"
|
||||
version = "1.0.66"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "98161a4e3e2184da77bb14f02184cdd111e83bbbcc9979dfee3c44b9a85f5602"
|
||||
checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6"
|
||||
|
||||
[[package]]
|
||||
name = "array-init-cursor"
|
||||
|
@ -10,6 +10,10 @@ use std::{
|
||||
fs::File,
|
||||
io::Read,
|
||||
path::{Path, PathBuf},
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
};
|
||||
|
||||
const SQLITE_MAGIC_BYTES: &[u8] = "SQLite format 3\0".as_bytes();
|
||||
@ -20,16 +24,25 @@ pub struct SQLiteDatabase {
|
||||
// 1) YAGNI, 2) it's not obvious how cloning a connection could work, 3) state
|
||||
// management gets tricky quick. Revisit this approach if we find a compelling use case.
|
||||
pub path: PathBuf,
|
||||
#[serde(skip)]
|
||||
// this understandably can't be serialized. think that's OK, I'm not aware of a
|
||||
// reason why a CustomValue would be serialized outside of a plugin
|
||||
ctrlc: Option<Arc<AtomicBool>>,
|
||||
}
|
||||
|
||||
impl SQLiteDatabase {
|
||||
pub fn new(path: &Path) -> Self {
|
||||
pub fn new(path: &Path, ctrlc: Option<Arc<AtomicBool>>) -> Self {
|
||||
Self {
|
||||
path: PathBuf::from(path),
|
||||
ctrlc,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_from_path(path: &Path, span: Span) -> Result<Self, ShellError> {
|
||||
pub fn try_from_path(
|
||||
path: &Path,
|
||||
span: Span,
|
||||
ctrlc: Option<Arc<AtomicBool>>,
|
||||
) -> Result<Self, ShellError> {
|
||||
let mut file =
|
||||
File::open(path).map_err(|e| ShellError::ReadingFile(e.to_string(), span))?;
|
||||
|
||||
@ -38,7 +51,7 @@ impl SQLiteDatabase {
|
||||
.map_err(|e| ShellError::ReadingFile(e.to_string(), span))
|
||||
.and_then(|_| {
|
||||
if buf == SQLITE_MAGIC_BYTES {
|
||||
Ok(SQLiteDatabase::new(path))
|
||||
Ok(SQLiteDatabase::new(path, ctrlc))
|
||||
} else {
|
||||
Err(ShellError::ReadingFile("Not a SQLite file".into(), span))
|
||||
}
|
||||
@ -50,6 +63,7 @@ impl SQLiteDatabase {
|
||||
Value::CustomValue { val, span } => match val.as_any().downcast_ref::<Self>() {
|
||||
Some(db) => Ok(Self {
|
||||
path: db.path.clone(),
|
||||
ctrlc: db.ctrlc.clone(),
|
||||
}),
|
||||
None => Err(ShellError::CantConvert(
|
||||
"database".into(),
|
||||
@ -81,7 +95,8 @@ impl SQLiteDatabase {
|
||||
|
||||
pub fn query(&self, sql: &Spanned<String>, call_span: Span) -> Result<Value, ShellError> {
|
||||
let db = open_sqlite_db(&self.path, call_span)?;
|
||||
run_sql_query(db, sql).map_err(|e| {
|
||||
|
||||
let stream = run_sql_query(db, sql, self.ctrlc.clone()).map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Failed to query SQLite database".into(),
|
||||
e.to_string(),
|
||||
@ -89,7 +104,9 @@ impl SQLiteDatabase {
|
||||
None,
|
||||
Vec::new(),
|
||||
)
|
||||
})
|
||||
})?;
|
||||
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
pub fn open_connection(&self) -> Result<Connection, rusqlite::Error> {
|
||||
@ -268,6 +285,7 @@ impl CustomValue for SQLiteDatabase {
|
||||
fn clone_value(&self, span: Span) -> Value {
|
||||
let cloned = SQLiteDatabase {
|
||||
path: self.path.clone(),
|
||||
ctrlc: self.ctrlc.clone(),
|
||||
};
|
||||
|
||||
Value::CustomValue {
|
||||
@ -282,7 +300,7 @@ impl CustomValue for SQLiteDatabase {
|
||||
|
||||
fn to_base_value(&self, span: Span) -> Result<Value, ShellError> {
|
||||
let db = open_sqlite_db(&self.path, span)?;
|
||||
read_entire_sqlite_db(db, span).map_err(|e| {
|
||||
read_entire_sqlite_db(db, span, self.ctrlc.clone()).map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Failed to read from SQLite database".into(),
|
||||
e.to_string(),
|
||||
@ -305,7 +323,7 @@ impl CustomValue for SQLiteDatabase {
|
||||
fn follow_path_string(&self, _column_name: String, span: Span) -> Result<Value, ShellError> {
|
||||
let db = open_sqlite_db(&self.path, span)?;
|
||||
|
||||
read_single_table(db, _column_name, span).map_err(|e| {
|
||||
read_single_table(db, _column_name, span, self.ctrlc.clone()).map_err(|e| {
|
||||
ShellError::GenericError(
|
||||
"Failed to read from SQLite database".into(),
|
||||
e.to_string(),
|
||||
@ -339,47 +357,74 @@ pub fn open_sqlite_db(path: &Path, call_span: Span) -> Result<Connection, nu_pro
|
||||
})
|
||||
}
|
||||
|
||||
fn run_sql_query(conn: Connection, sql: &Spanned<String>) -> Result<Value, rusqlite::Error> {
|
||||
fn run_sql_query(
|
||||
conn: Connection,
|
||||
sql: &Spanned<String>,
|
||||
ctrlc: Option<Arc<AtomicBool>>,
|
||||
) -> Result<Value, rusqlite::Error> {
|
||||
let stmt = conn.prepare(&sql.item)?;
|
||||
prepared_statement_to_nu_list(stmt, sql.span)
|
||||
prepared_statement_to_nu_list(stmt, sql.span, ctrlc)
|
||||
}
|
||||
|
||||
fn read_single_table(
|
||||
conn: Connection,
|
||||
table_name: String,
|
||||
call_span: Span,
|
||||
ctrlc: Option<Arc<AtomicBool>>,
|
||||
) -> Result<Value, rusqlite::Error> {
|
||||
let stmt = conn.prepare(&format!("SELECT * FROM {}", table_name))?;
|
||||
prepared_statement_to_nu_list(stmt, call_span)
|
||||
prepared_statement_to_nu_list(stmt, call_span, ctrlc)
|
||||
}
|
||||
|
||||
fn prepared_statement_to_nu_list(
|
||||
mut stmt: rusqlite::Statement,
|
||||
call_span: Span,
|
||||
ctrlc: Option<Arc<AtomicBool>>,
|
||||
) -> Result<Value, rusqlite::Error> {
|
||||
let column_names = stmt
|
||||
.column_names()
|
||||
.iter()
|
||||
.map(|c| c.to_string())
|
||||
.collect::<Vec<String>>();
|
||||
let results = stmt.query([])?;
|
||||
let nu_records = results
|
||||
.mapped(|row| {
|
||||
Result::Ok(convert_sqlite_row_to_nu_value(
|
||||
row,
|
||||
call_span,
|
||||
column_names.clone(),
|
||||
))
|
||||
})
|
||||
.into_iter()
|
||||
.collect::<Result<Vec<Value>, rusqlite::Error>>()?;
|
||||
|
||||
let row_results = stmt.query_map([], |row| {
|
||||
Ok(convert_sqlite_row_to_nu_value(
|
||||
row,
|
||||
call_span,
|
||||
column_names.clone(),
|
||||
))
|
||||
})?;
|
||||
|
||||
// we collect all rows before returning them. Not ideal but it's hard/impossible to return a stream from a CustomValue
|
||||
let mut row_values = vec![];
|
||||
|
||||
for row_result in row_results {
|
||||
if let Some(ctrlc) = &ctrlc {
|
||||
if ctrlc.load(Ordering::SeqCst) {
|
||||
// return whatever we have so far, let the caller decide whether to use it
|
||||
return Ok(Value::List {
|
||||
vals: row_values,
|
||||
span: call_span,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if let Ok(row_value) = row_result {
|
||||
row_values.push(row_value);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Value::List {
|
||||
vals: nu_records,
|
||||
vals: row_values,
|
||||
span: call_span,
|
||||
})
|
||||
}
|
||||
|
||||
fn read_entire_sqlite_db(conn: Connection, call_span: Span) -> Result<Value, rusqlite::Error> {
|
||||
fn read_entire_sqlite_db(
|
||||
conn: Connection,
|
||||
call_span: Span,
|
||||
ctrlc: Option<Arc<AtomicBool>>,
|
||||
) -> Result<Value, rusqlite::Error> {
|
||||
let mut table_names: Vec<String> = Vec::new();
|
||||
let mut tables: Vec<Value> = Vec::new();
|
||||
|
||||
@ -392,7 +437,7 @@ fn read_entire_sqlite_db(conn: Connection, call_span: Span) -> Result<Value, rus
|
||||
table_names.push(table_name.clone());
|
||||
|
||||
let table_stmt = conn.prepare(&format!("select * from [{}]", table_name))?;
|
||||
let rows = prepared_statement_to_nu_list(table_stmt, call_span)?;
|
||||
let rows = prepared_statement_to_nu_list(table_stmt, call_span, ctrlc.clone())?;
|
||||
tables.push(rows);
|
||||
}
|
||||
|
||||
@ -451,7 +496,7 @@ mod test {
|
||||
#[test]
|
||||
fn can_read_empty_db() {
|
||||
let db = open_connection_in_memory().unwrap();
|
||||
let converted_db = read_entire_sqlite_db(db, Span::test_data()).unwrap();
|
||||
let converted_db = read_entire_sqlite_db(db, Span::test_data(), None).unwrap();
|
||||
|
||||
let expected = Value::Record {
|
||||
cols: vec![],
|
||||
@ -475,7 +520,7 @@ mod test {
|
||||
[],
|
||||
)
|
||||
.unwrap();
|
||||
let converted_db = read_entire_sqlite_db(db, Span::test_data()).unwrap();
|
||||
let converted_db = read_entire_sqlite_db(db, Span::test_data(), None).unwrap();
|
||||
|
||||
let expected = Value::Record {
|
||||
cols: vec!["person".to_string()],
|
||||
@ -509,7 +554,7 @@ mod test {
|
||||
db.execute("INSERT INTO item (id, name) VALUES (456, 'foo bar')", [])
|
||||
.unwrap();
|
||||
|
||||
let converted_db = read_entire_sqlite_db(db, span).unwrap();
|
||||
let converted_db = read_entire_sqlite_db(db, span, None).unwrap();
|
||||
|
||||
let expected = Value::Record {
|
||||
cols: vec!["item".to_string()],
|
||||
|
@ -109,7 +109,7 @@ impl Command for Open {
|
||||
} else {
|
||||
#[cfg(feature = "sqlite")]
|
||||
if !raw {
|
||||
let res = SQLiteDatabase::try_from_path(path, arg_span)
|
||||
let res = SQLiteDatabase::try_from_path(path, arg_span, ctrlc.clone())
|
||||
.map(|db| db.into_value(call.head).into_pipeline_data());
|
||||
|
||||
if res.is_ok() {
|
||||
|
@ -278,7 +278,9 @@ fn handle_table_command(
|
||||
),
|
||||
PipelineData::Value(Value::Record { cols, vals, span }, ..) => {
|
||||
let result = match table_view {
|
||||
TableView::General => build_general_table2(cols, vals, ctrlc, config, term_width),
|
||||
TableView::General => {
|
||||
build_general_table2(cols, vals, ctrlc.clone(), config, term_width)
|
||||
}
|
||||
TableView::Expanded {
|
||||
limit,
|
||||
flatten,
|
||||
@ -286,14 +288,34 @@ fn handle_table_command(
|
||||
} => {
|
||||
let sep = flatten_separator.as_deref().unwrap_or(" ");
|
||||
build_expanded_table(
|
||||
cols, vals, span, ctrlc, config, term_width, limit, flatten, sep,
|
||||
cols,
|
||||
vals,
|
||||
span,
|
||||
ctrlc.clone(),
|
||||
config,
|
||||
term_width,
|
||||
limit,
|
||||
flatten,
|
||||
sep,
|
||||
)
|
||||
}
|
||||
TableView::Collapsed => build_collapsed_table(cols, vals, config, term_width),
|
||||
}?;
|
||||
|
||||
let result = result
|
||||
.unwrap_or_else(|| format!("Couldn't fit table into {} columns!", term_width));
|
||||
let ctrl_c_was_triggered = || match &ctrlc {
|
||||
Some(ctrlc) => ctrlc.load(Ordering::SeqCst),
|
||||
None => false,
|
||||
};
|
||||
|
||||
let result = result.unwrap_or_else(|| {
|
||||
if ctrl_c_was_triggered() {
|
||||
"".into()
|
||||
} else {
|
||||
// assume this failed because the table was too wide
|
||||
// TODO: more robust error classification
|
||||
format!("Couldn't fit table into {} columns!", term_width)
|
||||
}
|
||||
});
|
||||
|
||||
let val = Value::String {
|
||||
val: result,
|
||||
|
Loading…
Reference in New Issue
Block a user