From a4066d725515fa513c40a04d10def90aea6ab3f5 Mon Sep 17 00:00:00 2001 From: Ellie Huxtable Date: Wed, 5 Jul 2023 09:00:30 +0100 Subject: [PATCH] Working upload sync --- Cargo.lock | 6 +- atuin-client/src/kv.rs | 4 +- atuin-client/src/record/sqlite_store.rs | 6 +- atuin-client/src/record/store.rs | 4 +- atuin-client/src/record/sync.rs | 91 +++++++++++++++++++++++-- atuin-common/Cargo.toml | 6 -- atuin-common/src/record.rs | 4 +- atuin-server-postgres/src/lib.rs | 4 +- atuin/src/command/client/sync.rs | 14 ++-- 9 files changed, 104 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 43f91ef4..d824f0e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -151,12 +151,8 @@ dependencies = [ "memchr", "minspan", "parse_duration", -<<<<<<< HEAD - "rand 0.8.5", -======= "pretty_assertions", - "rand", ->>>>>>> f788279b (Add tests, all passing) + "rand 0.8.5", "regex", "reqwest", "rmp", diff --git a/atuin-client/src/kv.rs b/atuin-client/src/kv.rs index 22431833..102d2978 100644 --- a/atuin-client/src/kv.rs +++ b/atuin-client/src/kv.rs @@ -101,7 +101,7 @@ impl KvStore { let bytes = record.serialize()?; - let parent = store.last(host_id, KV_TAG).await?.map(|entry| entry.id); + let parent = store.tail(host_id, KV_TAG).await?.map(|entry| entry.id); let record = atuin_common::record::Record::builder() .host(host_id) @@ -135,7 +135,7 @@ impl KvStore { // iterate records to find the value we want // start at the end, so we get the most recent version - let Some(mut record) = store.last(host_id, KV_TAG).await? else { + let Some(mut record) = store.tail(host_id, KV_TAG).await? else { return Ok(None); }; diff --git a/atuin-client/src/record/sqlite_store.rs b/atuin-client/src/record/sqlite_store.rs index 9207129e..ebfd666e 100644 --- a/atuin-client/src/record/sqlite_store.rs +++ b/atuin-client/src/record/sqlite_store.rs @@ -158,7 +158,7 @@ impl Store for SqliteStore { } } - async fn first(&self, host: Uuid, tag: &str) -> Result>> { + async fn head(&self, host: Uuid, tag: &str) -> Result>> { let res = sqlx::query( "select * from records where host = ?1 and tag = ?2 and parent is null limit 1", ) @@ -171,7 +171,7 @@ impl Store for SqliteStore { Ok(res) } - async fn last(&self, host: Uuid, tag: &str) -> Result>> { + async fn tail(&self, host: Uuid, tag: &str) -> Result>> { let res = sqlx::query( "select * from records rp where tag=?1 and host=?2 and (select count(1) from records where parent=rp.id) = 0;", ) @@ -347,7 +347,7 @@ mod tests { db.push_batch(records.iter()).await.unwrap(); let mut record = db - .first(tail.host, tail.tag.as_str()) + .head(tail.host, tail.tag.as_str()) .await .expect("in memory sqlite should not fail") .expect("entry exists"); diff --git a/atuin-client/src/record/store.rs b/atuin-client/src/record/store.rs index a5280bbb..a4713877 100644 --- a/atuin-client/src/record/store.rs +++ b/atuin-client/src/record/store.rs @@ -28,9 +28,9 @@ pub trait Store { async fn next(&self, record: &Record) -> Result>>; /// Get the first record for a given host and tag - async fn first(&self, host: Uuid, tag: &str) -> Result>>; + async fn head(&self, host: Uuid, tag: &str) -> Result>>; /// Get the last record for a given host and tag - async fn last(&self, host: Uuid, tag: &str) -> Result>>; + async fn tail(&self, host: Uuid, tag: &str) -> Result>>; async fn tail_records(&self) -> Result>; } diff --git a/atuin-client/src/record/sync.rs b/atuin-client/src/record/sync.rs index 5b03b6de..a694fa93 100644 --- a/atuin-client/src/record/sync.rs +++ b/atuin-client/src/record/sync.rs @@ -14,7 +14,7 @@ pub enum Operation { Download { tail: Uuid, host: Uuid, tag: String }, } -pub async fn diff(settings: &Settings, store: &mut impl Store) -> Result { +pub async fn diff(settings: &Settings, store: &mut impl Store) -> Result<(Diff, RecordIndex)> { let client = Client::new(&settings.sync_address, &settings.session_token)?; // First, build our own index @@ -25,7 +25,7 @@ pub async fn diff(settings: &Settings, store: &mut impl Store) -> Result { let diff = local_index.diff(&remote_index); - Ok(diff) + Ok((diff, remote_index)) } // Take a diff, along with a local store, and resolve it into a set of operations. @@ -49,7 +49,7 @@ pub async fn operations(diff: Diff, store: &impl Store) -> Result // if local has the ID, then we should find the actual tail of this // store, so we know what we need to update the remote to. let tail = store - .last(host, tag.as_str()) + .tail(host, tag.as_str()) .await? .expect("failed to fetch last record, expected tag/host to exist"); @@ -82,6 +82,89 @@ pub async fn operations(diff: Diff, store: &impl Store) -> Result Ok(operations) } +async fn sync_upload( + store: &mut impl Store, + remote_index: &RecordIndex, + client: &Client<'_>, + op: (Uuid, String, Uuid), // just easier to reason about this way imo +) -> Result { + let mut total = 0; + + // so. we have an upload operation, with the tail representing the state + // we want to get the remote to + let current_tail = remote_index.get(op.0, op.1.clone()); + + println!( + "Syncing local {:?}/{}/{:?}, remote has {:?}", + op.0, op.1, op.2, current_tail + ); + + let start = if let Some(current_tail) = current_tail { + current_tail + } else { + store + .head(op.0, op.1.as_str()) + .await + .expect("failed to fetch host/tag head") + .expect("host/tag not in current index") + .id + }; + + // we have the start point for sync. it is either the head of the store if + // the remote has no data for it, or the tail that the remote has + // we need to iterate from the remote tail, and keep going until + // remote tail = current local tail + + let mut record = Some(store.get(start).await.unwrap()); + + // don't try and upload the head again + if let Some(r) = record { + record = store.next(&r).await?; + } + + // We are currently uploading them one at a time. Yes, this sucks. We are + // also processing all records in serial. That also sucks. + // Once sync works, we can then make it super fast. + while let Some(r) = record { + client.post_records(&[r.clone()]).await?; + + record = store.next(&r).await?; + total += 1; + } + + Ok(total) +} +fn sync_download(tail: Uuid, host: Uuid, tag: String) -> Result { + Ok(0) +} + +pub async fn sync_remote( + operations: Vec, + remote_index: &RecordIndex, + local_store: &mut impl Store, + settings: &Settings, +) -> Result<(i64, i64)> { + let client = Client::new(&settings.sync_address, &settings.session_token)?; + + let mut uploaded = 0; + let mut downloaded = 0; + + // this can totally run in parallel, but lets get it working first + for i in operations { + match i { + Operation::Upload { tail, host, tag } => { + uploaded += + sync_upload(local_store, remote_index, &client, (host, tag, tail)).await? + } + Operation::Download { tail, host, tag } => { + downloaded += sync_download(tail, host, tag)? + } + } + } + + Ok((uploaded, downloaded)) +} + #[cfg(test)] mod tests { use std::str::FromStr; @@ -208,7 +291,7 @@ mod tests { let local_known = test_record(); let second_shared = test_record(); - let second_shared_remote_ahead = second_shared.new_child(vec![1, 2, 3]); + let second_shared_remote_aheparti; let local_ahead = shared_record.new_child(vec![1, 2, 3]); diff --git a/atuin-common/Cargo.toml b/atuin-common/Cargo.toml index 0f7676c4..d36bd001 100644 --- a/atuin-common/Cargo.toml +++ b/atuin-common/Cargo.toml @@ -17,13 +17,7 @@ serde = { workspace = true } uuid = { workspace = true } rand = { workspace = true } typed-builder = { workspace = true } -<<<<<<< HEAD eyre = { workspace = true } -[dev-dependencies] -pretty_assertions = "1.3.0" -======= - [dev-dependencies] pretty_assertions = { workspace = true } ->>>>>>> f788279b (Add tests, all passing) diff --git a/atuin-common/src/record.rs b/atuin-common/src/record.rs index 34103ae6..279ecaa7 100644 --- a/atuin-common/src/record.rs +++ b/atuin-common/src/record.rs @@ -5,7 +5,6 @@ use serde::{Deserialize, Serialize}; use typed_builder::TypedBuilder; use uuid::Uuid; -<<<<<<< HEAD #[derive(Clone, Debug, PartialEq)] pub struct DecryptedData(pub Vec); @@ -14,9 +13,8 @@ pub struct EncryptedData { pub data: String, pub content_encryption_key: String, } -======= + pub type Diff = Vec<(Uuid, String, Uuid)>; ->>>>>>> f788279b (Add tests, all passing) /// A single record stored inside of our local database #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TypedBuilder)] diff --git a/atuin-server-postgres/src/lib.rs b/atuin-server-postgres/src/lib.rs index 37cf06ab..f5eb2b06 100644 --- a/atuin-server-postgres/src/lib.rs +++ b/atuin-server-postgres/src/lib.rs @@ -350,7 +350,7 @@ impl Database for Postgres { .bind(id) .bind(i.id) .bind(i.host) - .bind(id) + .bind(i.parent) .bind(i.timestamp as i64) // throwing away some data, but i64 is still big in terms of time .bind(&i.version) .bind(&i.tag) @@ -367,7 +367,7 @@ impl Database for Postgres { } async fn tail_records(&self, user: &User) -> DbResult> { - const TAIL_RECORDS_SQL: &str = "select host, tag, id from records rp where (select count(1) from records where parent=rp.id and user_id = $1) = 0;"; + const TAIL_RECORDS_SQL: &str = "select host, tag, client_id from records rp where (select count(1) from records where parent=rp.client_id and user_id = $1) = 0;"; let res = sqlx::query_as(TAIL_RECORDS_SQL) .bind(user.id) diff --git a/atuin/src/command/client/sync.rs b/atuin/src/command/client/sync.rs index feec9be0..cdb29a01 100644 --- a/atuin/src/command/client/sync.rs +++ b/atuin/src/command/client/sync.rs @@ -78,14 +78,12 @@ async fn run( db: &mut impl Database, store: &mut impl Store, ) -> Result<()> { - let diff = sync::diff(settings, store).await?; - println!("{:?}", diff); + let (diff, remote_index) = sync::diff(settings, store).await?; + let operations = sync::operations(diff, store).await?; + let (uploaded, downloaded) = + sync::sync_remote(operations, &remote_index, store, &settings).await?; + + println!("{}/{} up/down to record store", uploaded, downloaded); - atuin_client::sync::sync(settings, force, db).await?; - println!( - "Sync complete! {} items in database, force: {}", - db.history_count().await?, - force - ); Ok(()) }