Use the count cache (#312)

* Use the count cache

By default read from the count cache - if there is no value there, then
do a full COUNT. The cache will be filled when the user posts up some
more history

* clean up server db error handling

Co-authored-by: Conrad Ludgate <conrad.ludgate@truelayer.com>
This commit is contained in:
Ellie Huxtable 2022-04-21 08:03:39 +01:00 committed by GitHub
parent 6e11b8e0ed
commit ed4e07d2e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 63 additions and 54 deletions

View File

@ -1,8 +1,7 @@
use async_trait::async_trait; use async_trait::async_trait;
use std::collections::HashMap; use std::collections::HashMap;
use eyre::{eyre, Result}; use sqlx::{postgres::PgPoolOptions, Result};
use sqlx::postgres::PgPoolOptions;
use crate::settings::HISTORY_PAGE_SIZE; use crate::settings::HISTORY_PAGE_SIZE;
@ -25,6 +24,7 @@ pub trait Database {
async fn add_user(&self, user: &NewUser) -> Result<i64>; async fn add_user(&self, user: &NewUser) -> Result<i64>;
async fn count_history(&self, user: &User) -> Result<i64>; async fn count_history(&self, user: &User) -> Result<i64>;
async fn count_history_cached(&self, user: &User) -> Result<i64>;
async fn count_history_range( async fn count_history_range(
&self, &self,
@ -63,7 +63,7 @@ pub struct Postgres {
} }
impl Postgres { impl Postgres {
pub async fn new(uri: &str) -> Result<Self, sqlx::Error> { pub async fn new(uri: &str) -> Result<Self> {
let pool = PgPoolOptions::new() let pool = PgPoolOptions::new()
.max_connections(100) .max_connections(100)
.connect(uri) .connect(uri)
@ -78,52 +78,36 @@ impl Postgres {
#[async_trait] #[async_trait]
impl Database for Postgres { impl Database for Postgres {
async fn get_session(&self, token: &str) -> Result<Session> { async fn get_session(&self, token: &str) -> Result<Session> {
let res: Option<Session> =
sqlx::query_as::<_, Session>("select * from sessions where token = $1") sqlx::query_as::<_, Session>("select * from sessions where token = $1")
.bind(token) .bind(token)
.fetch_optional(&self.pool) .fetch_one(&self.pool)
.await?; .await
if let Some(s) = res {
Ok(s)
} else {
Err(eyre!("could not find session"))
}
} }
async fn get_user(&self, username: &str) -> Result<User> { async fn get_user(&self, username: &str) -> Result<User> {
let res: Option<User> =
sqlx::query_as::<_, User>("select * from users where username = $1") sqlx::query_as::<_, User>("select * from users where username = $1")
.bind(username) .bind(username)
.fetch_optional(&self.pool) .fetch_one(&self.pool)
.await?; .await
if let Some(u) = res {
Ok(u)
} else {
Err(eyre!("could not find user"))
}
} }
async fn get_session_user(&self, token: &str) -> Result<User> { async fn get_session_user(&self, token: &str) -> Result<User> {
let res: Option<User> = sqlx::query_as::<_, User>( sqlx::query_as::<_, User>(
"select * from users "select * from users
inner join sessions inner join sessions
on users.id = sessions.user_id on users.id = sessions.user_id
and sessions.token = $1", and sessions.token = $1",
) )
.bind(token) .bind(token)
.fetch_optional(&self.pool) .fetch_one(&self.pool)
.await?; .await
if let Some(u) = res {
Ok(u)
} else {
Err(eyre!("could not find user"))
}
} }
async fn count_history(&self, user: &User) -> Result<i64> { async fn count_history(&self, user: &User) -> Result<i64> {
// The cache is new, and the user might not yet have a cache value.
// They will have one as soon as they post up some new history, but handle that
// edge case.
let res: (i64,) = sqlx::query_as( let res: (i64,) = sqlx::query_as(
"select count(1) from history "select count(1) from history
where user_id = $1", where user_id = $1",
@ -135,6 +119,18 @@ impl Database for Postgres {
Ok(res.0) Ok(res.0)
} }
async fn count_history_cached(&self, user: &User) -> Result<i64> {
let res: (i64,) = sqlx::query_as(
"select total from total_history_count_user
where user_id = $1",
)
.bind(user.id)
.fetch_one(&self.pool)
.await?;
Ok(res.0)
}
async fn count_history_range( async fn count_history_range(
&self, &self,
user: &User, user: &User,
@ -300,17 +296,10 @@ impl Database for Postgres {
} }
async fn get_user_session(&self, u: &User) -> Result<Session> { async fn get_user_session(&self, u: &User) -> Result<Session> {
let res: Option<Session> =
sqlx::query_as::<_, Session>("select * from sessions where user_id = $1") sqlx::query_as::<_, Session>("select * from sessions where user_id = $1")
.bind(u.id) .bind(u.id)
.fetch_optional(&self.pool) .fetch_one(&self.pool)
.await?; .await
if let Some(s) = res {
Ok(s)
} else {
Err(eyre!("could not find session"))
}
} }
async fn oldest_history(&self, user: &User) -> Result<History> { async fn oldest_history(&self, user: &User) -> Result<History> {

View File

@ -13,10 +13,17 @@ pub async fn count(
user: User, user: User,
db: Extension<Postgres>, db: Extension<Postgres>,
) -> Result<Json<CountResponse>, ErrorResponseStatus<'static>> { ) -> Result<Json<CountResponse>, ErrorResponseStatus<'static>> {
match db.count_history(&user).await { match db.count_history_cached(&user).await {
// By default read out the cached value
Ok(count) => Ok(Json(CountResponse { count })),
// If that fails, fallback on a full COUNT. Cache is built on a POST
// only
Err(_) => match db.count_history(&user).await {
Ok(count) => Ok(Json(CountResponse { count })), Ok(count) => Ok(Json(CountResponse { count })),
Err(_) => Err(ErrorResponse::reply("failed to query history count") Err(_) => Err(ErrorResponse::reply("failed to query history count")
.with_status(StatusCode::INTERNAL_SERVER_ERROR)), .with_status(StatusCode::INTERNAL_SERVER_ERROR)),
},
} }
} }

View File

@ -32,10 +32,15 @@ pub async fn get(
) -> Result<Json<UserResponse>, ErrorResponseStatus<'static>> { ) -> Result<Json<UserResponse>, ErrorResponseStatus<'static>> {
let user = match db.get_user(username.as_ref()).await { let user = match db.get_user(username.as_ref()).await {
Ok(user) => user, Ok(user) => user,
Err(e) => { Err(sqlx::Error::RowNotFound) => {
debug!("user not found: {}", e); debug!("user not found: {}", username);
return Err(ErrorResponse::reply("user not found").with_status(StatusCode::NOT_FOUND)); return Err(ErrorResponse::reply("user not found").with_status(StatusCode::NOT_FOUND));
} }
Err(err) => {
error!("database error: {}", err);
return Err(ErrorResponse::reply("database error")
.with_status(StatusCode::INTERNAL_SERVER_ERROR));
}
}; };
Ok(Json(UserResponse { Ok(Json(UserResponse {
@ -96,20 +101,28 @@ pub async fn login(
) -> Result<Json<LoginResponse>, ErrorResponseStatus<'static>> { ) -> Result<Json<LoginResponse>, ErrorResponseStatus<'static>> {
let user = match db.get_user(login.username.borrow()).await { let user = match db.get_user(login.username.borrow()).await {
Ok(u) => u, Ok(u) => u,
Err(sqlx::Error::RowNotFound) => {
return Err(ErrorResponse::reply("user not found").with_status(StatusCode::NOT_FOUND));
}
Err(e) => { Err(e) => {
error!("failed to get user {}: {}", login.username.clone(), e); error!("failed to get user {}: {}", login.username.clone(), e);
return Err(ErrorResponse::reply("user not found").with_status(StatusCode::NOT_FOUND)); return Err(ErrorResponse::reply("database error")
.with_status(StatusCode::INTERNAL_SERVER_ERROR));
} }
}; };
let session = match db.get_user_session(&user).await { let session = match db.get_user_session(&user).await {
Ok(u) => u, Ok(u) => u,
Err(e) => { Err(sqlx::Error::RowNotFound) => {
error!("failed to get session for {}: {}", login.username, e); debug!("user session not found for user id={}", user.id);
return Err(ErrorResponse::reply("user not found").with_status(StatusCode::NOT_FOUND)); return Err(ErrorResponse::reply("user not found").with_status(StatusCode::NOT_FOUND));
} }
Err(err) => {
error!("database error for user {}: {}", login.username, err);
return Err(ErrorResponse::reply("database error")
.with_status(StatusCode::INTERNAL_SERVER_ERROR));
}
}; };
let verified = verify_str(user.password.as_str(), login.password.borrow()); let verified = verify_str(user.password.as_str(), login.password.borrow());