mirror of
https://github.com/atuinsh/atuin.git
synced 2024-11-22 16:23:54 +01:00
wip
This commit is contained in:
parent
59b29eb4aa
commit
4bd191c227
18
Cargo.lock
generated
18
Cargo.lock
generated
@ -57,6 +57,15 @@ dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atomic"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b88d82667eca772c4aa12f0f1348b3ae643424c8876448f3f7bd5787032e234c"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atty"
|
||||
version = "0.2.14"
|
||||
@ -119,7 +128,6 @@ dependencies = [
|
||||
"directories",
|
||||
"eyre",
|
||||
"fs-err",
|
||||
"hex",
|
||||
"interim",
|
||||
"itertools",
|
||||
"lazy_static",
|
||||
@ -134,7 +142,6 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_regex",
|
||||
"sha2",
|
||||
"shellexpand",
|
||||
"sodiumoxide",
|
||||
"sql-builder",
|
||||
@ -150,7 +157,9 @@ name = "atuin-common"
|
||||
version = "13.0.1"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"hex",
|
||||
"serde",
|
||||
"sha2",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
@ -2540,10 +2549,11 @@ checksum = "e8db7427f936968176eaa7cdf81b7f98b980b18495ec28f1b5791ac3bfe3eea9"
|
||||
|
||||
[[package]]
|
||||
name = "uuid"
|
||||
version = "1.2.1"
|
||||
version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "feb41e78f93363bb2df8b0e86a2ca30eed7806ea16ea0c790d757cf93f79be83"
|
||||
checksum = "1674845326ee10d37ca60470760d4288a6f80f304007d92e5c53bab78c9cfd79"
|
||||
dependencies = [
|
||||
"atomic",
|
||||
"getrandom",
|
||||
]
|
||||
|
||||
|
@ -12,15 +12,7 @@ repository = "https://github.com/ellie/atuin"
|
||||
|
||||
[features]
|
||||
default = ["sync"]
|
||||
sync = [
|
||||
"urlencoding",
|
||||
"sodiumoxide",
|
||||
"reqwest",
|
||||
"sha2",
|
||||
"hex",
|
||||
"rmp-serde",
|
||||
"base64",
|
||||
]
|
||||
sync = ["urlencoding", "sodiumoxide", "reqwest", "rmp-serde", "base64"]
|
||||
|
||||
[dependencies]
|
||||
atuin-common = { path = "../atuin-common", version = "13.0.1" }
|
||||
@ -60,8 +52,6 @@ reqwest = { version = "0.11", features = [
|
||||
"json",
|
||||
"rustls-tls-native-roots",
|
||||
], default-features = false, optional = true }
|
||||
hex = { version = "0.4", optional = true }
|
||||
sha2 = { version = "0.10", optional = true }
|
||||
rmp-serde = { version = "1.1.1", optional = true }
|
||||
base64 = { version = "0.20.0", optional = true }
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
|
@ -1,3 +1,5 @@
|
||||
-- CLIENT
|
||||
|
||||
create table if not exists events (
|
||||
id text primary key,
|
||||
timestamp integer not null,
|
||||
|
14
atuin-client/migrations/20230310184942_alter-events.sql
Normal file
14
atuin-client/migrations/20230310184942_alter-events.sql
Normal file
@ -0,0 +1,14 @@
|
||||
-- CLIENT
|
||||
|
||||
drop table events; -- we will rewrite it anyway
|
||||
|
||||
create table if not exists events (
|
||||
id text primary key,
|
||||
timestamp integer not null,
|
||||
hostname text not null,
|
||||
event_type text not null,
|
||||
|
||||
data blob not null,
|
||||
checksum text not null,
|
||||
previous text not null
|
||||
);
|
@ -130,19 +130,21 @@ impl Sqlite {
|
||||
|
||||
async fn save_event(tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>, e: &Event) -> Result<()> {
|
||||
let event_type = match e.event_type {
|
||||
EventType::Create => "create",
|
||||
EventType::Delete => "delete",
|
||||
EventType::CreateHistory => "create_history",
|
||||
EventType::DeleteHistory => "delete_history",
|
||||
};
|
||||
|
||||
sqlx::query(
|
||||
"insert or ignore into events(id, timestamp, hostname, event_type, history_id)
|
||||
values(?1, ?2, ?3, ?4, ?5)",
|
||||
"insert or ignore into events(id, timestamp, hostname, event_type, data, checksum, previous)
|
||||
values(?1, ?2, ?3, ?4, ?5, ?6, ?7)",
|
||||
)
|
||||
.bind(e.id.as_str())
|
||||
.bind(e.timestamp.timestamp_nanos())
|
||||
.bind(e.hostname.as_str())
|
||||
.bind(event_type)
|
||||
.bind(e.history_id.as_str())
|
||||
.bind(e.data.as_str())
|
||||
.bind(e.checksum.as_str())
|
||||
.bind(e.previous.as_str())
|
||||
.execute(tx)
|
||||
.await?;
|
||||
|
||||
@ -354,6 +356,7 @@ impl Database for Sqlite {
|
||||
// We can think of history as the merged log of events. There should never be more history than
|
||||
// events, and the only time this could happen is if someone is upgrading from an old Atuin version
|
||||
// from before we stored events.
|
||||
|
||||
let history_count = self.history_count().await?;
|
||||
let event_count = self.event_count().await?;
|
||||
|
||||
|
@ -3,7 +3,8 @@ use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::history::History;
|
||||
use atuin_common::api::EventType;
|
||||
use atuin_common::utils::uuid_v4;
|
||||
use atuin_common::utils::{hash_bytes, hash_str, uuid_v4};
|
||||
use eyre::Result;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, sqlx::FromRow)]
|
||||
pub struct Event {
|
||||
@ -12,31 +13,40 @@ pub struct Event {
|
||||
pub hostname: String,
|
||||
pub event_type: EventType,
|
||||
|
||||
pub history_id: String,
|
||||
pub data: Vec<u8>,
|
||||
pub previous: String,
|
||||
pub checksum: String,
|
||||
}
|
||||
|
||||
impl Event {
|
||||
pub fn new_create(history: &History) -> Event {
|
||||
Event {
|
||||
pub fn new_create_history(history: &History, previous: String) -> Result<Event> {
|
||||
let data = rmp_serde::to_vec(history)?;
|
||||
let checksum = hash_bytes(&data);
|
||||
|
||||
Ok(Event {
|
||||
id: uuid_v4(),
|
||||
timestamp: history.timestamp,
|
||||
hostname: history.hostname.clone(),
|
||||
event_type: EventType::Create,
|
||||
event_type: EventType::CreateHistory,
|
||||
|
||||
history_id: history.id.clone(),
|
||||
}
|
||||
data,
|
||||
previous,
|
||||
checksum,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn new_delete(history_id: &str) -> Event {
|
||||
pub fn new_delete_history(history_id: &str, previous: String) -> Event {
|
||||
let hostname = format!("{}:{}", whoami::hostname(), whoami::username());
|
||||
|
||||
Event {
|
||||
id: uuid_v4(),
|
||||
timestamp: chrono::Utc::now(),
|
||||
hostname,
|
||||
event_type: EventType::Create,
|
||||
event_type: EventType::DeleteHistory,
|
||||
|
||||
history_id: history_id.to_string(),
|
||||
data: history_id.as_bytes().to_owned(),
|
||||
checksum: hash_str(history_id),
|
||||
previous,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -9,6 +9,8 @@ pub mod api_client;
|
||||
pub mod encryption;
|
||||
#[cfg(feature = "sync")]
|
||||
pub mod sync;
|
||||
#[cfg(feature = "sync")]
|
||||
pub mod sync_event;
|
||||
|
||||
pub mod database;
|
||||
pub mod event;
|
||||
|
@ -3,7 +3,7 @@ use std::convert::TryInto;
|
||||
use chrono::prelude::*;
|
||||
use eyre::Result;
|
||||
|
||||
use atuin_common::api::AddHistoryRequest;
|
||||
use atuin_common::{api::AddHistoryRequest, utils::hash_str};
|
||||
|
||||
use crate::{
|
||||
api_client,
|
||||
@ -12,13 +12,6 @@ use crate::{
|
||||
settings::{Settings, HISTORY_PAGE_SIZE},
|
||||
};
|
||||
|
||||
pub fn hash_str(string: &str) -> String {
|
||||
use sha2::{Digest, Sha256};
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(string.as_bytes());
|
||||
hex::encode(hasher.finalize())
|
||||
}
|
||||
|
||||
// Currently sync is kinda naive, and basically just pages backwards through
|
||||
// history. This means newly added stuff shows up properly! We also just use
|
||||
// the total count in each database to indicate whether a sync is needed.
|
||||
|
153
atuin-client/src/sync_event.rs
Normal file
153
atuin-client/src/sync_event.rs
Normal file
@ -0,0 +1,153 @@
|
||||
use std::convert::TryInto;
|
||||
|
||||
use chrono::prelude::*;
|
||||
use eyre::Result;
|
||||
|
||||
use atuin_common::api::AddHistoryRequest;
|
||||
|
||||
use crate::{
|
||||
api_client,
|
||||
database::Database,
|
||||
encryption::{encrypt, load_encoded_key, load_key},
|
||||
settings::{Settings, HISTORY_PAGE_SIZE},
|
||||
};
|
||||
|
||||
// Currently sync is kinda naive, and basically just pages backwards through
|
||||
// history. This means newly added stuff shows up properly! We also just use
|
||||
// the total count in each database to indicate whether a sync is needed.
|
||||
// I think this could be massively improved! If we had a way of easily
|
||||
// indicating count per time period (hour, day, week, year, etc) then we can
|
||||
// easily pinpoint where we are missing data and what needs downloading. Start
|
||||
// with year, then find the week, then the day, then the hour, then download it
|
||||
// all! The current naive approach will do for now.
|
||||
|
||||
// Check if remote has things we don't, and if so, download them.
|
||||
// Returns (num downloaded, total local)
|
||||
async fn sync_download(
|
||||
force: bool,
|
||||
client: &api_client::Client<'_>,
|
||||
db: &mut (impl Database + Send),
|
||||
) -> Result<(i64, i64)> {
|
||||
debug!("starting sync download");
|
||||
|
||||
let remote_count = client.count().await?;
|
||||
|
||||
let initial_local = db.event_count().await?;
|
||||
let mut local_count = initial_local;
|
||||
|
||||
let mut last_sync = if force {
|
||||
Utc.timestamp_millis(0)
|
||||
} else {
|
||||
Settings::last_sync()?
|
||||
};
|
||||
|
||||
let mut last_timestamp = Utc.timestamp_millis(0);
|
||||
|
||||
let host = if force { Some(String::from("")) } else { None };
|
||||
|
||||
while remote_count > local_count {
|
||||
let page = client
|
||||
.get_event(last_sync, last_timestamp, host.clone())
|
||||
.await?;
|
||||
|
||||
db.save_bulk(&page).await?;
|
||||
|
||||
local_count = db.history_count().await?;
|
||||
|
||||
if page.len() < HISTORY_PAGE_SIZE.try_into().unwrap() {
|
||||
break;
|
||||
}
|
||||
|
||||
let page_last = page
|
||||
.last()
|
||||
.expect("could not get last element of page")
|
||||
.timestamp;
|
||||
|
||||
// in the case of a small sync frequency, it's possible for history to
|
||||
// be "lost" between syncs. In this case we need to rewind the sync
|
||||
// timestamps
|
||||
if page_last == last_timestamp {
|
||||
last_timestamp = Utc.timestamp_millis(0);
|
||||
last_sync -= chrono::Duration::hours(1);
|
||||
} else {
|
||||
last_timestamp = page_last;
|
||||
}
|
||||
}
|
||||
|
||||
Ok((local_count - initial_local, local_count))
|
||||
}
|
||||
|
||||
// Check if we have things remote doesn't, and if so, upload them
|
||||
async fn sync_upload(
|
||||
settings: &Settings,
|
||||
_force: bool,
|
||||
client: &api_client::Client<'_>,
|
||||
db: &mut (impl Database + Send),
|
||||
) -> Result<()> {
|
||||
debug!("starting sync upload");
|
||||
|
||||
let initial_remote_count = client.count().await?;
|
||||
let mut remote_count = initial_remote_count;
|
||||
|
||||
let local_count = db.history_count().await?;
|
||||
|
||||
debug!("remote has {}, we have {}", remote_count, local_count);
|
||||
|
||||
let key = load_key(settings)?; // encryption key
|
||||
|
||||
// first just try the most recent set
|
||||
|
||||
let mut cursor = Utc::now();
|
||||
|
||||
while local_count > remote_count {
|
||||
let last = db.before(cursor, HISTORY_PAGE_SIZE).await?;
|
||||
let mut buffer = Vec::new();
|
||||
|
||||
if last.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
for i in last {
|
||||
let data = encrypt(&i, &key)?;
|
||||
let data = serde_json::to_string(&data)?;
|
||||
|
||||
let add_hist = AddHistoryRequest {
|
||||
id: i.id,
|
||||
timestamp: i.timestamp,
|
||||
data,
|
||||
hostname: hash_str(&i.hostname),
|
||||
};
|
||||
|
||||
buffer.push(add_hist);
|
||||
}
|
||||
|
||||
// anything left over outside of the 100 block size
|
||||
client.post_history(&buffer).await?;
|
||||
cursor = buffer.last().unwrap().timestamp;
|
||||
remote_count = client.count().await?;
|
||||
|
||||
debug!("upload cursor: {:?}", cursor);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn sync(settings: &Settings, force: bool, db: &mut (impl Database + Send)) -> Result<()> {
|
||||
db.merge_events().await?;
|
||||
|
||||
let client = api_client::Client::new(
|
||||
&settings.sync_address,
|
||||
&settings.session_token,
|
||||
load_encoded_key(settings)?,
|
||||
)?;
|
||||
|
||||
sync_upload(settings, force, &client, db).await?;
|
||||
|
||||
let download = sync_download(force, &client, db).await?;
|
||||
|
||||
debug!("sync downloaded {}", download.0);
|
||||
|
||||
Settings::save_sync_time()?;
|
||||
|
||||
Ok(())
|
||||
}
|
@ -13,4 +13,6 @@ repository = "https://github.com/ellie/atuin"
|
||||
[dependencies]
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
serde = { version = "1.0.145", features = ["derive"] }
|
||||
uuid = { version = "1.2", features = ["v4"] }
|
||||
uuid = { version = "1.3.0", features = ["v7"] }
|
||||
sha2 = { version = "0.10" }
|
||||
hex = { version = "0.4" }
|
||||
|
@ -89,8 +89,8 @@ pub struct SyncEventResponse {
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub enum EventType {
|
||||
Create,
|
||||
Delete,
|
||||
CreateHistory,
|
||||
DeleteHistory,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for EventType {
|
||||
|
@ -0,0 +1 @@
|
||||
-- Add migration script here
|
Loading…
Reference in New Issue
Block a user