From 4bd191c2272c2d99ace2e8b9f21fa7ad775b140c Mon Sep 17 00:00:00 2001 From: Ellie Huxtable Date: Mon, 13 Mar 2023 21:40:11 +0000 Subject: [PATCH] wip --- Cargo.lock | 18 ++- atuin-client/Cargo.toml | 12 +- .../20220505083406_create-events.sql | 2 + .../20230310184942_alter-events.sql | 14 ++ atuin-client/src/database.rs | 13 +- atuin-client/src/event.rs | 30 ++-- atuin-client/src/lib.rs | 2 + atuin-client/src/sync.rs | 9 +- atuin-client/src/sync_event.rs | 153 ++++++++++++++++++ atuin-common/Cargo.toml | 4 +- atuin-common/src/api.rs | 4 +- .../20230310185609_event-checksum.sql | 1 + 12 files changed, 221 insertions(+), 41 deletions(-) create mode 100644 atuin-client/migrations/20230310184942_alter-events.sql create mode 100644 atuin-client/src/sync_event.rs create mode 100644 atuin-server/migrations/20230310185609_event-checksum.sql diff --git a/Cargo.lock b/Cargo.lock index c4f0c8c2..b3356aa8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -57,6 +57,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b88d82667eca772c4aa12f0f1348b3ae643424c8876448f3f7bd5787032e234c" +dependencies = [ + "autocfg", +] + [[package]] name = "atty" version = "0.2.14" @@ -119,7 +128,6 @@ dependencies = [ "directories", "eyre", "fs-err", - "hex", "interim", "itertools", "lazy_static", @@ -134,7 +142,6 @@ dependencies = [ "serde", "serde_json", "serde_regex", - "sha2", "shellexpand", "sodiumoxide", "sql-builder", @@ -150,7 +157,9 @@ name = "atuin-common" version = "13.0.1" dependencies = [ "chrono", + "hex", "serde", + "sha2", "uuid", ] @@ -2540,10 +2549,11 @@ checksum = "e8db7427f936968176eaa7cdf81b7f98b980b18495ec28f1b5791ac3bfe3eea9" [[package]] name = "uuid" -version = "1.2.1" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "feb41e78f93363bb2df8b0e86a2ca30eed7806ea16ea0c790d757cf93f79be83" +checksum = "1674845326ee10d37ca60470760d4288a6f80f304007d92e5c53bab78c9cfd79" dependencies = [ + "atomic", "getrandom", ] diff --git a/atuin-client/Cargo.toml b/atuin-client/Cargo.toml index 1fc5baf1..fd551a7c 100644 --- a/atuin-client/Cargo.toml +++ b/atuin-client/Cargo.toml @@ -12,15 +12,7 @@ repository = "https://github.com/ellie/atuin" [features] default = ["sync"] -sync = [ - "urlencoding", - "sodiumoxide", - "reqwest", - "sha2", - "hex", - "rmp-serde", - "base64", -] +sync = ["urlencoding", "sodiumoxide", "reqwest", "rmp-serde", "base64"] [dependencies] atuin-common = { path = "../atuin-common", version = "13.0.1" } @@ -60,8 +52,6 @@ reqwest = { version = "0.11", features = [ "json", "rustls-tls-native-roots", ], default-features = false, optional = true } -hex = { version = "0.4", optional = true } -sha2 = { version = "0.10", optional = true } rmp-serde = { version = "1.1.1", optional = true } base64 = { version = "0.20.0", optional = true } tokio = { version = "1", features = ["full"] } diff --git a/atuin-client/migrations/20220505083406_create-events.sql b/atuin-client/migrations/20220505083406_create-events.sql index f6cafeba..1ed4ee9f 100644 --- a/atuin-client/migrations/20220505083406_create-events.sql +++ b/atuin-client/migrations/20220505083406_create-events.sql @@ -1,3 +1,5 @@ +-- CLIENT + create table if not exists events ( id text primary key, timestamp integer not null, diff --git a/atuin-client/migrations/20230310184942_alter-events.sql b/atuin-client/migrations/20230310184942_alter-events.sql new file mode 100644 index 00000000..5c495bb3 --- /dev/null +++ b/atuin-client/migrations/20230310184942_alter-events.sql @@ -0,0 +1,14 @@ +-- CLIENT + +drop table events; -- we will rewrite it anyway + +create table if not exists events ( + id text primary key, + timestamp integer not null, + hostname text not null, + event_type text not null, + + data blob not null, + checksum text not null, + previous text not null +); diff --git a/atuin-client/src/database.rs b/atuin-client/src/database.rs index a729d10f..cdf5e3a9 100644 --- a/atuin-client/src/database.rs +++ b/atuin-client/src/database.rs @@ -130,19 +130,21 @@ impl Sqlite { async fn save_event(tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, e: &Event) -> Result<()> { let event_type = match e.event_type { - EventType::Create => "create", - EventType::Delete => "delete", + EventType::CreateHistory => "create_history", + EventType::DeleteHistory => "delete_history", }; sqlx::query( - "insert or ignore into events(id, timestamp, hostname, event_type, history_id) - values(?1, ?2, ?3, ?4, ?5)", + "insert or ignore into events(id, timestamp, hostname, event_type, data, checksum, previous) + values(?1, ?2, ?3, ?4, ?5, ?6, ?7)", ) .bind(e.id.as_str()) .bind(e.timestamp.timestamp_nanos()) .bind(e.hostname.as_str()) .bind(event_type) - .bind(e.history_id.as_str()) + .bind(e.data.as_str()) + .bind(e.checksum.as_str()) + .bind(e.previous.as_str()) .execute(tx) .await?; @@ -354,6 +356,7 @@ impl Database for Sqlite { // We can think of history as the merged log of events. There should never be more history than // events, and the only time this could happen is if someone is upgrading from an old Atuin version // from before we stored events. + let history_count = self.history_count().await?; let event_count = self.event_count().await?; diff --git a/atuin-client/src/event.rs b/atuin-client/src/event.rs index d0cf5117..06ae50e0 100644 --- a/atuin-client/src/event.rs +++ b/atuin-client/src/event.rs @@ -3,7 +3,8 @@ use serde::{Deserialize, Serialize}; use crate::history::History; use atuin_common::api::EventType; -use atuin_common::utils::uuid_v4; +use atuin_common::utils::{hash_bytes, hash_str, uuid_v4}; +use eyre::Result; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, sqlx::FromRow)] pub struct Event { @@ -12,31 +13,40 @@ pub struct Event { pub hostname: String, pub event_type: EventType, - pub history_id: String, + pub data: Vec, + pub previous: String, + pub checksum: String, } impl Event { - pub fn new_create(history: &History) -> Event { - Event { + pub fn new_create_history(history: &History, previous: String) -> Result { + let data = rmp_serde::to_vec(history)?; + let checksum = hash_bytes(&data); + + Ok(Event { id: uuid_v4(), timestamp: history.timestamp, hostname: history.hostname.clone(), - event_type: EventType::Create, + event_type: EventType::CreateHistory, - history_id: history.id.clone(), - } + data, + previous, + checksum, + }) } - pub fn new_delete(history_id: &str) -> Event { + pub fn new_delete_history(history_id: &str, previous: String) -> Event { let hostname = format!("{}:{}", whoami::hostname(), whoami::username()); Event { id: uuid_v4(), timestamp: chrono::Utc::now(), hostname, - event_type: EventType::Create, + event_type: EventType::DeleteHistory, - history_id: history_id.to_string(), + data: history_id.as_bytes().to_owned(), + checksum: hash_str(history_id), + previous, } } } diff --git a/atuin-client/src/lib.rs b/atuin-client/src/lib.rs index 37d1b0f0..7887226c 100644 --- a/atuin-client/src/lib.rs +++ b/atuin-client/src/lib.rs @@ -9,6 +9,8 @@ pub mod api_client; pub mod encryption; #[cfg(feature = "sync")] pub mod sync; +#[cfg(feature = "sync")] +pub mod sync_event; pub mod database; pub mod event; diff --git a/atuin-client/src/sync.rs b/atuin-client/src/sync.rs index 94ae24c4..67d31197 100644 --- a/atuin-client/src/sync.rs +++ b/atuin-client/src/sync.rs @@ -3,7 +3,7 @@ use std::convert::TryInto; use chrono::prelude::*; use eyre::Result; -use atuin_common::api::AddHistoryRequest; +use atuin_common::{api::AddHistoryRequest, utils::hash_str}; use crate::{ api_client, @@ -12,13 +12,6 @@ use crate::{ settings::{Settings, HISTORY_PAGE_SIZE}, }; -pub fn hash_str(string: &str) -> String { - use sha2::{Digest, Sha256}; - let mut hasher = Sha256::new(); - hasher.update(string.as_bytes()); - hex::encode(hasher.finalize()) -} - // Currently sync is kinda naive, and basically just pages backwards through // history. This means newly added stuff shows up properly! We also just use // the total count in each database to indicate whether a sync is needed. diff --git a/atuin-client/src/sync_event.rs b/atuin-client/src/sync_event.rs new file mode 100644 index 00000000..67ba1508 --- /dev/null +++ b/atuin-client/src/sync_event.rs @@ -0,0 +1,153 @@ +use std::convert::TryInto; + +use chrono::prelude::*; +use eyre::Result; + +use atuin_common::api::AddHistoryRequest; + +use crate::{ + api_client, + database::Database, + encryption::{encrypt, load_encoded_key, load_key}, + settings::{Settings, HISTORY_PAGE_SIZE}, +}; + +// Currently sync is kinda naive, and basically just pages backwards through +// history. This means newly added stuff shows up properly! We also just use +// the total count in each database to indicate whether a sync is needed. +// I think this could be massively improved! If we had a way of easily +// indicating count per time period (hour, day, week, year, etc) then we can +// easily pinpoint where we are missing data and what needs downloading. Start +// with year, then find the week, then the day, then the hour, then download it +// all! The current naive approach will do for now. + +// Check if remote has things we don't, and if so, download them. +// Returns (num downloaded, total local) +async fn sync_download( + force: bool, + client: &api_client::Client<'_>, + db: &mut (impl Database + Send), +) -> Result<(i64, i64)> { + debug!("starting sync download"); + + let remote_count = client.count().await?; + + let initial_local = db.event_count().await?; + let mut local_count = initial_local; + + let mut last_sync = if force { + Utc.timestamp_millis(0) + } else { + Settings::last_sync()? + }; + + let mut last_timestamp = Utc.timestamp_millis(0); + + let host = if force { Some(String::from("")) } else { None }; + + while remote_count > local_count { + let page = client + .get_event(last_sync, last_timestamp, host.clone()) + .await?; + + db.save_bulk(&page).await?; + + local_count = db.history_count().await?; + + if page.len() < HISTORY_PAGE_SIZE.try_into().unwrap() { + break; + } + + let page_last = page + .last() + .expect("could not get last element of page") + .timestamp; + + // in the case of a small sync frequency, it's possible for history to + // be "lost" between syncs. In this case we need to rewind the sync + // timestamps + if page_last == last_timestamp { + last_timestamp = Utc.timestamp_millis(0); + last_sync -= chrono::Duration::hours(1); + } else { + last_timestamp = page_last; + } + } + + Ok((local_count - initial_local, local_count)) +} + +// Check if we have things remote doesn't, and if so, upload them +async fn sync_upload( + settings: &Settings, + _force: bool, + client: &api_client::Client<'_>, + db: &mut (impl Database + Send), +) -> Result<()> { + debug!("starting sync upload"); + + let initial_remote_count = client.count().await?; + let mut remote_count = initial_remote_count; + + let local_count = db.history_count().await?; + + debug!("remote has {}, we have {}", remote_count, local_count); + + let key = load_key(settings)?; // encryption key + + // first just try the most recent set + + let mut cursor = Utc::now(); + + while local_count > remote_count { + let last = db.before(cursor, HISTORY_PAGE_SIZE).await?; + let mut buffer = Vec::new(); + + if last.is_empty() { + break; + } + + for i in last { + let data = encrypt(&i, &key)?; + let data = serde_json::to_string(&data)?; + + let add_hist = AddHistoryRequest { + id: i.id, + timestamp: i.timestamp, + data, + hostname: hash_str(&i.hostname), + }; + + buffer.push(add_hist); + } + + // anything left over outside of the 100 block size + client.post_history(&buffer).await?; + cursor = buffer.last().unwrap().timestamp; + remote_count = client.count().await?; + + debug!("upload cursor: {:?}", cursor); + } + + Ok(()) +} + +pub async fn sync(settings: &Settings, force: bool, db: &mut (impl Database + Send)) -> Result<()> { + db.merge_events().await?; + + let client = api_client::Client::new( + &settings.sync_address, + &settings.session_token, + load_encoded_key(settings)?, + )?; + + sync_upload(settings, force, &client, db).await?; + + let download = sync_download(force, &client, db).await?; + + debug!("sync downloaded {}", download.0); + + Settings::save_sync_time()?; + + Ok(()) +} diff --git a/atuin-common/Cargo.toml b/atuin-common/Cargo.toml index 7c407503..854783e7 100644 --- a/atuin-common/Cargo.toml +++ b/atuin-common/Cargo.toml @@ -13,4 +13,6 @@ repository = "https://github.com/ellie/atuin" [dependencies] chrono = { version = "0.4", features = ["serde"] } serde = { version = "1.0.145", features = ["derive"] } -uuid = { version = "1.2", features = ["v4"] } +uuid = { version = "1.3.0", features = ["v7"] } +sha2 = { version = "0.10" } +hex = { version = "0.4" } diff --git a/atuin-common/src/api.rs b/atuin-common/src/api.rs index 128db6b9..524a25aa 100644 --- a/atuin-common/src/api.rs +++ b/atuin-common/src/api.rs @@ -89,8 +89,8 @@ pub struct SyncEventResponse { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum EventType { - Create, - Delete, + CreateHistory, + DeleteHistory, } impl std::fmt::Display for EventType { diff --git a/atuin-server/migrations/20230310185609_event-checksum.sql b/atuin-server/migrations/20230310185609_event-checksum.sql new file mode 100644 index 00000000..8ddc1d3f --- /dev/null +++ b/atuin-server/migrations/20230310185609_event-checksum.sql @@ -0,0 +1 @@ +-- Add migration script here