rc/job: use mutex for adding listeners thread safety

Fix in extreme cases, when the job is executing finish(), the listener added by calling OnFinish() will never be executed.

This change should not cause compatibility issues, as consumers should not make assumptions about whether listeners will be run in a new goroutine
This commit is contained in:
hayden.pan 2024-10-13 16:43:00 +08:00
parent f639cd9c78
commit b388f5b7df
2 changed files with 53 additions and 8 deletions

View File

@ -74,12 +74,6 @@ func (job *Job) finish(out rc.Params, err error) {
running.kickExpire() // make sure this job gets expired
}
func (job *Job) addListener(fn *func()) {
job.mu.Lock()
defer job.mu.Unlock()
job.listeners = append(job.listeners, fn)
}
func (job *Job) removeListener(fn *func()) {
job.mu.Lock()
defer job.mu.Unlock()
@ -94,10 +88,12 @@ func (job *Job) removeListener(fn *func()) {
// OnFinish adds listener to job that will be triggered when job is finished.
// It returns a function to cancel listening.
func (job *Job) OnFinish(fn func()) func() {
job.mu.Lock()
defer job.mu.Unlock()
if job.Finished {
fn()
go fn()
} else {
job.addListener(&fn)
job.listeners = append(job.listeners, &fn)
}
return func() { job.removeListener(&fn) }
}

View File

@ -554,3 +554,52 @@ func TestOnFinishAlreadyFinished(t *testing.T) {
t.Fatal("Timeout waiting for OnFinish to fire")
}
}
func TestOnFinishDataRace(t *testing.T) {
jobID.Store(0)
job, _, err := NewJob(context.Background(), ctxFn, rc.Params{"_async": true})
assert.NoError(t, err)
var expect, got uint64
finished := make(chan struct{})
stop, stopped := make(chan struct{}), make(chan struct{})
go func() {
Loop:
for {
select {
case <-stop:
break Loop
default:
_, err := OnFinish(job.ID, func() {
finished <- struct{}{}
})
assert.NoError(t, err)
expect += 1
}
}
close(stopped)
}()
time.Sleep(10 * time.Millisecond)
job.Stop()
// Wait for the first OnFinish to fire
<-finished
got += 1
// Stop the OnFinish producer
close(stop)
<-stopped
timeout := time.After(5 * time.Second)
for {
if got == expect {
break
}
select {
case <-finished:
got += 1
case <-timeout:
t.Fatal("Timeout waiting for all OnFinish calls to fire")
}
}
}