mirror of
https://github.com/netbirdio/netbird.git
synced 2025-01-23 14:28:51 +01:00
765aba2c1c
propagate context from all the API calls and log request ID, account ID and peer ID --------- Co-authored-by: Zoltan Papp <zoltan.pmail@gmail.com>
127 lines
3.8 KiB
Go
127 lines
3.8 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// Scheduler is an interface which implementations can schedule and cancel jobs
|
|
type Scheduler interface {
|
|
Cancel(ctx context.Context, IDs []string)
|
|
Schedule(ctx context.Context, in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool))
|
|
}
|
|
|
|
// MockScheduler is a mock implementation of Scheduler
|
|
type MockScheduler struct {
|
|
CancelFunc func(ctx context.Context, IDs []string)
|
|
ScheduleFunc func(ctx context.Context, in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool))
|
|
}
|
|
|
|
// Cancel mocks the Cancel function of the Scheduler interface
|
|
func (mock *MockScheduler) Cancel(ctx context.Context, IDs []string) {
|
|
if mock.CancelFunc != nil {
|
|
mock.CancelFunc(ctx, IDs)
|
|
return
|
|
}
|
|
log.WithContext(ctx).Errorf("MockScheduler doesn't have Cancel function defined ")
|
|
}
|
|
|
|
// Schedule mocks the Schedule function of the Scheduler interface
|
|
func (mock *MockScheduler) Schedule(ctx context.Context, in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool)) {
|
|
if mock.ScheduleFunc != nil {
|
|
mock.ScheduleFunc(ctx, in, ID, job)
|
|
return
|
|
}
|
|
log.WithContext(ctx).Errorf("MockScheduler doesn't have Schedule function defined")
|
|
}
|
|
|
|
// DefaultScheduler is a generic structure that allows to schedule jobs (functions) to run in the future and cancel them.
|
|
type DefaultScheduler struct {
|
|
// jobs map holds cancellation channels indexed by the job ID
|
|
jobs map[string]chan struct{}
|
|
mu *sync.Mutex
|
|
}
|
|
|
|
// NewDefaultScheduler creates an instance of a DefaultScheduler
|
|
func NewDefaultScheduler() *DefaultScheduler {
|
|
return &DefaultScheduler{
|
|
jobs: make(map[string]chan struct{}),
|
|
mu: &sync.Mutex{},
|
|
}
|
|
}
|
|
|
|
func (wm *DefaultScheduler) cancel(ctx context.Context, ID string) bool {
|
|
cancel, ok := wm.jobs[ID]
|
|
if ok {
|
|
delete(wm.jobs, ID)
|
|
close(cancel)
|
|
log.WithContext(ctx).Debugf("cancelled scheduled job %s", ID)
|
|
}
|
|
return ok
|
|
}
|
|
|
|
// Cancel cancels the scheduled job by ID if present.
|
|
// If job wasn't found the function returns false.
|
|
func (wm *DefaultScheduler) Cancel(ctx context.Context, IDs []string) {
|
|
wm.mu.Lock()
|
|
defer wm.mu.Unlock()
|
|
|
|
for _, id := range IDs {
|
|
wm.cancel(ctx, id)
|
|
}
|
|
}
|
|
|
|
// Schedule a job to run in some time in the future. If job returns true then it will be scheduled one more time.
|
|
// If job with the provided ID already exists, a new one won't be scheduled.
|
|
func (wm *DefaultScheduler) Schedule(ctx context.Context, in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool)) {
|
|
wm.mu.Lock()
|
|
defer wm.mu.Unlock()
|
|
cancel := make(chan struct{})
|
|
if _, ok := wm.jobs[ID]; ok {
|
|
log.WithContext(ctx).Debugf("couldn't schedule a job %s because it already exists. There are %d total jobs scheduled.",
|
|
ID, len(wm.jobs))
|
|
return
|
|
}
|
|
|
|
ticker := time.NewTicker(in)
|
|
|
|
wm.jobs[ID] = cancel
|
|
log.WithContext(ctx).Debugf("scheduled a job %s to run in %s. There are %d total jobs scheduled.", ID, in.String(), len(wm.jobs))
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
select {
|
|
case <-cancel:
|
|
log.WithContext(ctx).Debugf("scheduled job %s was canceled, stop timer", ID)
|
|
ticker.Stop()
|
|
return
|
|
default:
|
|
log.WithContext(ctx).Debugf("time to do a scheduled job %s", ID)
|
|
}
|
|
runIn, reschedule := job()
|
|
if !reschedule {
|
|
wm.mu.Lock()
|
|
defer wm.mu.Unlock()
|
|
delete(wm.jobs, ID)
|
|
log.WithContext(ctx).Debugf("job %s is not scheduled to run again", ID)
|
|
ticker.Stop()
|
|
return
|
|
}
|
|
// we need this comparison to avoid resetting the ticker with the same duration and missing the current elapsesed time
|
|
if runIn != in {
|
|
ticker.Reset(runIn)
|
|
}
|
|
case <-cancel:
|
|
log.WithContext(ctx).Debugf("job %s was canceled, stopping timer", ID)
|
|
ticker.Stop()
|
|
return
|
|
}
|
|
}
|
|
|
|
}()
|
|
}
|