From 06a4bf0bcbcf3ec4ce367c9e764a080eef078f94 Mon Sep 17 00:00:00 2001 From: David Dworken Date: Sat, 17 Dec 2022 21:27:00 -0800 Subject: [PATCH] Add code to deep clean the DB to remove entries from people with 1 device that haven't been active in at least 90 days --- backend/server/server.go | 113 +++++++++++++++++++++++++--------- backend/server/server_test.go | 22 +++++++ 2 files changed, 105 insertions(+), 30 deletions(-) diff --git a/backend/server/server.go b/backend/server/server.go index e5ee37a..b9659de 100644 --- a/backend/server/server.go +++ b/backend/server/server.go @@ -549,11 +549,11 @@ func init() { func cron(ctx context.Context) error { err := updateReleaseVersion() if err != nil { - fmt.Println(err) + panic(err) } err = cleanDatabase(ctx) if err != nil { - fmt.Println(err) + panic(err) } return nil } @@ -802,39 +802,92 @@ func byteCountToString(b int) string { } func cleanDatabase(ctx context.Context) error { - checkGormResult(GLOBAL_DB.WithContext(ctx).Exec("DELETE FROM enc_history_entries WHERE read_count > 10")) - checkGormResult(GLOBAL_DB.WithContext(ctx).Exec("DELETE FROM deletion_requests WHERE read_count > 100")) - // TODO(optimization): Clean the database by deleting entries for users that haven't been used in X amount of time + r := GLOBAL_DB.WithContext(ctx).Exec("DELETE FROM enc_history_entries WHERE read_count > 10") + if r.Error != nil { + return r.Error + } + r = GLOBAL_DB.WithContext(ctx).Exec("DELETE FROM deletion_requests WHERE read_count > 100") + if r.Error != nil { + return r.Error + } return nil } +func deepCleanDatabase(ctx context.Context) { + err := GLOBAL_DB.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + r := tx.Exec(` + CREATE TEMP TABLE temp_users_with_one_device AS ( + SELECT user_id + FROM devices + GROUP BY user_id + HAVING COUNT(DISTINCT device_id) > 1 + ) + `) + if r.Error != nil { + return r.Error + } + r = tx.Exec(` + CREATE TEMP TABLE temp_inactive_users AS ( + SELECT user_id + FROM usage_data + WHERE last_used <= (now() - INTERVAL '90 days') + ) + `) + if r.Error != nil { + return r.Error + } + r = tx.Exec(` + SELECT COUNT(*) FROM enc_history_entries WHERE + date <= (now() - INTERVAL '90 days') + AND user_id IN (SELECT * FROM temp_users_with_one_device) + AND user_id IN (SELECT * FROM temp_inactive_users) + `) + if r.Error != nil { + return r.Error + } + fmt.Printf("Ran deep clean and deleted %d rows\n", r.RowsAffected) + return nil + }) + if err != nil { + panic(fmt.Errorf("failed to deep clean DB: %v", err)) + } +} + +func configureObservability() func() { + err := profiler.Start( + profiler.WithService("hishtory-api"), + profiler.WithVersion(ReleaseVersion), + profiler.WithAPIKey(os.Getenv("DD_API_KEY")), + profiler.WithUDS("/var/run/datadog/apm.socket"), + profiler.WithProfileTypes( + profiler.CPUProfile, + profiler.HeapProfile, + ), + ) + if err != nil { + fmt.Printf("Failed to start DataDog profiler: %v\n", err) + } + tracer.Start( + tracer.WithRuntimeMetrics(), + tracer.WithService("hishtory-api"), + tracer.WithUDS("/var/run/datadog/apm.socket"), + ) + defer tracer.Stop() + ddStats, err := statsd.New("unix:///var/run/datadog/dsd.socket") + if err != nil { + fmt.Printf("Failed to start DataDog statsd: %v\n", err) + } + GLOBAL_STATSD = ddStats + return func() { + profiler.Stop() + tracer.Stop() + } +} + func main() { if isProductionEnvironment() { - err := profiler.Start( - profiler.WithService("hishtory-api"), - profiler.WithVersion(ReleaseVersion), - profiler.WithAPIKey(os.Getenv("DD_API_KEY")), - profiler.WithUDS("/var/run/datadog/apm.socket"), - profiler.WithProfileTypes( - profiler.CPUProfile, - profiler.HeapProfile, - ), - ) - if err != nil { - fmt.Printf("Failed to start DataDog profiler: %v\n", err) - } - defer profiler.Stop() - tracer.Start( - tracer.WithRuntimeMetrics(), - tracer.WithService("hishtory-api"), - tracer.WithUDS("/var/run/datadog/apm.socket"), - ) - defer tracer.Stop() - ddStats, err := statsd.New("unix:///var/run/datadog/dsd.socket") - if err != nil { - fmt.Printf("Failed to start DataDog statsd: %v\n", err) - } - GLOBAL_STATSD = ddStats + go configureObservability()() + go deepCleanDatabase(context.Background()) } mux := httptrace.NewServeMux() diff --git a/backend/server/server_test.go b/backend/server/server_test.go index a679612..c9a0128 100644 --- a/backend/server/server_test.go +++ b/backend/server/server_test.go @@ -545,6 +545,28 @@ func TestLimitRegistrations(t *testing.T) { t.Errorf("expected panic") } +func TestCleanDatabaseNoErrors(t *testing.T) { + // Init + InitDB() + + // Create a user and an entry + userId := data.UserId("dkey") + devId1 := uuid.Must(uuid.NewRandom()).String() + deviceReq := httptest.NewRequest(http.MethodGet, "/?device_id="+devId1+"&user_id="+userId, nil) + apiRegisterHandler(context.Background(), nil, deviceReq) + entry1 := testutils.MakeFakeHistoryEntry("ls ~/") + entry1.DeviceId = devId1 + encEntry, err := data.EncryptHistoryEntry("dkey", entry1) + testutils.Check(t, err) + reqBody, err := json.Marshal([]shared.EncHistoryEntry{encEntry}) + testutils.Check(t, err) + submitReq := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(reqBody)) + apiSubmitHandler(context.Background(), nil, submitReq) + + // Call cleanDatabase and just check that there are no panics + testutils.Check(t, cleanDatabase(context.TODO())) +} + func assertNoLeakedConnections(t *testing.T, db *gorm.DB) { sqlDB, err := db.DB() if err != nil {