feat: add history rebuild (#1575)

* feat: add history rebuild

This adds a function that will

1. List all history from the store
2. Segment by create/delete
3. Insert all creates into the database
4. Delete all deleted

This replaces the old history sync.

Presently it's incomplete. There is no incremental rebuild, it can only
do the entire thing at once.

This is ran by `atuin store rebuild history`

* fix tests

* add incremental sync

* add auto sync
This commit is contained in:
Ellie Huxtable 2024-01-16 11:25:09 +00:00 committed by GitHub
parent c2439d1ed6
commit a2578c4521
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 286 additions and 106 deletions

View File

@ -18,7 +18,7 @@ use sqlx::{
};
use time::OffsetDateTime;
use crate::history::HistoryStats;
use crate::history::{HistoryId, HistoryStats};
use super::{
history::History,
@ -93,6 +93,7 @@ pub trait Database: Send + Sync + 'static {
async fn before(&self, timestamp: OffsetDateTime, count: i64) -> Result<Vec<History>>;
async fn delete(&self, h: History) -> Result<()>;
async fn delete_rows(&self, ids: &[HistoryId]) -> Result<()>;
async fn deleted(&self) -> Result<Vec<History>>;
// Yes I know, it's a lot.
@ -172,6 +173,18 @@ impl Sqlite {
Ok(())
}
async fn delete_row_raw(
tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
id: HistoryId,
) -> Result<()> {
sqlx::query("delete from history where id = ?1")
.bind(id.0.as_str())
.execute(&mut **tx)
.await?;
Ok(())
}
fn query_history(row: SqliteRow) -> History {
let deleted_at: Option<i64> = row.get("deleted_at");
@ -567,6 +580,18 @@ impl Database for Sqlite {
Ok(())
}
async fn delete_rows(&self, ids: &[HistoryId]) -> Result<()> {
let mut tx = self.pool.begin().await?;
for id in ids {
Self::delete_row_raw(&mut tx, id.clone()).await?;
}
tx.commit().await?;
Ok(())
}
async fn stats(&self, h: &History) -> Result<HistoryStats> {
// We select the previous in the session by time
let mut prev = SqlBuilder::select_from("history");

View File

@ -1,8 +1,11 @@
use eyre::{bail, eyre, Result};
use rmp::decode::Bytes;
use crate::record::{encryption::PASETO_V4, sqlite_store::SqliteStore, store::Store};
use atuin_common::record::{DecryptedData, Host, HostId, Record, RecordIdx};
use crate::{
database::Database,
record::{encryption::PASETO_V4, sqlite_store::SqliteStore, store::Store},
};
use atuin_common::record::{DecryptedData, Host, HostId, Record, RecordId, RecordIdx};
use super::{History, HistoryId, HISTORY_TAG, HISTORY_VERSION};
@ -58,14 +61,14 @@ impl HistoryRecord {
Ok(DecryptedData(output))
}
pub fn deserialize(bytes: &[u8], version: &str) -> Result<Self> {
pub fn deserialize(bytes: &DecryptedData, version: &str) -> Result<Self> {
use rmp::decode;
fn error_report<E: std::fmt::Debug>(err: E) -> eyre::Report {
eyre!("{err:?}")
}
let mut bytes = Bytes::new(bytes);
let mut bytes = Bytes::new(&bytes.0);
let record_type = decode::read_u8(&mut bytes).map_err(error_report)?;
@ -147,10 +150,89 @@ impl HistoryStore {
self.push_record(record).await
}
pub async fn history(&self) -> Result<Vec<HistoryRecord>> {
// Atm this loads all history into memory
// Not ideal as that is potentially quite a lot, although history will be small.
let records = self.store.all_tagged(HISTORY_TAG).await?;
let mut ret = Vec::with_capacity(records.len());
for record in records.into_iter() {
let hist = match record.version.as_str() {
HISTORY_VERSION => {
let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?;
HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION)
}
version => bail!("unknown history version {version:?}"),
}?;
ret.push(hist);
}
Ok(ret)
}
pub async fn build(&self, database: &dyn Database) -> Result<()> {
// I'd like to change how we rebuild and not couple this with the database, but need to
// consider the structure more deeply. This will be easy to change.
// TODO(ellie): page or iterate this
let history = self.history().await?;
// In theory we could flatten this here
// The current issue is that the database may have history in it already, from the old sync
// This didn't actually delete old history
// If we're sure we have a DB only maintained by the new store, we can flatten
// create/delete before we even get to sqlite
let mut creates = Vec::new();
let mut deletes = Vec::new();
for i in history {
match i {
HistoryRecord::Create(h) => {
creates.push(h);
}
HistoryRecord::Delete(id) => {
deletes.push(id);
}
}
}
database.save_bulk(&creates).await?;
database.delete_rows(&deletes).await?;
Ok(())
}
pub async fn incremental_build(&self, database: &dyn Database, ids: &[RecordId]) -> Result<()> {
for id in ids {
let record = self.store.get(*id).await?;
if record.tag != HISTORY_TAG {
continue;
}
let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?;
let record = HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION)?;
match record {
HistoryRecord::Create(h) => {
// TODO: benchmark CPU time/memory tradeoff of batch commit vs one at a time
database.save(&h).await?;
}
HistoryRecord::Delete(id) => {
database.delete_rows(&[id]).await?;
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use atuin_common::record::DecryptedData;
use time::macros::datetime;
use crate::history::{store::HistoryRecord, HISTORY_VERSION};
@ -187,13 +269,14 @@ mod tests {
let serialized = record.serialize().expect("failed to serialize history");
assert_eq!(serialized.0, bytes);
let deserialized = HistoryRecord::deserialize(&serialized.0, HISTORY_VERSION)
let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION)
.expect("failed to deserialize HistoryRecord");
assert_eq!(deserialized, record);
// check the snapshot too
let deserialized = HistoryRecord::deserialize(&bytes, HISTORY_VERSION)
.expect("failed to deserialize HistoryRecord");
let deserialized =
HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION)
.expect("failed to deserialize HistoryRecord");
assert_eq!(deserialized, record);
}
@ -208,12 +291,13 @@ mod tests {
let serialized = record.serialize().expect("failed to serialize history");
assert_eq!(serialized.0, bytes);
let deserialized = HistoryRecord::deserialize(&serialized.0, HISTORY_VERSION)
let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION)
.expect("failed to deserialize HistoryRecord");
assert_eq!(deserialized, record);
let deserialized = HistoryRecord::deserialize(&bytes, HISTORY_VERSION)
.expect("failed to deserialize HistoryRecord");
let deserialized =
HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION)
.expect("failed to deserialize HistoryRecord");
assert_eq!(deserialized, record);
}
}

View File

@ -2,6 +2,7 @@ use async_trait::async_trait;
use eyre::Result;
use atuin_common::record::{EncryptedData, HostId, Record, RecordId, RecordIdx, RecordStatus};
/// A record store stores records
/// In more detail - we tend to need to process this into _another_ format to actually query it.
/// As is, the record store is intended as the source of truth for arbitratry data, which could
@ -44,8 +45,6 @@ pub trait Store {
async fn status(&self) -> Result<RecordStatus>;
/// Get every start record for a given tag, regardless of host.
/// Useful when actually operating on synchronized data, and will often have conflict
/// resolution applied.
/// Get all records for a given tag
async fn all_tagged(&self, tag: &str) -> Result<Vec<Record<EncryptedData>>>;
}

View File

@ -7,7 +7,7 @@ use thiserror::Error;
use super::store::Store;
use crate::{api_client::Client, settings::Settings};
use atuin_common::record::{Diff, HostId, RecordIdx, RecordStatus};
use atuin_common::record::{Diff, HostId, RecordId, RecordIdx, RecordStatus};
#[derive(Error, Debug)]
pub enum SyncError {
@ -198,11 +198,12 @@ async fn sync_download(
tag: String,
local: Option<RecordIdx>,
remote: RecordIdx,
) -> Result<i64, SyncError> {
) -> Result<Vec<RecordId>, SyncError> {
let local = local.unwrap_or(0);
let expected = remote - local;
let download_page_size = 100;
let mut progress = 0;
let mut ret = Vec::new();
println!(
"Downloading {} records from {}/{}",
@ -230,6 +231,8 @@ async fn sync_download(
expected
);
ret.extend(page.iter().map(|f| f.id));
progress += page.len() as u64;
if progress >= expected {
@ -237,14 +240,14 @@ async fn sync_download(
}
}
Ok(progress as i64)
Ok(ret)
}
pub async fn sync_remote(
operations: Vec<Operation>,
local_store: &impl Store,
settings: &Settings,
) -> Result<(i64, i64), SyncError> {
) -> Result<(i64, Vec<RecordId>), SyncError> {
let client = Client::new(
&settings.sync_address,
&settings.session_token,
@ -254,7 +257,7 @@ pub async fn sync_remote(
.expect("failed to create client");
let mut uploaded = 0;
let mut downloaded = 0;
let mut downloaded = Vec::new();
// this can totally run in parallel, but lets get it working first
for i in operations {
@ -271,9 +274,7 @@ pub async fn sync_remote(
tag,
local,
remote,
} => {
downloaded += sync_download(local_store, &client, host, tag, local, remote).await?
}
} => downloaded = sync_download(local_store, &client, host, tag, local, remote).await?,
Operation::Noop { .. } => continue,
}

View File

@ -16,9 +16,9 @@ mod config;
mod history;
mod import;
mod kv;
mod record;
mod search;
mod stats;
mod store;
#[derive(Subcommand, Debug)]
#[command(infer_subcommands = true)]
@ -48,7 +48,7 @@ pub enum Cmd {
Kv(kv::Cmd),
#[command(subcommand)]
Record(record::Cmd),
Store(store::Cmd),
/// Print example configuration
#[command()]
@ -83,23 +83,23 @@ impl Cmd {
let record_store_path = PathBuf::from(settings.record_store_path.as_str());
let db = Sqlite::new(db_path).await?;
let store = SqliteStore::new(record_store_path).await?;
let sqlite_store = SqliteStore::new(record_store_path).await?;
match self {
Self::History(history) => history.run(&settings, &db, store).await,
Self::History(history) => history.run(&settings, &db, sqlite_store).await,
Self::Import(import) => import.run(&db).await,
Self::Stats(stats) => stats.run(&db, &settings).await,
Self::Search(search) => search.run(db, &mut settings).await,
#[cfg(feature = "sync")]
Self::Sync(sync) => sync.run(settings, &db, &store).await,
Self::Sync(sync) => sync.run(settings, &db, sqlite_store).await,
#[cfg(feature = "sync")]
Self::Account(account) => account.run(settings).await,
Self::Kv(kv) => kv.run(&settings, &store).await,
Self::Kv(kv) => kv.run(&settings, &sqlite_store).await,
Self::Record(record) => record.run(&settings, &store).await,
Self::Store(store) => store.run(&settings, &db, sqlite_store).await,
Self::DefaultConfig => {
config::run();

View File

@ -317,14 +317,14 @@ impl Cmd {
if settings.sync.records {
let (diff, _) = record::sync::diff(settings, &store).await?;
let operations = record::sync::operations(diff, &store).await?;
let (uploaded, downloaded) =
let (_, downloaded) =
record::sync::sync_remote(operations, &store, settings).await?;
println!("{uploaded}/{downloaded} up/down to record store");
history_store.incremental_build(db, &downloaded).await?;
} else {
debug!("running periodic background sync");
sync::sync(settings, false, db).await?;
}
debug!("running periodic background sync");
sync::sync(settings, false, db).await?;
}
#[cfg(not(feature = "sync"))]
debug!("not compiled with sync support");

View File

@ -1,63 +0,0 @@
use clap::Subcommand;
use eyre::Result;
use atuin_client::{record::store::Store, settings::Settings};
use time::OffsetDateTime;
#[derive(Subcommand, Debug)]
#[command(infer_subcommands = true)]
pub enum Cmd {
Status,
}
impl Cmd {
pub async fn run(
&self,
_settings: &Settings,
store: &(impl Store + Send + Sync),
) -> Result<()> {
let host_id = Settings::host_id().expect("failed to get host_id");
let status = store.status().await?;
// TODO: should probs build some data structure and then pretty-print it or smth
for (host, st) in &status.hosts {
let host_string = if host == &host_id {
format!("host: {} <- CURRENT HOST", host.0.as_hyphenated())
} else {
format!("host: {}", host.0.as_hyphenated())
};
println!("{host_string}");
for (tag, idx) in st {
println!("\tstore: {tag}");
let first = store.first(*host, tag).await?;
let last = store.last(*host, tag).await?;
println!("\t\tidx: {idx}");
if let Some(first) = first {
println!("\t\tfirst: {}", first.id.0.as_hyphenated());
let time =
OffsetDateTime::from_unix_timestamp_nanos(i128::from(first.timestamp))?;
println!("\t\t\tcreated: {time}");
}
if let Some(last) = last {
println!("\t\tlast: {}", last.id.0.as_hyphenated());
let time =
OffsetDateTime::from_unix_timestamp_nanos(i128::from(last.timestamp))?;
println!("\t\t\tcreated: {time}");
}
}
println!();
}
Ok(())
}
}

View File

@ -0,0 +1,123 @@
use clap::{Args, Subcommand};
use eyre::{bail, Result};
use atuin_client::{
database::Database,
encryption,
history::store::HistoryStore,
record::{sqlite_store::SqliteStore, store::Store},
settings::Settings,
};
use time::OffsetDateTime;
#[derive(Args, Debug)]
pub struct Rebuild {
pub tag: String,
}
impl Rebuild {
pub async fn run(
&self,
settings: &Settings,
store: SqliteStore,
database: &dyn Database,
) -> Result<()> {
// keep it as a string and not an enum atm
// would be super cool to build this dynamically in the future
// eg register handles for rebuilding various tags without having to make this part of the
// binary big
match self.tag.as_str() {
"history" => {
self.rebuild_history(settings, store.clone(), database)
.await?;
}
tag => bail!("unknown tag: {tag}"),
}
Ok(())
}
async fn rebuild_history(
&self,
settings: &Settings,
store: SqliteStore,
database: &dyn Database,
) -> Result<()> {
let encryption_key: [u8; 32] = encryption::load_key(settings)?.into();
let host_id = Settings::host_id().expect("failed to get host_id");
let history_store = HistoryStore::new(store, host_id, encryption_key);
history_store.build(database).await?;
Ok(())
}
}
#[derive(Subcommand, Debug)]
#[command(infer_subcommands = true)]
pub enum Cmd {
Status,
Rebuild(Rebuild),
}
impl Cmd {
pub async fn run(
&self,
settings: &Settings,
database: &dyn Database,
store: SqliteStore,
) -> Result<()> {
match self {
Self::Status => self.status(store).await,
Self::Rebuild(rebuild) => rebuild.run(settings, store, database).await,
}
}
pub async fn status(&self, store: SqliteStore) -> Result<()> {
let host_id = Settings::host_id().expect("failed to get host_id");
let status = store.status().await?;
// TODO: should probs build some data structure and then pretty-print it or smth
for (host, st) in &status.hosts {
let host_string = if host == &host_id {
format!("host: {} <- CURRENT HOST", host.0.as_hyphenated())
} else {
format!("host: {}", host.0.as_hyphenated())
};
println!("{host_string}");
for (tag, idx) in st {
println!("\tstore: {tag}");
let first = store.first(*host, tag).await?;
let last = store.last(*host, tag).await?;
println!("\t\tidx: {idx}");
if let Some(first) = first {
println!("\t\tfirst: {}", first.id.0.as_hyphenated());
let time =
OffsetDateTime::from_unix_timestamp_nanos(i128::from(first.timestamp))?;
println!("\t\t\tcreated: {time}");
}
if let Some(last) = last {
println!("\t\tlast: {}", last.id.0.as_hyphenated());
let time =
OffsetDateTime::from_unix_timestamp_nanos(i128::from(last.timestamp))?;
println!("\t\t\tcreated: {time}");
}
}
println!();
}
Ok(())
}
}

View File

@ -3,7 +3,9 @@ use eyre::{Result, WrapErr};
use atuin_client::{
database::Database,
record::{store::Store, sync},
encryption,
history::store::HistoryStore,
record::{sqlite_store::SqliteStore, sync},
settings::Settings,
};
@ -45,7 +47,7 @@ impl Cmd {
self,
settings: Settings,
db: &impl Database,
store: &(impl Store + Send + Sync),
store: SqliteStore,
) -> Result<()> {
match self {
Self::Sync { force } => run(&settings, force, db, store).await,
@ -75,18 +77,27 @@ async fn run(
settings: &Settings,
force: bool,
db: &impl Database,
store: &(impl Store + Send + Sync),
store: SqliteStore,
) -> Result<()> {
if settings.sync.records {
let (diff, _) = sync::diff(settings, store).await?;
let operations = sync::operations(diff, store).await?;
let (uploaded, downloaded) = sync::sync_remote(operations, store, settings).await?;
let (diff, _) = sync::diff(settings, &store).await?;
let operations = sync::operations(diff, &store).await?;
let (uploaded, downloaded) = sync::sync_remote(operations, &store, settings).await?;
println!("{uploaded}/{downloaded} up/down to record store");
let encryption_key: [u8; 32] = encryption::load_key(settings)
.context("could not load encryption key")?
.into();
let host_id = Settings::host_id().expect("failed to get host_id");
let history_store = HistoryStore::new(store.clone(), host_id, encryption_key);
history_store.incremental_build(db, &downloaded).await?;
println!("{uploaded}/{} up/down to record store", downloaded.len());
} else {
atuin_client::sync::sync(settings, force, db).await?;
}
atuin_client::sync::sync(settings, force, db).await?;
println!(
"Sync complete! {} items in history database, force: {}",
db.history_count(true).await?,