feat: add IDX_CACHE_ROLLOUT (#2850)

Only really useful for Atuin cloud

Given a % chance, either use the idx cache or use the old aggregation
query

This is to enable us to test rollout the idx cache, without breaking all
queries in weird ways. Can monitor for a change in http codes/etc, and
easily roll back.
This commit is contained in:
Ellie Huxtable
2025-07-29 16:14:27 +02:00
committed by GitHub
parent 59d2047a46
commit 6c31530c7a
3 changed files with 25 additions and 30 deletions

View File

@ -22,3 +22,4 @@ async-trait = { workspace = true }
uuid = { workspace = true }
metrics = "0.21.1"
futures-util = "0.3"
rand.workspace = true

View File

@ -1,13 +1,14 @@
use std::collections::HashMap;
use std::ops::Range;
use rand::Rng;
use async_trait::async_trait;
use atuin_common::record::{EncryptedData, HostId, Record, RecordIdx, RecordStatus};
use atuin_common::utils::crypto_random_string;
use atuin_server_database::models::{History, NewHistory, NewSession, NewUser, Session, User};
use atuin_server_database::{Database, DbError, DbResult, DbSettings};
use futures_util::TryStreamExt;
use metrics::counter;
use sqlx::Row;
use sqlx::postgres::PgPoolOptions;
@ -635,44 +636,36 @@ impl Database for Postgres {
const STATUS_SQL: &str =
"select host, tag, max(idx) from store where user_id = $1 group by host, tag";
// Use a transaction to ensure consistent reads from both tables
let mut tx = self.pool.begin().await.map_err(fix_error)?;
// If IDX_CACHE_ROLLOUT is set, then we
// 1. Read the value of the var, use it as a % chance of using the cache
// 2. If we use the cache, just read from the cache table
// 3. If we don't use the cache, read from the store table
// IDX_CACHE_ROLLOUT should be between 0 and 100.
let mut res: Vec<(Uuid, String, i64)> = sqlx::query_as(STATUS_SQL)
.bind(user.id)
.fetch_all(&mut *tx)
.await
.map_err(fix_error)?;
res.sort();
let idx_cache_rollout = std::env::var("IDX_CACHE_ROLLOUT").unwrap_or("0".to_string());
let idx_cache_rollout = idx_cache_rollout.parse::<f64>().unwrap_or(0.0);
let use_idx_cache = rand::thread_rng().gen_bool(idx_cache_rollout / 100.0);
// We're temporarily increasing latency in order to improve confidence in the cache
// If it runs for a few days, and we confirm that cached values are equal to realtime, we
// can replace realtime with cached.
//
// But let's check so sync doesn't do Weird Things.
let mut cached_res: Vec<(Uuid, String, i64)> =
let mut res: Vec<(Uuid, String, i64)> = if use_idx_cache {
tracing::debug!("using idx cache for user {}", user.id);
sqlx::query_as("select host, tag, idx from store_idx_cache where user_id = $1")
.bind(user.id)
.fetch_all(&mut *tx)
.fetch_all(&self.pool)
.await
.map_err(fix_error)?;
cached_res.sort();
.map_err(fix_error)?
} else {
tracing::debug!("using aggregate query for user {}", user.id);
sqlx::query_as(STATUS_SQL)
.bind(user.id)
.fetch_all(&self.pool)
.await
.map_err(fix_error)?
};
// No need to commit a read-only transaction
res.sort();
let mut status = RecordStatus::new();
let equal = res == cached_res;
if equal {
counter!("atuin_store_idx_cache_consistent", 1);
} else {
// log the values if we have an inconsistent cache
tracing::debug!(user = user.username, cache_match = equal, res = ?res, cached = ?cached_res, "record store index request");
counter!("atuin_store_idx_cache_inconsistent", 1);
};
for i in res.iter() {
status.set_raw(HostId(i.0), i.1.clone(), i.2 as u64);
}