diff --git a/atuin-client/src/record/sync.rs b/atuin-client/src/record/sync.rs index a694fa93..17714a02 100644 --- a/atuin-client/src/record/sync.rs +++ b/atuin-client/src/record/sync.rs @@ -110,6 +110,8 @@ async fn sync_upload( .id }; + debug!("starting push to remote from: {}", start); + // we have the start point for sync. it is either the head of the store if // the remote has no data for it, or the tail that the remote has // we need to iterate from the remote tail, and keep going until @@ -117,11 +119,6 @@ async fn sync_upload( let mut record = Some(store.get(start).await.unwrap()); - // don't try and upload the head again - if let Some(r) = record { - record = store.next(&r).await?; - } - // We are currently uploading them one at a time. Yes, this sucks. We are // also processing all records in serial. That also sucks. // Once sync works, we can then make it super fast. @@ -134,7 +131,15 @@ async fn sync_upload( Ok(total) } -fn sync_download(tail: Uuid, host: Uuid, tag: String) -> Result { + +async fn sync_download( + store: &mut impl Store, + remote_index: &RecordIndex, + client: &Client<'_>, + op: (Uuid, String, Uuid), +) -> Result { + let mut total = 0; + Ok(0) } @@ -157,7 +162,8 @@ pub async fn sync_remote( sync_upload(local_store, remote_index, &client, (host, tag, tail)).await? } Operation::Download { tail, host, tag } => { - downloaded += sync_download(tail, host, tag)? + downloaded += + sync_download(local_store, remote_index, &client, (host, tag, tail)).await? } } } diff --git a/atuin-server-database/src/lib.rs b/atuin-server-database/src/lib.rs index 6c72e2c9..9e19c673 100644 --- a/atuin-server-database/src/lib.rs +++ b/atuin-server-database/src/lib.rs @@ -57,6 +57,14 @@ pub trait Database: Sized + Clone + Send + Sync + 'static { async fn deleted_history(&self, user: &User) -> DbResult>; async fn add_records(&self, user: &User, record: &[Record]) -> DbResult<()>; + async fn next_records( + &self, + user: &User, + host: Uuid, + tag: String, + start: Option, + count: u64, + ) -> DbResult>; // Return the tail record ID for each store, so (HostID, Tag, TailRecordID) async fn tail_records(&self, user: &User) -> DbResult>; diff --git a/atuin-server-postgres/migrations/20230623070418_records.sql b/atuin-server-postgres/migrations/20230623070418_records.sql index f6d3d1e6..d741e78a 100644 --- a/atuin-server-postgres/migrations/20230623070418_records.sql +++ b/atuin-server-postgres/migrations/20230623070418_records.sql @@ -3,7 +3,7 @@ create table records ( id uuid primary key, -- remember to use uuidv7 for happy indices <3 client_id uuid not null, -- I am too uncomfortable with the idea of a client-generated primary key host uuid not null, -- a unique identifier for the host - parent uuid not null, -- the ID of the parent record, bearing in mind this is a linked list + parent uuid default null, -- the ID of the parent record, bearing in mind this is a linked list timestamp bigint not null, -- not a timestamp type, as those do not have nanosecond precision version text not null, tag text not null, -- what is this? history, kv, whatever. Remember clients get a log per tag per host diff --git a/atuin-server-postgres/src/lib.rs b/atuin-server-postgres/src/lib.rs index f5eb2b06..ca9a7a70 100644 --- a/atuin-server-postgres/src/lib.rs +++ b/atuin-server-postgres/src/lib.rs @@ -12,7 +12,7 @@ use sqlx::Row; use sqlx::types::Uuid; use tracing::instrument; -use wrappers::{DbHistory, DbSession, DbUser}; +use wrappers::{DbHistory, DbRecord, DbSession, DbUser}; mod wrappers; @@ -334,6 +334,7 @@ impl Database for Postgres { .map(|DbHistory(h)| h) } + #[instrument(skip_all)] async fn add_records(&self, user: &User, records: &[Record]) -> DbResult<()> { let mut tx = self.pool.begin().await.map_err(fix_error)?; @@ -366,6 +367,57 @@ impl Database for Postgres { Ok(()) } + #[instrument(skip_all)] + async fn next_records( + &self, + user: &User, + host: Uuid, + tag: String, + start: Option, + count: u64, + ) -> DbResult> { + tracing::debug!("{:?} - {:?} - {:?}", host, tag, start); + let mut ret = Vec::with_capacity(count as usize); + let mut parent = start; + + // yeah let's do something better + for _ in 0..count { + // a very much not ideal query. but it's simple at least? + // we are basically using postgres as a kv store here, so... maybe consider using an actual + // kv store? + let record: Result = sqlx::query_as( + "select client_id, host, parent, timestamp, version, tag, data from records + where user_id = $1 + and tag = $2 + and host = $3 + and parent is not distinct from $4", + ) + .bind(user.id) + .bind(tag.clone()) + .bind(host) + .bind(parent) + .fetch_one(&self.pool) + .await + .map_err(fix_error); + + match record { + Ok(record) => { + let record: Record = record.into(); + ret.push(record.clone()); + + parent = Some(record.id); + } + Err(DbError::NotFound) => { + tracing::debug!("hit tail of store: {:?}/{}", host, tag); + return Ok(ret); + } + Err(e) => return Err(e), + } + } + + Ok(ret) + } + async fn tail_records(&self, user: &User) -> DbResult> { const TAIL_RECORDS_SQL: &str = "select host, tag, client_id from records rp where (select count(1) from records where parent=rp.client_id and user_id = $1) = 0;"; diff --git a/atuin-server-postgres/src/wrappers.rs b/atuin-server-postgres/src/wrappers.rs index cb3d5a96..c4e6da97 100644 --- a/atuin-server-postgres/src/wrappers.rs +++ b/atuin-server-postgres/src/wrappers.rs @@ -1,10 +1,12 @@ use ::sqlx::{FromRow, Result}; +use atuin_common::record::Record; use atuin_server_database::models::{History, Session, User}; use sqlx::{postgres::PgRow, Row}; pub struct DbUser(pub User); pub struct DbSession(pub Session); pub struct DbHistory(pub History); +pub struct DbRecord(pub Record); impl<'a> FromRow<'a, PgRow> for DbUser { fn from_row(row: &'a PgRow) -> Result { @@ -40,3 +42,25 @@ impl<'a> ::sqlx::FromRow<'a, PgRow> for DbHistory { })) } } + +impl<'a> ::sqlx::FromRow<'a, PgRow> for DbRecord { + fn from_row(row: &'a PgRow) -> ::sqlx::Result { + let timestamp: i64 = row.try_get("timestamp")?; + + Ok(Self(Record { + id: row.try_get("client_id")?, + host: row.try_get("host")?, + parent: row.try_get("parent")?, + timestamp: timestamp as u64, + version: row.try_get("version")?, + tag: row.try_get("tag")?, + data: row.try_get("data")?, + })) + } +} + +impl Into for DbRecord { + fn into(self) -> Record { + Record { ..self.0 } + } +} diff --git a/atuin-server/src/handlers/record.rs b/atuin-server/src/handlers/record.rs index e7a6f566..0d3f277e 100644 --- a/atuin-server/src/handlers/record.rs +++ b/atuin-server/src/handlers/record.rs @@ -1,6 +1,8 @@ -use axum::{extract::State, Json}; +use axum::{extract::Query, extract::State, Json}; use http::StatusCode; +use serde::Deserialize; use tracing::{error, instrument}; +use uuid::Uuid; use super::{ErrorResponse, ErrorResponseStatus, RespExt}; use crate::router::{AppState, UserAuth}; @@ -68,3 +70,36 @@ pub async fn index( Ok(Json(record_index)) } + +#[derive(Deserialize)] +pub struct NextParams { + host: Uuid, + tag: String, + start: Option, + count: u64, +} + +#[instrument(skip_all, fields(user.id = user.id))] +pub async fn next( + params: Query, + UserAuth(user): UserAuth, + state: State>, +) -> Result>, ErrorResponseStatus<'static>> { + let State(AppState { database, settings }) = state; + let params = params.0; + + let records = match database + .next_records(&user, params.host, params.tag, params.start, params.count) + .await + { + Ok(records) => records, + Err(e) => { + error!("failed to get record index: {}", e); + + return Err(ErrorResponse::reply("failed to calculate record index") + .with_status(StatusCode::INTERNAL_SERVER_ERROR)); + } + }; + + Ok(Json(records)) +} diff --git a/atuin-server/src/router.rs b/atuin-server/src/router.rs index 355cf060..7dc8a246 100644 --- a/atuin-server/src/router.rs +++ b/atuin-server/src/router.rs @@ -73,6 +73,7 @@ pub fn router(database: DB, settings: Settings) -> R .route("/history", delete(handlers::history::delete)) .route("/record", post(handlers::record::post)) .route("/record", get(handlers::record::index)) + .route("/record/next", get(handlers::record::next)) .route("/user/:username", get(handlers::user::get)) .route("/account", delete(handlers::user::delete)) .route("/register", post(handlers::user::register))