mirror of
https://github.com/atuinsh/atuin.git
synced 2024-12-28 18:09:25 +01:00
Working upload sync
This commit is contained in:
parent
73ad36bc15
commit
a4066d7255
6
Cargo.lock
generated
6
Cargo.lock
generated
@ -151,12 +151,8 @@ dependencies = [
|
|||||||
"memchr",
|
"memchr",
|
||||||
"minspan",
|
"minspan",
|
||||||
"parse_duration",
|
"parse_duration",
|
||||||
<<<<<<< HEAD
|
|
||||||
"rand 0.8.5",
|
|
||||||
=======
|
|
||||||
"pretty_assertions",
|
"pretty_assertions",
|
||||||
"rand",
|
"rand 0.8.5",
|
||||||
>>>>>>> f788279b (Add tests, all passing)
|
|
||||||
"regex",
|
"regex",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"rmp",
|
"rmp",
|
||||||
|
@ -101,7 +101,7 @@ impl KvStore {
|
|||||||
|
|
||||||
let bytes = record.serialize()?;
|
let bytes = record.serialize()?;
|
||||||
|
|
||||||
let parent = store.last(host_id, KV_TAG).await?.map(|entry| entry.id);
|
let parent = store.tail(host_id, KV_TAG).await?.map(|entry| entry.id);
|
||||||
|
|
||||||
let record = atuin_common::record::Record::builder()
|
let record = atuin_common::record::Record::builder()
|
||||||
.host(host_id)
|
.host(host_id)
|
||||||
@ -135,7 +135,7 @@ impl KvStore {
|
|||||||
|
|
||||||
// iterate records to find the value we want
|
// iterate records to find the value we want
|
||||||
// start at the end, so we get the most recent version
|
// start at the end, so we get the most recent version
|
||||||
let Some(mut record) = store.last(host_id, KV_TAG).await? else {
|
let Some(mut record) = store.tail(host_id, KV_TAG).await? else {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -158,7 +158,7 @@ impl Store for SqliteStore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn first(&self, host: Uuid, tag: &str) -> Result<Option<Record<EncryptedData>>> {
|
async fn head(&self, host: Uuid, tag: &str) -> Result<Option<Record<EncryptedData>>> {
|
||||||
let res = sqlx::query(
|
let res = sqlx::query(
|
||||||
"select * from records where host = ?1 and tag = ?2 and parent is null limit 1",
|
"select * from records where host = ?1 and tag = ?2 and parent is null limit 1",
|
||||||
)
|
)
|
||||||
@ -171,7 +171,7 @@ impl Store for SqliteStore {
|
|||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn last(&self, host: Uuid, tag: &str) -> Result<Option<Record<EncryptedData>>> {
|
async fn tail(&self, host: Uuid, tag: &str) -> Result<Option<Record<EncryptedData>>> {
|
||||||
let res = sqlx::query(
|
let res = sqlx::query(
|
||||||
"select * from records rp where tag=?1 and host=?2 and (select count(1) from records where parent=rp.id) = 0;",
|
"select * from records rp where tag=?1 and host=?2 and (select count(1) from records where parent=rp.id) = 0;",
|
||||||
)
|
)
|
||||||
@ -347,7 +347,7 @@ mod tests {
|
|||||||
db.push_batch(records.iter()).await.unwrap();
|
db.push_batch(records.iter()).await.unwrap();
|
||||||
|
|
||||||
let mut record = db
|
let mut record = db
|
||||||
.first(tail.host, tail.tag.as_str())
|
.head(tail.host, tail.tag.as_str())
|
||||||
.await
|
.await
|
||||||
.expect("in memory sqlite should not fail")
|
.expect("in memory sqlite should not fail")
|
||||||
.expect("entry exists");
|
.expect("entry exists");
|
||||||
|
@ -28,9 +28,9 @@ pub trait Store {
|
|||||||
async fn next(&self, record: &Record<EncryptedData>) -> Result<Option<Record<EncryptedData>>>;
|
async fn next(&self, record: &Record<EncryptedData>) -> Result<Option<Record<EncryptedData>>>;
|
||||||
|
|
||||||
/// Get the first record for a given host and tag
|
/// Get the first record for a given host and tag
|
||||||
async fn first(&self, host: Uuid, tag: &str) -> Result<Option<Record<EncryptedData>>>;
|
async fn head(&self, host: Uuid, tag: &str) -> Result<Option<Record<EncryptedData>>>;
|
||||||
/// Get the last record for a given host and tag
|
/// Get the last record for a given host and tag
|
||||||
async fn last(&self, host: Uuid, tag: &str) -> Result<Option<Record<EncryptedData>>>;
|
async fn tail(&self, host: Uuid, tag: &str) -> Result<Option<Record<EncryptedData>>>;
|
||||||
|
|
||||||
async fn tail_records(&self) -> Result<Vec<(Uuid, String, Uuid)>>;
|
async fn tail_records(&self) -> Result<Vec<(Uuid, String, Uuid)>>;
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,7 @@ pub enum Operation {
|
|||||||
Download { tail: Uuid, host: Uuid, tag: String },
|
Download { tail: Uuid, host: Uuid, tag: String },
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn diff(settings: &Settings, store: &mut impl Store) -> Result<Diff> {
|
pub async fn diff(settings: &Settings, store: &mut impl Store) -> Result<(Diff, RecordIndex)> {
|
||||||
let client = Client::new(&settings.sync_address, &settings.session_token)?;
|
let client = Client::new(&settings.sync_address, &settings.session_token)?;
|
||||||
|
|
||||||
// First, build our own index
|
// First, build our own index
|
||||||
@ -25,7 +25,7 @@ pub async fn diff(settings: &Settings, store: &mut impl Store) -> Result<Diff> {
|
|||||||
|
|
||||||
let diff = local_index.diff(&remote_index);
|
let diff = local_index.diff(&remote_index);
|
||||||
|
|
||||||
Ok(diff)
|
Ok((diff, remote_index))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Take a diff, along with a local store, and resolve it into a set of operations.
|
// Take a diff, along with a local store, and resolve it into a set of operations.
|
||||||
@ -49,7 +49,7 @@ pub async fn operations(diff: Diff, store: &impl Store) -> Result<Vec<Operation>
|
|||||||
// if local has the ID, then we should find the actual tail of this
|
// if local has the ID, then we should find the actual tail of this
|
||||||
// store, so we know what we need to update the remote to.
|
// store, so we know what we need to update the remote to.
|
||||||
let tail = store
|
let tail = store
|
||||||
.last(host, tag.as_str())
|
.tail(host, tag.as_str())
|
||||||
.await?
|
.await?
|
||||||
.expect("failed to fetch last record, expected tag/host to exist");
|
.expect("failed to fetch last record, expected tag/host to exist");
|
||||||
|
|
||||||
@ -82,6 +82,89 @@ pub async fn operations(diff: Diff, store: &impl Store) -> Result<Vec<Operation>
|
|||||||
Ok(operations)
|
Ok(operations)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn sync_upload(
|
||||||
|
store: &mut impl Store,
|
||||||
|
remote_index: &RecordIndex,
|
||||||
|
client: &Client<'_>,
|
||||||
|
op: (Uuid, String, Uuid), // just easier to reason about this way imo
|
||||||
|
) -> Result<i64> {
|
||||||
|
let mut total = 0;
|
||||||
|
|
||||||
|
// so. we have an upload operation, with the tail representing the state
|
||||||
|
// we want to get the remote to
|
||||||
|
let current_tail = remote_index.get(op.0, op.1.clone());
|
||||||
|
|
||||||
|
println!(
|
||||||
|
"Syncing local {:?}/{}/{:?}, remote has {:?}",
|
||||||
|
op.0, op.1, op.2, current_tail
|
||||||
|
);
|
||||||
|
|
||||||
|
let start = if let Some(current_tail) = current_tail {
|
||||||
|
current_tail
|
||||||
|
} else {
|
||||||
|
store
|
||||||
|
.head(op.0, op.1.as_str())
|
||||||
|
.await
|
||||||
|
.expect("failed to fetch host/tag head")
|
||||||
|
.expect("host/tag not in current index")
|
||||||
|
.id
|
||||||
|
};
|
||||||
|
|
||||||
|
// we have the start point for sync. it is either the head of the store if
|
||||||
|
// the remote has no data for it, or the tail that the remote has
|
||||||
|
// we need to iterate from the remote tail, and keep going until
|
||||||
|
// remote tail = current local tail
|
||||||
|
|
||||||
|
let mut record = Some(store.get(start).await.unwrap());
|
||||||
|
|
||||||
|
// don't try and upload the head again
|
||||||
|
if let Some(r) = record {
|
||||||
|
record = store.next(&r).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We are currently uploading them one at a time. Yes, this sucks. We are
|
||||||
|
// also processing all records in serial. That also sucks.
|
||||||
|
// Once sync works, we can then make it super fast.
|
||||||
|
while let Some(r) = record {
|
||||||
|
client.post_records(&[r.clone()]).await?;
|
||||||
|
|
||||||
|
record = store.next(&r).await?;
|
||||||
|
total += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(total)
|
||||||
|
}
|
||||||
|
fn sync_download(tail: Uuid, host: Uuid, tag: String) -> Result<i64> {
|
||||||
|
Ok(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn sync_remote(
|
||||||
|
operations: Vec<Operation>,
|
||||||
|
remote_index: &RecordIndex,
|
||||||
|
local_store: &mut impl Store,
|
||||||
|
settings: &Settings,
|
||||||
|
) -> Result<(i64, i64)> {
|
||||||
|
let client = Client::new(&settings.sync_address, &settings.session_token)?;
|
||||||
|
|
||||||
|
let mut uploaded = 0;
|
||||||
|
let mut downloaded = 0;
|
||||||
|
|
||||||
|
// this can totally run in parallel, but lets get it working first
|
||||||
|
for i in operations {
|
||||||
|
match i {
|
||||||
|
Operation::Upload { tail, host, tag } => {
|
||||||
|
uploaded +=
|
||||||
|
sync_upload(local_store, remote_index, &client, (host, tag, tail)).await?
|
||||||
|
}
|
||||||
|
Operation::Download { tail, host, tag } => {
|
||||||
|
downloaded += sync_download(tail, host, tag)?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok((uploaded, downloaded))
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
@ -208,7 +291,7 @@ mod tests {
|
|||||||
let local_known = test_record();
|
let local_known = test_record();
|
||||||
|
|
||||||
let second_shared = test_record();
|
let second_shared = test_record();
|
||||||
let second_shared_remote_ahead = second_shared.new_child(vec![1, 2, 3]);
|
let second_shared_remote_aheparti;
|
||||||
|
|
||||||
let local_ahead = shared_record.new_child(vec![1, 2, 3]);
|
let local_ahead = shared_record.new_child(vec![1, 2, 3]);
|
||||||
|
|
||||||
|
@ -17,13 +17,7 @@ serde = { workspace = true }
|
|||||||
uuid = { workspace = true }
|
uuid = { workspace = true }
|
||||||
rand = { workspace = true }
|
rand = { workspace = true }
|
||||||
typed-builder = { workspace = true }
|
typed-builder = { workspace = true }
|
||||||
<<<<<<< HEAD
|
|
||||||
eyre = { workspace = true }
|
eyre = { workspace = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
|
||||||
pretty_assertions = "1.3.0"
|
|
||||||
=======
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
pretty_assertions = { workspace = true }
|
pretty_assertions = { workspace = true }
|
||||||
>>>>>>> f788279b (Add tests, all passing)
|
|
||||||
|
@ -5,7 +5,6 @@ use serde::{Deserialize, Serialize};
|
|||||||
use typed_builder::TypedBuilder;
|
use typed_builder::TypedBuilder;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
#[derive(Clone, Debug, PartialEq)]
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
pub struct DecryptedData(pub Vec<u8>);
|
pub struct DecryptedData(pub Vec<u8>);
|
||||||
|
|
||||||
@ -14,9 +13,8 @@ pub struct EncryptedData {
|
|||||||
pub data: String,
|
pub data: String,
|
||||||
pub content_encryption_key: String,
|
pub content_encryption_key: String,
|
||||||
}
|
}
|
||||||
=======
|
|
||||||
pub type Diff = Vec<(Uuid, String, Uuid)>;
|
pub type Diff = Vec<(Uuid, String, Uuid)>;
|
||||||
>>>>>>> f788279b (Add tests, all passing)
|
|
||||||
|
|
||||||
/// A single record stored inside of our local database
|
/// A single record stored inside of our local database
|
||||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TypedBuilder)]
|
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TypedBuilder)]
|
||||||
|
@ -350,7 +350,7 @@ impl Database for Postgres {
|
|||||||
.bind(id)
|
.bind(id)
|
||||||
.bind(i.id)
|
.bind(i.id)
|
||||||
.bind(i.host)
|
.bind(i.host)
|
||||||
.bind(id)
|
.bind(i.parent)
|
||||||
.bind(i.timestamp as i64) // throwing away some data, but i64 is still big in terms of time
|
.bind(i.timestamp as i64) // throwing away some data, but i64 is still big in terms of time
|
||||||
.bind(&i.version)
|
.bind(&i.version)
|
||||||
.bind(&i.tag)
|
.bind(&i.tag)
|
||||||
@ -367,7 +367,7 @@ impl Database for Postgres {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn tail_records(&self, user: &User) -> DbResult<Vec<(Uuid, String, Uuid)>> {
|
async fn tail_records(&self, user: &User) -> DbResult<Vec<(Uuid, String, Uuid)>> {
|
||||||
const TAIL_RECORDS_SQL: &str = "select host, tag, id from records rp where (select count(1) from records where parent=rp.id and user_id = $1) = 0;";
|
const TAIL_RECORDS_SQL: &str = "select host, tag, client_id from records rp where (select count(1) from records where parent=rp.client_id and user_id = $1) = 0;";
|
||||||
|
|
||||||
let res = sqlx::query_as(TAIL_RECORDS_SQL)
|
let res = sqlx::query_as(TAIL_RECORDS_SQL)
|
||||||
.bind(user.id)
|
.bind(user.id)
|
||||||
|
@ -78,14 +78,12 @@ async fn run(
|
|||||||
db: &mut impl Database,
|
db: &mut impl Database,
|
||||||
store: &mut impl Store,
|
store: &mut impl Store,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let diff = sync::diff(settings, store).await?;
|
let (diff, remote_index) = sync::diff(settings, store).await?;
|
||||||
println!("{:?}", diff);
|
let operations = sync::operations(diff, store).await?;
|
||||||
|
let (uploaded, downloaded) =
|
||||||
|
sync::sync_remote(operations, &remote_index, store, &settings).await?;
|
||||||
|
|
||||||
|
println!("{}/{} up/down to record store", uploaded, downloaded);
|
||||||
|
|
||||||
atuin_client::sync::sync(settings, force, db).await?;
|
|
||||||
println!(
|
|
||||||
"Sync complete! {} items in database, force: {}",
|
|
||||||
db.history_count().await?,
|
|
||||||
force
|
|
||||||
);
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user