2021-09-11 00:00:04 +02:00
package sql
2021-07-12 06:56:30 +02:00
import (
"database/sql"
"errors"
2021-07-16 04:07:30 +02:00
"fmt"
2021-07-12 06:56:30 +02:00
"log"
2022-06-14 01:15:30 +02:00
"strconv"
2021-07-12 10:25:25 +02:00
"strings"
"time"
2021-07-12 06:56:30 +02:00
2024-05-16 03:29:45 +02:00
"github.com/TwiN/gatus/v5/alerting/alert"
2024-05-10 04:56:16 +02:00
"github.com/TwiN/gatus/v5/config/endpoint"
2022-12-06 07:41:09 +01:00
"github.com/TwiN/gatus/v5/storage/store/common"
"github.com/TwiN/gatus/v5/storage/store/common/paging"
2022-08-12 02:47:29 +02:00
"github.com/TwiN/gocache/v2"
2021-09-11 00:00:04 +02:00
_ "github.com/lib/pq"
2021-07-12 06:56:30 +02:00
_ "modernc.org/sqlite"
)
2021-07-14 04:59:43 +02:00
//////////////////////////////////////////////////////////////////////////////////////////////////
// Note that only exported functions in this file may create, commit, or rollback a transaction //
//////////////////////////////////////////////////////////////////////////////////////////////////
2021-07-12 10:25:25 +02:00
const (
2021-08-07 17:46:58 +02:00
// arraySeparator is the separator used to separate multiple strings in a single column.
// It's a dirty hack, but it's only used for persisting errors, and since this data will likely only ever be used
// for aesthetic purposes, I deemed it wasn't worth the performance impact of yet another one-to-many table.
2021-07-12 10:25:25 +02:00
arraySeparator = "|~|"
2021-07-14 07:40:27 +02:00
2024-05-16 03:29:45 +02:00
eventsCleanUpThreshold = common . MaximumNumberOfEvents + 10 // Maximum number of events before triggering a cleanup
resultsCleanUpThreshold = common . MaximumNumberOfResults + 10 // Maximum number of results before triggering a cleanup
2021-07-14 07:40:27 +02:00
2024-08-12 04:40:19 +02:00
uptimeTotalEntriesMergeThreshold = 100 // Maximum number of uptime entries before triggering a merge
uptimeAgeCleanUpThreshold = 32 * 24 * time . Hour // Maximum uptime age before triggering a cleanup
uptimeRetention = 30 * 24 * time . Hour // Minimum duration that must be kept to operate as intended
uptimeHourlyBuffer = 48 * time . Hour // Number of hours to buffer from now when determining which hourly uptime entries can be merged into daily uptime entries
2022-08-12 02:47:29 +02:00
cacheTTL = 10 * time . Minute
2021-07-12 10:25:25 +02:00
)
2021-07-12 06:56:30 +02:00
var (
2021-11-05 02:33:13 +01:00
// ErrPathNotSpecified is the error returned when the path parameter passed in NewStore is blank
ErrPathNotSpecified = errors . New ( "path cannot be empty" )
2021-07-12 06:56:30 +02:00
// ErrDatabaseDriverNotSpecified is the error returned when the driver parameter passed in NewStore is blank
ErrDatabaseDriverNotSpecified = errors . New ( "database driver cannot be empty" )
2021-08-13 03:54:23 +02:00
errNoRowsReturned = errors . New ( "expected a row to be returned, but none was" )
2021-07-12 06:56:30 +02:00
)
// Store that leverages a database
type Store struct {
2021-11-05 02:33:13 +01:00
driver , path string
2021-07-12 06:56:30 +02:00
db * sql . DB
2022-08-12 02:47:29 +02:00
// writeThroughCache is a cache used to drastically decrease read latency by pre-emptively
// caching writes as they happen. If nil, writes are not cached.
writeThroughCache * gocache . Cache
2021-07-12 06:56:30 +02:00
}
2021-11-05 02:33:13 +01:00
// NewStore initializes the database and creates the schema if it doesn't already exist in the path specified
2022-08-12 02:47:29 +02:00
func NewStore ( driver , path string , caching bool ) ( * Store , error ) {
2021-07-12 06:56:30 +02:00
if len ( driver ) == 0 {
return nil , ErrDatabaseDriverNotSpecified
}
if len ( path ) == 0 {
2021-11-05 02:33:13 +01:00
return nil , ErrPathNotSpecified
2021-07-12 06:56:30 +02:00
}
2021-11-05 02:33:13 +01:00
store := & Store { driver : driver , path : path }
2021-07-12 06:56:30 +02:00
var err error
if store . db , err = sql . Open ( driver , path ) ; err != nil {
return nil , err
}
2021-09-12 04:42:56 +02:00
if err := store . db . Ping ( ) ; err != nil {
return nil , err
}
2021-07-13 04:53:14 +02:00
if driver == "sqlite" {
_ , _ = store . db . Exec ( "PRAGMA foreign_keys=ON" )
_ , _ = store . db . Exec ( "PRAGMA journal_mode=WAL" )
2021-07-17 02:17:02 +02:00
_ , _ = store . db . Exec ( "PRAGMA synchronous=NORMAL" )
2021-07-13 04:53:14 +02:00
// Prevents driver from running into "database is locked" errors
// This is because we're using WAL to improve performance
store . db . SetMaxOpenConns ( 1 )
}
2021-07-12 06:56:30 +02:00
if err = store . createSchema ( ) ; err != nil {
_ = store . db . Close ( )
return nil , err
}
2022-08-12 02:47:29 +02:00
if caching {
store . writeThroughCache = gocache . NewCache ( ) . WithMaxSize ( 10000 )
}
2021-07-12 06:56:30 +02:00
return store , nil
}
// createSchema creates the schema required to perform all database operations.
func ( s * Store ) createSchema ( ) error {
2021-09-11 00:00:04 +02:00
if s . driver == "sqlite" {
return s . createSQLiteSchema ( )
2021-07-12 06:56:30 +02:00
}
2021-09-11 00:00:04 +02:00
return s . createPostgresSchema ( )
2021-07-12 06:56:30 +02:00
}
2024-05-10 04:56:16 +02:00
// GetAllEndpointStatuses returns all monitored endpoint.Status
// with a subset of endpoint.Result defined by the page and pageSize parameters
func ( s * Store ) GetAllEndpointStatuses ( params * paging . EndpointStatusParams ) ( [ ] * endpoint . Status , error ) {
2021-07-14 04:59:43 +02:00
tx , err := s . db . Begin ( )
if err != nil {
2021-09-11 00:00:04 +02:00
return nil , err
2021-07-14 04:59:43 +02:00
}
2021-10-23 22:47:12 +02:00
keys , err := s . getAllEndpointKeys ( tx )
2021-07-14 04:59:43 +02:00
if err != nil {
_ = tx . Rollback ( )
2021-09-11 00:00:04 +02:00
return nil , err
2021-07-14 04:59:43 +02:00
}
2024-05-10 04:56:16 +02:00
endpointStatuses := make ( [ ] * endpoint . Status , 0 , len ( keys ) )
2021-07-14 04:59:43 +02:00
for _ , key := range keys {
2021-10-23 22:47:12 +02:00
endpointStatus , err := s . getEndpointStatusByKey ( tx , key , params )
2021-07-14 04:59:43 +02:00
if err != nil {
continue
}
2021-10-23 22:47:12 +02:00
endpointStatuses = append ( endpointStatuses , endpointStatus )
2021-07-14 04:17:27 +02:00
}
2021-07-14 04:59:43 +02:00
if err = tx . Commit ( ) ; err != nil {
_ = tx . Rollback ( )
}
2021-10-23 22:47:12 +02:00
return endpointStatuses , err
2021-07-12 06:56:30 +02:00
}
2021-10-23 22:47:12 +02:00
// GetEndpointStatus returns the endpoint status for a given endpoint name in the given group
2024-05-10 04:56:16 +02:00
func ( s * Store ) GetEndpointStatus ( groupName , endpointName string , params * paging . EndpointStatusParams ) ( * endpoint . Status , error ) {
return s . GetEndpointStatusByKey ( endpoint . ConvertGroupAndEndpointNameToKey ( groupName , endpointName ) , params )
2021-07-12 06:56:30 +02:00
}
2021-10-23 22:47:12 +02:00
// GetEndpointStatusByKey returns the endpoint status for a given key
2024-05-10 04:56:16 +02:00
func ( s * Store ) GetEndpointStatusByKey ( key string , params * paging . EndpointStatusParams ) ( * endpoint . Status , error ) {
2021-07-14 04:59:43 +02:00
tx , err := s . db . Begin ( )
if err != nil {
2021-09-11 00:00:04 +02:00
return nil , err
2021-07-14 04:59:43 +02:00
}
2021-10-23 22:47:12 +02:00
endpointStatus , err := s . getEndpointStatusByKey ( tx , key , params )
2021-07-14 04:59:43 +02:00
if err != nil {
_ = tx . Rollback ( )
2021-09-11 00:00:04 +02:00
return nil , err
2021-07-14 04:59:43 +02:00
}
if err = tx . Commit ( ) ; err != nil {
_ = tx . Rollback ( )
}
2021-10-23 22:47:12 +02:00
return endpointStatus , err
2021-07-12 06:56:30 +02:00
}
2021-08-13 03:54:23 +02:00
// GetUptimeByKey returns the uptime percentage during a time range
func ( s * Store ) GetUptimeByKey ( key string , from , to time . Time ) ( float64 , error ) {
if from . After ( to ) {
return 0 , common . ErrInvalidTimeRange
}
tx , err := s . db . Begin ( )
if err != nil {
return 0 , err
}
2021-10-23 22:47:12 +02:00
endpointID , _ , _ , err := s . getEndpointIDGroupAndNameByKey ( tx , key )
2021-08-13 03:54:23 +02:00
if err != nil {
_ = tx . Rollback ( )
return 0 , err
}
2021-10-23 22:47:12 +02:00
uptime , _ , err := s . getEndpointUptime ( tx , endpointID , from , to )
2021-08-13 03:54:23 +02:00
if err != nil {
_ = tx . Rollback ( )
return 0 , err
}
if err = tx . Commit ( ) ; err != nil {
_ = tx . Rollback ( )
}
return uptime , nil
}
2021-08-21 16:59:09 +02:00
// GetAverageResponseTimeByKey returns the average response time in milliseconds (value) during a time range
func ( s * Store ) GetAverageResponseTimeByKey ( key string , from , to time . Time ) ( int , error ) {
if from . After ( to ) {
return 0 , common . ErrInvalidTimeRange
}
tx , err := s . db . Begin ( )
if err != nil {
return 0 , err
}
2021-10-23 22:47:12 +02:00
endpointID , _ , _ , err := s . getEndpointIDGroupAndNameByKey ( tx , key )
2021-08-21 16:59:09 +02:00
if err != nil {
_ = tx . Rollback ( )
return 0 , err
}
2021-10-23 22:47:12 +02:00
averageResponseTime , err := s . getEndpointAverageResponseTime ( tx , endpointID , from , to )
2021-08-21 16:59:09 +02:00
if err != nil {
_ = tx . Rollback ( )
return 0 , err
}
if err = tx . Commit ( ) ; err != nil {
_ = tx . Rollback ( )
}
return averageResponseTime , nil
}
2021-08-20 05:07:21 +02:00
// GetHourlyAverageResponseTimeByKey returns a map of hourly (key) average response time in milliseconds (value) during a time range
func ( s * Store ) GetHourlyAverageResponseTimeByKey ( key string , from , to time . Time ) ( map [ int64 ] int , error ) {
if from . After ( to ) {
return nil , common . ErrInvalidTimeRange
}
tx , err := s . db . Begin ( )
if err != nil {
return nil , err
}
2021-10-23 22:47:12 +02:00
endpointID , _ , _ , err := s . getEndpointIDGroupAndNameByKey ( tx , key )
2021-08-20 05:07:21 +02:00
if err != nil {
_ = tx . Rollback ( )
return nil , err
}
2021-10-23 22:47:12 +02:00
hourlyAverageResponseTimes , err := s . getEndpointHourlyAverageResponseTimes ( tx , endpointID , from , to )
2021-08-20 05:07:21 +02:00
if err != nil {
_ = tx . Rollback ( )
return nil , err
}
if err = tx . Commit ( ) ; err != nil {
_ = tx . Rollback ( )
}
return hourlyAverageResponseTimes , nil
}
2021-10-23 22:47:12 +02:00
// Insert adds the observed result for the specified endpoint into the store
2024-05-10 04:56:16 +02:00
func ( s * Store ) Insert ( ep * endpoint . Endpoint , result * endpoint . Result ) error {
2021-07-13 04:53:14 +02:00
tx , err := s . db . Begin ( )
if err != nil {
2021-09-11 00:00:04 +02:00
return err
2021-07-13 04:53:14 +02:00
}
2024-05-10 04:56:16 +02:00
endpointID , err := s . getEndpointID ( tx , ep )
2021-07-13 04:53:14 +02:00
if err != nil {
2024-04-02 03:47:14 +02:00
if errors . Is ( err , common . ErrEndpointNotFound ) {
2021-10-23 22:47:12 +02:00
// Endpoint doesn't exist in the database, insert it
2024-05-10 04:56:16 +02:00
if endpointID , err = s . insertEndpoint ( tx , ep ) ; err != nil {
2021-07-14 04:30:30 +02:00
_ = tx . Rollback ( )
2024-05-16 03:29:45 +02:00
log . Printf ( "[sql.Insert] Failed to create endpoint with key=%s: %s" , ep . Key ( ) , err . Error ( ) )
2021-09-11 00:00:04 +02:00
return err
2021-07-13 04:53:14 +02:00
}
} else {
2021-07-14 04:30:30 +02:00
_ = tx . Rollback ( )
2024-05-16 03:29:45 +02:00
log . Printf ( "[sql.Insert] Failed to retrieve id of endpoint with key=%s: %s" , ep . Key ( ) , err . Error ( ) )
2021-09-11 00:00:04 +02:00
return err
2021-07-13 04:53:14 +02:00
}
}
// First, we need to check if we need to insert a new event.
//
// A new event must be added if either of the following cases happen:
2024-05-10 04:56:16 +02:00
// 1. There is only 1 event. The total number of events for an endpoint can only be 1 if the only existing event is
2021-07-13 04:53:14 +02:00
// of type EventStart, in which case we will have to create a new event of type EventHealthy or EventUnhealthy
// based on result.Success.
2021-10-23 22:47:12 +02:00
// 2. The lastResult.Success != result.Success. This implies that the endpoint went from healthy to unhealthy or
2021-07-13 04:53:14 +02:00
// vice-versa, in which case we will have to create a new event of type EventHealthy or EventUnhealthy
// based on result.Success.
2021-10-23 22:47:12 +02:00
numberOfEvents , err := s . getNumberOfEventsByEndpointID ( tx , endpointID )
2021-07-13 04:53:14 +02:00
if err != nil {
2021-09-11 00:52:09 +02:00
// Silently fail
2024-05-16 03:29:45 +02:00
log . Printf ( "[sql.Insert] Failed to retrieve total number of events for endpoint with key=%s: %s" , ep . Key ( ) , err . Error ( ) )
2021-07-13 04:53:14 +02:00
}
if numberOfEvents == 0 {
// There's no events yet, which means we need to add the EventStart and the first healthy/unhealthy event
2024-05-10 04:56:16 +02:00
err = s . insertEndpointEvent ( tx , endpointID , & endpoint . Event {
Type : endpoint . EventStart ,
2021-07-13 04:53:14 +02:00
Timestamp : result . Timestamp . Add ( - 50 * time . Millisecond ) ,
} )
if err != nil {
// Silently fail
2024-05-16 03:29:45 +02:00
log . Printf ( "[sql.Insert] Failed to insert event=%s for endpoint with key=%s: %s" , endpoint . EventStart , ep . Key ( ) , err . Error ( ) )
2021-07-13 04:53:14 +02:00
}
2024-05-10 04:56:16 +02:00
event := endpoint . NewEventFromResult ( result )
2021-10-23 22:47:12 +02:00
if err = s . insertEndpointEvent ( tx , endpointID , event ) ; err != nil {
2021-07-13 04:53:14 +02:00
// Silently fail
2024-05-16 03:29:45 +02:00
log . Printf ( "[sql.Insert] Failed to insert event=%s for endpoint with key=%s: %s" , event . Type , ep . Key ( ) , err . Error ( ) )
2021-07-13 04:53:14 +02:00
}
} else {
// Get the success value of the previous result
var lastResultSuccess bool
2021-10-23 22:47:12 +02:00
if lastResultSuccess , err = s . getLastEndpointResultSuccessValue ( tx , endpointID ) ; err != nil {
2024-05-16 03:29:45 +02:00
log . Printf ( "[sql.Insert] Failed to retrieve outcome of previous result for endpoint with key=%s: %s" , ep . Key ( ) , err . Error ( ) )
2021-07-13 04:53:14 +02:00
} else {
// If we managed to retrieve the outcome of the previous result, we'll compare it with the new result.
// If the final outcome (success or failure) of the previous and the new result aren't the same, it means
2021-10-23 22:47:12 +02:00
// that the endpoint either went from Healthy to Unhealthy or Unhealthy -> Healthy, therefore, we'll add
2021-07-13 04:53:14 +02:00
// an event to mark the change in state
if lastResultSuccess != result . Success {
2024-05-10 04:56:16 +02:00
event := endpoint . NewEventFromResult ( result )
2021-10-23 22:47:12 +02:00
if err = s . insertEndpointEvent ( tx , endpointID , event ) ; err != nil {
2021-07-13 04:53:14 +02:00
// Silently fail
2024-05-16 03:29:45 +02:00
log . Printf ( "[sql.Insert] Failed to insert event=%s for endpoint with key=%s: %s" , event . Type , ep . Key ( ) , err . Error ( ) )
2021-07-13 04:53:14 +02:00
}
}
}
// Clean up old events if there's more than twice the maximum number of events
// This lets us both keep the table clean without impacting performance too much
// (since we're only deleting MaximumNumberOfEvents at a time instead of 1)
2021-07-15 07:56:49 +02:00
if numberOfEvents > eventsCleanUpThreshold {
2021-10-23 22:47:12 +02:00
if err = s . deleteOldEndpointEvents ( tx , endpointID ) ; err != nil {
2024-05-16 03:29:45 +02:00
log . Printf ( "[sql.Insert] Failed to delete old events for endpoint with key=%s: %s" , ep . Key ( ) , err . Error ( ) )
2021-07-13 04:53:14 +02:00
}
}
}
// Second, we need to insert the result.
2021-10-23 22:47:12 +02:00
if err = s . insertEndpointResult ( tx , endpointID , result ) ; err != nil {
2024-05-16 03:29:45 +02:00
log . Printf ( "[sql.Insert] Failed to insert result for endpoint with key=%s: %s" , ep . Key ( ) , err . Error ( ) )
2021-07-14 07:40:27 +02:00
_ = tx . Rollback ( ) // If we can't insert the result, we'll rollback now since there's no point continuing
2021-09-11 00:00:04 +02:00
return err
2021-07-13 04:53:14 +02:00
}
// Clean up old results
2021-10-23 22:47:12 +02:00
numberOfResults , err := s . getNumberOfResultsByEndpointID ( tx , endpointID )
2021-07-13 04:53:14 +02:00
if err != nil {
2024-05-16 03:29:45 +02:00
log . Printf ( "[sql.Insert] Failed to retrieve total number of results for endpoint with key=%s: %s" , ep . Key ( ) , err . Error ( ) )
2021-07-14 07:40:27 +02:00
} else {
2021-07-15 07:56:49 +02:00
if numberOfResults > resultsCleanUpThreshold {
2021-10-23 22:47:12 +02:00
if err = s . deleteOldEndpointResults ( tx , endpointID ) ; err != nil {
2024-05-16 03:29:45 +02:00
log . Printf ( "[sql.Insert] Failed to delete old results for endpoint with key=%s: %s" , ep . Key ( ) , err . Error ( ) )
2021-07-14 07:40:27 +02:00
}
}
2021-07-13 04:53:14 +02:00
}
2021-07-14 07:40:27 +02:00
// Finally, we need to insert the uptime data.
// Because the uptime data significantly outlives the results, we can't rely on the results for determining the uptime
2021-10-23 22:47:12 +02:00
if err = s . updateEndpointUptime ( tx , endpointID , result ) ; err != nil {
2024-05-16 03:29:45 +02:00
log . Printf ( "[sql.Insert] Failed to update uptime for endpoint with key=%s: %s" , ep . Key ( ) , err . Error ( ) )
2021-07-14 07:40:27 +02:00
}
2024-08-12 04:40:19 +02:00
// Merge hourly uptime entries that can be merged into daily entries and clean up old uptime entries
numberOfUptimeEntries , err := s . getNumberOfUptimeEntriesByEndpointID ( tx , endpointID )
if err != nil {
log . Printf ( "[sql.Insert] Failed to retrieve total number of uptime entries for endpoint with key=%s: %s" , ep . Key ( ) , err . Error ( ) )
} else {
// Merge older hourly uptime entries into daily uptime entries if we have more than uptimeTotalEntriesMergeThreshold
if numberOfUptimeEntries >= uptimeTotalEntriesMergeThreshold {
log . Printf ( "[sql.Insert] Merging hourly uptime entries for endpoint with key=%s; This is a lot of work, it shouldn't happen too often" , ep . Key ( ) )
if err = s . mergeHourlyUptimeEntriesOlderThanMergeThresholdIntoDailyUptimeEntries ( tx , endpointID ) ; err != nil {
log . Printf ( "[sql.Insert] Failed to merge hourly uptime entries for endpoint with key=%s: %s" , ep . Key ( ) , err . Error ( ) )
}
}
}
// Clean up outdated uptime entries
// In most cases, this would be handled by mergeHourlyUptimeEntriesOlderThanMergeThresholdIntoDailyUptimeEntries,
// but if Gatus was temporarily shut down, we might have some old entries that need to be cleaned up
2021-10-23 22:47:12 +02:00
ageOfOldestUptimeEntry , err := s . getAgeOfOldestEndpointUptimeEntry ( tx , endpointID )
2021-07-14 07:40:27 +02:00
if err != nil {
2024-05-16 03:29:45 +02:00
log . Printf ( "[sql.Insert] Failed to retrieve oldest endpoint uptime entry for endpoint with key=%s: %s" , ep . Key ( ) , err . Error ( ) )
2021-07-14 07:40:27 +02:00
} else {
2024-08-12 04:40:19 +02:00
if ageOfOldestUptimeEntry > uptimeAgeCleanUpThreshold {
2021-10-23 22:47:12 +02:00
if err = s . deleteOldUptimeEntries ( tx , endpointID , time . Now ( ) . Add ( - ( uptimeRetention + time . Hour ) ) ) ; err != nil {
2024-05-16 03:29:45 +02:00
log . Printf ( "[sql.Insert] Failed to delete old uptime entries for endpoint with key=%s: %s" , ep . Key ( ) , err . Error ( ) )
2021-07-14 07:40:27 +02:00
}
2021-07-13 04:53:14 +02:00
}
}
2022-08-12 02:47:29 +02:00
if s . writeThroughCache != nil {
2024-05-10 04:56:16 +02:00
cacheKeysToRefresh := s . writeThroughCache . GetKeysByPattern ( ep . Key ( ) + "*" , 0 )
2022-08-12 02:47:29 +02:00
for _ , cacheKey := range cacheKeysToRefresh {
s . writeThroughCache . Delete ( cacheKey )
endpointKey , params , err := extractKeyAndParamsFromCacheKey ( cacheKey )
if err != nil {
2024-04-02 03:47:14 +02:00
log . Printf ( "[sql.Insert] Silently deleting cache key %s instead of refreshing due to error: %s" , cacheKey , err . Error ( ) )
2022-08-12 02:47:29 +02:00
continue
}
// Retrieve the endpoint status by key, which will in turn refresh the cache
_ , _ = s . getEndpointStatusByKey ( tx , endpointKey , params )
}
}
2021-07-13 04:53:14 +02:00
if err = tx . Commit ( ) ; err != nil {
_ = tx . Rollback ( )
}
2021-09-11 00:00:04 +02:00
return err
2021-07-13 04:53:14 +02:00
}
2021-10-23 22:47:12 +02:00
// DeleteAllEndpointStatusesNotInKeys removes all rows owned by an endpoint whose key is not within the keys provided
func ( s * Store ) DeleteAllEndpointStatusesNotInKeys ( keys [ ] string ) int {
2021-07-18 06:34:22 +02:00
var err error
var result sql . Result
2021-07-16 04:07:30 +02:00
if len ( keys ) == 0 {
2021-07-18 06:34:22 +02:00
// Delete everything
2021-10-23 22:47:12 +02:00
result , err = s . db . Exec ( "DELETE FROM endpoints" )
2021-07-18 06:34:22 +02:00
} else {
args := make ( [ ] interface { } , 0 , len ( keys ) )
2021-10-23 22:47:12 +02:00
query := "DELETE FROM endpoints WHERE endpoint_key NOT IN ("
2021-07-18 06:34:22 +02:00
for i := range keys {
2021-09-11 00:00:04 +02:00
query += fmt . Sprintf ( "$%d," , i + 1 )
2021-07-18 06:34:22 +02:00
args = append ( args , keys [ i ] )
}
2021-11-04 03:17:58 +01:00
query = query [ : len ( query ) - 1 ] + ")" // Remove the last comma and add the closing parenthesis
2021-09-11 00:00:04 +02:00
result , err = s . db . Exec ( query , args ... )
2021-07-16 04:07:30 +02:00
}
if err != nil {
2024-04-02 03:47:14 +02:00
log . Printf ( "[sql.DeleteAllEndpointStatusesNotInKeys] Failed to delete rows that do not belong to any of keys=%v: %s" , keys , err . Error ( ) )
2021-07-18 06:34:22 +02:00
return 0
2021-07-16 04:07:30 +02:00
}
2022-08-12 02:47:29 +02:00
if s . writeThroughCache != nil {
// It's easier to just wipe out the entire cache than to try to find all keys that are not in the keys list
2024-05-16 03:29:45 +02:00
// This only happens on start and during tests, so it's fine for us to just clear the cache without worrying
// about performance
2022-08-12 02:47:29 +02:00
_ = s . writeThroughCache . DeleteKeysByPattern ( "*" )
}
// Return number of rows deleted
2021-07-18 06:34:22 +02:00
rowsAffects , _ := result . RowsAffected ( )
return int ( rowsAffects )
2024-05-16 03:29:45 +02:00
}
// GetTriggeredEndpointAlert returns whether the triggered alert for the specified endpoint as well as the necessary information to resolve it
func ( s * Store ) GetTriggeredEndpointAlert ( ep * endpoint . Endpoint , alert * alert . Alert ) ( exists bool , resolveKey string , numberOfSuccessesInARow int , err error ) {
//log.Printf("[sql.GetTriggeredEndpointAlert] Getting triggered alert with checksum=%s for endpoint with key=%s", alert.Checksum(), ep.Key())
err = s . db . QueryRow (
"SELECT resolve_key, number_of_successes_in_a_row FROM endpoint_alerts_triggered WHERE endpoint_id = (SELECT endpoint_id FROM endpoints WHERE endpoint_key = $1 LIMIT 1) AND configuration_checksum = $2" ,
ep . Key ( ) ,
alert . Checksum ( ) ,
) . Scan ( & resolveKey , & numberOfSuccessesInARow )
if err != nil {
if errors . Is ( err , sql . ErrNoRows ) {
return false , "" , 0 , nil
}
return false , "" , 0 , err
}
return true , resolveKey , numberOfSuccessesInARow , nil
}
// UpsertTriggeredEndpointAlert inserts/updates a triggered alert for an endpoint
// Used for persistence of triggered alerts across application restarts
func ( s * Store ) UpsertTriggeredEndpointAlert ( ep * endpoint . Endpoint , triggeredAlert * alert . Alert ) error {
//log.Printf("[sql.UpsertTriggeredEndpointAlert] Upserting triggered alert with checksum=%s for endpoint with key=%s", triggeredAlert.Checksum(), ep.Key())
tx , err := s . db . Begin ( )
if err != nil {
return err
}
endpointID , err := s . getEndpointID ( tx , ep )
if err != nil {
if errors . Is ( err , common . ErrEndpointNotFound ) {
// Endpoint doesn't exist in the database, insert it
// This shouldn't happen, but we'll handle it anyway
if endpointID , err = s . insertEndpoint ( tx , ep ) ; err != nil {
_ = tx . Rollback ( )
log . Printf ( "[sql.UpsertTriggeredEndpointAlert] Failed to create endpoint with key=%s: %s" , ep . Key ( ) , err . Error ( ) )
return err
}
} else {
_ = tx . Rollback ( )
log . Printf ( "[sql.UpsertTriggeredEndpointAlert] Failed to retrieve id of endpoint with key=%s: %s" , ep . Key ( ) , err . Error ( ) )
return err
}
}
_ , err = tx . Exec (
`
INSERT INTO endpoint_alerts_triggered ( endpoint_id , configuration_checksum , resolve_key , number_of_successes_in_a_row )
VALUES ( $ 1 , $ 2 , $ 3 , $ 4 )
ON CONFLICT ( endpoint_id , configuration_checksum ) DO UPDATE SET
resolve_key = $ 3 ,
number_of_successes_in_a_row = $ 4
` ,
endpointID ,
triggeredAlert . Checksum ( ) ,
triggeredAlert . ResolveKey ,
ep . NumberOfSuccessesInARow , // We only persist NumberOfSuccessesInARow, because all alerts in this table are already triggered
)
if err != nil {
_ = tx . Rollback ( )
log . Printf ( "[sql.UpsertTriggeredEndpointAlert] Failed to persist triggered alert for endpoint with key=%s: %s" , ep . Key ( ) , err . Error ( ) )
return err
}
if err = tx . Commit ( ) ; err != nil {
_ = tx . Rollback ( )
}
return nil
}
// DeleteTriggeredEndpointAlert deletes a triggered alert for an endpoint
func ( s * Store ) DeleteTriggeredEndpointAlert ( ep * endpoint . Endpoint , triggeredAlert * alert . Alert ) error {
//log.Printf("[sql.DeleteTriggeredEndpointAlert] Deleting triggered alert with checksum=%s for endpoint with key=%s", triggeredAlert.Checksum(), ep.Key())
_ , err := s . db . Exec ( "DELETE FROM endpoint_alerts_triggered WHERE configuration_checksum = $1 AND endpoint_id = (SELECT endpoint_id FROM endpoints WHERE endpoint_key = $2 LIMIT 1)" , triggeredAlert . Checksum ( ) , ep . Key ( ) )
return err
}
// DeleteAllTriggeredAlertsNotInChecksumsByEndpoint removes all triggered alerts owned by an endpoint whose alert
// configurations are not provided in the checksums list.
// This prevents triggered alerts that have been removed or modified from lingering in the database.
func ( s * Store ) DeleteAllTriggeredAlertsNotInChecksumsByEndpoint ( ep * endpoint . Endpoint , checksums [ ] string ) int {
//log.Printf("[sql.DeleteAllTriggeredAlertsNotInChecksumsByEndpoint] Deleting triggered alerts for endpoint with key=%s that do not belong to any of checksums=%v", ep.Key(), checksums)
var err error
var result sql . Result
if len ( checksums ) == 0 {
// No checksums? Then it means there are no (enabled) alerts configured for that endpoint, so we can get rid of all
// persisted triggered alerts for that endpoint
result , err = s . db . Exec ( "DELETE FROM endpoint_alerts_triggered WHERE endpoint_id = (SELECT endpoint_id FROM endpoints WHERE endpoint_key = $1 LIMIT 1)" , ep . Key ( ) )
} else {
args := make ( [ ] interface { } , 0 , len ( checksums ) + 1 )
args = append ( args , ep . Key ( ) )
query := ` DELETE FROM endpoint_alerts_triggered
WHERE endpoint_id = ( SELECT endpoint_id FROM endpoints WHERE endpoint_key = $ 1 LIMIT 1 )
AND configuration_checksum NOT IN ( `
for i := range checksums {
query += fmt . Sprintf ( "$%d," , i + 2 )
args = append ( args , checksums [ i ] )
}
query = query [ : len ( query ) - 1 ] + ")" // Remove the last comma and add the closing parenthesis
result , err = s . db . Exec ( query , args ... )
}
if err != nil {
log . Printf ( "[sql.DeleteAllTriggeredAlertsNotInChecksumsByEndpoint] Failed to delete rows for endpoint with key=%s that do not belong to any of checksums=%v: %s" , ep . Key ( ) , checksums , err . Error ( ) )
return 0
}
// Return number of rows deleted
rowsAffects , _ := result . RowsAffected ( )
return int ( rowsAffects )
2021-07-13 04:53:14 +02:00
}
// Clear deletes everything from the store
func ( s * Store ) Clear ( ) {
2021-10-23 22:47:12 +02:00
_ , _ = s . db . Exec ( "DELETE FROM endpoints" )
2022-08-12 02:47:29 +02:00
if s . writeThroughCache != nil {
_ = s . writeThroughCache . DeleteKeysByPattern ( "*" )
}
2021-07-13 04:53:14 +02:00
}
// Save does nothing, because this store is immediately persistent.
func ( s * Store ) Save ( ) error {
return nil
}
// Close the database handle
func ( s * Store ) Close ( ) {
_ = s . db . Close ( )
2022-08-12 02:47:29 +02:00
if s . writeThroughCache != nil {
// Clear the cache too. If the store's been closed, we don't want to keep the cache around.
_ = s . writeThroughCache . DeleteKeysByPattern ( "*" )
}
2021-07-13 04:53:14 +02:00
}
2021-10-23 22:47:12 +02:00
// insertEndpoint inserts an endpoint in the store and returns the generated id of said endpoint
2024-05-10 04:56:16 +02:00
func ( s * Store ) insertEndpoint ( tx * sql . Tx , ep * endpoint . Endpoint ) ( int64 , error ) {
//log.Printf("[sql.insertEndpoint] Inserting endpoint with group=%s and name=%s", ep.Group, ep.Name)
2021-09-11 00:00:04 +02:00
var id int64
err := tx . QueryRow (
2021-10-23 22:47:12 +02:00
"INSERT INTO endpoints (endpoint_key, endpoint_name, endpoint_group) VALUES ($1, $2, $3) RETURNING endpoint_id" ,
2024-05-10 04:56:16 +02:00
ep . Key ( ) ,
ep . Name ,
ep . Group ,
2021-09-11 00:00:04 +02:00
) . Scan ( & id )
2021-07-14 05:23:14 +02:00
if err != nil {
return 0 , err
}
2021-09-11 00:00:04 +02:00
return id , nil
2021-07-14 05:23:14 +02:00
}
2021-10-23 22:47:12 +02:00
// insertEndpointEvent inserts en event in the store
2024-05-10 04:56:16 +02:00
func ( s * Store ) insertEndpointEvent ( tx * sql . Tx , endpointID int64 , event * endpoint . Event ) error {
2021-07-14 05:23:14 +02:00
_ , err := tx . Exec (
2021-10-23 22:47:12 +02:00
"INSERT INTO endpoint_events (endpoint_id, event_type, event_timestamp) VALUES ($1, $2, $3)" ,
endpointID ,
2021-07-14 05:23:14 +02:00
event . Type ,
2021-09-11 00:00:04 +02:00
event . Timestamp . UTC ( ) ,
2021-07-14 05:23:14 +02:00
)
if err != nil {
return err
}
return nil
}
2021-10-23 22:47:12 +02:00
// insertEndpointResult inserts a result in the store
2024-05-10 04:56:16 +02:00
func ( s * Store ) insertEndpointResult ( tx * sql . Tx , endpointID int64 , result * endpoint . Result ) error {
2021-10-23 22:47:12 +02:00
var endpointResultID int64
2021-09-11 00:00:04 +02:00
err := tx . QueryRow (
2021-07-14 05:23:14 +02:00
`
2022-09-07 03:22:02 +02:00
INSERT INTO endpoint_results ( endpoint_id , success , errors , connected , status , dns_rcode , certificate_expiration , domain_expiration , hostname , ip , duration , timestamp )
VALUES ( $ 1 , $ 2 , $ 3 , $ 4 , $ 5 , $ 6 , $ 7 , $ 8 , $ 9 , $ 10 , $ 11 , $ 12 )
2021-10-23 22:47:12 +02:00
RETURNING endpoint_result_id
2021-07-14 05:23:14 +02:00
` ,
2021-10-23 22:47:12 +02:00
endpointID ,
2021-07-14 05:23:14 +02:00
result . Success ,
strings . Join ( result . Errors , arraySeparator ) ,
result . Connected ,
result . HTTPStatus ,
result . DNSRCode ,
result . CertificateExpiration ,
2022-09-07 03:22:02 +02:00
result . DomainExpiration ,
2021-07-14 05:23:14 +02:00
result . Hostname ,
result . IP ,
result . Duration ,
2021-09-11 00:00:04 +02:00
result . Timestamp . UTC ( ) ,
2021-10-23 22:47:12 +02:00
) . Scan ( & endpointResultID )
2021-07-14 05:23:14 +02:00
if err != nil {
return err
}
2021-10-23 22:47:12 +02:00
return s . insertConditionResults ( tx , endpointResultID , result . ConditionResults )
2021-07-14 05:23:14 +02:00
}
2024-05-10 04:56:16 +02:00
func ( s * Store ) insertConditionResults ( tx * sql . Tx , endpointResultID int64 , conditionResults [ ] * endpoint . ConditionResult ) error {
2021-07-14 05:23:14 +02:00
var err error
for _ , cr := range conditionResults {
2021-10-23 22:47:12 +02:00
_ , err = tx . Exec ( "INSERT INTO endpoint_result_conditions (endpoint_result_id, condition, success) VALUES ($1, $2, $3)" ,
endpointResultID ,
2021-07-14 05:23:14 +02:00
cr . Condition ,
cr . Success ,
)
if err != nil {
return err
}
}
return nil
}
2024-05-10 04:56:16 +02:00
func ( s * Store ) updateEndpointUptime ( tx * sql . Tx , endpointID int64 , result * endpoint . Result ) error {
2021-07-14 07:40:27 +02:00
unixTimestampFlooredAtHour := result . Timestamp . Truncate ( time . Hour ) . Unix ( )
var successfulExecutions int
if result . Success {
successfulExecutions = 1
}
_ , err := tx . Exec (
`
2021-10-23 22:47:12 +02:00
INSERT INTO endpoint_uptimes ( endpoint_id , hour_unix_timestamp , total_executions , successful_executions , total_response_time )
2021-07-14 07:40:27 +02:00
VALUES ( $ 1 , $ 2 , $ 3 , $ 4 , $ 5 )
2021-10-23 22:47:12 +02:00
ON CONFLICT ( endpoint_id , hour_unix_timestamp ) DO UPDATE SET
total_executions = excluded . total_executions + endpoint_uptimes . total_executions ,
successful_executions = excluded . successful_executions + endpoint_uptimes . successful_executions ,
total_response_time = excluded . total_response_time + endpoint_uptimes . total_response_time
2021-07-14 07:40:27 +02:00
` ,
2021-10-23 22:47:12 +02:00
endpointID ,
2021-07-14 07:40:27 +02:00
unixTimestampFlooredAtHour ,
1 ,
successfulExecutions ,
result . Duration . Milliseconds ( ) ,
)
2021-10-01 03:19:57 +02:00
return err
2021-07-14 07:40:27 +02:00
}
2021-10-23 22:47:12 +02:00
func ( s * Store ) getAllEndpointKeys ( tx * sql . Tx ) ( keys [ ] string , err error ) {
rows , err := tx . Query ( "SELECT endpoint_key FROM endpoints ORDER BY endpoint_key" )
2021-07-14 04:17:27 +02:00
if err != nil {
return nil , err
}
for rows . Next ( ) {
var key string
_ = rows . Scan ( & key )
keys = append ( keys , key )
}
return
}
2024-05-10 04:56:16 +02:00
func ( s * Store ) getEndpointStatusByKey ( tx * sql . Tx , key string , parameters * paging . EndpointStatusParams ) ( * endpoint . Status , error ) {
2022-08-12 02:47:29 +02:00
var cacheKey string
if s . writeThroughCache != nil {
cacheKey = generateCacheKey ( key , parameters )
if cachedEndpointStatus , exists := s . writeThroughCache . Get ( cacheKey ) ; exists {
2024-05-10 04:56:16 +02:00
if castedCachedEndpointStatus , ok := cachedEndpointStatus . ( * endpoint . Status ) ; ok {
2022-08-12 02:47:29 +02:00
return castedCachedEndpointStatus , nil
}
}
}
2021-10-23 22:47:12 +02:00
endpointID , group , endpointName , err := s . getEndpointIDGroupAndNameByKey ( tx , key )
2021-07-14 04:17:27 +02:00
if err != nil {
return nil , err
}
2024-05-10 04:56:16 +02:00
endpointStatus := endpoint . NewStatus ( group , endpointName )
2021-07-15 04:26:51 +02:00
if parameters . EventsPageSize > 0 {
2021-10-23 22:47:12 +02:00
if endpointStatus . Events , err = s . getEndpointEventsByEndpointID ( tx , endpointID , parameters . EventsPage , parameters . EventsPageSize ) ; err != nil {
2024-04-02 03:47:14 +02:00
log . Printf ( "[sql.getEndpointStatusByKey] Failed to retrieve events for key=%s: %s" , key , err . Error ( ) )
2021-07-14 04:17:27 +02:00
}
}
2021-07-15 04:26:51 +02:00
if parameters . ResultsPageSize > 0 {
2021-10-23 22:47:12 +02:00
if endpointStatus . Results , err = s . getEndpointResultsByEndpointID ( tx , endpointID , parameters . ResultsPage , parameters . ResultsPageSize ) ; err != nil {
2024-04-02 03:47:14 +02:00
log . Printf ( "[sql.getEndpointStatusByKey] Failed to retrieve results for key=%s: %s" , key , err . Error ( ) )
2021-07-14 04:17:27 +02:00
}
}
2022-08-12 02:47:29 +02:00
if s . writeThroughCache != nil {
s . writeThroughCache . SetWithTTL ( cacheKey , endpointStatus , cacheTTL )
}
2021-10-23 22:47:12 +02:00
return endpointStatus , nil
2021-07-14 04:17:27 +02:00
}
2021-10-23 22:47:12 +02:00
func ( s * Store ) getEndpointIDGroupAndNameByKey ( tx * sql . Tx , key string ) ( id int64 , group , name string , err error ) {
2021-09-11 00:00:04 +02:00
err = tx . QueryRow (
2021-07-18 05:59:17 +02:00
`
2021-10-23 22:47:12 +02:00
SELECT endpoint_id , endpoint_group , endpoint_name
FROM endpoints
WHERE endpoint_key = $ 1
2021-07-18 05:59:17 +02:00
LIMIT 1
` ,
key ,
2021-09-11 00:00:04 +02:00
) . Scan ( & id , & group , & name )
2021-07-12 06:56:30 +02:00
if err != nil {
2024-04-09 03:00:40 +02:00
if errors . Is ( err , sql . ErrNoRows ) {
2021-10-23 22:47:12 +02:00
return 0 , "" , "" , common . ErrEndpointNotFound
2021-09-11 00:00:04 +02:00
}
2021-07-12 06:56:30 +02:00
return 0 , "" , "" , err
}
return
}
2024-05-10 04:56:16 +02:00
func ( s * Store ) getEndpointEventsByEndpointID ( tx * sql . Tx , endpointID int64 , page , pageSize int ) ( events [ ] * endpoint . Event , err error ) {
2021-07-14 04:59:43 +02:00
rows , err := tx . Query (
2021-07-14 04:17:27 +02:00
`
SELECT event_type , event_timestamp
2021-10-23 22:47:12 +02:00
FROM endpoint_events
WHERE endpoint_id = $ 1
ORDER BY endpoint_event_id ASC
2021-07-14 04:17:27 +02:00
LIMIT $ 2 OFFSET $ 3
` ,
2021-10-23 22:47:12 +02:00
endpointID ,
2021-07-14 04:17:27 +02:00
pageSize ,
( page - 1 ) * pageSize ,
)
2021-07-12 06:56:30 +02:00
if err != nil {
return nil , err
}
for rows . Next ( ) {
2024-05-10 04:56:16 +02:00
event := & endpoint . Event { }
2021-07-12 06:56:30 +02:00
_ = rows . Scan ( & event . Type , & event . Timestamp )
events = append ( events , event )
}
return
}
2024-05-10 04:56:16 +02:00
func ( s * Store ) getEndpointResultsByEndpointID ( tx * sql . Tx , endpointID int64 , page , pageSize int ) ( results [ ] * endpoint . Result , err error ) {
2021-07-14 04:17:27 +02:00
rows , err := tx . Query (
2021-07-12 10:25:25 +02:00
`
2022-09-07 03:22:02 +02:00
SELECT endpoint_result_id , success , errors , connected , status , dns_rcode , certificate_expiration , domain_expiration , hostname , ip , duration , timestamp
2021-10-23 22:47:12 +02:00
FROM endpoint_results
WHERE endpoint_id = $ 1
ORDER BY endpoint_result_id DESC -- Normally , we ' d sort by timestamp , but sorting by endpoint_result_id is faster
2021-07-14 04:17:27 +02:00
LIMIT $ 2 OFFSET $ 3
2021-07-13 04:53:14 +02:00
` ,
2021-10-23 22:47:12 +02:00
endpointID ,
2021-07-14 04:17:27 +02:00
pageSize ,
( page - 1 ) * pageSize ,
2021-07-12 06:56:30 +02:00
)
if err != nil {
return nil , err
}
2024-05-10 04:56:16 +02:00
idResultMap := make ( map [ int64 ] * endpoint . Result )
2021-07-12 06:56:30 +02:00
for rows . Next ( ) {
2024-05-10 04:56:16 +02:00
result := & endpoint . Result { }
2021-07-12 06:56:30 +02:00
var id int64
2021-07-12 10:25:25 +02:00
var joinedErrors string
2022-09-08 00:17:28 +02:00
err = rows . Scan ( & id , & result . Success , & joinedErrors , & result . Connected , & result . HTTPStatus , & result . DNSRCode , & result . CertificateExpiration , & result . DomainExpiration , & result . Hostname , & result . IP , & result . Duration , & result . Timestamp )
if err != nil {
2024-04-02 03:47:14 +02:00
log . Printf ( "[sql.getEndpointResultsByEndpointID] Silently failed to retrieve endpoint result for endpointID=%d: %s" , endpointID , err . Error ( ) )
2022-09-08 00:17:28 +02:00
err = nil
}
2021-07-16 04:07:30 +02:00
if len ( joinedErrors ) != 0 {
result . Errors = strings . Split ( joinedErrors , arraySeparator )
}
2021-07-16 23:48:38 +02:00
// This is faster than using a subselect
2024-05-10 04:56:16 +02:00
results = append ( [ ] * endpoint . Result { result } , results ... )
2021-07-12 07:06:44 +02:00
idResultMap [ id ] = result
2021-07-12 06:56:30 +02:00
}
2021-11-17 04:56:16 +01:00
if len ( idResultMap ) == 0 {
// If there's no result, we'll just return an empty/nil slice
return
}
2021-09-11 23:49:31 +02:00
// Get condition results
args := make ( [ ] interface { } , 0 , len ( idResultMap ) )
2021-10-23 22:47:12 +02:00
query := ` SELECT endpoint_result_id , condition , success
FROM endpoint_result_conditions
WHERE endpoint_result_id IN ( `
2021-09-11 23:49:31 +02:00
index := 1
2021-10-23 22:47:12 +02:00
for endpointResultID := range idResultMap {
2022-06-14 01:15:30 +02:00
query += "$" + strconv . Itoa ( index ) + ","
2021-10-23 22:47:12 +02:00
args = append ( args , endpointResultID )
2021-09-11 23:49:31 +02:00
index ++
}
query = query [ : len ( query ) - 1 ] + ")"
rows , err = tx . Query ( query , args ... )
if err != nil {
return nil , err
}
2021-10-01 03:19:57 +02:00
defer rows . Close ( ) // explicitly defer the close in case an error happens during the scan
2021-09-11 23:49:31 +02:00
for rows . Next ( ) {
2024-05-10 04:56:16 +02:00
conditionResult := & endpoint . ConditionResult { }
2021-10-23 22:47:12 +02:00
var endpointResultID int64
if err = rows . Scan ( & endpointResultID , & conditionResult . Condition , & conditionResult . Success ) ; err != nil {
2021-07-12 06:56:30 +02:00
return
}
2021-10-23 22:47:12 +02:00
idResultMap [ endpointResultID ] . ConditionResults = append ( idResultMap [ endpointResultID ] . ConditionResults , conditionResult )
2021-07-12 06:56:30 +02:00
}
return
}
2021-10-23 22:47:12 +02:00
func ( s * Store ) getEndpointUptime ( tx * sql . Tx , endpointID int64 , from , to time . Time ) ( uptime float64 , avgResponseTime time . Duration , err error ) {
2021-07-14 07:40:27 +02:00
rows , err := tx . Query (
`
SELECT SUM ( total_executions ) , SUM ( successful_executions ) , SUM ( total_response_time )
2021-10-23 22:47:12 +02:00
FROM endpoint_uptimes
WHERE endpoint_id = $ 1
2021-07-14 07:40:27 +02:00
AND hour_unix_timestamp >= $ 2
AND hour_unix_timestamp <= $ 3
` ,
2021-10-23 22:47:12 +02:00
endpointID ,
2021-07-14 07:40:27 +02:00
from . Unix ( ) ,
to . Unix ( ) ,
)
if err != nil {
return 0 , 0 , err
}
var totalExecutions , totalSuccessfulExecutions , totalResponseTime int
for rows . Next ( ) {
_ = rows . Scan ( & totalExecutions , & totalSuccessfulExecutions , & totalResponseTime )
}
if totalExecutions > 0 {
uptime = float64 ( totalSuccessfulExecutions ) / float64 ( totalExecutions )
avgResponseTime = time . Duration ( float64 ( totalResponseTime ) / float64 ( totalExecutions ) ) * time . Millisecond
}
return
}
2021-10-23 22:47:12 +02:00
func ( s * Store ) getEndpointAverageResponseTime ( tx * sql . Tx , endpointID int64 , from , to time . Time ) ( int , error ) {
2021-08-21 16:59:09 +02:00
rows , err := tx . Query (
`
SELECT SUM ( total_executions ) , SUM ( total_response_time )
2021-10-23 22:47:12 +02:00
FROM endpoint_uptimes
WHERE endpoint_id = $ 1
2021-08-21 16:59:09 +02:00
AND total_executions > 0
AND hour_unix_timestamp >= $ 2
AND hour_unix_timestamp <= $ 3
` ,
2021-10-23 22:47:12 +02:00
endpointID ,
2021-08-21 16:59:09 +02:00
from . Unix ( ) ,
to . Unix ( ) ,
)
if err != nil {
return 0 , err
}
var totalExecutions , totalResponseTime int
for rows . Next ( ) {
_ = rows . Scan ( & totalExecutions , & totalResponseTime )
}
if totalExecutions == 0 {
return 0 , nil
}
return int ( float64 ( totalResponseTime ) / float64 ( totalExecutions ) ) , nil
}
2021-10-23 22:47:12 +02:00
func ( s * Store ) getEndpointHourlyAverageResponseTimes ( tx * sql . Tx , endpointID int64 , from , to time . Time ) ( map [ int64 ] int , error ) {
2021-08-20 05:07:21 +02:00
rows , err := tx . Query (
`
SELECT hour_unix_timestamp , total_executions , total_response_time
2021-10-23 22:47:12 +02:00
FROM endpoint_uptimes
WHERE endpoint_id = $ 1
2021-08-20 05:07:21 +02:00
AND total_executions > 0
AND hour_unix_timestamp >= $ 2
AND hour_unix_timestamp <= $ 3
` ,
2021-10-23 22:47:12 +02:00
endpointID ,
2021-08-20 05:07:21 +02:00
from . Unix ( ) ,
to . Unix ( ) ,
)
if err != nil {
return nil , err
}
var totalExecutions , totalResponseTime int
var unixTimestampFlooredAtHour int64
hourlyAverageResponseTimes := make ( map [ int64 ] int )
for rows . Next ( ) {
_ = rows . Scan ( & unixTimestampFlooredAtHour , & totalExecutions , & totalResponseTime )
hourlyAverageResponseTimes [ unixTimestampFlooredAtHour ] = int ( float64 ( totalResponseTime ) / float64 ( totalExecutions ) )
}
return hourlyAverageResponseTimes , nil
}
2024-05-10 04:56:16 +02:00
func ( s * Store ) getEndpointID ( tx * sql . Tx , ep * endpoint . Endpoint ) ( int64 , error ) {
2021-09-11 00:00:04 +02:00
var id int64
2024-05-10 04:56:16 +02:00
err := tx . QueryRow ( "SELECT endpoint_id FROM endpoints WHERE endpoint_key = $1" , ep . Key ( ) ) . Scan ( & id )
2021-07-12 06:56:30 +02:00
if err != nil {
2024-04-09 03:00:40 +02:00
if errors . Is ( err , sql . ErrNoRows ) {
2021-10-23 22:47:12 +02:00
return 0 , common . ErrEndpointNotFound
2021-09-11 00:00:04 +02:00
}
2021-07-12 06:56:30 +02:00
return 0 , err
}
return id , nil
}
2021-10-23 22:47:12 +02:00
func ( s * Store ) getNumberOfEventsByEndpointID ( tx * sql . Tx , endpointID int64 ) ( int64 , error ) {
2021-07-12 06:56:30 +02:00
var numberOfEvents int64
2021-10-23 22:47:12 +02:00
err := tx . QueryRow ( "SELECT COUNT(1) FROM endpoint_events WHERE endpoint_id = $1" , endpointID ) . Scan ( & numberOfEvents )
2021-09-11 00:00:04 +02:00
return numberOfEvents , err
2021-07-12 06:56:30 +02:00
}
2021-10-23 22:47:12 +02:00
func ( s * Store ) getNumberOfResultsByEndpointID ( tx * sql . Tx , endpointID int64 ) ( int64 , error ) {
2021-07-13 04:53:14 +02:00
var numberOfResults int64
2021-10-23 22:47:12 +02:00
err := tx . QueryRow ( "SELECT COUNT(1) FROM endpoint_results WHERE endpoint_id = $1" , endpointID ) . Scan ( & numberOfResults )
2021-09-11 00:00:04 +02:00
return numberOfResults , err
2021-07-13 04:53:14 +02:00
}
2024-08-12 04:40:19 +02:00
func ( s * Store ) getNumberOfUptimeEntriesByEndpointID ( tx * sql . Tx , endpointID int64 ) ( int64 , error ) {
var numberOfUptimeEntries int64
err := tx . QueryRow ( "SELECT COUNT(1) FROM endpoint_uptimes WHERE endpoint_id = $1" , endpointID ) . Scan ( & numberOfUptimeEntries )
return numberOfUptimeEntries , err
}
2021-10-23 22:47:12 +02:00
func ( s * Store ) getAgeOfOldestEndpointUptimeEntry ( tx * sql . Tx , endpointID int64 ) ( time . Duration , error ) {
2021-07-14 07:40:27 +02:00
rows , err := tx . Query (
`
SELECT hour_unix_timestamp
2021-10-23 22:47:12 +02:00
FROM endpoint_uptimes
WHERE endpoint_id = $ 1
2021-07-14 07:40:27 +02:00
ORDER BY hour_unix_timestamp
LIMIT 1
` ,
2021-10-23 22:47:12 +02:00
endpointID ,
2021-07-14 07:40:27 +02:00
)
if err != nil {
return 0 , err
}
2021-10-23 22:47:12 +02:00
var oldestEndpointUptimeUnixTimestamp int64
2021-07-14 07:40:27 +02:00
var found bool
for rows . Next ( ) {
2021-10-23 22:47:12 +02:00
_ = rows . Scan ( & oldestEndpointUptimeUnixTimestamp )
2021-07-14 07:40:27 +02:00
found = true
}
if ! found {
return 0 , errNoRowsReturned
}
2021-10-23 22:47:12 +02:00
return time . Since ( time . Unix ( oldestEndpointUptimeUnixTimestamp , 0 ) ) , nil
2021-07-14 07:40:27 +02:00
}
2021-10-23 22:47:12 +02:00
func ( s * Store ) getLastEndpointResultSuccessValue ( tx * sql . Tx , endpointID int64 ) ( bool , error ) {
2021-09-11 00:00:04 +02:00
var success bool
2021-10-23 22:47:12 +02:00
err := tx . QueryRow ( "SELECT success FROM endpoint_results WHERE endpoint_id = $1 ORDER BY endpoint_result_id DESC LIMIT 1" , endpointID ) . Scan ( & success )
2021-07-12 06:56:30 +02:00
if err != nil {
2024-04-09 03:00:40 +02:00
if errors . Is ( err , sql . ErrNoRows ) {
2021-09-11 00:00:04 +02:00
return false , errNoRowsReturned
}
2021-07-12 06:56:30 +02:00
return false , err
}
return success , nil
}
2021-10-23 22:47:12 +02:00
// deleteOldEndpointEvents deletes endpoint events that are no longer needed
func ( s * Store ) deleteOldEndpointEvents ( tx * sql . Tx , endpointID int64 ) error {
2021-07-13 04:53:14 +02:00
_ , err := tx . Exec (
2021-07-12 10:25:25 +02:00
`
2021-10-23 22:47:12 +02:00
DELETE FROM endpoint_events
WHERE endpoint_id = $ 1
AND endpoint_event_id NOT IN (
SELECT endpoint_event_id
FROM endpoint_events
WHERE endpoint_id = $ 1
ORDER BY endpoint_event_id DESC
2021-07-19 05:07:24 +02:00
LIMIT $ 2
)
2021-07-12 10:25:25 +02:00
` ,
2021-10-23 22:47:12 +02:00
endpointID ,
2021-08-13 03:54:23 +02:00
common . MaximumNumberOfEvents ,
2021-07-12 10:25:25 +02:00
)
2021-09-11 00:00:04 +02:00
return err
2021-07-12 10:25:25 +02:00
}
2021-10-23 22:47:12 +02:00
// deleteOldEndpointResults deletes endpoint results that are no longer needed
func ( s * Store ) deleteOldEndpointResults ( tx * sql . Tx , endpointID int64 ) error {
2021-07-13 04:53:14 +02:00
_ , err := tx . Exec (
`
2021-10-23 22:47:12 +02:00
DELETE FROM endpoint_results
WHERE endpoint_id = $ 1
AND endpoint_result_id NOT IN (
SELECT endpoint_result_id
FROM endpoint_results
WHERE endpoint_id = $ 1
ORDER BY endpoint_result_id DESC
2021-07-19 05:07:24 +02:00
LIMIT $ 2
)
2021-07-13 04:53:14 +02:00
` ,
2021-10-23 22:47:12 +02:00
endpointID ,
2021-08-13 03:54:23 +02:00
common . MaximumNumberOfResults ,
2021-07-13 04:53:14 +02:00
)
2021-09-11 00:00:04 +02:00
return err
2021-07-12 06:56:30 +02:00
}
2021-07-14 07:40:27 +02:00
2021-10-23 22:47:12 +02:00
func ( s * Store ) deleteOldUptimeEntries ( tx * sql . Tx , endpointID int64 , maxAge time . Time ) error {
_ , err := tx . Exec ( "DELETE FROM endpoint_uptimes WHERE endpoint_id = $1 AND hour_unix_timestamp < $2" , endpointID , maxAge . Unix ( ) )
2021-07-14 07:40:27 +02:00
return err
}
2022-08-12 02:47:29 +02:00
2024-08-12 04:40:19 +02:00
// mergeHourlyUptimeEntriesOlderThanMergeThresholdIntoDailyUptimeEntries merges all hourly uptime entries older than
// uptimeHourlyMergeThreshold from now into daily uptime entries by summing all hourly entries of the same day into a
// single entry.
//
// This effectively limits the number of uptime entries to (48+(n-2)) where 48 is for the first 48 entries with hourly
// entries (defined by uptimeHourlyBuffer) and n is the number of days for all entries older than 48 hours.
// Supporting 30d of entries would then result in far less than 24*30=720 entries.
func ( s * Store ) mergeHourlyUptimeEntriesOlderThanMergeThresholdIntoDailyUptimeEntries ( tx * sql . Tx , endpointID int64 ) error {
// Calculate timestamp of the first full day of uptime entries that would not impact the uptime calculation for 24h badges
// The logic is that once at least 48 hours passed, we:
// - No longer need to worry about keeping hourly entries
// - Don't have to worry about new hourly entries being inserted, as the day has already passed
// which implies that no matter at what hour of the day we are, any timestamp + 48h floored to the current day
// will never impact the 24h uptime badge calculation
now := time . Now ( )
minThreshold := now . Add ( - uptimeHourlyBuffer )
minThreshold = time . Date ( minThreshold . Year ( ) , minThreshold . Month ( ) , minThreshold . Day ( ) , 0 , 0 , 0 , 0 , minThreshold . Location ( ) )
maxThreshold := now . Add ( - uptimeRetention )
// Get all uptime entries older than uptimeHourlyMergeThreshold
rows , err := tx . Query (
`
SELECT hour_unix_timestamp , total_executions , successful_executions , total_response_time
FROM endpoint_uptimes
WHERE endpoint_id = $ 1
AND hour_unix_timestamp < $ 2
AND hour_unix_timestamp >= $ 3
` ,
endpointID ,
minThreshold . Unix ( ) ,
maxThreshold . Unix ( ) ,
)
if err != nil {
return err
}
type Entry struct {
totalExecutions int
successfulExecutions int
totalResponseTime int
}
dailyEntries := make ( map [ int64 ] * Entry )
for rows . Next ( ) {
var unixTimestamp int64
entry := Entry { }
if err = rows . Scan ( & unixTimestamp , & entry . totalExecutions , & entry . successfulExecutions , & entry . totalResponseTime ) ; err != nil {
return err
}
timestamp := time . Unix ( unixTimestamp , 0 )
unixTimestampFlooredAtDay := time . Date ( timestamp . Year ( ) , timestamp . Month ( ) , timestamp . Day ( ) , 0 , 0 , 0 , 0 , timestamp . Location ( ) ) . Unix ( )
if dailyEntry := dailyEntries [ unixTimestampFlooredAtDay ] ; dailyEntry == nil {
dailyEntries [ unixTimestampFlooredAtDay ] = & entry
} else {
dailyEntries [ unixTimestampFlooredAtDay ] . totalExecutions += entry . totalExecutions
dailyEntries [ unixTimestampFlooredAtDay ] . successfulExecutions += entry . successfulExecutions
dailyEntries [ unixTimestampFlooredAtDay ] . totalResponseTime += entry . totalResponseTime
}
}
// Delete older hourly uptime entries
_ , err = tx . Exec ( "DELETE FROM endpoint_uptimes WHERE endpoint_id = $1 AND hour_unix_timestamp < $2" , endpointID , minThreshold . Unix ( ) )
if err != nil {
return err
}
// Insert new daily uptime entries
for unixTimestamp , entry := range dailyEntries {
_ , err = tx . Exec (
`
INSERT INTO endpoint_uptimes ( endpoint_id , hour_unix_timestamp , total_executions , successful_executions , total_response_time )
VALUES ( $ 1 , $ 2 , $ 3 , $ 4 , $ 5 )
ON CONFLICT ( endpoint_id , hour_unix_timestamp ) DO UPDATE SET
total_executions = $ 3 ,
successful_executions = $ 4 ,
total_response_time = $ 5
` ,
endpointID ,
unixTimestamp ,
entry . totalExecutions ,
entry . successfulExecutions ,
entry . totalResponseTime ,
)
if err != nil {
return err
}
}
// TODO: Find a way to ignore entries that were already merged?
return nil
}
2022-08-12 02:47:29 +02:00
func generateCacheKey ( endpointKey string , p * paging . EndpointStatusParams ) string {
return fmt . Sprintf ( "%s-%d-%d-%d-%d" , endpointKey , p . EventsPage , p . EventsPageSize , p . ResultsPage , p . ResultsPageSize )
}
func extractKeyAndParamsFromCacheKey ( cacheKey string ) ( string , * paging . EndpointStatusParams , error ) {
parts := strings . Split ( cacheKey , "-" )
if len ( parts ) < 5 {
return "" , nil , fmt . Errorf ( "invalid cache key: %s" , cacheKey )
}
params := & paging . EndpointStatusParams { }
var err error
if params . EventsPage , err = strconv . Atoi ( parts [ len ( parts ) - 4 ] ) ; err != nil {
2022-09-21 03:54:59 +02:00
return "" , nil , fmt . Errorf ( "invalid cache key: %w" , err )
2022-08-12 02:47:29 +02:00
}
if params . EventsPageSize , err = strconv . Atoi ( parts [ len ( parts ) - 3 ] ) ; err != nil {
2022-09-21 03:54:59 +02:00
return "" , nil , fmt . Errorf ( "invalid cache key: %w" , err )
2022-08-12 02:47:29 +02:00
}
if params . ResultsPage , err = strconv . Atoi ( parts [ len ( parts ) - 2 ] ) ; err != nil {
2022-09-21 03:54:59 +02:00
return "" , nil , fmt . Errorf ( "invalid cache key: %w" , err )
2022-08-12 02:47:29 +02:00
}
if params . ResultsPageSize , err = strconv . Atoi ( parts [ len ( parts ) - 1 ] ) ; err != nil {
2022-09-21 03:54:59 +02:00
return "" , nil , fmt . Errorf ( "invalid cache key: %w" , err )
2022-08-12 02:47:29 +02:00
}
return strings . Join ( parts [ : len ( parts ) - 4 ] , "-" ) , params , nil
}