mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-25 18:04:58 +01:00
Merge pull request #167 from zrepl/problame/161-limit-concurrency-io-intensive-zfs-ops
hotfix #161 for 0.1: limit concurrency of zfs send & recv commands
This commit is contained in:
commit
32dd456ac6
9
Gopkg.lock
generated
9
Gopkg.lock
generated
@ -828,6 +828,14 @@
|
|||||||
pruneopts = ""
|
pruneopts = ""
|
||||||
revision = "351d144fa1fc0bd934e2408202be0c29f25e35a0"
|
revision = "351d144fa1fc0bd934e2408202be0c29f25e35a0"
|
||||||
|
|
||||||
|
[[projects]]
|
||||||
|
branch = "master"
|
||||||
|
digest = "1:0142c968b74c157abbb0220c05fa2bdde8a3a4509d6134b35ef75d5b58afb721"
|
||||||
|
name = "golang.org/x/sync"
|
||||||
|
packages = ["semaphore"]
|
||||||
|
pruneopts = ""
|
||||||
|
revision = "e225da77a7e68af35c70ccbf71af2b83e6acac3c"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
branch = "master"
|
branch = "master"
|
||||||
digest = "1:f358024b019f87eecaadcb098113a40852c94fe58ea670ef3c3e2d2c7bd93db1"
|
digest = "1:f358024b019f87eecaadcb098113a40852c94fe58ea670ef3c3e2d2c7bd93db1"
|
||||||
@ -1015,6 +1023,7 @@
|
|||||||
"github.com/yudai/gojsondiff/formatter",
|
"github.com/yudai/gojsondiff/formatter",
|
||||||
"github.com/zrepl/yaml-config",
|
"github.com/zrepl/yaml-config",
|
||||||
"golang.org/x/net/context",
|
"golang.org/x/net/context",
|
||||||
|
"golang.org/x/sync/semaphore",
|
||||||
"golang.org/x/sys/unix",
|
"golang.org/x/sys/unix",
|
||||||
"golang.org/x/tools/cmd/goimports",
|
"golang.org/x/tools/cmd/goimports",
|
||||||
"golang.org/x/tools/cmd/stringer",
|
"golang.org/x/tools/cmd/stringer",
|
||||||
|
@ -10,6 +10,8 @@ import (
|
|||||||
|
|
||||||
"github.com/zrepl/zrepl/replication/logic/pdu"
|
"github.com/zrepl/zrepl/replication/logic/pdu"
|
||||||
"github.com/zrepl/zrepl/util/chainlock"
|
"github.com/zrepl/zrepl/util/chainlock"
|
||||||
|
"github.com/zrepl/zrepl/util/envconst"
|
||||||
|
"github.com/zrepl/zrepl/util/semaphore"
|
||||||
"github.com/zrepl/zrepl/zfs"
|
"github.com/zrepl/zrepl/zfs"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -75,16 +77,29 @@ func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesyst
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var maxConcurrentZFSSendSemaphore = semaphore.New(envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_SEND", 10))
|
||||||
|
|
||||||
func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.StreamCopier, error) {
|
func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.StreamCopier, error) {
|
||||||
_, err := s.filterCheckFS(r.Filesystem)
|
_, err := s.filterCheckFS(r.Filesystem)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
getLogger(ctx).Debug("acquire concurrent send semaphore")
|
||||||
|
// TODO use try-acquire and fail with resource-exhaustion rpc status
|
||||||
|
// => would require handling on the client-side
|
||||||
|
// => this is a dataconn endpoint, doesn't have the status code semantics of gRPC
|
||||||
|
guard, err := maxConcurrentZFSSendSemaphore.Acquire(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
defer guard.Release()
|
||||||
|
|
||||||
si, err := zfs.ZFSSendDry(r.Filesystem, r.From, r.To, "")
|
si, err := zfs.ZFSSendDry(r.Filesystem, r.From, r.To, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var expSize int64 = 0 // protocol says 0 means no estimate
|
var expSize int64 = 0 // protocol says 0 means no estimate
|
||||||
if si.SizeEstimate != -1 { // but si returns -1 for no size estimate
|
if si.SizeEstimate != -1 { // but si returns -1 for no size estimate
|
||||||
expSize = si.SizeEstimate
|
expSize = si.SizeEstimate
|
||||||
@ -320,6 +335,8 @@ func (s *Receiver) Send(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, zf
|
|||||||
return nil, nil, fmt.Errorf("receiver does not implement Send()")
|
return nil, nil, fmt.Errorf("receiver does not implement Send()")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var maxConcurrentZFSRecvSemaphore = semaphore.New(envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_RECV", 10))
|
||||||
|
|
||||||
func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs.StreamCopier) (*pdu.ReceiveRes, error) {
|
func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs.StreamCopier) (*pdu.ReceiveRes, error) {
|
||||||
getLogger(ctx).Debug("incoming Receive")
|
getLogger(ctx).Debug("incoming Receive")
|
||||||
defer receive.Close()
|
defer receive.Close()
|
||||||
@ -396,6 +413,16 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
getLogger(ctx).Debug("acquire concurrent recv semaphore")
|
||||||
|
// TODO use try-acquire and fail with resource-exhaustion rpc status
|
||||||
|
// => would require handling on the client-side
|
||||||
|
// => this is a dataconn endpoint, doesn't have the status code semantics of gRPC
|
||||||
|
guard, err := maxConcurrentZFSRecvSemaphore.Acquire(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer guard.Release()
|
||||||
|
|
||||||
getLogger(ctx).WithField("opts", fmt.Sprintf("%#v", recvOpts)).Debug("start receive command")
|
getLogger(ctx).WithField("opts", fmt.Sprintf("%#v", recvOpts)).Debug("start receive command")
|
||||||
|
|
||||||
if err := zfs.ZFSRecv(ctx, lp.ToString(), receive, recvOpts); err != nil {
|
if err := zfs.ZFSRecv(ctx, lp.ToString(), receive, recvOpts); err != nil {
|
||||||
|
@ -14,6 +14,8 @@ import (
|
|||||||
"github.com/zrepl/zrepl/replication/logic/pdu"
|
"github.com/zrepl/zrepl/replication/logic/pdu"
|
||||||
"github.com/zrepl/zrepl/replication/report"
|
"github.com/zrepl/zrepl/replication/report"
|
||||||
"github.com/zrepl/zrepl/util/bytecounter"
|
"github.com/zrepl/zrepl/util/bytecounter"
|
||||||
|
"github.com/zrepl/zrepl/util/envconst"
|
||||||
|
"github.com/zrepl/zrepl/util/semaphore"
|
||||||
"github.com/zrepl/zrepl/zfs"
|
"github.com/zrepl/zrepl/zfs"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -110,6 +112,8 @@ type Filesystem struct {
|
|||||||
Path string // compat
|
Path string // compat
|
||||||
receiverFS *pdu.Filesystem
|
receiverFS *pdu.Filesystem
|
||||||
promBytesReplicated prometheus.Counter // compat
|
promBytesReplicated prometheus.Counter // compat
|
||||||
|
|
||||||
|
sizeEstimateRequestSem *semaphore.S
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *Filesystem) EqualToPreviousAttempt(other driver.FS) bool {
|
func (f *Filesystem) EqualToPreviousAttempt(other driver.FS) bool {
|
||||||
@ -234,6 +238,8 @@ func (p *Planner) doPlanning(ctx context.Context) ([]*Filesystem, error) {
|
|||||||
}
|
}
|
||||||
rfss := rlfssres.GetFilesystems()
|
rfss := rlfssres.GetFilesystems()
|
||||||
|
|
||||||
|
sizeEstimateRequestSem := semaphore.New(envconst.Int64("ZREPL_REPLICATION_MAX_CONCURRENT_SIZE_ESTIMATE", 4))
|
||||||
|
|
||||||
q := make([]*Filesystem, 0, len(sfss))
|
q := make([]*Filesystem, 0, len(sfss))
|
||||||
for _, fs := range sfss {
|
for _, fs := range sfss {
|
||||||
|
|
||||||
@ -252,6 +258,7 @@ func (p *Planner) doPlanning(ctx context.Context) ([]*Filesystem, error) {
|
|||||||
Path: fs.Path,
|
Path: fs.Path,
|
||||||
receiverFS: receiverFS,
|
receiverFS: receiverFS,
|
||||||
promBytesReplicated: ctr,
|
promBytesReplicated: ctr,
|
||||||
|
sizeEstimateRequestSem: sizeEstimateRequestSem,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -336,7 +343,17 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) {
|
|||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(step *Step) {
|
go func(step *Step) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
err := step.updateSizeEstimate(fanOutCtx)
|
|
||||||
|
// TODO instead of the semaphore, rely on resource-exhaustion signaled by the remote endpoint to limit size-estimate requests
|
||||||
|
// Send is handled over rpc/dataconn ATM, which doesn't support the resource exhaustion status codes that gRPC defines
|
||||||
|
guard, err := fs.sizeEstimateRequestSem.Acquire(fanOutCtx)
|
||||||
|
if err != nil {
|
||||||
|
fanOutCancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer guard.Release()
|
||||||
|
|
||||||
|
err = step.updateSizeEstimate(fanOutCtx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).WithField("step", step).Error("error computing size estimate")
|
log.WithError(err).WithField("step", step).Error("error computing size estimate")
|
||||||
fanOutCancel()
|
fanOutCancel()
|
||||||
|
@ -86,7 +86,7 @@ func (c *Client) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.St
|
|||||||
// TODO the returned sendStream may return a read error created by the remote side
|
// TODO the returned sendStream may return a read error created by the remote side
|
||||||
res, streamCopier, err := c.dataClient.ReqSend(ctx, r)
|
res, streamCopier, err := c.dataClient.ReqSend(ctx, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
if streamCopier == nil {
|
if streamCopier == nil {
|
||||||
return res, nil, nil
|
return res, nil, nil
|
||||||
|
38
util/semaphore/semaphore.go
Normal file
38
util/semaphore/semaphore.go
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
package semaphore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
wsemaphore "golang.org/x/sync/semaphore"
|
||||||
|
)
|
||||||
|
|
||||||
|
type S struct {
|
||||||
|
ws *wsemaphore.Weighted
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(max int64) *S {
|
||||||
|
return &S{wsemaphore.NewWeighted(max)}
|
||||||
|
}
|
||||||
|
|
||||||
|
type AcquireGuard struct {
|
||||||
|
s *S
|
||||||
|
released bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// The returned AcquireGuard is not goroutine-safe.
|
||||||
|
func (s *S) Acquire(ctx context.Context) (*AcquireGuard, error) {
|
||||||
|
if err := s.ws.Acquire(ctx, 1); err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if err := ctx.Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &AcquireGuard{s, false}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *AcquireGuard) Release() {
|
||||||
|
if g == nil || g.released {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
g.released = true
|
||||||
|
g.s.ws.Release(1)
|
||||||
|
}
|
49
util/semaphore/semaphore_test.go
Normal file
49
util/semaphore/semaphore_test.go
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
package semaphore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSemaphore(t *testing.T) {
|
||||||
|
const numGoroutines = 10
|
||||||
|
const concurrentSemaphore = 5
|
||||||
|
const sleepTime = 1 * time.Second
|
||||||
|
|
||||||
|
begin := time.Now()
|
||||||
|
|
||||||
|
sem := New(concurrentSemaphore)
|
||||||
|
|
||||||
|
var aquisitions struct {
|
||||||
|
beforeT, afterT uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(numGoroutines)
|
||||||
|
for i := 0; i < numGoroutines; i++ {
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
res, err := sem.Acquire(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer res.Release()
|
||||||
|
if time.Since(begin) > sleepTime {
|
||||||
|
atomic.AddUint32(&aquisitions.beforeT, 1)
|
||||||
|
} else {
|
||||||
|
atomic.AddUint32(&aquisitions.afterT, 1)
|
||||||
|
}
|
||||||
|
time.Sleep(sleepTime)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
assert.True(t, aquisitions.beforeT == concurrentSemaphore)
|
||||||
|
assert.True(t, aquisitions.afterT == numGoroutines-concurrentSemaphore)
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user