rc/job: use mutex for adding listeners thread safety

Fix in extreme cases, when the job is executing finish(), the listener added by calling OnFinish() will never be executed.

This change should not cause compatibility issues, as consumers should not make assumptions about whether listeners will be run in a new goroutine
This commit is contained in:
hayden.pan 2024-10-13 16:43:00 +08:00 committed by Nick Craig-Wood
parent 19f4580aca
commit caac95ff54
2 changed files with 53 additions and 8 deletions

View File

@ -74,12 +74,6 @@ func (job *Job) finish(out rc.Params, err error) {
running.kickExpire() // make sure this job gets expired running.kickExpire() // make sure this job gets expired
} }
func (job *Job) addListener(fn *func()) {
job.mu.Lock()
defer job.mu.Unlock()
job.listeners = append(job.listeners, fn)
}
func (job *Job) removeListener(fn *func()) { func (job *Job) removeListener(fn *func()) {
job.mu.Lock() job.mu.Lock()
defer job.mu.Unlock() defer job.mu.Unlock()
@ -94,10 +88,12 @@ func (job *Job) removeListener(fn *func()) {
// OnFinish adds listener to job that will be triggered when job is finished. // OnFinish adds listener to job that will be triggered when job is finished.
// It returns a function to cancel listening. // It returns a function to cancel listening.
func (job *Job) OnFinish(fn func()) func() { func (job *Job) OnFinish(fn func()) func() {
job.mu.Lock()
defer job.mu.Unlock()
if job.Finished { if job.Finished {
fn() go fn()
} else { } else {
job.addListener(&fn) job.listeners = append(job.listeners, &fn)
} }
return func() { job.removeListener(&fn) } return func() { job.removeListener(&fn) }
} }

View File

@ -554,3 +554,52 @@ func TestOnFinishAlreadyFinished(t *testing.T) {
t.Fatal("Timeout waiting for OnFinish to fire") t.Fatal("Timeout waiting for OnFinish to fire")
} }
} }
func TestOnFinishDataRace(t *testing.T) {
jobID.Store(0)
job, _, err := NewJob(context.Background(), ctxFn, rc.Params{"_async": true})
assert.NoError(t, err)
var expect, got uint64
finished := make(chan struct{})
stop, stopped := make(chan struct{}), make(chan struct{})
go func() {
Loop:
for {
select {
case <-stop:
break Loop
default:
_, err := OnFinish(job.ID, func() {
finished <- struct{}{}
})
assert.NoError(t, err)
expect += 1
}
}
close(stopped)
}()
time.Sleep(10 * time.Millisecond)
job.Stop()
// Wait for the first OnFinish to fire
<-finished
got += 1
// Stop the OnFinish producer
close(stop)
<-stopped
timeout := time.After(5 * time.Second)
for {
if got == expect {
break
}
select {
case <-finished:
got += 1
case <-timeout:
t.Fatal("Timeout waiting for all OnFinish calls to fire")
}
}
}