Add datadog integration for monitoring the server

This commit is contained in:
David Dworken
2022-11-25 20:04:40 -08:00
parent 8dd9c1d9e4
commit 1ce20157c7
4 changed files with 269 additions and 131 deletions

View File

@ -1,6 +1,7 @@
package main
import (
"context"
"encoding/json"
"fmt"
"html"
@ -8,16 +9,26 @@ import (
"log"
"net/http"
"os"
"reflect"
"runtime"
"strconv"
"strings"
"time"
"github.com/DataDog/datadog-go/statsd"
"github.com/ddworken/hishtory/shared"
"github.com/jackc/pgx/v4/stdlib"
_ "github.com/lib/pq"
"github.com/rodaine/table"
"gorm.io/driver/postgres"
sqltrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/database/sql"
gormtrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/gorm.io/gorm.v1"
httptrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/profiler"
"gorm.io/gorm/logger"
"gorm.io/driver/postgres"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
@ -28,6 +39,7 @@ const (
var (
GLOBAL_DB *gorm.DB
GLOBAL_STATSD *statsd.Client
ReleaseVersion string = "UNKNOWN"
)
@ -54,27 +66,27 @@ func getHishtoryVersion(r *http.Request) string {
return r.Header.Get("X-Hishtory-Version")
}
func updateUsageData(r *http.Request, userId, deviceId string, numEntriesHandled int, isQuery bool) {
func updateUsageData(ctx context.Context, r *http.Request, userId, deviceId string, numEntriesHandled int, isQuery bool) {
var usageData []UsageData
GLOBAL_DB.Where("user_id = ? AND device_id = ?", userId, deviceId).Find(&usageData)
GLOBAL_DB.WithContext(ctx).Where("user_id = ? AND device_id = ?", userId, deviceId).Find(&usageData)
if len(usageData) == 0 {
GLOBAL_DB.Create(&UsageData{UserId: userId, DeviceId: deviceId, LastUsed: time.Now(), NumEntriesHandled: numEntriesHandled, Version: getHishtoryVersion(r)})
GLOBAL_DB.WithContext(ctx).Create(&UsageData{UserId: userId, DeviceId: deviceId, LastUsed: time.Now(), NumEntriesHandled: numEntriesHandled, Version: getHishtoryVersion(r)})
} else {
usage := usageData[0]
GLOBAL_DB.Model(&UsageData{}).Where("user_id = ? AND device_id = ?", userId, deviceId).Update("last_used", time.Now()).Update("last_ip", getRemoteAddr(r))
GLOBAL_DB.WithContext(ctx).Model(&UsageData{}).Where("user_id = ? AND device_id = ?", userId, deviceId).Update("last_used", time.Now()).Update("last_ip", getRemoteAddr(r))
if numEntriesHandled > 0 {
GLOBAL_DB.Exec("UPDATE usage_data SET num_entries_handled = COALESCE(num_entries_handled, 0) + ? WHERE user_id = ? AND device_id = ?", numEntriesHandled, userId, deviceId)
GLOBAL_DB.WithContext(ctx).Exec("UPDATE usage_data SET num_entries_handled = COALESCE(num_entries_handled, 0) + ? WHERE user_id = ? AND device_id = ?", numEntriesHandled, userId, deviceId)
}
if usage.Version != getHishtoryVersion(r) {
GLOBAL_DB.Exec("UPDATE usage_data SET version = ? WHERE user_id = ? AND device_id = ?", getHishtoryVersion(r), userId, deviceId)
GLOBAL_DB.WithContext(ctx).Exec("UPDATE usage_data SET version = ? WHERE user_id = ? AND device_id = ?", getHishtoryVersion(r), userId, deviceId)
}
}
if isQuery {
GLOBAL_DB.Exec("UPDATE usage_data SET num_queries = COALESCE(num_queries, 0) + 1, last_queried = ? WHERE user_id = ? AND device_id = ?", time.Now(), userId, deviceId)
GLOBAL_DB.WithContext(ctx).Exec("UPDATE usage_data SET num_queries = COALESCE(num_queries, 0) + 1, last_queried = ? WHERE user_id = ? AND device_id = ?", time.Now(), userId, deviceId)
}
}
func usageStatsHandler(w http.ResponseWriter, r *http.Request) {
func usageStatsHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
query := `
SELECT
MIN(devices.registration_date) as registration_date,
@ -90,7 +102,7 @@ func usageStatsHandler(w http.ResponseWriter, r *http.Request) {
GROUP BY devices.user_id
ORDER BY registration_date
`
rows, err := GLOBAL_DB.Raw(query).Rows()
rows, err := GLOBAL_DB.WithContext(ctx).Raw(query).Rows()
if err != nil {
panic(err)
}
@ -116,24 +128,24 @@ func usageStatsHandler(w http.ResponseWriter, r *http.Request) {
tbl.Print()
}
func statsHandler(w http.ResponseWriter, r *http.Request) {
func statsHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
var numDevices int64 = 0
checkGormResult(GLOBAL_DB.Model(&shared.Device{}).Count(&numDevices))
checkGormResult(GLOBAL_DB.WithContext(ctx).Model(&shared.Device{}).Count(&numDevices))
type numEntriesProcessed struct {
Total int
}
nep := numEntriesProcessed{}
checkGormResult(GLOBAL_DB.Model(&UsageData{}).Select("SUM(num_entries_handled) as total").Find(&nep))
checkGormResult(GLOBAL_DB.WithContext(ctx).Model(&UsageData{}).Select("SUM(num_entries_handled) as total").Find(&nep))
var numDbEntries int64 = 0
checkGormResult(GLOBAL_DB.Model(&shared.EncHistoryEntry{}).Count(&numDbEntries))
checkGormResult(GLOBAL_DB.WithContext(ctx).Model(&shared.EncHistoryEntry{}).Count(&numDbEntries))
lastWeek := time.Now().AddDate(0, 0, -7)
var weeklyActiveInstalls int64 = 0
checkGormResult(GLOBAL_DB.Model(&UsageData{}).Where("last_used > ?", lastWeek).Count(&weeklyActiveInstalls))
checkGormResult(GLOBAL_DB.WithContext(ctx).Model(&UsageData{}).Where("last_used > ?", lastWeek).Count(&weeklyActiveInstalls))
var weeklyQueryUsers int64 = 0
checkGormResult(GLOBAL_DB.Model(&UsageData{}).Where("last_queried > ?", lastWeek).Count(&weeklyQueryUsers))
checkGormResult(GLOBAL_DB.WithContext(ctx).Model(&UsageData{}).Where("last_queried > ?", lastWeek).Count(&weeklyQueryUsers))
var lastRegistration string = ""
err := GLOBAL_DB.Raw("select to_char(max(registration_date), 'DD Month YYYY HH24:MI') from devices").Row().Scan(&lastRegistration)
err := GLOBAL_DB.WithContext(ctx).Raw("select to_char(max(registration_date), 'DD Month YYYY HH24:MI') from devices").Row().Scan(&lastRegistration)
if err != nil {
panic(err)
}
@ -145,7 +157,7 @@ func statsHandler(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(fmt.Sprintf("Last registration: %s\n", lastRegistration)))
}
func apiSubmitHandler(w http.ResponseWriter, r *http.Request) {
func apiSubmitHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
data, err := ioutil.ReadAll(r.Body)
if err != nil {
panic(err)
@ -159,8 +171,8 @@ func apiSubmitHandler(w http.ResponseWriter, r *http.Request) {
if len(entries) == 0 {
return
}
updateUsageData(r, entries[0].UserId, entries[0].DeviceId, len(entries), false)
tx := GLOBAL_DB.Where("user_id = ?", entries[0].UserId)
updateUsageData(ctx, r, entries[0].UserId, entries[0].DeviceId, len(entries), false)
tx := GLOBAL_DB.WithContext(ctx).Where("user_id = ?", entries[0].UserId)
var devices []*shared.Device
checkGormResult(tx.Find(&devices))
if len(devices) == 0 {
@ -171,15 +183,16 @@ func apiSubmitHandler(w http.ResponseWriter, r *http.Request) {
for _, entry := range entries {
entry.DeviceId = device.DeviceId
}
checkGormResult(GLOBAL_DB.Create(&entries))
checkGormResult(GLOBAL_DB.WithContext(ctx).Create(&entries))
}
GLOBAL_STATSD.Count("hishtory.submit", int64(len(devices)), []string{}, 1.0)
}
func apiBootstrapHandler(w http.ResponseWriter, r *http.Request) {
func apiBootstrapHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
userId := getRequiredQueryParam(r, "user_id")
deviceId := getRequiredQueryParam(r, "device_id")
updateUsageData(r, userId, deviceId, 0, false)
tx := GLOBAL_DB.Where("user_id = ?", userId)
updateUsageData(ctx, r, userId, deviceId, 0, false)
tx := GLOBAL_DB.WithContext(ctx).Where("user_id = ?", userId)
var historyEntries []*shared.EncHistoryEntry
checkGormResult(tx.Find(&historyEntries))
fmt.Printf("apiBootstrapHandler: Found %d entries\n", len(historyEntries))
@ -190,25 +203,25 @@ func apiBootstrapHandler(w http.ResponseWriter, r *http.Request) {
w.Write(resp)
}
func apiQueryHandler(w http.ResponseWriter, r *http.Request) {
func apiQueryHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
userId := getRequiredQueryParam(r, "user_id")
deviceId := getRequiredQueryParam(r, "device_id")
updateUsageData(r, userId, deviceId, 0, true)
updateUsageData(ctx, r, userId, deviceId, 0, true)
// Increment the count
checkGormResult(GLOBAL_DB.Exec("UPDATE enc_history_entries SET read_count = read_count + 1 WHERE device_id = ?", deviceId))
checkGormResult(GLOBAL_DB.WithContext(ctx).Exec("UPDATE enc_history_entries SET read_count = read_count + 1 WHERE device_id = ?", deviceId))
// Delete any entries that match a pending deletion request
var deletionRequests []*shared.DeletionRequest
checkGormResult(GLOBAL_DB.Where("destination_device_id = ? AND user_id = ?", deviceId, userId).Find(&deletionRequests))
checkGormResult(GLOBAL_DB.WithContext(ctx).Where("destination_device_id = ? AND user_id = ?", deviceId, userId).Find(&deletionRequests))
for _, request := range deletionRequests {
_, err := applyDeletionRequestsToBackend(*request)
_, err := applyDeletionRequestsToBackend(ctx, *request)
if err != nil {
panic(err)
}
}
// Then retrieve, to avoid a race condition
tx := GLOBAL_DB.Where("device_id = ? AND read_count < 5", deviceId)
tx := GLOBAL_DB.WithContext(ctx).Where("device_id = ? AND read_count < 5", deviceId)
var historyEntries []*shared.EncHistoryEntry
checkGormResult(tx.Find(&historyEntries))
fmt.Printf("apiQueryHandler: Found %d entries for %s\n", len(historyEntries), r.URL)
@ -217,6 +230,7 @@ func apiQueryHandler(w http.ResponseWriter, r *http.Request) {
panic(err)
}
w.Write(resp)
GLOBAL_STATSD.Incr("hishtory.query", []string{}, 1.0)
}
func getRemoteAddr(r *http.Request) string {
@ -227,25 +241,26 @@ func getRemoteAddr(r *http.Request) string {
return addr[0]
}
func apiRegisterHandler(w http.ResponseWriter, r *http.Request) {
func apiRegisterHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
userId := getRequiredQueryParam(r, "user_id")
deviceId := getRequiredQueryParam(r, "device_id")
var existingDevicesCount int64 = -1
checkGormResult(GLOBAL_DB.Model(&shared.Device{}).Where("user_id = ?", userId).Count(&existingDevicesCount))
checkGormResult(GLOBAL_DB.WithContext(ctx).Model(&shared.Device{}).Where("user_id = ?", userId).Count(&existingDevicesCount))
fmt.Printf("apiRegisterHandler: existingDevicesCount=%d\n", existingDevicesCount)
checkGormResult(GLOBAL_DB.Create(&shared.Device{UserId: userId, DeviceId: deviceId, RegistrationIp: getRemoteAddr(r), RegistrationDate: time.Now()}))
checkGormResult(GLOBAL_DB.WithContext(ctx).Create(&shared.Device{UserId: userId, DeviceId: deviceId, RegistrationIp: getRemoteAddr(r), RegistrationDate: time.Now()}))
if existingDevicesCount > 0 {
checkGormResult(GLOBAL_DB.Create(&shared.DumpRequest{UserId: userId, RequestingDeviceId: deviceId, RequestTime: time.Now()}))
checkGormResult(GLOBAL_DB.WithContext(ctx).Create(&shared.DumpRequest{UserId: userId, RequestingDeviceId: deviceId, RequestTime: time.Now()}))
}
updateUsageData(r, userId, deviceId, 0, false)
updateUsageData(ctx, r, userId, deviceId, 0, false)
GLOBAL_STATSD.Incr("hishtory.register", []string{}, 1.0)
}
func apiGetPendingDumpRequestsHandler(w http.ResponseWriter, r *http.Request) {
func apiGetPendingDumpRequestsHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
userId := getRequiredQueryParam(r, "user_id")
deviceId := getRequiredQueryParam(r, "device_id")
var dumpRequests []*shared.DumpRequest
// Filter out ones requested by the hishtory instance that sent this request
checkGormResult(GLOBAL_DB.Where("user_id = ? AND requesting_device_id != ?", userId, deviceId).Find(&dumpRequests))
checkGormResult(GLOBAL_DB.WithContext(ctx).Where("user_id = ? AND requesting_device_id != ?", userId, deviceId).Find(&dumpRequests))
respBody, err := json.Marshal(dumpRequests)
if err != nil {
panic(fmt.Errorf("failed to JSON marshall the dump requests: %v", err))
@ -253,7 +268,7 @@ func apiGetPendingDumpRequestsHandler(w http.ResponseWriter, r *http.Request) {
w.Write(respBody)
}
func apiSubmitDumpHandler(w http.ResponseWriter, r *http.Request) {
func apiSubmitDumpHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
userId := getRequiredQueryParam(r, "user_id")
srcDeviceId := getRequiredQueryParam(r, "source_device_id")
requestingDeviceId := getRequiredQueryParam(r, "requesting_device_id")
@ -267,7 +282,7 @@ func apiSubmitDumpHandler(w http.ResponseWriter, r *http.Request) {
panic(fmt.Sprintf("body=%#v, err=%v", data, err))
}
fmt.Printf("apiSubmitDumpHandler: received request containg %d EncHistoryEntry\n", len(entries))
err = GLOBAL_DB.Transaction(func(tx *gorm.DB) error {
err = GLOBAL_DB.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
for _, entry := range entries {
entry.DeviceId = requestingDeviceId
if entry.UserId != userId {
@ -280,11 +295,11 @@ func apiSubmitDumpHandler(w http.ResponseWriter, r *http.Request) {
if err != nil {
panic(fmt.Errorf("failed to execute transaction to add dumped DB: %v", err))
}
checkGormResult(GLOBAL_DB.Delete(&shared.DumpRequest{}, "user_id = ? AND requesting_device_id = ?", userId, requestingDeviceId))
updateUsageData(r, userId, srcDeviceId, len(entries), false)
checkGormResult(GLOBAL_DB.WithContext(ctx).Delete(&shared.DumpRequest{}, "user_id = ? AND requesting_device_id = ?", userId, requestingDeviceId))
updateUsageData(ctx, r, userId, srcDeviceId, len(entries), false)
}
func apiBannerHandler(w http.ResponseWriter, r *http.Request) {
func apiBannerHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
commitHash := getRequiredQueryParam(r, "commit_hash")
deviceId := getRequiredQueryParam(r, "device_id")
forcedBanner := r.URL.Query().Get("forced_banner")
@ -296,16 +311,16 @@ func apiBannerHandler(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(html.EscapeString(forcedBanner)))
}
func getDeletionRequestsHandler(w http.ResponseWriter, r *http.Request) {
func getDeletionRequestsHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
userId := getRequiredQueryParam(r, "user_id")
deviceId := getRequiredQueryParam(r, "device_id")
// Increment the ReadCount
checkGormResult(GLOBAL_DB.Exec("UPDATE deletion_requests SET read_count = read_count + 1 WHERE destination_device_id = ? AND user_id = ?", deviceId, userId))
checkGormResult(GLOBAL_DB.WithContext(ctx).Exec("UPDATE deletion_requests SET read_count = read_count + 1 WHERE destination_device_id = ? AND user_id = ?", deviceId, userId))
// Return all the deletion requests
var deletionRequests []*shared.DeletionRequest
checkGormResult(GLOBAL_DB.Where("user_id = ? AND destination_device_id = ?", userId, deviceId).Find(&deletionRequests))
checkGormResult(GLOBAL_DB.WithContext(ctx).Where("user_id = ? AND destination_device_id = ?", userId, deviceId).Find(&deletionRequests))
respBody, err := json.Marshal(deletionRequests)
if err != nil {
panic(fmt.Errorf("failed to JSON marshall the dump requests: %v", err))
@ -313,7 +328,7 @@ func getDeletionRequestsHandler(w http.ResponseWriter, r *http.Request) {
w.Write(respBody)
}
func addDeletionRequestHandler(w http.ResponseWriter, r *http.Request) {
func addDeletionRequestHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
data, err := ioutil.ReadAll(r.Body)
if err != nil {
panic(err)
@ -327,7 +342,7 @@ func addDeletionRequestHandler(w http.ResponseWriter, r *http.Request) {
fmt.Printf("addDeletionRequestHandler: received request containg %d messages to be deleted\n", len(request.Messages.Ids))
// Store the deletion request so all the devices will get it
tx := GLOBAL_DB.Where("user_id = ?", request.UserId)
tx := GLOBAL_DB.WithContext(ctx).Where("user_id = ?", request.UserId)
var devices []*shared.Device
checkGormResult(tx.Find(&devices))
if len(devices) == 0 {
@ -336,18 +351,18 @@ func addDeletionRequestHandler(w http.ResponseWriter, r *http.Request) {
fmt.Printf("addDeletionRequestHandler: Found %d devices\n", len(devices))
for _, device := range devices {
request.DestinationDeviceId = device.DeviceId
checkGormResult(GLOBAL_DB.Create(&request))
checkGormResult(GLOBAL_DB.WithContext(ctx).Create(&request))
}
// Also delete anything currently in the DB matching it
numDeleted, err := applyDeletionRequestsToBackend(request)
numDeleted, err := applyDeletionRequestsToBackend(ctx, request)
if err != nil {
panic(err)
}
fmt.Printf("addDeletionRequestHandler: Deleted %d rows in the backend\n", numDeleted)
}
func healthCheckHandler(w http.ResponseWriter, r *http.Request) {
func healthCheckHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
db, err := GLOBAL_DB.DB()
if err != nil {
panic(fmt.Sprintf("failed to get DB: %v", err))
@ -356,19 +371,19 @@ func healthCheckHandler(w http.ResponseWriter, r *http.Request) {
if err != nil {
panic(fmt.Sprintf("failed to ping DB: %v", err))
}
if isProductionEnvironment(r) {
if isProductionEnvironment() {
// Check that we have a reasonable looking set of devices/entries in the DB
var count int64
checkGormResult(GLOBAL_DB.Model(&shared.EncHistoryEntry{}).Count(&count))
checkGormResult(GLOBAL_DB.WithContext(ctx).Model(&shared.EncHistoryEntry{}).Count(&count))
if count < 1000 {
panic("Suspiciously few enc history entries!")
}
checkGormResult(GLOBAL_DB.Model(&shared.Device{}).Count(&count))
checkGormResult(GLOBAL_DB.WithContext(ctx).Model(&shared.Device{}).Count(&count))
if count < 100 {
panic("Suspiciously few devices!")
}
// Check that we can write to the DB. This entry will get written and then eventually cleaned by the cron.
checkGormResult(GLOBAL_DB.Create(&shared.EncHistoryEntry{
checkGormResult(GLOBAL_DB.WithContext(ctx).Create(&shared.EncHistoryEntry{
EncryptedData: []byte("data"),
Nonce: []byte("nonce"),
DeviceId: "healthcheck_device_id",
@ -382,26 +397,29 @@ func healthCheckHandler(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(ok))
}
func applyDeletionRequestsToBackend(request shared.DeletionRequest) (int, error) {
tx := GLOBAL_DB.Where("false")
func applyDeletionRequestsToBackend(ctx context.Context, request shared.DeletionRequest) (int, error) {
tx := GLOBAL_DB.WithContext(ctx).Where("false")
for _, message := range request.Messages.Ids {
tx = tx.Or(GLOBAL_DB.Where("user_id = ? AND device_id = ? AND date = ?", request.UserId, message.DeviceId, message.Date))
tx = tx.Or(GLOBAL_DB.WithContext(ctx).Where("user_id = ? AND device_id = ? AND date = ?", request.UserId, message.DeviceId, message.Date))
}
result := tx.Delete(&shared.EncHistoryEntry{})
checkGormResult(result)
return int(result.RowsAffected), nil
}
func wipeDbHandler(w http.ResponseWriter, r *http.Request) {
checkGormResult(GLOBAL_DB.Exec("DELETE FROM enc_history_entries"))
func wipeDbHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
if r.Host == "api.hishtory.dev" {
panic("refusing to wipe the DB for prod")
}
checkGormResult(GLOBAL_DB.WithContext(ctx).Exec("DELETE FROM enc_history_entries"))
}
func isTestEnvironment() bool {
return os.Getenv("HISHTORY_TEST") != ""
}
func isProductionEnvironment(r *http.Request) bool {
return r.Host == "api.hishtory.dev"
func isProductionEnvironment() bool {
return os.Getenv("HISHTORY_ENV") == "prod"
}
func OpenDB() (*gorm.DB, error) {
@ -420,6 +438,14 @@ func OpenDB() (*gorm.DB, error) {
return db, nil
}
// The same as the default logger, except with a higher SlowThreshold
customLogger := logger.New(log.New(os.Stdout, "\r\n", log.LstdFlags), logger.Config{
SlowThreshold: 1000 * time.Millisecond,
LogLevel: logger.Warn,
IgnoreRecordNotFoundError: false,
Colorful: true,
})
var sqliteDb string
if os.Getenv("HISHTORY_SQLITE_DB") != "" {
sqliteDb = os.Getenv("HISHTORY_SQLITE_DB")
@ -429,13 +455,18 @@ func OpenDB() (*gorm.DB, error) {
var err error
if sqliteDb != "" {
db, err = gorm.Open(sqlite.Open(sqliteDb), &gorm.Config{})
db, err = gorm.Open(sqlite.Open(sqliteDb), &gorm.Config{Logger: customLogger})
} else {
postgresDb := fmt.Sprintf(PostgresDb, os.Getenv("POSTGRESQL_PASSWORD"))
if os.Getenv("HISHTORY_POSTGRES_DB") != "" {
postgresDb = os.Getenv("HISHTORY_POSTGRES_DB")
}
db, err = gorm.Open(postgres.Open(postgresDb), &gorm.Config{})
sqltrace.Register("pgx", &stdlib.Driver{}, sqltrace.WithServiceName("hishtory-api"))
sqlDb, err := sqltrace.Open("pgx", postgresDb)
if err != nil {
log.Fatal(err)
}
db, err = gormtrace.Open(postgres.New(postgres.Config{Conn: sqlDb}), &gorm.Config{Logger: customLogger})
}
if err != nil {
return nil, fmt.Errorf("failed to connect to the DB: %v", err)
@ -455,25 +486,25 @@ func init() {
panic("server.go was built without a ReleaseVersion!")
}
InitDB()
go runBackgroundJobs()
go runBackgroundJobs(context.Background())
}
func cron() error {
func cron(ctx context.Context) error {
err := updateReleaseVersion()
if err != nil {
fmt.Println(err)
}
err = cleanDatabase()
err = cleanDatabase(ctx)
if err != nil {
fmt.Println(err)
}
return nil
}
func runBackgroundJobs() {
func runBackgroundJobs(ctx context.Context) {
time.Sleep(5 * time.Second)
for {
err := cron()
err := cron(ctx)
if err != nil {
fmt.Printf("Cron failure: %v", err)
}
@ -481,8 +512,8 @@ func runBackgroundJobs() {
}
}
func triggerCronHandler(w http.ResponseWriter, r *http.Request) {
err := cron()
func triggerCronHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
err := cron(ctx)
if err != nil {
panic(err)
}
@ -599,7 +630,7 @@ func buildUpdateInfo(version string) shared.UpdateInfo {
}
}
func apiDownloadHandler(w http.ResponseWriter, r *http.Request) {
func apiDownloadHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
updateInfo := buildUpdateInfo(ReleaseVersion)
resp, err := json.Marshal(updateInfo)
if err != nil {
@ -608,7 +639,7 @@ func apiDownloadHandler(w http.ResponseWriter, r *http.Request) {
w.Write(resp)
}
func slsaStatusHandler(w http.ResponseWriter, r *http.Request) {
func slsaStatusHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
// returns "OK" unless there is a current SLSA bug
v := getHishtoryVersion(r)
if !strings.Contains(v, "v0.") {
@ -627,7 +658,7 @@ func slsaStatusHandler(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK"))
}
func feedbackHandler(w http.ResponseWriter, r *http.Request) {
func feedbackHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
data, err := ioutil.ReadAll(r.Body)
if err != nil {
panic(err)
@ -638,7 +669,8 @@ func feedbackHandler(w http.ResponseWriter, r *http.Request) {
panic(fmt.Sprintf("feedbackHandler: body=%#v, err=%v", data, err))
}
fmt.Printf("feedbackHandler: received request containg feedback %#v\n", feedback)
checkGormResult(GLOBAL_DB.Create(feedback))
checkGormResult(GLOBAL_DB.WithContext(ctx).Create(feedback))
GLOBAL_STATSD.Incr("hishtory.uninstall", []string{}, 1.0)
}
type loggedResponseData struct {
@ -660,7 +692,12 @@ func (r *loggingResponseWriter) WriteHeader(statusCode int) {
r.ResponseWriter.WriteHeader(statusCode)
}
func withLogging(h func(http.ResponseWriter, *http.Request)) http.Handler {
func getFunctionName(temp interface{}) string {
strs := strings.Split((runtime.FuncForPC(reflect.ValueOf(temp).Pointer()).Name()), ".")
return strs[len(strs)-1]
}
func withLogging(h func(context.Context, http.ResponseWriter, *http.Request)) http.Handler {
logFn := func(rw http.ResponseWriter, r *http.Request) {
var responseData loggedResponseData
lrw := loggingResponseWriter{
@ -668,11 +705,20 @@ func withLogging(h func(http.ResponseWriter, *http.Request)) http.Handler {
responseData: &responseData,
}
start := time.Now()
span, ctx := tracer.StartSpanFromContext(
context.Background(),
getFunctionName(h),
tracer.SpanType(ext.SpanTypeSQL),
tracer.ServiceName("hishtory-api"),
)
defer span.Finish()
h(&lrw, r)
h(ctx, &lrw, r)
duration := time.Since(start)
fmt.Printf("%s %s %#v %s %s %s\n", getRemoteAddr(r), r.Method, r.RequestURI, getHishtoryVersion(r), duration.String(), byteCountToString(responseData.size))
GLOBAL_STATSD.Distribution("hishtory.request_duration", float64(duration.Microseconds())/1_000, []string{"HANDLER=" + getFunctionName(h)}, 1.0)
GLOBAL_STATSD.Incr("hishtory.request", []string{}, 1.0)
}
return http.HandlerFunc(logFn)
}
@ -690,35 +736,64 @@ func byteCountToString(b int) string {
return fmt.Sprintf("%.1f %cB", float64(b)/float64(div), "kMG"[exp])
}
func cleanDatabase() error {
checkGormResult(GLOBAL_DB.Exec("DELETE FROM enc_history_entries WHERE read_count > 10"))
checkGormResult(GLOBAL_DB.Exec("DELETE FROM deletion_requests WHERE read_count > 100"))
func cleanDatabase(ctx context.Context) error {
checkGormResult(GLOBAL_DB.WithContext(ctx).Exec("DELETE FROM enc_history_entries WHERE read_count > 10"))
checkGormResult(GLOBAL_DB.WithContext(ctx).Exec("DELETE FROM deletion_requests WHERE read_count > 100"))
// TODO(optimization): Clean the database by deleting entries for users that haven't been used in X amount of time
return nil
}
func main() {
fmt.Println("Listening on localhost:8080")
http.Handle("/api/v1/submit", withLogging(apiSubmitHandler))
http.Handle("/api/v1/get-dump-requests", withLogging(apiGetPendingDumpRequestsHandler))
http.Handle("/api/v1/submit-dump", withLogging(apiSubmitDumpHandler))
http.Handle("/api/v1/query", withLogging(apiQueryHandler))
http.Handle("/api/v1/bootstrap", withLogging(apiBootstrapHandler))
http.Handle("/api/v1/register", withLogging(apiRegisterHandler))
http.Handle("/api/v1/banner", withLogging(apiBannerHandler))
http.Handle("/api/v1/download", withLogging(apiDownloadHandler))
http.Handle("/api/v1/trigger-cron", withLogging(triggerCronHandler))
http.Handle("/api/v1/get-deletion-requests", withLogging(getDeletionRequestsHandler))
http.Handle("/api/v1/add-deletion-request", withLogging(addDeletionRequestHandler))
http.Handle("/api/v1/slsa-status", withLogging(slsaStatusHandler))
http.Handle("/api/v1/feedback", withLogging(feedbackHandler))
http.Handle("/healthcheck", withLogging(healthCheckHandler))
http.Handle("/internal/api/v1/usage-stats", withLogging(usageStatsHandler))
http.Handle("/internal/api/v1/stats", withLogging(statsHandler))
if isTestEnvironment() {
http.Handle("/api/v1/wipe-db", withLogging(wipeDbHandler))
if isProductionEnvironment() {
err := profiler.Start(
profiler.WithService("hishtory-api"),
profiler.WithVersion(ReleaseVersion),
profiler.WithAPIKey(os.Getenv("DD_API_KEY")),
profiler.WithUDS("/var/run/datadog/apm.socket"),
profiler.WithProfileTypes(
profiler.CPUProfile,
profiler.HeapProfile,
),
)
if err != nil {
fmt.Printf("Failed to start DataDog profiler: %v\n", err)
}
defer profiler.Stop()
tracer.Start(
tracer.WithRuntimeMetrics(),
tracer.WithService("hishtory-api"),
tracer.WithUDS("/var/run/datadog/apm.socket"),
)
defer tracer.Stop()
ddStats, err := statsd.New("unix:///var/run/datadog/dsd.socket")
if err != nil {
fmt.Printf("Failed to start DataDog statsd: %v\n", err)
}
GLOBAL_STATSD = ddStats
}
log.Fatal(http.ListenAndServe(":8080", nil))
mux := httptrace.NewServeMux()
mux.Handle("/api/v1/submit", withLogging(apiSubmitHandler))
mux.Handle("/api/v1/get-dump-requests", withLogging(apiGetPendingDumpRequestsHandler))
mux.Handle("/api/v1/submit-dump", withLogging(apiSubmitDumpHandler))
mux.Handle("/api/v1/query", withLogging(apiQueryHandler))
mux.Handle("/api/v1/bootstrap", withLogging(apiBootstrapHandler))
mux.Handle("/api/v1/register", withLogging(apiRegisterHandler))
mux.Handle("/api/v1/banner", withLogging(apiBannerHandler))
mux.Handle("/api/v1/download", withLogging(apiDownloadHandler))
mux.Handle("/api/v1/trigger-cron", withLogging(triggerCronHandler))
mux.Handle("/api/v1/get-deletion-requests", withLogging(getDeletionRequestsHandler))
mux.Handle("/api/v1/add-deletion-request", withLogging(addDeletionRequestHandler))
mux.Handle("/api/v1/slsa-status", withLogging(slsaStatusHandler))
mux.Handle("/api/v1/feedback", withLogging(feedbackHandler))
mux.Handle("/healthcheck", withLogging(healthCheckHandler))
mux.Handle("/internal/api/v1/usage-stats", withLogging(usageStatsHandler))
mux.Handle("/internal/api/v1/stats", withLogging(statsHandler))
if isTestEnvironment() {
mux.Handle("/api/v1/wipe-db", withLogging(wipeDbHandler))
}
fmt.Println("Listening on localhost:8080")
log.Fatal(http.ListenAndServe(":8080", mux))
}
func checkGormResult(result *gorm.DB) {