mirror of
https://github.com/atuinsh/atuin.git
synced 2025-08-08 23:05:24 +02:00
chore: move crates into crates/ dir (#1958)
I'd like to tidy up the root a little, and it's nice to have all the rust crates in one place
This commit is contained in:
23
crates/atuin-server-postgres/Cargo.toml
Normal file
23
crates/atuin-server-postgres/Cargo.toml
Normal file
@ -0,0 +1,23 @@
|
||||
[package]
|
||||
name = "atuin-server-postgres"
|
||||
edition = "2021"
|
||||
description = "server postgres database library for atuin"
|
||||
|
||||
version = { workspace = true }
|
||||
authors = { workspace = true }
|
||||
license = { workspace = true }
|
||||
homepage = { workspace = true }
|
||||
repository = { workspace = true }
|
||||
|
||||
[dependencies]
|
||||
atuin-common = { path = "../atuin-common", version = "18.2.0" }
|
||||
atuin-server-database = { path = "../atuin-server-database", version = "18.2.0" }
|
||||
|
||||
eyre = { workspace = true }
|
||||
tracing = "0.1"
|
||||
time = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
sqlx = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
futures-util = "0.3"
|
5
crates/atuin-server-postgres/build.rs
Normal file
5
crates/atuin-server-postgres/build.rs
Normal file
@ -0,0 +1,5 @@
|
||||
// generated by `sqlx migrate build-script`
|
||||
fn main() {
|
||||
// trigger recompilation when a new migration is added
|
||||
println!("cargo:rerun-if-changed=migrations");
|
||||
}
|
@ -0,0 +1,11 @@
|
||||
create table history (
|
||||
id bigserial primary key,
|
||||
client_id text not null unique, -- the client-generated ID
|
||||
user_id bigserial not null, -- allow multiple users
|
||||
hostname text not null, -- a unique identifier from the client (can be hashed, random, whatever)
|
||||
timestamp timestamp not null, -- one of the few non-encrypted metadatas
|
||||
|
||||
data varchar(8192) not null, -- store the actual history data, encrypted. I don't wanna know!
|
||||
|
||||
created_at timestamp not null default current_timestamp
|
||||
);
|
@ -0,0 +1,10 @@
|
||||
create table users (
|
||||
id bigserial primary key, -- also store our own ID
|
||||
username varchar(32) not null unique, -- being able to contact users is useful
|
||||
email varchar(128) not null unique, -- being able to contact users is useful
|
||||
password varchar(128) not null unique
|
||||
);
|
||||
|
||||
-- the prior index is case sensitive :(
|
||||
CREATE UNIQUE INDEX email_unique_idx on users (LOWER(email));
|
||||
CREATE UNIQUE INDEX username_unique_idx on users (LOWER(username));
|
@ -0,0 +1,6 @@
|
||||
-- Add migration script here
|
||||
create table sessions (
|
||||
id bigserial primary key,
|
||||
user_id bigserial,
|
||||
token varchar(128) unique not null
|
||||
);
|
@ -0,0 +1,51 @@
|
||||
-- Prior to this, the count endpoint was super naive and just ran COUNT(1).
|
||||
-- This is slow asf. Now that we have an amount of actual traffic,
|
||||
-- stop doing that!
|
||||
-- This basically maintains a count, so we can read ONE row, instead of ALL the
|
||||
-- rows. Much better.
|
||||
-- Future optimisation could use some sort of cache so we don't even need to hit
|
||||
-- postgres at all.
|
||||
|
||||
create table total_history_count_user(
|
||||
id bigserial primary key,
|
||||
user_id bigserial,
|
||||
total integer -- try and avoid using keywords - hence total, not count
|
||||
);
|
||||
|
||||
create or replace function user_history_count()
|
||||
returns trigger as
|
||||
$func$
|
||||
begin
|
||||
if (TG_OP='INSERT') then
|
||||
update total_history_count_user set total = total + 1 where user_id = new.user_id;
|
||||
|
||||
if not found then
|
||||
insert into total_history_count_user(user_id, total)
|
||||
values (
|
||||
new.user_id,
|
||||
(select count(1) from history where user_id = new.user_id)
|
||||
);
|
||||
end if;
|
||||
|
||||
elsif (TG_OP='DELETE') then
|
||||
update total_history_count_user set total = total - 1 where user_id = new.user_id;
|
||||
|
||||
if not found then
|
||||
insert into total_history_count_user(user_id, total)
|
||||
values (
|
||||
new.user_id,
|
||||
(select count(1) from history where user_id = new.user_id)
|
||||
);
|
||||
end if;
|
||||
end if;
|
||||
|
||||
return NEW; -- this is actually ignored for an after trigger, but oh well
|
||||
end;
|
||||
$func$
|
||||
language plpgsql volatile -- pldfplplpflh
|
||||
cost 100; -- default value
|
||||
|
||||
create trigger tg_user_history_count
|
||||
after insert or delete on history
|
||||
for each row
|
||||
execute procedure user_history_count();
|
@ -0,0 +1,35 @@
|
||||
-- the old version of this function used NEW in the delete part when it should
|
||||
-- use OLD
|
||||
|
||||
create or replace function user_history_count()
|
||||
returns trigger as
|
||||
$func$
|
||||
begin
|
||||
if (TG_OP='INSERT') then
|
||||
update total_history_count_user set total = total + 1 where user_id = new.user_id;
|
||||
|
||||
if not found then
|
||||
insert into total_history_count_user(user_id, total)
|
||||
values (
|
||||
new.user_id,
|
||||
(select count(1) from history where user_id = new.user_id)
|
||||
);
|
||||
end if;
|
||||
|
||||
elsif (TG_OP='DELETE') then
|
||||
update total_history_count_user set total = total - 1 where user_id = old.user_id;
|
||||
|
||||
if not found then
|
||||
insert into total_history_count_user(user_id, total)
|
||||
values (
|
||||
old.user_id,
|
||||
(select count(1) from history where user_id = old.user_id)
|
||||
);
|
||||
end if;
|
||||
end if;
|
||||
|
||||
return NEW; -- this is actually ignored for an after trigger, but oh well
|
||||
end;
|
||||
$func$
|
||||
language plpgsql volatile -- pldfplplpflh
|
||||
cost 100; -- default value
|
@ -0,0 +1,3 @@
|
||||
-- Make it 4x larger. Most commands are less than this, but as it's base64
|
||||
-- SOME are more than 8192. Should be enough for now.
|
||||
ALTER TABLE history ALTER COLUMN data TYPE varchar(32768);
|
@ -0,0 +1 @@
|
||||
alter table users add column created_at timestamp not null default now();
|
@ -0,0 +1,14 @@
|
||||
create type event_type as enum ('create', 'delete');
|
||||
|
||||
create table events (
|
||||
id bigserial primary key,
|
||||
client_id text not null unique, -- the client-generated ID
|
||||
user_id bigserial not null, -- allow multiple users
|
||||
hostname text not null, -- a unique identifier from the client (can be hashed, random, whatever)
|
||||
timestamp timestamp not null, -- one of the few non-encrypted metadatas
|
||||
|
||||
event_type event_type,
|
||||
data text not null, -- store the actual history data, encrypted. I don't wanna know!
|
||||
|
||||
created_at timestamp not null default current_timestamp
|
||||
);
|
@ -0,0 +1,2 @@
|
||||
-- Add migration script here
|
||||
alter table history alter column data type text;
|
@ -0,0 +1,2 @@
|
||||
-- Add migration script here
|
||||
drop table events;
|
@ -0,0 +1,5 @@
|
||||
-- Add migration script here
|
||||
alter table history add column if not exists deleted_at timestamp;
|
||||
|
||||
-- queries will all be selecting the ids of history for a user, that has been deleted
|
||||
create index if not exists history_deleted_index on history(client_id, user_id, deleted_at);
|
@ -0,0 +1,30 @@
|
||||
-- We do not need to run the trigger on deletes, as the only time we are deleting history is when the user
|
||||
-- has already been deleted
|
||||
-- This actually slows down deleting all the history a good bit!
|
||||
|
||||
create or replace function user_history_count()
|
||||
returns trigger as
|
||||
$func$
|
||||
begin
|
||||
if (TG_OP='INSERT') then
|
||||
update total_history_count_user set total = total + 1 where user_id = new.user_id;
|
||||
|
||||
if not found then
|
||||
insert into total_history_count_user(user_id, total)
|
||||
values (
|
||||
new.user_id,
|
||||
(select count(1) from history where user_id = new.user_id)
|
||||
);
|
||||
end if;
|
||||
end if;
|
||||
|
||||
return NEW; -- this is actually ignored for an after trigger, but oh well
|
||||
end;
|
||||
$func$
|
||||
language plpgsql volatile -- pldfplplpflh
|
||||
cost 100; -- default value
|
||||
|
||||
create or replace trigger tg_user_history_count
|
||||
after insert on history
|
||||
for each row
|
||||
execute procedure user_history_count();
|
@ -0,0 +1,15 @@
|
||||
-- Add migration script here
|
||||
create table records (
|
||||
id uuid primary key, -- remember to use uuidv7 for happy indices <3
|
||||
client_id uuid not null, -- I am too uncomfortable with the idea of a client-generated primary key
|
||||
host uuid not null, -- a unique identifier for the host
|
||||
parent uuid default null, -- the ID of the parent record, bearing in mind this is a linked list
|
||||
timestamp bigint not null, -- not a timestamp type, as those do not have nanosecond precision
|
||||
version text not null,
|
||||
tag text not null, -- what is this? history, kv, whatever. Remember clients get a log per tag per host
|
||||
data text not null, -- store the actual history data, encrypted. I don't wanna know!
|
||||
cek text not null,
|
||||
|
||||
user_id bigint not null, -- allow multiple users
|
||||
created_at timestamp not null default current_timestamp
|
||||
);
|
@ -0,0 +1,15 @@
|
||||
-- Add migration script here
|
||||
create table store (
|
||||
id uuid primary key, -- remember to use uuidv7 for happy indices <3
|
||||
client_id uuid not null, -- I am too uncomfortable with the idea of a client-generated primary key, even though it's fine mathematically
|
||||
host uuid not null, -- a unique identifier for the host
|
||||
idx bigint not null, -- the index of the record in this store, identified by (host, tag)
|
||||
timestamp bigint not null, -- not a timestamp type, as those do not have nanosecond precision
|
||||
version text not null,
|
||||
tag text not null, -- what is this? history, kv, whatever. Remember clients get a log per tag per host
|
||||
data text not null, -- store the actual history data, encrypted. I don't wanna know!
|
||||
cek text not null,
|
||||
|
||||
user_id bigint not null, -- allow multiple users
|
||||
created_at timestamp not null default current_timestamp
|
||||
);
|
@ -0,0 +1,2 @@
|
||||
-- Add migration script here
|
||||
create unique index record_uniq ON store(user_id, host, tag, idx);
|
@ -0,0 +1,4 @@
|
||||
-- Add migration script here
|
||||
alter table history alter column user_id drop default;
|
||||
alter table sessions alter column user_id drop default;
|
||||
alter table total_history_count_user alter column user_id drop default;
|
538
crates/atuin-server-postgres/src/lib.rs
Normal file
538
crates/atuin-server-postgres/src/lib.rs
Normal file
@ -0,0 +1,538 @@
|
||||
use std::ops::Range;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use atuin_common::record::{EncryptedData, HostId, Record, RecordIdx, RecordStatus};
|
||||
use atuin_server_database::models::{History, NewHistory, NewSession, NewUser, Session, User};
|
||||
use atuin_server_database::{Database, DbError, DbResult};
|
||||
use futures_util::TryStreamExt;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::postgres::PgPoolOptions;
|
||||
use sqlx::Row;
|
||||
|
||||
use time::{OffsetDateTime, PrimitiveDateTime, UtcOffset};
|
||||
use tracing::instrument;
|
||||
use uuid::Uuid;
|
||||
use wrappers::{DbHistory, DbRecord, DbSession, DbUser};
|
||||
|
||||
mod wrappers;
|
||||
|
||||
const MIN_PG_VERSION: u32 = 14;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Postgres {
|
||||
pool: sqlx::Pool<sqlx::postgres::Postgres>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
pub struct PostgresSettings {
|
||||
pub db_uri: String,
|
||||
}
|
||||
|
||||
fn fix_error(error: sqlx::Error) -> DbError {
|
||||
match error {
|
||||
sqlx::Error::RowNotFound => DbError::NotFound,
|
||||
error => DbError::Other(error.into()),
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Database for Postgres {
|
||||
type Settings = PostgresSettings;
|
||||
async fn new(settings: &PostgresSettings) -> DbResult<Self> {
|
||||
let pool = PgPoolOptions::new()
|
||||
.max_connections(100)
|
||||
.connect(settings.db_uri.as_str())
|
||||
.await
|
||||
.map_err(fix_error)?;
|
||||
|
||||
// Call server_version_num to get the DB server's major version number
|
||||
// The call returns None for servers older than 8.x.
|
||||
let pg_major_version: u32 = pool
|
||||
.acquire()
|
||||
.await
|
||||
.map_err(fix_error)?
|
||||
.server_version_num()
|
||||
.ok_or(DbError::Other(eyre::Report::msg(
|
||||
"could not get PostgreSQL version",
|
||||
)))?
|
||||
/ 10000;
|
||||
|
||||
if pg_major_version < MIN_PG_VERSION {
|
||||
return Err(DbError::Other(eyre::Report::msg(format!(
|
||||
"unsupported PostgreSQL version {}, minimum required is {}",
|
||||
pg_major_version, MIN_PG_VERSION
|
||||
))));
|
||||
}
|
||||
|
||||
sqlx::migrate!("./migrations")
|
||||
.run(&pool)
|
||||
.await
|
||||
.map_err(|error| DbError::Other(error.into()))?;
|
||||
|
||||
Ok(Self { pool })
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn get_session(&self, token: &str) -> DbResult<Session> {
|
||||
sqlx::query_as("select id, user_id, token from sessions where token = $1")
|
||||
.bind(token)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(fix_error)
|
||||
.map(|DbSession(session)| session)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn get_user(&self, username: &str) -> DbResult<User> {
|
||||
sqlx::query_as("select id, username, email, password from users where username = $1")
|
||||
.bind(username)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(fix_error)
|
||||
.map(|DbUser(user)| user)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn get_session_user(&self, token: &str) -> DbResult<User> {
|
||||
sqlx::query_as(
|
||||
"select users.id, users.username, users.email, users.password from users
|
||||
inner join sessions
|
||||
on users.id = sessions.user_id
|
||||
and sessions.token = $1",
|
||||
)
|
||||
.bind(token)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(fix_error)
|
||||
.map(|DbUser(user)| user)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn count_history(&self, user: &User) -> DbResult<i64> {
|
||||
// The cache is new, and the user might not yet have a cache value.
|
||||
// They will have one as soon as they post up some new history, but handle that
|
||||
// edge case.
|
||||
|
||||
let res: (i64,) = sqlx::query_as(
|
||||
"select count(1) from history
|
||||
where user_id = $1",
|
||||
)
|
||||
.bind(user.id)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(fix_error)?;
|
||||
|
||||
Ok(res.0)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn total_history(&self) -> DbResult<i64> {
|
||||
// The cache is new, and the user might not yet have a cache value.
|
||||
// They will have one as soon as they post up some new history, but handle that
|
||||
// edge case.
|
||||
|
||||
let res: (i64,) = sqlx::query_as("select sum(total) from total_history_count_user")
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(fix_error)?
|
||||
.unwrap_or((0,));
|
||||
|
||||
Ok(res.0)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn count_history_cached(&self, user: &User) -> DbResult<i64> {
|
||||
let res: (i32,) = sqlx::query_as(
|
||||
"select total from total_history_count_user
|
||||
where user_id = $1",
|
||||
)
|
||||
.bind(user.id)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(fix_error)?;
|
||||
|
||||
Ok(res.0 as i64)
|
||||
}
|
||||
|
||||
async fn delete_store(&self, user: &User) -> DbResult<()> {
|
||||
sqlx::query(
|
||||
"delete from store
|
||||
where user_id = $1",
|
||||
)
|
||||
.bind(user.id)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(fix_error)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete_history(&self, user: &User, id: String) -> DbResult<()> {
|
||||
sqlx::query(
|
||||
"update history
|
||||
set deleted_at = $3
|
||||
where user_id = $1
|
||||
and client_id = $2
|
||||
and deleted_at is null", // don't just keep setting it
|
||||
)
|
||||
.bind(user.id)
|
||||
.bind(id)
|
||||
.bind(OffsetDateTime::now_utc())
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(fix_error)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn deleted_history(&self, user: &User) -> DbResult<Vec<String>> {
|
||||
// The cache is new, and the user might not yet have a cache value.
|
||||
// They will have one as soon as they post up some new history, but handle that
|
||||
// edge case.
|
||||
|
||||
let res = sqlx::query(
|
||||
"select client_id from history
|
||||
where user_id = $1
|
||||
and deleted_at is not null",
|
||||
)
|
||||
.bind(user.id)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(fix_error)?;
|
||||
|
||||
let res = res
|
||||
.iter()
|
||||
.map(|row| row.get::<String, _>("client_id"))
|
||||
.collect();
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn count_history_range(
|
||||
&self,
|
||||
user: &User,
|
||||
range: Range<OffsetDateTime>,
|
||||
) -> DbResult<i64> {
|
||||
let res: (i64,) = sqlx::query_as(
|
||||
"select count(1) from history
|
||||
where user_id = $1
|
||||
and timestamp >= $2::date
|
||||
and timestamp < $3::date",
|
||||
)
|
||||
.bind(user.id)
|
||||
.bind(into_utc(range.start))
|
||||
.bind(into_utc(range.end))
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(fix_error)?;
|
||||
|
||||
Ok(res.0)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn list_history(
|
||||
&self,
|
||||
user: &User,
|
||||
created_after: OffsetDateTime,
|
||||
since: OffsetDateTime,
|
||||
host: &str,
|
||||
page_size: i64,
|
||||
) -> DbResult<Vec<History>> {
|
||||
let res = sqlx::query_as(
|
||||
"select id, client_id, user_id, hostname, timestamp, data, created_at from history
|
||||
where user_id = $1
|
||||
and hostname != $2
|
||||
and created_at >= $3
|
||||
and timestamp >= $4
|
||||
order by timestamp asc
|
||||
limit $5",
|
||||
)
|
||||
.bind(user.id)
|
||||
.bind(host)
|
||||
.bind(into_utc(created_after))
|
||||
.bind(into_utc(since))
|
||||
.bind(page_size)
|
||||
.fetch(&self.pool)
|
||||
.map_ok(|DbHistory(h)| h)
|
||||
.try_collect()
|
||||
.await
|
||||
.map_err(fix_error)?;
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn add_history(&self, history: &[NewHistory]) -> DbResult<()> {
|
||||
let mut tx = self.pool.begin().await.map_err(fix_error)?;
|
||||
|
||||
for i in history {
|
||||
let client_id: &str = &i.client_id;
|
||||
let hostname: &str = &i.hostname;
|
||||
let data: &str = &i.data;
|
||||
|
||||
sqlx::query(
|
||||
"insert into history
|
||||
(client_id, user_id, hostname, timestamp, data)
|
||||
values ($1, $2, $3, $4, $5)
|
||||
on conflict do nothing
|
||||
",
|
||||
)
|
||||
.bind(client_id)
|
||||
.bind(i.user_id)
|
||||
.bind(hostname)
|
||||
.bind(i.timestamp)
|
||||
.bind(data)
|
||||
.execute(&mut *tx)
|
||||
.await
|
||||
.map_err(fix_error)?;
|
||||
}
|
||||
|
||||
tx.commit().await.map_err(fix_error)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn delete_user(&self, u: &User) -> DbResult<()> {
|
||||
sqlx::query("delete from sessions where user_id = $1")
|
||||
.bind(u.id)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(fix_error)?;
|
||||
|
||||
sqlx::query("delete from users where id = $1")
|
||||
.bind(u.id)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(fix_error)?;
|
||||
|
||||
sqlx::query("delete from history where user_id = $1")
|
||||
.bind(u.id)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(fix_error)?;
|
||||
|
||||
sqlx::query("delete from total_history_count_user where user_id = $1")
|
||||
.bind(u.id)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(fix_error)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn update_user_password(&self, user: &User) -> DbResult<()> {
|
||||
sqlx::query(
|
||||
"update users
|
||||
set password = $1
|
||||
where id = $2",
|
||||
)
|
||||
.bind(&user.password)
|
||||
.bind(user.id)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(fix_error)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn add_user(&self, user: &NewUser) -> DbResult<i64> {
|
||||
let email: &str = &user.email;
|
||||
let username: &str = &user.username;
|
||||
let password: &str = &user.password;
|
||||
|
||||
let res: (i64,) = sqlx::query_as(
|
||||
"insert into users
|
||||
(username, email, password)
|
||||
values($1, $2, $3)
|
||||
returning id",
|
||||
)
|
||||
.bind(username)
|
||||
.bind(email)
|
||||
.bind(password)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(fix_error)?;
|
||||
|
||||
Ok(res.0)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn add_session(&self, session: &NewSession) -> DbResult<()> {
|
||||
let token: &str = &session.token;
|
||||
|
||||
sqlx::query(
|
||||
"insert into sessions
|
||||
(user_id, token)
|
||||
values($1, $2)",
|
||||
)
|
||||
.bind(session.user_id)
|
||||
.bind(token)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(fix_error)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn get_user_session(&self, u: &User) -> DbResult<Session> {
|
||||
sqlx::query_as("select id, user_id, token from sessions where user_id = $1")
|
||||
.bind(u.id)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(fix_error)
|
||||
.map(|DbSession(session)| session)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn oldest_history(&self, user: &User) -> DbResult<History> {
|
||||
sqlx::query_as(
|
||||
"select id, client_id, user_id, hostname, timestamp, data, created_at from history
|
||||
where user_id = $1
|
||||
order by timestamp asc
|
||||
limit 1",
|
||||
)
|
||||
.bind(user.id)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(fix_error)
|
||||
.map(|DbHistory(h)| h)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn add_records(&self, user: &User, records: &[Record<EncryptedData>]) -> DbResult<()> {
|
||||
let mut tx = self.pool.begin().await.map_err(fix_error)?;
|
||||
|
||||
for i in records {
|
||||
let id = atuin_common::utils::uuid_v7();
|
||||
|
||||
sqlx::query(
|
||||
"insert into store
|
||||
(id, client_id, host, idx, timestamp, version, tag, data, cek, user_id)
|
||||
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
||||
on conflict do nothing
|
||||
",
|
||||
)
|
||||
.bind(id)
|
||||
.bind(i.id)
|
||||
.bind(i.host.id)
|
||||
.bind(i.idx as i64)
|
||||
.bind(i.timestamp as i64) // throwing away some data, but i64 is still big in terms of time
|
||||
.bind(&i.version)
|
||||
.bind(&i.tag)
|
||||
.bind(&i.data.data)
|
||||
.bind(&i.data.content_encryption_key)
|
||||
.bind(user.id)
|
||||
.execute(&mut *tx)
|
||||
.await
|
||||
.map_err(fix_error)?;
|
||||
}
|
||||
|
||||
tx.commit().await.map_err(fix_error)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn next_records(
|
||||
&self,
|
||||
user: &User,
|
||||
host: HostId,
|
||||
tag: String,
|
||||
start: Option<RecordIdx>,
|
||||
count: u64,
|
||||
) -> DbResult<Vec<Record<EncryptedData>>> {
|
||||
tracing::debug!("{:?} - {:?} - {:?}", host, tag, start);
|
||||
let start = start.unwrap_or(0);
|
||||
|
||||
let records: Result<Vec<DbRecord>, DbError> = sqlx::query_as(
|
||||
"select client_id, host, idx, timestamp, version, tag, data, cek from store
|
||||
where user_id = $1
|
||||
and tag = $2
|
||||
and host = $3
|
||||
and idx >= $4
|
||||
order by idx asc
|
||||
limit $5",
|
||||
)
|
||||
.bind(user.id)
|
||||
.bind(tag.clone())
|
||||
.bind(host)
|
||||
.bind(start as i64)
|
||||
.bind(count as i64)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(fix_error);
|
||||
|
||||
let ret = match records {
|
||||
Ok(records) => {
|
||||
let records: Vec<Record<EncryptedData>> = records
|
||||
.into_iter()
|
||||
.map(|f| {
|
||||
let record: Record<EncryptedData> = f.into();
|
||||
record
|
||||
})
|
||||
.collect();
|
||||
|
||||
records
|
||||
}
|
||||
Err(DbError::NotFound) => {
|
||||
tracing::debug!("no records found in store: {:?}/{}", host, tag);
|
||||
return Ok(vec![]);
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
async fn status(&self, user: &User) -> DbResult<RecordStatus> {
|
||||
const STATUS_SQL: &str =
|
||||
"select host, tag, max(idx) from store where user_id = $1 group by host, tag";
|
||||
|
||||
let res: Vec<(Uuid, String, i64)> = sqlx::query_as(STATUS_SQL)
|
||||
.bind(user.id)
|
||||
.fetch_all(&self.pool)
|
||||
.await
|
||||
.map_err(fix_error)?;
|
||||
|
||||
let mut status = RecordStatus::new();
|
||||
|
||||
for i in res {
|
||||
status.set_raw(HostId(i.0), i.1, i.2 as u64);
|
||||
}
|
||||
|
||||
Ok(status)
|
||||
}
|
||||
}
|
||||
|
||||
fn into_utc(x: OffsetDateTime) -> PrimitiveDateTime {
|
||||
let x = x.to_offset(UtcOffset::UTC);
|
||||
PrimitiveDateTime::new(x.date(), x.time())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use time::macros::datetime;
|
||||
|
||||
use crate::into_utc;
|
||||
|
||||
#[test]
|
||||
fn utc() {
|
||||
let dt = datetime!(2023-09-26 15:11:02 +05:30);
|
||||
assert_eq!(into_utc(dt), datetime!(2023-09-26 09:41:02));
|
||||
assert_eq!(into_utc(dt).assume_utc(), dt);
|
||||
|
||||
let dt = datetime!(2023-09-26 15:11:02 -07:00);
|
||||
assert_eq!(into_utc(dt), datetime!(2023-09-26 22:11:02));
|
||||
assert_eq!(into_utc(dt).assume_utc(), dt);
|
||||
|
||||
let dt = datetime!(2023-09-26 15:11:02 +00:00);
|
||||
assert_eq!(into_utc(dt), datetime!(2023-09-26 15:11:02));
|
||||
assert_eq!(into_utc(dt).assume_utc(), dt);
|
||||
}
|
||||
}
|
77
crates/atuin-server-postgres/src/wrappers.rs
Normal file
77
crates/atuin-server-postgres/src/wrappers.rs
Normal file
@ -0,0 +1,77 @@
|
||||
use ::sqlx::{FromRow, Result};
|
||||
use atuin_common::record::{EncryptedData, Host, Record};
|
||||
use atuin_server_database::models::{History, Session, User};
|
||||
use sqlx::{postgres::PgRow, Row};
|
||||
use time::PrimitiveDateTime;
|
||||
|
||||
pub struct DbUser(pub User);
|
||||
pub struct DbSession(pub Session);
|
||||
pub struct DbHistory(pub History);
|
||||
pub struct DbRecord(pub Record<EncryptedData>);
|
||||
|
||||
impl<'a> FromRow<'a, PgRow> for DbUser {
|
||||
fn from_row(row: &'a PgRow) -> Result<Self> {
|
||||
Ok(Self(User {
|
||||
id: row.try_get("id")?,
|
||||
username: row.try_get("username")?,
|
||||
email: row.try_get("email")?,
|
||||
password: row.try_get("password")?,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> ::sqlx::FromRow<'a, PgRow> for DbSession {
|
||||
fn from_row(row: &'a PgRow) -> ::sqlx::Result<Self> {
|
||||
Ok(Self(Session {
|
||||
id: row.try_get("id")?,
|
||||
user_id: row.try_get("user_id")?,
|
||||
token: row.try_get("token")?,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> ::sqlx::FromRow<'a, PgRow> for DbHistory {
|
||||
fn from_row(row: &'a PgRow) -> ::sqlx::Result<Self> {
|
||||
Ok(Self(History {
|
||||
id: row.try_get("id")?,
|
||||
client_id: row.try_get("client_id")?,
|
||||
user_id: row.try_get("user_id")?,
|
||||
hostname: row.try_get("hostname")?,
|
||||
timestamp: row
|
||||
.try_get::<PrimitiveDateTime, _>("timestamp")?
|
||||
.assume_utc(),
|
||||
data: row.try_get("data")?,
|
||||
created_at: row
|
||||
.try_get::<PrimitiveDateTime, _>("created_at")?
|
||||
.assume_utc(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> ::sqlx::FromRow<'a, PgRow> for DbRecord {
|
||||
fn from_row(row: &'a PgRow) -> ::sqlx::Result<Self> {
|
||||
let timestamp: i64 = row.try_get("timestamp")?;
|
||||
let idx: i64 = row.try_get("idx")?;
|
||||
|
||||
let data = EncryptedData {
|
||||
data: row.try_get("data")?,
|
||||
content_encryption_key: row.try_get("cek")?,
|
||||
};
|
||||
|
||||
Ok(Self(Record {
|
||||
id: row.try_get("client_id")?,
|
||||
host: Host::new(row.try_get("host")?),
|
||||
idx: idx as u64,
|
||||
timestamp: timestamp as u64,
|
||||
version: row.try_get("version")?,
|
||||
tag: row.try_get("tag")?,
|
||||
data,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DbRecord> for Record<EncryptedData> {
|
||||
fn from(other: DbRecord) -> Record<EncryptedData> {
|
||||
Record { ..other.0 }
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user