Make deletion request processing happen in parallel to speed up query endpoint

This commit is contained in:
David Dworken 2023-10-14 17:21:00 -07:00
parent 218c70f5e7
commit 6fb6498515
No known key found for this signature in database
3 changed files with 37 additions and 35 deletions

View File

@ -105,10 +105,10 @@ func (s *Server) apiQueryHandler(w http.ResponseWriter, r *http.Request) {
// Delete any entries that match a pending deletion request
deletionRequests, err := s.db.DeletionRequestsForUserAndDevice(r.Context(), userId, deviceId)
checkGormError(err)
for _, request := range deletionRequests {
_, err := s.db.ApplyDeletionRequestsToBackend(r.Context(), request)
checkGormError(err)
}
checkGormError(shared.ForEach(deletionRequests, 4, func(delRequest *shared.DeletionRequest) error {
_, err := s.db.ApplyDeletionRequestsToBackend(r.Context(), delRequest)
return err
}))
// Then retrieve
historyEntries, err := s.db.HistoryEntriesForDevice(r.Context(), deviceId, 5)

View File

@ -18,7 +18,6 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"
_ "embed" // for embedding config.sh
@ -728,35 +727,6 @@ func EncryptAndMarshal(config *hctx.ClientConfig, entries []*data.HistoryEntry)
return jsonValue, nil
}
func forEach[T any](arr []T, numThreads int, fn func(T) error) error {
wg := &sync.WaitGroup{}
wg.Add(len(arr))
limiter := make(chan bool, numThreads)
var errors []error
for _, item := range arr {
limiter <- true
go func(x T) {
defer wg.Done()
err := fn(x)
if err != nil {
errors = append(errors, err)
}
<-limiter
}(item)
if len(errors) > 0 {
return errors[0]
}
}
wg.Wait()
if len(errors) > 0 {
return errors[0]
}
return nil
}
func Reupload(ctx context.Context) error {
config := hctx.GetConf(ctx)
if config.IsOffline {
@ -774,7 +744,7 @@ func Reupload(ctx context.Context) error {
}
chunkSize := 500
chunks := shared.Chunks(entries, chunkSize)
return forEach(chunks, 10, func(chunk []*data.HistoryEntry) error {
return shared.ForEach(chunks, 10, func(chunk []*data.HistoryEntry) error {
jsonValue, err := EncryptAndMarshal(config, chunk)
if err != nil {
return fmt.Errorf("failed to reupload due to failed encryption: %w", err)

32
shared/utils.go Normal file
View File

@ -0,0 +1,32 @@
package shared
import "sync"
func ForEach[T any](arr []T, numThreads int, fn func(T) error) error {
wg := &sync.WaitGroup{}
wg.Add(len(arr))
limiter := make(chan bool, numThreads)
var errors []error
for _, item := range arr {
limiter <- true
go func(x T) {
defer wg.Done()
err := fn(x)
if err != nil {
errors = append(errors, err)
}
<-limiter
}(item)
if len(errors) > 0 {
return errors[0]
}
}
wg.Wait()
if len(errors) > 0 {
return errors[0]
}
return nil
}