mirror of
https://github.com/atuinsh/atuin.git
synced 2025-06-20 18:07:57 +02:00
fix: add incremental rebuild to daemon loop (#2010)
This commit is contained in:
parent
7067720057
commit
0da534d524
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -322,6 +322,8 @@ name = "atuin-daemon"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"atuin-client",
|
"atuin-client",
|
||||||
|
"atuin-dotfiles",
|
||||||
|
"atuin-history",
|
||||||
"dashmap",
|
"dashmap",
|
||||||
"eyre",
|
"eyre",
|
||||||
"prost",
|
"prost",
|
||||||
|
@ -120,7 +120,7 @@ pub trait Database: Send + Sync + 'static {
|
|||||||
|
|
||||||
// Intended for use on a developer machine and not a sync server.
|
// Intended for use on a developer machine and not a sync server.
|
||||||
// TODO: implement IntoIterator
|
// TODO: implement IntoIterator
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Sqlite {
|
pub struct Sqlite {
|
||||||
pub pool: SqlitePool,
|
pub pool: SqlitePool,
|
||||||
}
|
}
|
||||||
|
@ -12,7 +12,7 @@ use atuin_common::record::{DecryptedData, Host, HostId, Record, RecordId, Record
|
|||||||
|
|
||||||
use super::{History, HistoryId, HISTORY_TAG, HISTORY_VERSION};
|
use super::{History, HistoryId, HISTORY_TAG, HISTORY_VERSION};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct HistoryStore {
|
pub struct HistoryStore {
|
||||||
pub store: SqliteStore,
|
pub store: SqliteStore,
|
||||||
pub host_id: HostId,
|
pub host_id: HostId,
|
||||||
|
@ -13,6 +13,8 @@ readme.workspace = true
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
atuin-client = { path = "../atuin-client", version = "18.0.1" }
|
atuin-client = { path = "../atuin-client", version = "18.0.1" }
|
||||||
|
atuin-dotfiles = { path = "../atuin-dotfiles", version = "0.2.0" }
|
||||||
|
atuin-history = { path = "../atuin-history", version = "0.1.0" }
|
||||||
|
|
||||||
time = { workspace = true }
|
time = { workspace = true }
|
||||||
uuid = { workspace = true }
|
uuid = { workspace = true }
|
||||||
|
@ -166,7 +166,7 @@ pub async fn listen(
|
|||||||
let host_id = Settings::host_id().expect("failed to get host_id");
|
let host_id = Settings::host_id().expect("failed to get host_id");
|
||||||
let history_store = HistoryStore::new(store.clone(), host_id, encryption_key);
|
let history_store = HistoryStore::new(store.clone(), host_id, encryption_key);
|
||||||
|
|
||||||
let history = HistoryService::new(history_store, history_db);
|
let history = HistoryService::new(history_store.clone(), history_db.clone());
|
||||||
|
|
||||||
let socket = settings.daemon.socket_path.clone();
|
let socket = settings.daemon.socket_path.clone();
|
||||||
let uds = UnixListener::bind(socket.clone())?;
|
let uds = UnixListener::bind(socket.clone())?;
|
||||||
@ -175,7 +175,12 @@ pub async fn listen(
|
|||||||
tracing::info!("listening on unix socket {:?}", socket);
|
tracing::info!("listening on unix socket {:?}", socket);
|
||||||
|
|
||||||
// start services
|
// start services
|
||||||
tokio::spawn(sync::worker(settings.clone(), store));
|
tokio::spawn(sync::worker(
|
||||||
|
settings.clone(),
|
||||||
|
store,
|
||||||
|
history_store,
|
||||||
|
history_db,
|
||||||
|
));
|
||||||
|
|
||||||
Server::builder()
|
Server::builder()
|
||||||
.add_service(HistoryServer::new(history))
|
.add_service(HistoryServer::new(history))
|
||||||
|
@ -2,14 +2,32 @@ use eyre::Result;
|
|||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use tokio::time::{self, MissedTickBehavior};
|
use tokio::time::{self, MissedTickBehavior};
|
||||||
|
|
||||||
|
use atuin_client::database::Sqlite as HistoryDatabase;
|
||||||
use atuin_client::{
|
use atuin_client::{
|
||||||
|
encryption,
|
||||||
|
history::store::HistoryStore,
|
||||||
record::{sqlite_store::SqliteStore, sync},
|
record::{sqlite_store::SqliteStore, sync},
|
||||||
settings::Settings,
|
settings::Settings,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub async fn worker(settings: Settings, store: SqliteStore) -> Result<()> {
|
use atuin_dotfiles::store::{var::VarStore, AliasStore};
|
||||||
|
|
||||||
|
pub async fn worker(
|
||||||
|
settings: Settings,
|
||||||
|
store: SqliteStore,
|
||||||
|
history_store: HistoryStore,
|
||||||
|
history_db: HistoryDatabase,
|
||||||
|
) -> Result<()> {
|
||||||
tracing::info!("booting sync worker");
|
tracing::info!("booting sync worker");
|
||||||
|
|
||||||
|
let encryption_key: [u8; 32] = encryption::load_key(&settings)?.into();
|
||||||
|
let host_id = Settings::host_id().expect("failed to get host_id");
|
||||||
|
let alias_store = AliasStore::new(store.clone(), host_id, encryption_key);
|
||||||
|
let var_store = VarStore::new(store.clone(), host_id, encryption_key);
|
||||||
|
|
||||||
|
// Don't backoff by more than 30 mins (with a random jitter of up to 1 min)
|
||||||
|
let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0);
|
||||||
|
|
||||||
let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
|
let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
|
||||||
|
|
||||||
// IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed,
|
// IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed,
|
||||||
@ -24,13 +42,13 @@ pub async fn worker(settings: Settings, store: SqliteStore) -> Result<()> {
|
|||||||
|
|
||||||
if let Err(e) = res {
|
if let Err(e) = res {
|
||||||
tracing::error!("sync tick failed with {e}");
|
tracing::error!("sync tick failed with {e}");
|
||||||
|
|
||||||
let mut rng = rand::thread_rng();
|
let mut rng = rand::thread_rng();
|
||||||
|
|
||||||
let new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2);
|
let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2);
|
||||||
|
|
||||||
// Don't backoff by more than 30 mins
|
if new_interval > max_interval {
|
||||||
if new_interval > 60.0 * 30.0 {
|
new_interval = max_interval;
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ticker = time::interval(time::Duration::from_secs(new_interval as u64));
|
ticker = time::interval(time::Duration::from_secs(new_interval as u64));
|
||||||
@ -46,6 +64,13 @@ pub async fn worker(settings: Settings, store: SqliteStore) -> Result<()> {
|
|||||||
"sync complete"
|
"sync complete"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
history_store
|
||||||
|
.incremental_build(&history_db, &downloaded)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
alias_store.build().await?;
|
||||||
|
var_store.build().await?;
|
||||||
|
|
||||||
// Reset backoff on success
|
// Reset backoff on success
|
||||||
if ticker.period().as_secs() != settings.daemon.sync_frequency {
|
if ticker.period().as_secs() != settings.daemon.sync_frequency {
|
||||||
ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
|
ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
|
||||||
|
Loading…
x
Reference in New Issue
Block a user