mirror of
https://github.com/atuinsh/atuin.git
synced 2024-11-25 17:54:55 +01:00
Use the count cache (#312)
* Use the count cache By default read from the count cache - if there is no value there, then do a full COUNT. The cache will be filled when the user posts up some more history * clean up server db error handling Co-authored-by: Conrad Ludgate <conrad.ludgate@truelayer.com>
This commit is contained in:
parent
6e11b8e0ed
commit
ed4e07d2e6
@ -1,8 +1,7 @@
|
||||
use async_trait::async_trait;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use eyre::{eyre, Result};
|
||||
use sqlx::postgres::PgPoolOptions;
|
||||
use sqlx::{postgres::PgPoolOptions, Result};
|
||||
|
||||
use crate::settings::HISTORY_PAGE_SIZE;
|
||||
|
||||
@ -25,6 +24,7 @@ pub trait Database {
|
||||
async fn add_user(&self, user: &NewUser) -> Result<i64>;
|
||||
|
||||
async fn count_history(&self, user: &User) -> Result<i64>;
|
||||
async fn count_history_cached(&self, user: &User) -> Result<i64>;
|
||||
|
||||
async fn count_history_range(
|
||||
&self,
|
||||
@ -63,7 +63,7 @@ pub struct Postgres {
|
||||
}
|
||||
|
||||
impl Postgres {
|
||||
pub async fn new(uri: &str) -> Result<Self, sqlx::Error> {
|
||||
pub async fn new(uri: &str) -> Result<Self> {
|
||||
let pool = PgPoolOptions::new()
|
||||
.max_connections(100)
|
||||
.connect(uri)
|
||||
@ -78,52 +78,36 @@ impl Postgres {
|
||||
#[async_trait]
|
||||
impl Database for Postgres {
|
||||
async fn get_session(&self, token: &str) -> Result<Session> {
|
||||
let res: Option<Session> =
|
||||
sqlx::query_as::<_, Session>("select * from sessions where token = $1")
|
||||
.bind(token)
|
||||
.fetch_optional(&self.pool)
|
||||
.await?;
|
||||
|
||||
if let Some(s) = res {
|
||||
Ok(s)
|
||||
} else {
|
||||
Err(eyre!("could not find session"))
|
||||
}
|
||||
sqlx::query_as::<_, Session>("select * from sessions where token = $1")
|
||||
.bind(token)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_user(&self, username: &str) -> Result<User> {
|
||||
let res: Option<User> =
|
||||
sqlx::query_as::<_, User>("select * from users where username = $1")
|
||||
.bind(username)
|
||||
.fetch_optional(&self.pool)
|
||||
.await?;
|
||||
|
||||
if let Some(u) = res {
|
||||
Ok(u)
|
||||
} else {
|
||||
Err(eyre!("could not find user"))
|
||||
}
|
||||
sqlx::query_as::<_, User>("select * from users where username = $1")
|
||||
.bind(username)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_session_user(&self, token: &str) -> Result<User> {
|
||||
let res: Option<User> = sqlx::query_as::<_, User>(
|
||||
sqlx::query_as::<_, User>(
|
||||
"select * from users
|
||||
inner join sessions
|
||||
on users.id = sessions.user_id
|
||||
and sessions.token = $1",
|
||||
)
|
||||
.bind(token)
|
||||
.fetch_optional(&self.pool)
|
||||
.await?;
|
||||
|
||||
if let Some(u) = res {
|
||||
Ok(u)
|
||||
} else {
|
||||
Err(eyre!("could not find user"))
|
||||
}
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn count_history(&self, user: &User) -> Result<i64> {
|
||||
// The cache is new, and the user might not yet have a cache value.
|
||||
// They will have one as soon as they post up some new history, but handle that
|
||||
// edge case.
|
||||
|
||||
let res: (i64,) = sqlx::query_as(
|
||||
"select count(1) from history
|
||||
where user_id = $1",
|
||||
@ -135,6 +119,18 @@ impl Database for Postgres {
|
||||
Ok(res.0)
|
||||
}
|
||||
|
||||
async fn count_history_cached(&self, user: &User) -> Result<i64> {
|
||||
let res: (i64,) = sqlx::query_as(
|
||||
"select total from total_history_count_user
|
||||
where user_id = $1",
|
||||
)
|
||||
.bind(user.id)
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(res.0)
|
||||
}
|
||||
|
||||
async fn count_history_range(
|
||||
&self,
|
||||
user: &User,
|
||||
@ -300,17 +296,10 @@ impl Database for Postgres {
|
||||
}
|
||||
|
||||
async fn get_user_session(&self, u: &User) -> Result<Session> {
|
||||
let res: Option<Session> =
|
||||
sqlx::query_as::<_, Session>("select * from sessions where user_id = $1")
|
||||
.bind(u.id)
|
||||
.fetch_optional(&self.pool)
|
||||
.await?;
|
||||
|
||||
if let Some(s) = res {
|
||||
Ok(s)
|
||||
} else {
|
||||
Err(eyre!("could not find session"))
|
||||
}
|
||||
sqlx::query_as::<_, Session>("select * from sessions where user_id = $1")
|
||||
.bind(u.id)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn oldest_history(&self, user: &User) -> Result<History> {
|
||||
|
@ -13,10 +13,17 @@ pub async fn count(
|
||||
user: User,
|
||||
db: Extension<Postgres>,
|
||||
) -> Result<Json<CountResponse>, ErrorResponseStatus<'static>> {
|
||||
match db.count_history(&user).await {
|
||||
match db.count_history_cached(&user).await {
|
||||
// By default read out the cached value
|
||||
Ok(count) => Ok(Json(CountResponse { count })),
|
||||
Err(_) => Err(ErrorResponse::reply("failed to query history count")
|
||||
.with_status(StatusCode::INTERNAL_SERVER_ERROR)),
|
||||
|
||||
// If that fails, fallback on a full COUNT. Cache is built on a POST
|
||||
// only
|
||||
Err(_) => match db.count_history(&user).await {
|
||||
Ok(count) => Ok(Json(CountResponse { count })),
|
||||
Err(_) => Err(ErrorResponse::reply("failed to query history count")
|
||||
.with_status(StatusCode::INTERNAL_SERVER_ERROR)),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -32,10 +32,15 @@ pub async fn get(
|
||||
) -> Result<Json<UserResponse>, ErrorResponseStatus<'static>> {
|
||||
let user = match db.get_user(username.as_ref()).await {
|
||||
Ok(user) => user,
|
||||
Err(e) => {
|
||||
debug!("user not found: {}", e);
|
||||
Err(sqlx::Error::RowNotFound) => {
|
||||
debug!("user not found: {}", username);
|
||||
return Err(ErrorResponse::reply("user not found").with_status(StatusCode::NOT_FOUND));
|
||||
}
|
||||
Err(err) => {
|
||||
error!("database error: {}", err);
|
||||
return Err(ErrorResponse::reply("database error")
|
||||
.with_status(StatusCode::INTERNAL_SERVER_ERROR));
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Json(UserResponse {
|
||||
@ -96,20 +101,28 @@ pub async fn login(
|
||||
) -> Result<Json<LoginResponse>, ErrorResponseStatus<'static>> {
|
||||
let user = match db.get_user(login.username.borrow()).await {
|
||||
Ok(u) => u,
|
||||
Err(sqlx::Error::RowNotFound) => {
|
||||
return Err(ErrorResponse::reply("user not found").with_status(StatusCode::NOT_FOUND));
|
||||
}
|
||||
Err(e) => {
|
||||
error!("failed to get user {}: {}", login.username.clone(), e);
|
||||
|
||||
return Err(ErrorResponse::reply("user not found").with_status(StatusCode::NOT_FOUND));
|
||||
return Err(ErrorResponse::reply("database error")
|
||||
.with_status(StatusCode::INTERNAL_SERVER_ERROR));
|
||||
}
|
||||
};
|
||||
|
||||
let session = match db.get_user_session(&user).await {
|
||||
Ok(u) => u,
|
||||
Err(e) => {
|
||||
error!("failed to get session for {}: {}", login.username, e);
|
||||
|
||||
Err(sqlx::Error::RowNotFound) => {
|
||||
debug!("user session not found for user id={}", user.id);
|
||||
return Err(ErrorResponse::reply("user not found").with_status(StatusCode::NOT_FOUND));
|
||||
}
|
||||
Err(err) => {
|
||||
error!("database error for user {}: {}", login.username, err);
|
||||
return Err(ErrorResponse::reply("database error")
|
||||
.with_status(StatusCode::INTERNAL_SERVER_ERROR));
|
||||
}
|
||||
};
|
||||
|
||||
let verified = verify_str(user.password.as_str(), login.password.borrow());
|
||||
|
Loading…
Reference in New Issue
Block a user