Protect concurrent read/writes to pacer.

Protects all variables in the pacer from concurrent modifications. It is now safe to modify pacer settings while it is running.

I decided to not go for an RWMutex, since all accesses are very short, so the overhead of an RWMutex isn't worth it.

Fixes #138.
This commit is contained in:
klauspost 2015-09-16 12:25:55 +02:00
parent 34193fd8d9
commit 79fd662676

View File

@ -2,6 +2,7 @@
package pacer
import (
"sync"
"time"
"github.com/ncw/rclone/fs"
@ -14,6 +15,7 @@ type Pacer struct {
pacer chan struct{} // To pace the operations
sleepTime time.Duration // Time to sleep for each transaction
retries int // Max number of retries
mu sync.Mutex // Protecting read/writes
}
// Paced is a function which is called by the Call and CallNoRetry
@ -41,6 +43,8 @@ func New() *Pacer {
// SetMinSleep sets the minimum sleep time for the pacer
func (p *Pacer) SetMinSleep(t time.Duration) *Pacer {
p.mu.Lock()
defer p.mu.Unlock()
p.minSleep = t
p.sleepTime = p.minSleep
return p
@ -48,6 +52,8 @@ func (p *Pacer) SetMinSleep(t time.Duration) *Pacer {
// SetMaxSleep sets the maximum sleep time for the pacer
func (p *Pacer) SetMaxSleep(t time.Duration) *Pacer {
p.mu.Lock()
defer p.mu.Unlock()
p.maxSleep = t
p.sleepTime = p.minSleep
return p
@ -60,12 +66,16 @@ func (p *Pacer) SetMaxSleep(t time.Duration) *Pacer {
//
// bigger for slower decay, exponential
func (p *Pacer) SetDecayConstant(decay uint) *Pacer {
p.mu.Lock()
defer p.mu.Unlock()
p.decayConstant = decay
return p
}
// SetRetries sets the max number of tries for Call
func (p *Pacer) SetRetries(retries int) *Pacer {
p.mu.Lock()
defer p.mu.Unlock()
p.retries = retries
return p
}
@ -82,12 +92,14 @@ func (p *Pacer) beginCall() {
// not to run it when it wasn't needed
<-p.pacer
p.mu.Lock()
// Restart the timer
go func(t time.Duration) {
// fs.Debug(f, "New sleep for %v at %v", t, time.Now())
time.Sleep(t)
p.pacer <- struct{}{}
}(p.sleepTime)
p.mu.Unlock()
}
// End a call to the API
@ -95,6 +107,7 @@ func (p *Pacer) beginCall() {
// Refresh the pace given an error that was returned. It returns a
// boolean as to whether the operation should be retried.
func (p *Pacer) endCall(again bool) {
p.mu.Lock()
oldSleepTime := p.sleepTime
if again {
p.sleepTime *= 2
@ -113,6 +126,7 @@ func (p *Pacer) endCall(again bool) {
fs.Debug("pacer", "Reducing sleep to %v", p.sleepTime)
}
}
p.mu.Unlock()
}
// call implements Call but with settable retries
@ -139,7 +153,10 @@ func (p *Pacer) call(fn Paced, retries int) (err error) {
// error. This error may be returned wrapped in a RetryError if the
// number of retries is exceeded.
func (p *Pacer) Call(fn Paced) (err error) {
return p.call(fn, p.retries)
p.mu.Lock()
retries := p.retries
p.mu.Unlock()
return p.call(fn, retries)
}
// Pace the remote operations to not exceed Amazon's limits and return