mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-22 00:13:52 +01:00
hotfix: limit concurrency of zfs send & recv commands
ATM, the replication logic sends all dry-run requests in parallel, which might overwhelm the ZFS pool on the sending side. Since we use rpc/dataconn for dry sends, this also opens one TCP connection per dry-run request. Use a sempahore to limit the degree of concurrency where we know it is a problem ATM. As indicated by the comments, the cleaner solution would involve some kind of 'resource exhaustion' error code. refs #161 refs #164
This commit is contained in:
parent
5f909dab76
commit
000d8bba66
9
Gopkg.lock
generated
9
Gopkg.lock
generated
@ -828,6 +828,14 @@
|
||||
pruneopts = ""
|
||||
revision = "351d144fa1fc0bd934e2408202be0c29f25e35a0"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:0142c968b74c157abbb0220c05fa2bdde8a3a4509d6134b35ef75d5b58afb721"
|
||||
name = "golang.org/x/sync"
|
||||
packages = ["semaphore"]
|
||||
pruneopts = ""
|
||||
revision = "e225da77a7e68af35c70ccbf71af2b83e6acac3c"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
digest = "1:f358024b019f87eecaadcb098113a40852c94fe58ea670ef3c3e2d2c7bd93db1"
|
||||
@ -1015,6 +1023,7 @@
|
||||
"github.com/yudai/gojsondiff/formatter",
|
||||
"github.com/zrepl/yaml-config",
|
||||
"golang.org/x/net/context",
|
||||
"golang.org/x/sync/semaphore",
|
||||
"golang.org/x/sys/unix",
|
||||
"golang.org/x/tools/cmd/goimports",
|
||||
"golang.org/x/tools/cmd/stringer",
|
||||
|
@ -10,6 +10,8 @@ import (
|
||||
|
||||
"github.com/zrepl/zrepl/replication/logic/pdu"
|
||||
"github.com/zrepl/zrepl/util/chainlock"
|
||||
"github.com/zrepl/zrepl/util/envconst"
|
||||
"github.com/zrepl/zrepl/util/semaphore"
|
||||
"github.com/zrepl/zrepl/zfs"
|
||||
)
|
||||
|
||||
@ -75,16 +77,29 @@ func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesyst
|
||||
|
||||
}
|
||||
|
||||
var maxConcurrentZFSSendSemaphore = semaphore.New(envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_SEND", 10))
|
||||
|
||||
func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.StreamCopier, error) {
|
||||
_, err := s.filterCheckFS(r.Filesystem)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
getLogger(ctx).Debug("acquire concurrent send semaphore")
|
||||
// TODO use try-acquire and fail with resource-exhaustion rpc status
|
||||
// => would require handling on the client-side
|
||||
// => this is a dataconn endpoint, doesn't have the status code semantics of gRPC
|
||||
guard, err := maxConcurrentZFSSendSemaphore.Acquire(ctx)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
defer guard.Release()
|
||||
|
||||
si, err := zfs.ZFSSendDry(r.Filesystem, r.From, r.To, "")
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
var expSize int64 = 0 // protocol says 0 means no estimate
|
||||
if si.SizeEstimate != -1 { // but si returns -1 for no size estimate
|
||||
expSize = si.SizeEstimate
|
||||
@ -320,6 +335,8 @@ func (s *Receiver) Send(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, zf
|
||||
return nil, nil, fmt.Errorf("receiver does not implement Send()")
|
||||
}
|
||||
|
||||
var maxConcurrentZFSRecvSemaphore = semaphore.New(envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_RECV", 10))
|
||||
|
||||
func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs.StreamCopier) (*pdu.ReceiveRes, error) {
|
||||
getLogger(ctx).Debug("incoming Receive")
|
||||
defer receive.Close()
|
||||
@ -396,6 +413,16 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs
|
||||
}
|
||||
}
|
||||
|
||||
getLogger(ctx).Debug("acquire concurrent recv semaphore")
|
||||
// TODO use try-acquire and fail with resource-exhaustion rpc status
|
||||
// => would require handling on the client-side
|
||||
// => this is a dataconn endpoint, doesn't have the status code semantics of gRPC
|
||||
guard, err := maxConcurrentZFSRecvSemaphore.Acquire(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer guard.Release()
|
||||
|
||||
getLogger(ctx).WithField("opts", fmt.Sprintf("%#v", recvOpts)).Debug("start receive command")
|
||||
|
||||
if err := zfs.ZFSRecv(ctx, lp.ToString(), receive, recvOpts); err != nil {
|
||||
|
@ -14,6 +14,8 @@ import (
|
||||
"github.com/zrepl/zrepl/replication/logic/pdu"
|
||||
"github.com/zrepl/zrepl/replication/report"
|
||||
"github.com/zrepl/zrepl/util/bytecounter"
|
||||
"github.com/zrepl/zrepl/util/envconst"
|
||||
"github.com/zrepl/zrepl/util/semaphore"
|
||||
"github.com/zrepl/zrepl/zfs"
|
||||
)
|
||||
|
||||
@ -110,6 +112,8 @@ type Filesystem struct {
|
||||
Path string // compat
|
||||
receiverFS *pdu.Filesystem
|
||||
promBytesReplicated prometheus.Counter // compat
|
||||
|
||||
sizeEstimateRequestSem *semaphore.S
|
||||
}
|
||||
|
||||
func (f *Filesystem) EqualToPreviousAttempt(other driver.FS) bool {
|
||||
@ -234,6 +238,8 @@ func (p *Planner) doPlanning(ctx context.Context) ([]*Filesystem, error) {
|
||||
}
|
||||
rfss := rlfssres.GetFilesystems()
|
||||
|
||||
sizeEstimateRequestSem := semaphore.New(envconst.Int64("ZREPL_REPLICATION_MAX_CONCURRENT_SIZE_ESTIMATE", 4))
|
||||
|
||||
q := make([]*Filesystem, 0, len(sfss))
|
||||
for _, fs := range sfss {
|
||||
|
||||
@ -247,11 +253,12 @@ func (p *Planner) doPlanning(ctx context.Context) ([]*Filesystem, error) {
|
||||
ctr := p.promBytesReplicated.WithLabelValues(fs.Path)
|
||||
|
||||
q = append(q, &Filesystem{
|
||||
sender: p.sender,
|
||||
receiver: p.receiver,
|
||||
Path: fs.Path,
|
||||
receiverFS: receiverFS,
|
||||
promBytesReplicated: ctr,
|
||||
sender: p.sender,
|
||||
receiver: p.receiver,
|
||||
Path: fs.Path,
|
||||
receiverFS: receiverFS,
|
||||
promBytesReplicated: ctr,
|
||||
sizeEstimateRequestSem: sizeEstimateRequestSem,
|
||||
})
|
||||
}
|
||||
|
||||
@ -336,7 +343,17 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) {
|
||||
wg.Add(1)
|
||||
go func(step *Step) {
|
||||
defer wg.Done()
|
||||
err := step.updateSizeEstimate(fanOutCtx)
|
||||
|
||||
// TODO instead of the semaphore, rely on resource-exhaustion signaled by the remote endpoint to limit size-estimate requests
|
||||
// Send is handled over rpc/dataconn ATM, which doesn't support the resource exhaustion status codes that gRPC defines
|
||||
guard, err := fs.sizeEstimateRequestSem.Acquire(fanOutCtx)
|
||||
if err != nil {
|
||||
fanOutCancel()
|
||||
return
|
||||
}
|
||||
defer guard.Release()
|
||||
|
||||
err = step.updateSizeEstimate(fanOutCtx)
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("step", step).Error("error computing size estimate")
|
||||
fanOutCancel()
|
||||
|
38
util/semaphore/semaphore.go
Normal file
38
util/semaphore/semaphore.go
Normal file
@ -0,0 +1,38 @@
|
||||
package semaphore
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
wsemaphore "golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
type S struct {
|
||||
ws *wsemaphore.Weighted
|
||||
}
|
||||
|
||||
func New(max int64) *S {
|
||||
return &S{wsemaphore.NewWeighted(max)}
|
||||
}
|
||||
|
||||
type AcquireGuard struct {
|
||||
s *S
|
||||
released bool
|
||||
}
|
||||
|
||||
// The returned AcquireGuard is not goroutine-safe.
|
||||
func (s *S) Acquire(ctx context.Context) (*AcquireGuard, error) {
|
||||
if err := s.ws.Acquire(ctx, 1); err != nil {
|
||||
return nil, err
|
||||
} else if err := ctx.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &AcquireGuard{s, false}, nil
|
||||
}
|
||||
|
||||
func (g *AcquireGuard) Release() {
|
||||
if g == nil || g.released {
|
||||
return
|
||||
}
|
||||
g.released = true
|
||||
g.s.ws.Release(1)
|
||||
}
|
49
util/semaphore/semaphore_test.go
Normal file
49
util/semaphore/semaphore_test.go
Normal file
@ -0,0 +1,49 @@
|
||||
package semaphore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSemaphore(t *testing.T) {
|
||||
const numGoroutines = 10
|
||||
const concurrentSemaphore = 5
|
||||
const sleepTime = 1 * time.Second
|
||||
|
||||
begin := time.Now()
|
||||
|
||||
sem := New(concurrentSemaphore)
|
||||
|
||||
var aquisitions struct {
|
||||
beforeT, afterT uint32
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(numGoroutines)
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
res, err := sem.Acquire(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer res.Release()
|
||||
if time.Since(begin) > sleepTime {
|
||||
atomic.AddUint32(&aquisitions.beforeT, 1)
|
||||
} else {
|
||||
atomic.AddUint32(&aquisitions.afterT, 1)
|
||||
}
|
||||
time.Sleep(sleepTime)
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
assert.True(t, aquisitions.beforeT == concurrentSemaphore)
|
||||
assert.True(t, aquisitions.afterT == numGoroutines-concurrentSemaphore)
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user