From 000d8bba6656e19ed00f57b9c3d2b1b9baf3f757 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 28 Mar 2019 21:22:22 +0100 Subject: [PATCH 1/2] hotfix: limit concurrency of zfs send & recv commands ATM, the replication logic sends all dry-run requests in parallel, which might overwhelm the ZFS pool on the sending side. Since we use rpc/dataconn for dry sends, this also opens one TCP connection per dry-run request. Use a sempahore to limit the degree of concurrency where we know it is a problem ATM. As indicated by the comments, the cleaner solution would involve some kind of 'resource exhaustion' error code. refs #161 refs #164 --- Gopkg.lock | 9 +++++ endpoint/endpoint.go | 27 ++++++++++++++ replication/logic/replication_logic.go | 29 +++++++++++---- util/semaphore/semaphore.go | 38 ++++++++++++++++++++ util/semaphore/semaphore_test.go | 49 ++++++++++++++++++++++++++ 5 files changed, 146 insertions(+), 6 deletions(-) create mode 100644 util/semaphore/semaphore.go create mode 100644 util/semaphore/semaphore_test.go diff --git a/Gopkg.lock b/Gopkg.lock index 1e1f155..8000054 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -828,6 +828,14 @@ pruneopts = "" revision = "351d144fa1fc0bd934e2408202be0c29f25e35a0" +[[projects]] + branch = "master" + digest = "1:0142c968b74c157abbb0220c05fa2bdde8a3a4509d6134b35ef75d5b58afb721" + name = "golang.org/x/sync" + packages = ["semaphore"] + pruneopts = "" + revision = "e225da77a7e68af35c70ccbf71af2b83e6acac3c" + [[projects]] branch = "master" digest = "1:f358024b019f87eecaadcb098113a40852c94fe58ea670ef3c3e2d2c7bd93db1" @@ -1015,6 +1023,7 @@ "github.com/yudai/gojsondiff/formatter", "github.com/zrepl/yaml-config", "golang.org/x/net/context", + "golang.org/x/sync/semaphore", "golang.org/x/sys/unix", "golang.org/x/tools/cmd/goimports", "golang.org/x/tools/cmd/stringer", diff --git a/endpoint/endpoint.go b/endpoint/endpoint.go index f3ec738..389527e 100644 --- a/endpoint/endpoint.go +++ b/endpoint/endpoint.go @@ -10,6 +10,8 @@ import ( "github.com/zrepl/zrepl/replication/logic/pdu" "github.com/zrepl/zrepl/util/chainlock" + "github.com/zrepl/zrepl/util/envconst" + "github.com/zrepl/zrepl/util/semaphore" "github.com/zrepl/zrepl/zfs" ) @@ -75,16 +77,29 @@ func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesyst } +var maxConcurrentZFSSendSemaphore = semaphore.New(envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_SEND", 10)) + func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.StreamCopier, error) { _, err := s.filterCheckFS(r.Filesystem) if err != nil { return nil, nil, err } + getLogger(ctx).Debug("acquire concurrent send semaphore") + // TODO use try-acquire and fail with resource-exhaustion rpc status + // => would require handling on the client-side + // => this is a dataconn endpoint, doesn't have the status code semantics of gRPC + guard, err := maxConcurrentZFSSendSemaphore.Acquire(ctx) + if err != nil { + return nil, nil, err + } + defer guard.Release() + si, err := zfs.ZFSSendDry(r.Filesystem, r.From, r.To, "") if err != nil { return nil, nil, err } + var expSize int64 = 0 // protocol says 0 means no estimate if si.SizeEstimate != -1 { // but si returns -1 for no size estimate expSize = si.SizeEstimate @@ -320,6 +335,8 @@ func (s *Receiver) Send(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, zf return nil, nil, fmt.Errorf("receiver does not implement Send()") } +var maxConcurrentZFSRecvSemaphore = semaphore.New(envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_RECV", 10)) + func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs.StreamCopier) (*pdu.ReceiveRes, error) { getLogger(ctx).Debug("incoming Receive") defer receive.Close() @@ -396,6 +413,16 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs } } + getLogger(ctx).Debug("acquire concurrent recv semaphore") + // TODO use try-acquire and fail with resource-exhaustion rpc status + // => would require handling on the client-side + // => this is a dataconn endpoint, doesn't have the status code semantics of gRPC + guard, err := maxConcurrentZFSRecvSemaphore.Acquire(ctx) + if err != nil { + return nil, err + } + defer guard.Release() + getLogger(ctx).WithField("opts", fmt.Sprintf("%#v", recvOpts)).Debug("start receive command") if err := zfs.ZFSRecv(ctx, lp.ToString(), receive, recvOpts); err != nil { diff --git a/replication/logic/replication_logic.go b/replication/logic/replication_logic.go index ed5235c..ff0e3d6 100644 --- a/replication/logic/replication_logic.go +++ b/replication/logic/replication_logic.go @@ -14,6 +14,8 @@ import ( "github.com/zrepl/zrepl/replication/logic/pdu" "github.com/zrepl/zrepl/replication/report" "github.com/zrepl/zrepl/util/bytecounter" + "github.com/zrepl/zrepl/util/envconst" + "github.com/zrepl/zrepl/util/semaphore" "github.com/zrepl/zrepl/zfs" ) @@ -110,6 +112,8 @@ type Filesystem struct { Path string // compat receiverFS *pdu.Filesystem promBytesReplicated prometheus.Counter // compat + + sizeEstimateRequestSem *semaphore.S } func (f *Filesystem) EqualToPreviousAttempt(other driver.FS) bool { @@ -234,6 +238,8 @@ func (p *Planner) doPlanning(ctx context.Context) ([]*Filesystem, error) { } rfss := rlfssres.GetFilesystems() + sizeEstimateRequestSem := semaphore.New(envconst.Int64("ZREPL_REPLICATION_MAX_CONCURRENT_SIZE_ESTIMATE", 4)) + q := make([]*Filesystem, 0, len(sfss)) for _, fs := range sfss { @@ -247,11 +253,12 @@ func (p *Planner) doPlanning(ctx context.Context) ([]*Filesystem, error) { ctr := p.promBytesReplicated.WithLabelValues(fs.Path) q = append(q, &Filesystem{ - sender: p.sender, - receiver: p.receiver, - Path: fs.Path, - receiverFS: receiverFS, - promBytesReplicated: ctr, + sender: p.sender, + receiver: p.receiver, + Path: fs.Path, + receiverFS: receiverFS, + promBytesReplicated: ctr, + sizeEstimateRequestSem: sizeEstimateRequestSem, }) } @@ -336,7 +343,17 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) { wg.Add(1) go func(step *Step) { defer wg.Done() - err := step.updateSizeEstimate(fanOutCtx) + + // TODO instead of the semaphore, rely on resource-exhaustion signaled by the remote endpoint to limit size-estimate requests + // Send is handled over rpc/dataconn ATM, which doesn't support the resource exhaustion status codes that gRPC defines + guard, err := fs.sizeEstimateRequestSem.Acquire(fanOutCtx) + if err != nil { + fanOutCancel() + return + } + defer guard.Release() + + err = step.updateSizeEstimate(fanOutCtx) if err != nil { log.WithError(err).WithField("step", step).Error("error computing size estimate") fanOutCancel() diff --git a/util/semaphore/semaphore.go b/util/semaphore/semaphore.go new file mode 100644 index 0000000..73fdaed --- /dev/null +++ b/util/semaphore/semaphore.go @@ -0,0 +1,38 @@ +package semaphore + +import ( + "context" + + wsemaphore "golang.org/x/sync/semaphore" +) + +type S struct { + ws *wsemaphore.Weighted +} + +func New(max int64) *S { + return &S{wsemaphore.NewWeighted(max)} +} + +type AcquireGuard struct { + s *S + released bool +} + +// The returned AcquireGuard is not goroutine-safe. +func (s *S) Acquire(ctx context.Context) (*AcquireGuard, error) { + if err := s.ws.Acquire(ctx, 1); err != nil { + return nil, err + } else if err := ctx.Err(); err != nil { + return nil, err + } + return &AcquireGuard{s, false}, nil +} + +func (g *AcquireGuard) Release() { + if g == nil || g.released { + return + } + g.released = true + g.s.ws.Release(1) +} diff --git a/util/semaphore/semaphore_test.go b/util/semaphore/semaphore_test.go new file mode 100644 index 0000000..66c900c --- /dev/null +++ b/util/semaphore/semaphore_test.go @@ -0,0 +1,49 @@ +package semaphore + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSemaphore(t *testing.T) { + const numGoroutines = 10 + const concurrentSemaphore = 5 + const sleepTime = 1 * time.Second + + begin := time.Now() + + sem := New(concurrentSemaphore) + + var aquisitions struct { + beforeT, afterT uint32 + } + + var wg sync.WaitGroup + wg.Add(numGoroutines) + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + res, err := sem.Acquire(context.Background()) + require.NoError(t, err) + defer res.Release() + if time.Since(begin) > sleepTime { + atomic.AddUint32(&aquisitions.beforeT, 1) + } else { + atomic.AddUint32(&aquisitions.afterT, 1) + } + time.Sleep(sleepTime) + }() + } + + wg.Wait() + + assert.True(t, aquisitions.beforeT == concurrentSemaphore) + assert.True(t, aquisitions.afterT == numGoroutines-concurrentSemaphore) + +} From 4d2765e5f2b9551f5086a9fd70b7dceb0ef0c2fb Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 28 Mar 2019 21:53:14 +0100 Subject: [PATCH 2/2] rpc: actually return an error if ReqSend fails --- rpc/rpc_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/rpc_client.go b/rpc/rpc_client.go index d563a66..25e24d6 100644 --- a/rpc/rpc_client.go +++ b/rpc/rpc_client.go @@ -86,7 +86,7 @@ func (c *Client) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.St // TODO the returned sendStream may return a read error created by the remote side res, streamCopier, err := c.dataClient.ReqSend(ctx, r) if err != nil { - return nil, nil, nil + return nil, nil, err } if streamCopier == nil { return res, nil, nil