diff --git a/backend/server/internal/server/api_handlers.go b/backend/server/internal/server/api_handlers.go index af997d6..23a3c9c 100644 --- a/backend/server/internal/server/api_handlers.go +++ b/backend/server/internal/server/api_handlers.go @@ -105,10 +105,10 @@ func (s *Server) apiQueryHandler(w http.ResponseWriter, r *http.Request) { // Delete any entries that match a pending deletion request deletionRequests, err := s.db.DeletionRequestsForUserAndDevice(r.Context(), userId, deviceId) checkGormError(err) - for _, request := range deletionRequests { - _, err := s.db.ApplyDeletionRequestsToBackend(r.Context(), request) - checkGormError(err) - } + checkGormError(shared.ForEach(deletionRequests, 4, func(delRequest *shared.DeletionRequest) error { + _, err := s.db.ApplyDeletionRequestsToBackend(r.Context(), delRequest) + return err + })) // Then retrieve historyEntries, err := s.db.HistoryEntriesForDevice(r.Context(), deviceId, 5) diff --git a/client/lib/lib.go b/client/lib/lib.go index af7c616..db0e2ce 100644 --- a/client/lib/lib.go +++ b/client/lib/lib.go @@ -18,7 +18,6 @@ import ( "runtime" "strconv" "strings" - "sync" "time" _ "embed" // for embedding config.sh @@ -728,35 +727,6 @@ func EncryptAndMarshal(config *hctx.ClientConfig, entries []*data.HistoryEntry) return jsonValue, nil } -func forEach[T any](arr []T, numThreads int, fn func(T) error) error { - wg := &sync.WaitGroup{} - wg.Add(len(arr)) - - limiter := make(chan bool, numThreads) - - var errors []error - for _, item := range arr { - limiter <- true - go func(x T) { - defer wg.Done() - err := fn(x) - if err != nil { - errors = append(errors, err) - } - <-limiter - }(item) - if len(errors) > 0 { - return errors[0] - } - } - - wg.Wait() - if len(errors) > 0 { - return errors[0] - } - return nil -} - func Reupload(ctx context.Context) error { config := hctx.GetConf(ctx) if config.IsOffline { @@ -774,7 +744,7 @@ func Reupload(ctx context.Context) error { } chunkSize := 500 chunks := shared.Chunks(entries, chunkSize) - return forEach(chunks, 10, func(chunk []*data.HistoryEntry) error { + return shared.ForEach(chunks, 10, func(chunk []*data.HistoryEntry) error { jsonValue, err := EncryptAndMarshal(config, chunk) if err != nil { return fmt.Errorf("failed to reupload due to failed encryption: %w", err) diff --git a/shared/utils.go b/shared/utils.go new file mode 100644 index 0000000..1c4b1f7 --- /dev/null +++ b/shared/utils.go @@ -0,0 +1,32 @@ +package shared + +import "sync" + +func ForEach[T any](arr []T, numThreads int, fn func(T) error) error { + wg := &sync.WaitGroup{} + wg.Add(len(arr)) + + limiter := make(chan bool, numThreads) + + var errors []error + for _, item := range arr { + limiter <- true + go func(x T) { + defer wg.Done() + err := fn(x) + if err != nil { + errors = append(errors, err) + } + <-limiter + }(item) + if len(errors) > 0 { + return errors[0] + } + } + + wg.Wait() + if len(errors) > 0 { + return errors[0] + } + return nil +}