Handle canceling schedule and avoid recursive call (#1636)

Using time.Ticker allows us to avoid recursive calls that may end up in schedule running and possible deadlock if no routine is listening for cancel calls
This commit is contained in:
Maycon Santos 2024-03-03 10:35:01 +01:00 committed by GitHub
parent aa935bdae3
commit 17f5abc653
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 50 additions and 29 deletions

View File

@ -1,9 +1,10 @@
package server
import (
log "github.com/sirupsen/logrus"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
// Scheduler is an interface which implementations can schedule and cancel jobs
@ -55,14 +56,8 @@ func (wm *DefaultScheduler) cancel(ID string) bool {
cancel, ok := wm.jobs[ID]
if ok {
delete(wm.jobs, ID)
select {
case cancel <- struct{}{}:
log.Debugf("cancelled scheduled job %s", ID)
default:
log.Warnf("couldn't cancel job %s because there was no routine listening on the cancel event", ID)
return false
}
close(cancel)
log.Debugf("cancelled scheduled job %s", ID)
}
return ok
}
@ -90,25 +85,41 @@ func (wm *DefaultScheduler) Schedule(in time.Duration, ID string, job func() (ne
return
}
ticker := time.NewTicker(in)
wm.jobs[ID] = cancel
log.Debugf("scheduled a job %s to run in %s. There are %d total jobs scheduled.", ID, in.String(), len(wm.jobs))
go func() {
select {
case <-time.After(in):
log.Debugf("time to do a scheduled job %s", ID)
runIn, reschedule := job()
wm.mu.Lock()
defer wm.mu.Unlock()
delete(wm.jobs, ID)
if reschedule {
go wm.Schedule(runIn, ID, job)
for {
select {
case <-ticker.C:
select {
case <-cancel:
log.Debugf("scheduled job %s was canceled, stop timer", ID)
ticker.Stop()
return
default:
log.Debugf("time to do a scheduled job %s", ID)
}
runIn, reschedule := job()
if !reschedule {
wm.mu.Lock()
defer wm.mu.Unlock()
delete(wm.jobs, ID)
log.Debugf("job %s is not scheduled to run again", ID)
ticker.Stop()
return
}
// we need this comparison to avoid resetting the ticker with the same duration and missing the current elapsesed time
if runIn != in {
ticker.Reset(runIn)
}
case <-cancel:
log.Debugf("job %s was canceled, stopping timer", ID)
ticker.Stop()
return
}
case <-cancel:
log.Debugf("stopped scheduled job %s ", ID)
wm.mu.Lock()
defer wm.mu.Unlock()
delete(wm.jobs, ID)
return
}
}()
}

View File

@ -2,11 +2,12 @@ package server
import (
"fmt"
"github.com/stretchr/testify/assert"
"math/rand"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestScheduler_Performance(t *testing.T) {
@ -36,15 +37,24 @@ func TestScheduler_Cancel(t *testing.T) {
jobID1 := "test-scheduler-job-1"
jobID2 := "test-scheduler-job-2"
scheduler := NewDefaultScheduler()
scheduler.Schedule(2*time.Second, jobID1, func() (nextRunIn time.Duration, reschedule bool) {
return 0, false
tChan := make(chan struct{})
p := []string{jobID1, jobID2}
scheduler.Schedule(2*time.Millisecond, jobID1, func() (nextRunIn time.Duration, reschedule bool) {
tt := p[0]
<-tChan
t.Logf("job %s", tt)
return 2 * time.Millisecond, true
})
scheduler.Schedule(2*time.Second, jobID2, func() (nextRunIn time.Duration, reschedule bool) {
return 0, false
scheduler.Schedule(2*time.Millisecond, jobID2, func() (nextRunIn time.Duration, reschedule bool) {
return 2 * time.Millisecond, true
})
time.Sleep(4 * time.Millisecond)
assert.Len(t, scheduler.jobs, 2)
scheduler.Cancel([]string{jobID1})
close(tChan)
p = []string{}
time.Sleep(4 * time.Millisecond)
assert.Len(t, scheduler.jobs, 1)
assert.NotNil(t, scheduler.jobs[jobID2])
}