From 79fd662676363f9d984597b6db1d8892aefd39da Mon Sep 17 00:00:00 2001 From: klauspost Date: Wed, 16 Sep 2015 12:25:55 +0200 Subject: [PATCH] Protect concurrent read/writes to pacer. Protects all variables in the pacer from concurrent modifications. It is now safe to modify pacer settings while it is running. I decided to not go for an RWMutex, since all accesses are very short, so the overhead of an RWMutex isn't worth it. Fixes #138. --- pacer/pacer.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/pacer/pacer.go b/pacer/pacer.go index 1cc007184..c69e9e01f 100644 --- a/pacer/pacer.go +++ b/pacer/pacer.go @@ -2,6 +2,7 @@ package pacer import ( + "sync" "time" "github.com/ncw/rclone/fs" @@ -14,6 +15,7 @@ type Pacer struct { pacer chan struct{} // To pace the operations sleepTime time.Duration // Time to sleep for each transaction retries int // Max number of retries + mu sync.Mutex // Protecting read/writes } // Paced is a function which is called by the Call and CallNoRetry @@ -41,6 +43,8 @@ func New() *Pacer { // SetMinSleep sets the minimum sleep time for the pacer func (p *Pacer) SetMinSleep(t time.Duration) *Pacer { + p.mu.Lock() + defer p.mu.Unlock() p.minSleep = t p.sleepTime = p.minSleep return p @@ -48,6 +52,8 @@ func (p *Pacer) SetMinSleep(t time.Duration) *Pacer { // SetMaxSleep sets the maximum sleep time for the pacer func (p *Pacer) SetMaxSleep(t time.Duration) *Pacer { + p.mu.Lock() + defer p.mu.Unlock() p.maxSleep = t p.sleepTime = p.minSleep return p @@ -60,12 +66,16 @@ func (p *Pacer) SetMaxSleep(t time.Duration) *Pacer { // // bigger for slower decay, exponential func (p *Pacer) SetDecayConstant(decay uint) *Pacer { + p.mu.Lock() + defer p.mu.Unlock() p.decayConstant = decay return p } // SetRetries sets the max number of tries for Call func (p *Pacer) SetRetries(retries int) *Pacer { + p.mu.Lock() + defer p.mu.Unlock() p.retries = retries return p } @@ -82,12 +92,14 @@ func (p *Pacer) beginCall() { // not to run it when it wasn't needed <-p.pacer + p.mu.Lock() // Restart the timer go func(t time.Duration) { // fs.Debug(f, "New sleep for %v at %v", t, time.Now()) time.Sleep(t) p.pacer <- struct{}{} }(p.sleepTime) + p.mu.Unlock() } // End a call to the API @@ -95,6 +107,7 @@ func (p *Pacer) beginCall() { // Refresh the pace given an error that was returned. It returns a // boolean as to whether the operation should be retried. func (p *Pacer) endCall(again bool) { + p.mu.Lock() oldSleepTime := p.sleepTime if again { p.sleepTime *= 2 @@ -113,6 +126,7 @@ func (p *Pacer) endCall(again bool) { fs.Debug("pacer", "Reducing sleep to %v", p.sleepTime) } } + p.mu.Unlock() } // call implements Call but with settable retries @@ -139,7 +153,10 @@ func (p *Pacer) call(fn Paced, retries int) (err error) { // error. This error may be returned wrapped in a RetryError if the // number of retries is exceeded. func (p *Pacer) Call(fn Paced) (err error) { - return p.call(fn, p.retries) + p.mu.Lock() + retries := p.retries + p.mu.Unlock() + return p.call(fn, retries) } // Pace the remote operations to not exceed Amazon's limits and return