mirror of
https://github.com/atuinsh/atuin.git
synced 2024-10-07 10:43:06 +02:00
feat: monitor idx cache consistency before switching (#2229)
This commit is contained in:
parent
255a7bccb6
commit
c3723aaf27
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -447,6 +447,7 @@ dependencies = [
|
|||||||
"atuin-server-database",
|
"atuin-server-database",
|
||||||
"eyre",
|
"eyre",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
|
"metrics",
|
||||||
"serde",
|
"serde",
|
||||||
"sqlx",
|
"sqlx",
|
||||||
"time",
|
"time",
|
||||||
|
@ -20,5 +20,6 @@ serde = { workspace = true }
|
|||||||
sqlx = { workspace = true }
|
sqlx = { workspace = true }
|
||||||
async-trait = { workspace = true }
|
async-trait = { workspace = true }
|
||||||
uuid = { workspace = true }
|
uuid = { workspace = true }
|
||||||
|
metrics = "0.21.1"
|
||||||
futures-util = "0.3"
|
futures-util = "0.3"
|
||||||
url = "2.5.2"
|
url = "2.5.2"
|
||||||
|
@ -8,6 +8,7 @@ use atuin_common::utils::crypto_random_string;
|
|||||||
use atuin_server_database::models::{History, NewHistory, NewSession, NewUser, Session, User};
|
use atuin_server_database::models::{History, NewHistory, NewSession, NewUser, Session, User};
|
||||||
use atuin_server_database::{Database, DbError, DbResult};
|
use atuin_server_database::{Database, DbError, DbResult};
|
||||||
use futures_util::TryStreamExt;
|
use futures_util::TryStreamExt;
|
||||||
|
use metrics::counter;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use sqlx::postgres::PgPoolOptions;
|
use sqlx::postgres::PgPoolOptions;
|
||||||
use sqlx::Row;
|
use sqlx::Row;
|
||||||
@ -643,16 +644,41 @@ impl Database for Postgres {
|
|||||||
const STATUS_SQL: &str =
|
const STATUS_SQL: &str =
|
||||||
"select host, tag, max(idx) from store where user_id = $1 group by host, tag";
|
"select host, tag, max(idx) from store where user_id = $1 group by host, tag";
|
||||||
|
|
||||||
let res: Vec<(Uuid, String, i64)> = sqlx::query_as(STATUS_SQL)
|
let mut res: Vec<(Uuid, String, i64)> = sqlx::query_as(STATUS_SQL)
|
||||||
.bind(user.id)
|
.bind(user.id)
|
||||||
.fetch_all(&self.pool)
|
.fetch_all(&self.pool)
|
||||||
.await
|
.await
|
||||||
.map_err(fix_error)?;
|
.map_err(fix_error)?;
|
||||||
|
res.sort();
|
||||||
|
|
||||||
|
// We're temporarily increasing latency in order to improve confidence in the cache
|
||||||
|
// If it runs for a few days, and we confirm that cached values are equal to realtime, we
|
||||||
|
// can replace realtime with cached.
|
||||||
|
//
|
||||||
|
// But let's check so sync doesn't do Weird Things.
|
||||||
|
|
||||||
|
let mut cached_res: Vec<(Uuid, String, i64)> =
|
||||||
|
sqlx::query_as("select host, tag, idx from store_idx_cache where user_id = $1")
|
||||||
|
.bind(user.id)
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await
|
||||||
|
.map_err(fix_error)?;
|
||||||
|
cached_res.sort();
|
||||||
|
|
||||||
let mut status = RecordStatus::new();
|
let mut status = RecordStatus::new();
|
||||||
|
|
||||||
for i in res {
|
let equal = res == cached_res;
|
||||||
status.set_raw(HostId(i.0), i.1, i.2 as u64);
|
|
||||||
|
if equal {
|
||||||
|
counter!("atuin_store_idx_cache_consistent", 1);
|
||||||
|
} else {
|
||||||
|
// log the values if we have an inconsistent cache
|
||||||
|
tracing::debug!(user = user.username, cache_match = equal, res = ?res, cached = ?cached_res, "record store index request");
|
||||||
|
counter!("atuin_store_idx_cache_inconsistent", 1);
|
||||||
|
};
|
||||||
|
|
||||||
|
for i in res.iter() {
|
||||||
|
status.set_raw(HostId(i.0), i.1.clone(), i.2 as u64);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(status)
|
Ok(status)
|
||||||
|
Loading…
Reference in New Issue
Block a user