From caac95ff5457c46818d66383eb41e05d3c4200f7 Mon Sep 17 00:00:00 2001 From: "hayden.pan" Date: Sun, 13 Oct 2024 16:43:00 +0800 Subject: [PATCH] rc/job: use mutex for adding listeners thread safety Fix in extreme cases, when the job is executing finish(), the listener added by calling OnFinish() will never be executed. This change should not cause compatibility issues, as consumers should not make assumptions about whether listeners will be run in a new goroutine --- fs/rc/jobs/job.go | 12 ++++------- fs/rc/jobs/job_test.go | 49 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 8 deletions(-) diff --git a/fs/rc/jobs/job.go b/fs/rc/jobs/job.go index 2a27e0c8b..ceca2cf78 100644 --- a/fs/rc/jobs/job.go +++ b/fs/rc/jobs/job.go @@ -74,12 +74,6 @@ func (job *Job) finish(out rc.Params, err error) { running.kickExpire() // make sure this job gets expired } -func (job *Job) addListener(fn *func()) { - job.mu.Lock() - defer job.mu.Unlock() - job.listeners = append(job.listeners, fn) -} - func (job *Job) removeListener(fn *func()) { job.mu.Lock() defer job.mu.Unlock() @@ -94,10 +88,12 @@ func (job *Job) removeListener(fn *func()) { // OnFinish adds listener to job that will be triggered when job is finished. // It returns a function to cancel listening. func (job *Job) OnFinish(fn func()) func() { + job.mu.Lock() + defer job.mu.Unlock() if job.Finished { - fn() + go fn() } else { - job.addListener(&fn) + job.listeners = append(job.listeners, &fn) } return func() { job.removeListener(&fn) } } diff --git a/fs/rc/jobs/job_test.go b/fs/rc/jobs/job_test.go index 770843cc8..ad5744f3a 100644 --- a/fs/rc/jobs/job_test.go +++ b/fs/rc/jobs/job_test.go @@ -554,3 +554,52 @@ func TestOnFinishAlreadyFinished(t *testing.T) { t.Fatal("Timeout waiting for OnFinish to fire") } } + +func TestOnFinishDataRace(t *testing.T) { + jobID.Store(0) + job, _, err := NewJob(context.Background(), ctxFn, rc.Params{"_async": true}) + assert.NoError(t, err) + var expect, got uint64 + finished := make(chan struct{}) + stop, stopped := make(chan struct{}), make(chan struct{}) + go func() { + Loop: + for { + select { + case <-stop: + break Loop + default: + _, err := OnFinish(job.ID, func() { + finished <- struct{}{} + }) + assert.NoError(t, err) + expect += 1 + } + } + close(stopped) + }() + + time.Sleep(10 * time.Millisecond) + job.Stop() + + // Wait for the first OnFinish to fire + <-finished + got += 1 + + // Stop the OnFinish producer + close(stop) + <-stopped + + timeout := time.After(5 * time.Second) + for { + if got == expect { + break + } + select { + case <-finished: + got += 1 + case <-timeout: + t.Fatal("Timeout waiting for all OnFinish calls to fire") + } + } +}