diff --git a/fs/rc/jobs/job.go b/fs/rc/jobs/job.go index 2a27e0c8b..ceca2cf78 100644 --- a/fs/rc/jobs/job.go +++ b/fs/rc/jobs/job.go @@ -74,12 +74,6 @@ func (job *Job) finish(out rc.Params, err error) { running.kickExpire() // make sure this job gets expired } -func (job *Job) addListener(fn *func()) { - job.mu.Lock() - defer job.mu.Unlock() - job.listeners = append(job.listeners, fn) -} - func (job *Job) removeListener(fn *func()) { job.mu.Lock() defer job.mu.Unlock() @@ -94,10 +88,12 @@ func (job *Job) removeListener(fn *func()) { // OnFinish adds listener to job that will be triggered when job is finished. // It returns a function to cancel listening. func (job *Job) OnFinish(fn func()) func() { + job.mu.Lock() + defer job.mu.Unlock() if job.Finished { - fn() + go fn() } else { - job.addListener(&fn) + job.listeners = append(job.listeners, &fn) } return func() { job.removeListener(&fn) } } diff --git a/fs/rc/jobs/job_test.go b/fs/rc/jobs/job_test.go index 770843cc8..ad5744f3a 100644 --- a/fs/rc/jobs/job_test.go +++ b/fs/rc/jobs/job_test.go @@ -554,3 +554,52 @@ func TestOnFinishAlreadyFinished(t *testing.T) { t.Fatal("Timeout waiting for OnFinish to fire") } } + +func TestOnFinishDataRace(t *testing.T) { + jobID.Store(0) + job, _, err := NewJob(context.Background(), ctxFn, rc.Params{"_async": true}) + assert.NoError(t, err) + var expect, got uint64 + finished := make(chan struct{}) + stop, stopped := make(chan struct{}), make(chan struct{}) + go func() { + Loop: + for { + select { + case <-stop: + break Loop + default: + _, err := OnFinish(job.ID, func() { + finished <- struct{}{} + }) + assert.NoError(t, err) + expect += 1 + } + } + close(stopped) + }() + + time.Sleep(10 * time.Millisecond) + job.Stop() + + // Wait for the first OnFinish to fire + <-finished + got += 1 + + // Stop the OnFinish producer + close(stop) + <-stopped + + timeout := time.After(5 * time.Second) + for { + if got == expect { + break + } + select { + case <-finished: + got += 1 + case <-timeout: + t.Fatal("Timeout waiting for all OnFinish calls to fire") + } + } +}