diff --git a/storage/store/database/database.go b/storage/store/database/database.go index c5343fe4..de1b0989 100644 --- a/storage/store/database/database.go +++ b/storage/store/database/database.go @@ -18,6 +18,12 @@ import ( const ( arraySeparator = "|~|" + + uptimeCleanUpThreshold = 10 * 24 * time.Hour // Maximum uptime age before triggering a clean up + eventsCleanUpThreshold = core.MaximumNumberOfEvents * 2 // Maximum number of events before triggering a clean up + resultsCleanUpThreshold = core.MaximumNumberOfResults * 2 // Maximum number of results before triggering a clean up + + uptimeRetention = 7 * 24 * time.Hour ) var ( @@ -228,16 +234,14 @@ func (s *Store) Insert(service *core.Service, result *core.Result) { log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", core.EventStart, service.Group, service.Name, err.Error()) } event := generateEventBasedOnResult(result) - err = s.insertEvent(tx, serviceID, event) - if err != nil { + if err = s.insertEvent(tx, serviceID, event); err != nil { // Silently fail log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Group, service.Name, err.Error()) } } else { // Get the success value of the previous result var lastResultSuccess bool - lastResultSuccess, err = s.getLastServiceResultSuccessValue(tx, serviceID) - if err != nil { + if lastResultSuccess, err = s.getLastServiceResultSuccessValue(tx, serviceID); err != nil { log.Printf("[database][Insert] Failed to retrieve outcome of previous result for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) } else { // If we managed to retrieve the outcome of the previous result, we'll compare it with the new result. @@ -246,8 +250,7 @@ func (s *Store) Insert(service *core.Service, result *core.Result) { // an event to mark the change in state if lastResultSuccess != result.Success { event := generateEventBasedOnResult(result) - err = s.insertEvent(tx, serviceID, event) - if err != nil { + if err = s.insertEvent(tx, serviceID, event); err != nil { // Silently fail log.Printf("[database][Insert] Failed to insert event=%s for group=%s; service=%s: %s", event.Type, service.Group, service.Name, err.Error()) } @@ -256,29 +259,43 @@ func (s *Store) Insert(service *core.Service, result *core.Result) { // Clean up old events if there's more than twice the maximum number of events // This lets us both keep the table clean without impacting performance too much // (since we're only deleting MaximumNumberOfEvents at a time instead of 1) - if numberOfEvents > core.MaximumNumberOfEvents*2 { - err = s.deleteOldServiceEvents(tx, serviceID) - if err != nil { + if numberOfEvents > resultsCleanUpThreshold { + if err = s.deleteOldServiceEvents(tx, serviceID); err != nil { log.Printf("[database][Insert] Failed to delete old events for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) } } } // Second, we need to insert the result. - err = s.insertResult(tx, serviceID, result) - if err != nil { + if err = s.insertResult(tx, serviceID, result); err != nil { log.Printf("[database][Insert] Failed to insert result for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) - _ = tx.Rollback() + _ = tx.Rollback() // If we can't insert the result, we'll rollback now since there's no point continuing return } // Clean up old results numberOfResults, err := s.getNumberOfResultsByServiceID(tx, serviceID) if err != nil { log.Printf("[database][Insert] Failed to retrieve total number of results for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) + } else { + if numberOfResults > eventsCleanUpThreshold { + if err = s.deleteOldServiceResults(tx, serviceID); err != nil { + log.Printf("[database][Insert] Failed to delete old results for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) + } + } } - if numberOfResults > core.MaximumNumberOfResults*2 { - err = s.deleteOldServiceResults(tx, serviceID) - if err != nil { - log.Printf("[database][Insert] Failed to delete old results for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) + // Finally, we need to insert the uptime data. + // Because the uptime data significantly outlives the results, we can't rely on the results for determining the uptime + if err = s.updateServiceUptime(tx, serviceID, result); err != nil { + log.Printf("[database][Insert] Failed to update uptime for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) + } + // Clean up old uptime entries + ageOfOldestUptimeEntry, err := s.getAgeOfOldestServiceUptimeEntry(tx, serviceID) + if err != nil { + log.Printf("[database][Insert] Failed to retrieve oldest service uptime entry for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) + } else { + if ageOfOldestUptimeEntry > uptimeCleanUpThreshold { + if err = s.deleteOldUptimeEntries(tx, serviceID, time.Now().Add(-(uptimeRetention + time.Hour))); err != nil { + log.Printf("[database][Insert] Failed to delete old uptime entries for group=%s; service=%s: %s", service.Group, service.Name, err.Error()) + } } } //log.Printf("[database][Insert] Successfully inserted result in duration=%dns", time.Since(start).Nanoseconds()) @@ -381,6 +398,33 @@ func (s *Store) insertConditionResults(tx *sql.Tx, serviceResultID int64, condit return nil } +func (s *Store) updateServiceUptime(tx *sql.Tx, serviceID int64, result *core.Result) error { + unixTimestampFlooredAtHour := result.Timestamp.Truncate(time.Hour).Unix() + var successfulExecutions int + if result.Success { + successfulExecutions = 1 + } + _, err := tx.Exec( + ` + INSERT INTO service_uptime (service_id, hour_unix_timestamp, total_executions, successful_executions, total_response_time) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT(service_id, hour_unix_timestamp) DO UPDATE SET + total_executions = excluded.total_executions + total_executions, + successful_executions = excluded.successful_executions + successful_executions, + total_response_time = excluded.total_response_time + total_response_time + `, + serviceID, + unixTimestampFlooredAtHour, + 1, + successfulExecutions, + result.Duration.Milliseconds(), + ) + if err != nil { + return err + } + return nil +} + func (s *Store) getAllServiceKeys(tx *sql.Tx) (keys []string, err error) { rows, err := tx.Query("SELECT service_key FROM service") if err != nil { @@ -404,7 +448,7 @@ func (s *Store) getServiceStatusByKey(tx *sql.Tx, key string, eventsPage, events Name: serviceName, Group: serviceGroup, Key: key, - Uptime: nil, + Uptime: &core.Uptime{}, } if eventsPageSize > 0 { if serviceStatus.Events, err = s.getEventsByServiceID(tx, serviceID, eventsPage, eventsPageSize); err != nil { @@ -417,7 +461,10 @@ func (s *Store) getServiceStatusByKey(tx *sql.Tx, key string, eventsPage, events } } if includeUptime { - // TODO + now := time.Now() + serviceStatus.Uptime.LastHour, _, _ = s.getServiceUptime(tx, serviceID, now.Add(-time.Hour), now) + serviceStatus.Uptime.LastTwentyFourHours, _, _ = s.getServiceUptime(tx, serviceID, now.Add(-24*time.Hour), now) + serviceStatus.Uptime.LastSevenDays, _, _ = s.getServiceUptime(tx, serviceID, now.Add(-7*24*time.Hour), now) } return serviceStatus, nil } @@ -512,6 +559,35 @@ func (s *Store) getResultsByServiceID(tx *sql.Tx, serviceID int64, page, pageSiz return } +func (s *Store) getServiceUptime(tx *sql.Tx, serviceID int64, from, to time.Time) (uptime float64, avgResponseTime time.Duration, err error) { + rows, err := tx.Query( + ` + SELECT SUM(total_executions), SUM(successful_executions), SUM(total_response_time) + FROM service_uptime + WHERE service_id = $1 + AND hour_unix_timestamp >= $2 + AND hour_unix_timestamp <= $3 + `, + serviceID, + from.Unix(), + to.Unix(), + ) + if err != nil { + return 0, 0, err + } + var totalExecutions, totalSuccessfulExecutions, totalResponseTime int + for rows.Next() { + _ = rows.Scan(&totalExecutions, &totalSuccessfulExecutions, &totalResponseTime) + break + } + _ = rows.Close() + if totalExecutions > 0 { + uptime = float64(totalSuccessfulExecutions) / float64(totalExecutions) + avgResponseTime = time.Duration(float64(totalResponseTime)/float64(totalExecutions)) * time.Millisecond + } + return +} + func (s *Store) getServiceID(tx *sql.Tx, service *core.Service) (int64, error) { rows, err := tx.Query("SELECT service_id FROM service WHERE service_key = $1", service.Key()) if err != nil { @@ -569,6 +645,34 @@ func (s *Store) getNumberOfResultsByServiceID(tx *sql.Tx, serviceID int64) (int6 return numberOfResults, nil } +func (s *Store) getAgeOfOldestServiceUptimeEntry(tx *sql.Tx, serviceID int64) (time.Duration, error) { + rows, err := tx.Query( + ` + SELECT hour_unix_timestamp + FROM service_uptime + WHERE service_id = $1 + ORDER BY hour_unix_timestamp + LIMIT 1 + `, + serviceID, + ) + if err != nil { + return 0, err + } + var oldestServiceUptimeUnixTimestamp int64 + var found bool + for rows.Next() { + _ = rows.Scan(&oldestServiceUptimeUnixTimestamp) + found = true + break + } + _ = rows.Close() + if !found { + return 0, errNoRowsReturned + } + return time.Since(time.Unix(oldestServiceUptimeUnixTimestamp, 0)), nil +} + func (s *Store) getLastServiceResultSuccessValue(tx *sql.Tx, serviceID int64) (bool, error) { rows, err := tx.Query("SELECT success FROM service_result WHERE service_id = $1 ORDER BY service_result_id DESC LIMIT 1", serviceID) if err != nil { @@ -637,3 +741,13 @@ func (s *Store) deleteOldServiceResults(tx *sql.Tx, serviceID int64) error { //log.Printf("deleted %d rows from service_result", rowsAffected) return nil } + +func (s *Store) deleteOldUptimeEntries(tx *sql.Tx, serviceID int64, maxAge time.Time) error { + _, err := tx.Exec("DELETE FROM service_uptime WHERE service_id = $1 AND hour_unix_timestamp < $2", serviceID, maxAge.Unix()) + //if err != nil { + // return err + //} + //rowsAffected, _ := result.RowsAffected() + //log.Printf("deleted %d rows from service_uptime", rowsAffected) + return err +} diff --git a/storage/store/database/database_test.go b/storage/store/database/database_test.go index 7945e7d1..fe4f2a3b 100644 --- a/storage/store/database/database_test.go +++ b/storage/store/database/database_test.go @@ -145,3 +145,153 @@ func TestStore_Insert(t *testing.T) { } } } + +func TestStore_GetServiceStatus(t *testing.T) { + store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_GetServiceStatus.db") + defer store.db.Close() + store.Insert(&testService, &testSuccessfulResult) + store.Insert(&testService, &testUnsuccessfulResult) + + serviceStatus := store.GetServiceStatus(testService.Group, testService.Name) + if serviceStatus == nil { + t.Fatalf("serviceStatus shouldn't have been nil") + } + if serviceStatus.Uptime == nil { + t.Fatalf("serviceStatus.Uptime shouldn't have been nil") + } + if serviceStatus.Uptime.LastHour != 0.5 { + t.Errorf("serviceStatus.Uptime.LastHour should've been 0.5") + } + if serviceStatus.Uptime.LastTwentyFourHours != 0.5 { + t.Errorf("serviceStatus.Uptime.LastTwentyFourHours should've been 0.5") + } + if serviceStatus.Uptime.LastSevenDays != 0.5 { + t.Errorf("serviceStatus.Uptime.LastSevenDays should've been 0.5") + } +} + +func TestStore_GetServiceStatusForMissingStatusReturnsNil(t *testing.T) { + store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_GetServiceStatusForMissingStatusReturnsNil.db") + defer store.db.Close() + store.Insert(&testService, &testSuccessfulResult) + + serviceStatus := store.GetServiceStatus("nonexistantgroup", "nonexistantname") + if serviceStatus != nil { + t.Errorf("Returned service status for group '%s' and name '%s' not nil after inserting the service into the store", testService.Group, testService.Name) + } + serviceStatus = store.GetServiceStatus(testService.Group, "nonexistantname") + if serviceStatus != nil { + t.Errorf("Returned service status for group '%s' and name '%s' not nil after inserting the service into the store", testService.Group, "nonexistantname") + } + serviceStatus = store.GetServiceStatus("nonexistantgroup", testService.Name) + if serviceStatus != nil { + t.Errorf("Returned service status for group '%s' and name '%s' not nil after inserting the service into the store", "nonexistantgroup", testService.Name) + } +} + +func TestStore_GetServiceStatusByKey(t *testing.T) { + store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_GetServiceStatusByKey.db") + defer store.db.Close() + store.Insert(&testService, &testSuccessfulResult) + store.Insert(&testService, &testUnsuccessfulResult) + + serviceStatus := store.GetServiceStatusByKey(testService.Key()) + if serviceStatus == nil { + t.Fatalf("serviceStatus shouldn't have been nil") + } + if serviceStatus.Uptime == nil { + t.Fatalf("serviceStatus.Uptime shouldn't have been nil") + } + if serviceStatus.Uptime.LastHour != 0.5 { + t.Errorf("serviceStatus.Uptime.LastHour should've been 0.5") + } + if serviceStatus.Uptime.LastTwentyFourHours != 0.5 { + t.Errorf("serviceStatus.Uptime.LastTwentyFourHours should've been 0.5") + } + if serviceStatus.Uptime.LastSevenDays != 0.5 { + t.Errorf("serviceStatus.Uptime.LastSevenDays should've been 0.5") + } +} + +func TestStore_GetAllServiceStatusesWithResultPagination(t *testing.T) { + store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_GetAllServiceStatusesWithResultPagination.db") + defer store.db.Close() + firstResult := &testSuccessfulResult + secondResult := &testUnsuccessfulResult + store.Insert(&testService, firstResult) + store.Insert(&testService, secondResult) + // Can't be bothered dealing with timezone issues on the worker that runs the automated tests + firstResult.Timestamp = time.Time{} + secondResult.Timestamp = time.Time{} + serviceStatuses := store.GetAllServiceStatusesWithResultPagination(1, 20) + if len(serviceStatuses) != 1 { + t.Fatal("expected 1 service status") + } + actual, exists := serviceStatuses[testService.Key()] + if !exists { + t.Fatal("expected service status to exist") + } + if len(actual.Results) != 2 { + t.Error("expected 2 results, got", len(actual.Results)) + } + if len(actual.Events) != 0 { + t.Error("expected 0 events, got", len(actual.Events)) + } +} + +func TestStore_InsertCleansUpOldUptimeEntriesProperly(t *testing.T) { + store, _ := NewStore("sqlite", t.TempDir()+"/TestStore_InsertCleansUpOldUptimeEntriesProperly.db") + defer store.db.Close() + now := time.Now().Round(time.Minute) + now = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) + + store.Insert(&testService, &core.Result{Timestamp: now.Add(-5 * time.Hour), Success: true}) + + tx, _ := store.db.Begin() + oldest, _ := store.getAgeOfOldestServiceUptimeEntry(tx, 1) + _ = tx.Commit() + if oldest.Truncate(time.Hour) != 5*time.Hour { + t.Errorf("oldest service uptime entry should've been ~5 hours old, was %s", oldest) + } + + // The oldest cache entry should remain at ~5 hours old, because this entry is more recent + store.Insert(&testService, &core.Result{Timestamp: now.Add(-3 * time.Hour), Success: true}) + + tx, _ = store.db.Begin() + oldest, _ = store.getAgeOfOldestServiceUptimeEntry(tx, 1) + _ = tx.Commit() + if oldest.Truncate(time.Hour) != 5*time.Hour { + t.Errorf("oldest service uptime entry should've been ~5 hours old, was %s", oldest) + } + + // The oldest cache entry should now become at ~8 hours old, because this entry is older + store.Insert(&testService, &core.Result{Timestamp: now.Add(-8 * time.Hour), Success: true}) + + tx, _ = store.db.Begin() + oldest, _ = store.getAgeOfOldestServiceUptimeEntry(tx, 1) + _ = tx.Commit() + if oldest.Truncate(time.Hour) != 8*time.Hour { + t.Errorf("oldest service uptime entry should've been ~8 hours old, was %s", oldest) + } + + // Since this is one hour before reaching the clean up threshold, the oldest entry should now be this one + store.Insert(&testService, &core.Result{Timestamp: now.Add(-(uptimeCleanUpThreshold - time.Hour)), Success: true}) + + tx, _ = store.db.Begin() + oldest, _ = store.getAgeOfOldestServiceUptimeEntry(tx, 1) + _ = tx.Commit() + if oldest.Truncate(time.Hour) != uptimeCleanUpThreshold-time.Hour { + t.Errorf("oldest service uptime entry should've been ~%s hours old, was %s", uptimeCleanUpThreshold-time.Hour, oldest) + } + + // Since this entry is after the uptimeCleanUpThreshold, both this entry as well as the previous + // one should be deleted since they both surpass uptimeRetention + store.Insert(&testService, &core.Result{Timestamp: now.Add(-(uptimeCleanUpThreshold + time.Hour)), Success: true}) + + tx, _ = store.db.Begin() + oldest, _ = store.getAgeOfOldestServiceUptimeEntry(tx, 1) + _ = tx.Commit() + if oldest.Truncate(time.Hour) != 8*time.Hour { + t.Errorf("oldest service uptime entry should've been ~8 hours old, was %s", oldest) + } +}