This commit is contained in:
Conrad Ludgate 2021-11-21 19:40:50 +00:00
parent 6e8ec8689d
commit 04c3f6cdc6
No known key found for this signature in database
GPG Key ID: 3DD1A1DB3CB4BF63
10 changed files with 94 additions and 124 deletions

2
Cargo.lock generated
View File

@ -105,6 +105,7 @@ dependencies = [
"directories",
"eyre",
"fork",
"futures",
"humantime 2.1.0",
"indicatif",
"itertools",
@ -134,6 +135,7 @@ dependencies = [
"directories",
"eyre",
"fern",
"futures",
"humantime 2.1.0",
"indicatif",
"itertools",

View File

@ -56,6 +56,7 @@ base64 = "0.13.0"
humantime = "2.1.0"
tabwriter = "1.2.1"
crossbeam-channel = "0.5.1"
futures = "0.3"
[profile.release]
lto = "fat"

View File

@ -41,6 +41,7 @@ itertools = "0.10.1"
shellexpand = "2"
sqlx = { version = "0.5", features = [ "runtime-tokio-rustls", "uuid", "chrono", "sqlite" ] }
minspan = "0.1.1"
futures = "0.3"
[dev-dependencies]
tokio-test = "*"

View File

@ -1,11 +1,11 @@
use std::path::Path;
use std::str::FromStr;
use async_trait::async_trait;
use chrono::prelude::*;
use chrono::Utc;
use eyre::Result;
use futures::Stream;
use itertools::Itertools;
use sqlx::sqlite::{
@ -17,36 +17,6 @@ use super::history::History;
use super::ordering;
use super::settings::SearchMode;
#[async_trait]
pub trait Database {
async fn save(&mut self, h: &History) -> Result<()>;
async fn save_bulk(&mut self, h: &[History]) -> Result<()>;
async fn load(&self, id: &str) -> Result<History>;
async fn list(&self, max: Option<usize>, unique: bool) -> Result<Vec<History>>;
async fn range(
&self,
from: chrono::DateTime<Utc>,
to: chrono::DateTime<Utc>,
) -> Result<Vec<History>>;
async fn update(&self, h: &History) -> Result<()>;
async fn history_count(&self) -> Result<i64>;
async fn first(&self) -> Result<History>;
async fn last(&self) -> Result<History>;
async fn before(&self, timestamp: chrono::DateTime<Utc>, count: i64) -> Result<Vec<History>>;
async fn search(
&self,
limit: Option<i64>,
search_mode: SearchMode,
query: &str,
) -> Result<Vec<History>>;
async fn query_history(&self, query: &str) -> Result<Vec<History>>;
}
// Intended for use on a developer machine and not a sync server.
// TODO: implement IntoIterator
pub struct Sqlite {
@ -103,7 +73,7 @@ impl Sqlite {
Ok(())
}
fn query_history(row: SqliteRow) -> History {
fn query_history_row(row: SqliteRow) -> History {
History {
id: row.get("id"),
timestamp: Utc.timestamp_nanos(row.get("timestamp")),
@ -117,9 +87,8 @@ impl Sqlite {
}
}
#[async_trait]
impl Database for Sqlite {
async fn save(&mut self, h: &History) -> Result<()> {
impl Sqlite {
pub async fn save(&mut self, h: &History) -> Result<()> {
debug!("saving history to sqlite");
let mut tx = self.pool.begin().await?;
@ -129,7 +98,7 @@ impl Database for Sqlite {
Ok(())
}
async fn save_bulk(&mut self, h: &[History]) -> Result<()> {
pub async fn save_bulk(&mut self, h: &[History]) -> Result<()> {
debug!("saving history to sqlite");
let mut tx = self.pool.begin().await?;
@ -143,19 +112,19 @@ impl Database for Sqlite {
Ok(())
}
async fn load(&self, id: &str) -> Result<History> {
pub async fn load(&self, id: &str) -> Result<History> {
debug!("loading history item {}", id);
let res = sqlx::query("select * from history where id = ?1")
.bind(id)
.map(Self::query_history)
.map(Self::query_history_row)
.fetch_one(&self.pool)
.await?;
Ok(res)
}
async fn update(&self, h: &History) -> Result<()> {
pub async fn update(&self, h: &History) -> Result<()> {
debug!("updating sqlite history");
sqlx::query(
@ -178,14 +147,14 @@ impl Database for Sqlite {
}
// make a unique list, that only shows the *newest* version of things
async fn list(&self, max: Option<usize>, unique: bool) -> Result<Vec<History>> {
pub fn list(&self, max: Option<usize>, unique: bool) -> String {
debug!("listing history");
// very likely vulnerable to SQL injection
// however, this is client side, and only used by the client, on their
// own data. They can just open the db file...
// otherwise building the query is awkward
let query = format!(
format!(
"select * from history h
{}
order by timestamp desc
@ -205,17 +174,14 @@ impl Database for Sqlite {
} else {
"".to_string()
}
);
)
let res = sqlx::query(query.as_str())
.map(Self::query_history)
.fetch_all(&self.pool)
.await?;
Ok(res)
// sqlx::query(query.as_str())
// .map(Self::query_history_row)
// .fetch(&self.pool)
}
async fn range(
pub async fn range(
&self,
from: chrono::DateTime<Utc>,
to: chrono::DateTime<Utc>,
@ -227,48 +193,41 @@ impl Database for Sqlite {
)
.bind(from)
.bind(to)
.map(Self::query_history)
.map(Self::query_history_row)
.fetch_all(&self.pool)
.await?;
Ok(res)
}
async fn first(&self) -> Result<History> {
pub async fn first(&self) -> Result<History> {
let res =
sqlx::query("select * from history where duration >= 0 order by timestamp asc limit 1")
.map(Self::query_history)
.map(Self::query_history_row)
.fetch_one(&self.pool)
.await?;
Ok(res)
}
async fn last(&self) -> Result<History> {
let res = sqlx::query(
"select * from history where duration >= 0 order by timestamp desc limit 1",
)
.map(Self::query_history)
.fetch_one(&self.pool)
.await?;
Ok(res)
pub fn last(&self) -> &'static str {
"select * from history where duration >= 0 order by timestamp desc limit 1"
}
async fn before(&self, timestamp: chrono::DateTime<Utc>, count: i64) -> Result<Vec<History>> {
pub async fn before(&self, timestamp: chrono::DateTime<Utc>, count: i64) -> Result<Vec<History>> {
let res = sqlx::query(
"select * from history where timestamp < ?1 order by timestamp desc limit ?2",
)
.bind(timestamp.timestamp_nanos())
.bind(count)
.map(Self::query_history)
.map(Self::query_history_row)
.fetch_all(&self.pool)
.await?;
Ok(res)
}
async fn history_count(&self) -> Result<i64> {
pub async fn history_count(&self) -> Result<i64> {
let res: (i64,) = sqlx::query_as("select count(1) from history")
.fetch_one(&self.pool)
.await?;
@ -276,7 +235,7 @@ impl Database for Sqlite {
Ok(res.0)
}
async fn search(
pub async fn search(
&self,
limit: Option<i64>,
search_mode: SearchMode,
@ -306,20 +265,17 @@ impl Database for Sqlite {
.as_str(),
)
.bind(query)
.map(Self::query_history)
.map(Self::query_history_row)
.fetch_all(&self.pool)
.await?;
Ok(ordering::reorder_fuzzy(search_mode, orig_query, res))
}
async fn query_history(&self, query: &str) -> Result<Vec<History>> {
let res = sqlx::query(query)
.map(Self::query_history)
.fetch_all(&self.pool)
.await?;
Ok(res)
pub fn query_history<'q: 'e, 'e>(&'e self, query: &'q str) -> impl Stream<Item = Result<History, sqlx::Error>> +'e {
sqlx::query(query)
.map(Self::query_history_row)
.fetch(&self.pool)
}
}
@ -327,7 +283,7 @@ impl Database for Sqlite {
mod test {
use super::*;
async fn new_history_item(db: &mut impl Database, cmd: &str) -> Result<()> {
async fn new_history_item(db: &mut Sqlite, cmd: &str) -> Result<()> {
let history = History::new(
chrono::Utc::now(),
cmd.to_string(),

View File

@ -6,7 +6,7 @@ use eyre::Result;
use atuin_common::{api::AddHistoryRequest, utils::hash_str};
use crate::api_client;
use crate::database::Database;
use crate::database::Sqlite;
use crate::encryption::{encrypt, load_encoded_key, load_key};
use crate::settings::{Settings, HISTORY_PAGE_SIZE};
@ -24,7 +24,7 @@ use crate::settings::{Settings, HISTORY_PAGE_SIZE};
async fn sync_download(
force: bool,
client: &api_client::Client<'_>,
db: &mut (impl Database + Send),
db: &mut Sqlite,
) -> Result<(i64, i64)> {
debug!("starting sync download");
@ -80,7 +80,7 @@ async fn sync_upload(
settings: &Settings,
_force: bool,
client: &api_client::Client<'_>,
db: &mut (impl Database + Send),
db: &mut Sqlite,
) -> Result<()> {
debug!("starting sync upload");
@ -130,7 +130,7 @@ async fn sync_upload(
Ok(())
}
pub async fn sync(settings: &Settings, force: bool, db: &mut (impl Database + Send)) -> Result<()> {
pub async fn sync(settings: &Settings, force: bool, db: &mut Sqlite) -> Result<()> {
let client = api_client::Client::new(
&settings.sync_address,
&settings.session_token,

View File

@ -3,10 +3,11 @@ use std::io::Write;
use std::time::Duration;
use eyre::Result;
use futures::stream::{Stream, TryStreamExt};
use structopt::StructOpt;
use tabwriter::TabWriter;
use atuin_client::database::Database;
use atuin_client::database::Sqlite;
use atuin_client::history::History;
use atuin_client::settings::Settings;
use atuin_client::sync;
@ -61,10 +62,10 @@ pub enum Cmd {
}
#[allow(clippy::cast_sign_loss)]
pub fn print_list(h: &[History], human: bool, cmd_only: bool) {
pub async fn print_list<E>(h: impl Stream<Item = Result<History, E>>, human: bool, cmd_only: bool) -> Result<(), E> {
let mut writer = TabWriter::new(std::io::stdout()).padding(2);
let lines = h.iter().map(|h| {
let lines = h.map_ok(|h| {
if human {
let duration = humantime::format_duration(Duration::from_nanos(std::cmp::max(
h.duration, 0,
@ -91,20 +92,26 @@ pub fn print_list(h: &[History], human: bool, cmd_only: bool) {
}
});
for i in lines.rev() {
writer
.write_all(i.as_bytes())
.expect("failed to write to tab writer");
}
let fut = lines
.try_for_each(|i| {
writer
.write_all(i.as_bytes())
.expect("failed to write to tab writer");
futures::future::ready(Ok(()))
});
fut.await?;
writer.flush().expect("failed to flush tab writer");
Ok(())
}
impl Cmd {
pub async fn run(
&self,
settings: &Settings,
db: &mut (impl Database + Send + Sync),
db: &mut Sqlite,
) -> Result<()> {
match self {
Self::Start { command: words } => {
@ -171,33 +178,34 @@ impl Cmd {
None
};
let history = match (session, cwd) {
(None, None) => db.list(None, false).await?,
let query = match (session, cwd) {
(None, None) => db.list(None, false),
(None, Some(cwd)) => {
let query = format!("select * from history where cwd = {};", cwd);
db.query_history(&query).await?
format!("select * from history where cwd = {};", cwd)
}
(Some(session), None) => {
let query = format!("select * from history where session = {};", session);
db.query_history(&query).await?
format!("select * from history where session = {};", session)
}
(Some(session), Some(cwd)) => {
let query = format!(
format!(
"select * from history where cwd = {} and session = {};",
cwd, session
);
db.query_history(&query).await?
)
}
};
print_list(&history, *human, *cmd_only);
let history = db.query_history(&query);
print_list(history, *human, *cmd_only).await?;
Ok(())
}
Self::Last { human, cmd_only } => {
let last = db.last().await?;
print_list(&[last], *human, *cmd_only);
let last = db.last();
let history = db.query_history(last);
print_list(history, *human, *cmd_only).await?;
Ok(())
}

View File

@ -4,7 +4,7 @@ use eyre::{eyre, Result};
use structopt::StructOpt;
use atuin_client::import::{bash::Bash, zsh::Zsh};
use atuin_client::{database::Database, import::Importer};
use atuin_client::{database::Sqlite, import::Importer};
use atuin_client::{history::History, import::resh::Resh};
use indicatif::ProgressBar;
@ -38,7 +38,7 @@ pub enum Cmd {
const BATCH_SIZE: usize = 100;
impl Cmd {
pub async fn run(&self, db: &mut (impl Database + Send + Sync)) -> Result<()> {
pub async fn run(&self, db: &mut Sqlite) -> Result<()> {
println!(" Atuin ");
println!("======================");
println!(" \u{1f30d} ");
@ -53,22 +53,22 @@ impl Cmd {
if shell.ends_with("/zsh") {
println!("Detected ZSH");
import::<Zsh<_>, _>(db, BATCH_SIZE).await
import::<Zsh<_>>(db, BATCH_SIZE).await
} else {
println!("cannot import {} history", shell);
Ok(())
}
}
Self::Zsh => import::<Zsh<_>, _>(db, BATCH_SIZE).await,
Self::Bash => import::<Bash<_>, _>(db, BATCH_SIZE).await,
Self::Resh => import::<Resh, _>(db, BATCH_SIZE).await,
Self::Zsh => import::<Zsh<_>>(db, BATCH_SIZE).await,
Self::Bash => import::<Bash<_>>(db, BATCH_SIZE).await,
Self::Resh => import::<Resh>(db, BATCH_SIZE).await,
}
}
}
async fn import<I: Importer + Send, DB: Database + Send + Sync>(
db: &mut DB,
async fn import<I: Importer + Send>(
db: &mut Sqlite,
buf_size: usize,
) -> Result<()>
where

View File

@ -14,7 +14,7 @@ use tui::{
};
use unicode_width::UnicodeWidthStr;
use atuin_client::database::Database;
use atuin_client::database::Sqlite;
use atuin_client::history::History;
use atuin_client::settings::{SearchMode, Settings};
@ -151,13 +151,9 @@ impl State {
}
}
async fn query_results(
app: &mut State,
search_mode: SearchMode,
db: &mut (impl Database + Send + Sync),
) -> Result<()> {
async fn query_results(app: &mut State, search_mode: SearchMode, db: &mut Sqlite) -> Result<()> {
let results = match app.input.as_str() {
"" => db.list(Some(200), true).await?,
// "" => db.list(Some(200), true).await?,
i => db.search(Some(200), search_mode, i).await?,
};
@ -175,7 +171,7 @@ async fn query_results(
async fn key_handler(
input: Key,
search_mode: SearchMode,
db: &mut (impl Database + Send + Sync),
db: &mut Sqlite,
app: &mut State,
) -> Option<String> {
match input {
@ -314,7 +310,7 @@ fn draw<T: Backend>(f: &mut Frame<'_, T>, history_count: i64, app: &mut State) {
async fn select_history(
query: &[String],
search_mode: SearchMode,
db: &mut (impl Database + Send + Sync),
db: &mut Sqlite,
) -> Result<String> {
let stdout = stdout().into_raw_mode()?;
let stdout = MouseTerminal::from(stdout);
@ -361,7 +357,7 @@ pub async fn run(
after: Option<String>,
cmd_only: bool,
query: &[String],
db: &mut (impl Database + Send + Sync),
db: &mut Sqlite,
) -> Result<()> {
let dir = if let Some(cwd) = cwd {
if cwd == "." {
@ -443,7 +439,13 @@ pub async fn run(
.map(std::borrow::ToOwned::to_owned)
.collect();
super::history::print_list(&results, human, cmd_only);
super::history::print_list(
futures::stream::iter(results.into_iter().map(Result::<_, ()>::Ok)),
human,
cmd_only,
)
.await
.unwrap();
}
Ok(())

View File

@ -8,7 +8,7 @@ use cli_table::{format::Justify, print_stdout, Cell, Style, Table};
use eyre::{eyre, Result};
use structopt::StructOpt;
use atuin_client::database::Database;
use atuin_client::database::Sqlite;
use atuin_client::history::History;
use atuin_client::settings::Settings;
@ -73,7 +73,7 @@ fn compute_stats(history: &[History]) -> Result<()> {
impl Cmd {
pub async fn run(
&self,
db: &mut (impl Database + Send + Sync),
db: &mut Sqlite,
settings: &Settings,
) -> Result<()> {
match self {
@ -95,9 +95,9 @@ impl Cmd {
}
Self::All => {
let history = db.list(None, false).await?;
// let history = db.list(None, false).await?;
compute_stats(&history)?;
// compute_stats(&history)?;
Ok(())
}

View File

@ -1,13 +1,13 @@
use eyre::Result;
use atuin_client::database::Database;
use atuin_client::database::Sqlite;
use atuin_client::settings::Settings;
use atuin_client::sync;
pub async fn run(
settings: &Settings,
force: bool,
db: &mut (impl Database + Send + Sync),
db: &mut Sqlite,
) -> Result<()> {
sync::sync(settings, force, db).await?;
println!(