From 0ceea1b792118453d792a8e25ecacdc8d8284ae1 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sun, 28 Feb 2021 23:33:28 +0100 Subject: [PATCH] replication: simplify parallel replication variables & expose them in config closes #140 --- config/config.go | 8 +- daemon/job/active.go | 36 +++++- daemon/job/build_jobs_test.go | 110 ++++++++++++++++++ docs/configuration/replication.rst | 27 +++++ endpoint/endpoint.go | 26 ----- go.mod | 5 +- go.sum | 13 +++ platformtest/tests/replication.go | 7 ++ replication/driver/replication_driver.go | 31 +++-- replication/driver/replication_driver_test.go | 7 +- replication/logic/replication_logic.go | 7 +- replication/logic/replication_logic_policy.go | 12 +- replication/replication.go | 4 +- 13 files changed, 246 insertions(+), 47 deletions(-) diff --git a/config/config.go b/config/config.go index 58a4ce5..d3bc086 100644 --- a/config/config.go +++ b/config/config.go @@ -99,7 +99,8 @@ type RecvOptions struct { } type Replication struct { - Protection *ReplicationOptionsProtection `yaml:"protection,optional,fromdefaults"` + Protection *ReplicationOptionsProtection `yaml:"protection,optional,fromdefaults"` + Concurrency *ReplicationOptionsConcurrency `yaml:"concurrency,optional,fromdefaults"` } type ReplicationOptionsProtection struct { @@ -107,6 +108,11 @@ type ReplicationOptionsProtection struct { Incremental string `yaml:"incremental,optional,default=guarantee_resumability"` } +type ReplicationOptionsConcurrency struct { + Steps int `yaml:"steps,optional,default=1"` + SizeEstimates int `yaml:"size_estimates,optional,default=4"` +} + func (l *RecvOptions) SetDefault() { *l = RecvOptions{Properties: &PropertyRecvOptions{}} } diff --git a/daemon/job/active.go b/daemon/job/active.go index f183d66..cc6cbc1 100644 --- a/daemon/job/active.go +++ b/daemon/job/active.go @@ -11,6 +11,7 @@ import ( "github.com/prometheus/common/log" "github.com/zrepl/zrepl/daemon/logging/trace" + "github.com/zrepl/zrepl/util/envconst" "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/job/reset" @@ -33,6 +34,8 @@ type ActiveSide struct { name endpoint.JobID connecter transport.Connecter + replicationDriverConfig driver.Config + prunerFactory *pruner.PrunerFactory promRepStateSecs *prometheus.HistogramVec // labels: state @@ -159,8 +162,12 @@ func modePushFromConfig(g *config.Global, in *config.PushJob, jobID endpoint.Job } m.plannerPolicy = &logic.PlannerPolicy{ - EncryptedSend: logic.TriFromBool(in.Send.Encrypted), - ReplicationConfig: replicationConfig, + EncryptedSend: logic.TriFromBool(in.Send.Encrypted), + ReplicationConfig: replicationConfig, + SizeEstimationConcurrency: in.Replication.Concurrency.SizeEstimates, + } + if err := m.plannerPolicy.Validate(); err != nil { + return nil, errors.Wrap(err, "cannot build planner policy") } if m.snapper, err = snapper.FromConfig(g, m.senderConfig.FSF, in.Snapshotting); err != nil { @@ -254,8 +261,12 @@ func modePullFromConfig(g *config.Global, in *config.PullJob, jobID endpoint.Job } m.plannerPolicy = &logic.PlannerPolicy{ - EncryptedSend: logic.DontCare, - ReplicationConfig: replicationConfig, + EncryptedSend: logic.DontCare, + ReplicationConfig: replicationConfig, + SizeEstimationConcurrency: in.Replication.Concurrency.SizeEstimates, + } + if err := m.plannerPolicy.Validate(); err != nil { + return nil, errors.Wrap(err, "cannot build planner policy") } m.receiverConfig, err = buildReceiverConfig(in, jobID) @@ -266,6 +277,16 @@ func modePullFromConfig(g *config.Global, in *config.PullJob, jobID endpoint.Job return m, nil } +func replicationDriverConfigFromConfig(in *config.Replication) (c driver.Config, err error) { + c = driver.Config{ + StepQueueConcurrency: in.Concurrency.Steps, + MaxAttempts: envconst.Int("ZREPL_REPLICATION_MAX_ATTEMPTS", 3), + ReconnectHardFailTimeout: envconst.Duration("ZREPL_REPLICATION_RECONNECT_HARD_FAIL_TIMEOUT", 10*time.Minute), + } + err = c.Validate() + return c, err +} + func activeSide(g *config.Global, in *config.ActiveJob, configJob interface{}) (j *ActiveSide, err error) { j = &ActiveSide{} @@ -326,6 +347,11 @@ func activeSide(g *config.Global, in *config.ActiveJob, configJob interface{}) ( return nil, err } + j.replicationDriverConfig, err = replicationDriverConfigFromConfig(in.Replication) + if err != nil { + return nil, errors.Wrap(err, "cannot build replication driver config") + } + return j, nil } @@ -459,7 +485,7 @@ func (j *ActiveSide) do(ctx context.Context) { *tasks = activeSideTasks{} tasks.replicationCancel = func() { repCancel(); endSpan() } tasks.replicationReport, repWait = replication.Do( - ctx, logic.NewPlanner(j.promRepStateSecs, j.promBytesReplicated, sender, receiver, j.mode.PlannerPolicy()), + ctx, j.replicationDriverConfig, logic.NewPlanner(j.promRepStateSecs, j.promBytesReplicated, sender, receiver, j.mode.PlannerPolicy()), ) tasks.state = ActiveSideReplicating }) diff --git a/daemon/job/build_jobs_test.go b/daemon/job/build_jobs_test.go index 6fca84f..ef1b8cf 100644 --- a/daemon/job/build_jobs_test.go +++ b/daemon/job/build_jobs_test.go @@ -144,3 +144,113 @@ func TestSampleConfigsAreBuiltWithoutErrors(t *testing.T) { } } + +func TestReplicationOptions(t *testing.T) { + tmpl := ` +jobs: +- name: foo + type: push + connect: + type: local + listener_name: foo + client_identity: bar + filesystems: {"<": true} + %s + snapshotting: + type: manual + pruning: + keep_sender: + - type: last_n + count: 10 + keep_receiver: + - type: last_n + count: 10 +` + + type Test struct { + name string + input string + expectOk func(t *testing.T, a *ActiveSide, m *modePush) + expectError bool + } + + tests := []Test{ + { + name: "defaults", + input: ` + replication: {} +`, + expectOk: func(t *testing.T, a *ActiveSide, m *modePush) {}, + }, + { + name: "steps_zero", + input: ` + replication: + concurrency: + steps: 0 +`, + expectError: true, + }, + { + name: "size_estimates_zero", + input: ` + replication: + concurrency: + size_estimates: 0 +`, + expectError: true, + }, + { + name: "custom_values", + input: ` + replication: + concurrency: + steps: 23 + size_estimates: 42 +`, + expectOk: func(t *testing.T, a *ActiveSide, m *modePush) { + assert.Equal(t, 23, a.replicationDriverConfig.StepQueueConcurrency) + assert.Equal(t, 42, m.plannerPolicy.SizeEstimationConcurrency) + }, + }, + { + name: "negative_values_forbidden", + input: ` + replication: + concurrency: + steps: -23 + size_estimates: -42 +`, + expectError: true, + }, + } + + fill := func(s string) string { return fmt.Sprintf(tmpl, s) } + + for _, ts := range tests { + t.Run(ts.name, func(t *testing.T) { + assert.True(t, (ts.expectError) != (ts.expectOk != nil)) + + cstr := fill(ts.input) + t.Logf("testing config:\n%s", cstr) + c, err := config.ParseConfigBytes([]byte(cstr)) + require.NoError(t, err) + jobs, err := JobsFromConfig(c) + if ts.expectOk != nil { + require.NoError(t, err) + require.NotNil(t, c) + require.NoError(t, err) + require.Len(t, jobs, 1) + a := jobs[0].(*ActiveSide) + m := a.mode.(*modePush) + ts.expectOk(t, a, m) + } else if ts.expectError { + require.Error(t, err) + } else { + t.Fatalf("test must define expectOk or expectError") + } + + }) + } + +} diff --git a/docs/configuration/replication.rst b/docs/configuration/replication.rst index 03f8e6d..e58f526 100644 --- a/docs/configuration/replication.rst +++ b/docs/configuration/replication.rst @@ -14,6 +14,10 @@ Replication Options protection: initial: guarantee_resumability # guarantee_{resumability,incremental,nothing} incremental: guarantee_resumability # guarantee_{resumability,incremental,nothing} + concurrency: + size_estimates: 4 + steps: 1 + ... .. _replication-option-protection: @@ -45,3 +49,26 @@ which is useful if replication happens so rarely (or fails so frequently) that t When changing this flag, obsoleted zrepl-managed bookmarks and holds will be destroyed on the next replication step that is attempted for each filesystem. + +.. _replication-option-concurrency: + +``concurrency`` option +---------------------- + +The ``concurrency`` options control the maximum amount of concurrency during replication. +The default values allow some concurrency during size estimation but no parallelism for the actual replication. + +* ``concurrency.steps`` (default = 1) controls the maximum number of concurrently executed :ref:`replication steps `. + The planning step for each file system is counted as a single step. +* ``concurrency.size_estimates`` (default = 4) controls the maximum number of concurrent step size estimations done by the job. + +Note that initial replication cannot start replicating child filesystems before the parent filesystem's initial replication step has completed. + +Some notes on tuning these values: + +* Disk: Size estimation is less I/O intensive than step execution because it does not need to access the data blocks. +* CPU: Size estimation is usually a dense CPU burst whereas step execution CPU utilization is stretched out over time because of disk IO. + Faster disks, sending a compressed dataset in :ref:`plain mode ` and the zrepl transport mode all contribute to higher CPU requirements. +* Network bandwidth: Size estimation does not consume meaningful amounts of bandwidth, step execution does. +* :ref:`zrepl ZFS abstractions `: for each replication step zrepl needs to update its ZFS abstractions through the ``zfs`` command which often waits multiple seconds for the zpool to sync. + Thus, if the actual send & recv time of a step is small compared to the time spent on zrepl ZFS abstractions then increasing step execution concurrency will result in a lower overall turnaround time. diff --git a/endpoint/endpoint.go b/endpoint/endpoint.go index c970cbd..f0eba82 100644 --- a/endpoint/endpoint.go +++ b/endpoint/endpoint.go @@ -18,7 +18,6 @@ import ( "github.com/zrepl/zrepl/util/chainlock" "github.com/zrepl/zrepl/util/envconst" "github.com/zrepl/zrepl/util/nodefault" - "github.com/zrepl/zrepl/util/semaphore" "github.com/zrepl/zrepl/zfs" zfsprop "github.com/zrepl/zrepl/zfs/property" ) @@ -130,9 +129,6 @@ func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesyst } -var maxConcurrentZFSSend = envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_SEND", 10) -var maxConcurrentZFSSendSemaphore = semaphore.New(maxConcurrentZFSSend) - func uncheckedSendArgsFromPDU(fsv *pdu.FilesystemVersion) *zfs.ZFSSendArgVersion { if fsv == nil { return nil @@ -199,16 +195,6 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea return nil, nil, errors.Wrap(err, "validate send arguments") } - getLogger(ctx).Debug("acquire concurrent send semaphore") - // TODO use try-acquire and fail with resource-exhaustion rpc status - // => would require handling on the client-side - // => this is a dataconn endpoint, doesn't have the status code semantics of gRPC - guard, err := maxConcurrentZFSSendSemaphore.Acquire(ctx) - if err != nil { - return nil, nil, err - } - defer guard.Release() - si, err := zfs.ZFSSendDry(ctx, sendArgs) if err != nil { return nil, nil, errors.Wrap(err, "zfs send dry failed") @@ -682,8 +668,6 @@ func (s *Receiver) Send(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, io return nil, nil, fmt.Errorf("receiver does not implement Send()") } -var maxConcurrentZFSRecvSemaphore = semaphore.New(envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_RECV", 10)) - func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.ReadCloser) (*pdu.ReceiveRes, error) { defer trace.WithSpanFromStackUpdateCtx(&ctx)() @@ -803,16 +787,6 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io. return nil, errors.Wrap(err, "cannot determine whether we can use resumable send & recv") } - log.Debug("acquire concurrent recv semaphore") - // TODO use try-acquire and fail with resource-exhaustion rpc status - // => would require handling on the client-side - // => this is a dataconn endpoint, doesn't have the status code semantics of gRPC - guard, err := maxConcurrentZFSRecvSemaphore.Acquire(ctx) - if err != nil { - return nil, err - } - defer guard.Release() - var peek bytes.Buffer var MaxPeek = envconst.Int64("ZREPL_ENDPOINT_RECV_PEEK_SIZE", 1<<20) log.WithField("max_peek_bytes", MaxPeek).Info("peeking incoming stream") diff --git a/go.mod b/go.mod index a63e7c7..9888fe9 100644 --- a/go.mod +++ b/go.mod @@ -7,12 +7,15 @@ require ( github.com/gdamore/tcell v1.2.0 github.com/gitchander/permutation v0.0.0-20181107151852-9e56b92e9909 github.com/go-logfmt/logfmt v0.4.0 + github.com/go-playground/universal-translator v0.17.0 // indirect + github.com/go-playground/validator v9.31.0+incompatible github.com/go-sql-driver/mysql v1.4.1-0.20190907122137-b2c03bcae3d4 github.com/golang/protobuf v1.4.3 github.com/google/uuid v1.1.2 github.com/jinzhu/copier v0.0.0-20170922082739-db4671f3a9b8 github.com/kisielk/gotool v1.0.0 // indirect github.com/kr/pretty v0.1.0 + github.com/leodido/go-urn v1.2.1 // indirect github.com/lib/pq v1.2.0 github.com/mattn/go-colorable v0.1.4 // indirect github.com/mattn/go-isatty v0.0.8 @@ -29,7 +32,7 @@ require ( github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44 // go1.12 thinks it needs this github.com/spf13/cobra v0.0.2 github.com/spf13/pflag v1.0.5 - github.com/stretchr/testify v1.5.1 + github.com/stretchr/testify v1.6.1 github.com/willf/bitset v1.1.10 github.com/yudai/gojsondiff v0.0.0-20170107030110-7b1b7adf999d github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // go1.12 thinks it needs this diff --git a/go.sum b/go.sum index 04805ce..3bef2ae 100644 --- a/go.sum +++ b/go.sum @@ -59,6 +59,12 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0 h1:MP4Eh7ZCb31lleYCFuwm0oe4/YGak+5l1vA2NOE80nA= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8= +github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= +github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= +github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/validator v9.31.0+incompatible h1:UA72EPEogEnq76ehGdEDp4Mit+3FDh548oRqwVgNsHA= +github.com/go-playground/validator v9.31.0+incompatible/go.mod h1:yrEkQXlcI+PugkyDjY2bRrL/UBU4f3rvrgkN3V8JEig= github.com/go-sql-driver/mysql v1.4.1-0.20190907122137-b2c03bcae3d4 h1:0suja/iKSDbEIYLbrS/8C7iArJiWpgCNcR+zwAHu7Ig= github.com/go-sql-driver/mysql v1.4.1-0.20190907122137-b2c03bcae3d4/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -171,6 +177,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= +github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/logrusorgru/aurora v0.0.0-20181002194514-a7b3b318ed4e/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= @@ -300,6 +308,8 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/theckman/goconstraint v1.11.0 h1:oBUwN5wpE4dwyPhRGraEgJsFTr+JtLWiDnaJZJeeXI0= github.com/theckman/goconstraint v1.11.0/go.mod h1:zkCR/f2kOULTk/h1ujgyB9BlCNLaqlQ6GN2Zl4mg81g= github.com/timakin/bodyclose v0.0.0-20190407043127-4a873e97b2bb/go.mod h1:Qimiffbc6q9tBWlVV6x0P9sat/ao1xEkREYPPj9hphk= @@ -380,6 +390,7 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn golang.org/x/text v0.0.0-20170915090833-1cbadb444a80/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -460,6 +471,8 @@ gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bl gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/platformtest/tests/replication.go b/platformtest/tests/replication.go index ee08c2a..ed143c0 100644 --- a/platformtest/tests/replication.go +++ b/platformtest/tests/replication.go @@ -7,6 +7,7 @@ import ( "os" "path" "sort" + "time" "github.com/kr/pretty" "github.com/stretchr/testify/require" @@ -15,6 +16,7 @@ import ( "github.com/zrepl/zrepl/endpoint" "github.com/zrepl/zrepl/platformtest" "github.com/zrepl/zrepl/replication" + "github.com/zrepl/zrepl/replication/driver" "github.com/zrepl/zrepl/replication/logic" "github.com/zrepl/zrepl/replication/logic/pdu" "github.com/zrepl/zrepl/replication/report" @@ -92,6 +94,11 @@ func (i replicationInvocation) Do(ctx *platformtest.Context) *report.Report { report, wait := replication.Do( ctx, + driver.Config{ + MaxAttempts: 1, + StepQueueConcurrency: 1, + ReconnectHardFailTimeout: 1 * time.Second, + }, logic.NewPlanner(nil, nil, sender, receiver, plannerPolicy), ) wait(true) diff --git a/replication/driver/replication_driver.go b/replication/driver/replication_driver.go index 8aee4c6..69139b1 100644 --- a/replication/driver/replication_driver.go +++ b/replication/driver/replication_driver.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/go-playground/validator" "github.com/kr/pretty" "github.com/pkg/errors" "google.golang.org/grpc/codes" @@ -19,7 +20,6 @@ import ( "github.com/zrepl/zrepl/replication/report" "github.com/zrepl/zrepl/util/chainlock" - "github.com/zrepl/zrepl/util/envconst" ) type interval struct { @@ -84,6 +84,7 @@ type Planner interface { // an attempt represents a single planning & execution of fs replications type attempt struct { planner Planner + config Config l *chainlock.L @@ -181,10 +182,25 @@ type step struct { type ReportFunc func() *report.Report type WaitFunc func(block bool) (done bool) -var maxAttempts = envconst.Int64("ZREPL_REPLICATION_MAX_ATTEMPTS", 3) -var reconnectHardFailTimeout = envconst.Duration("ZREPL_REPLICATION_RECONNECT_HARD_FAIL_TIMEOUT", 10*time.Minute) +type Config struct { + StepQueueConcurrency int `validate:"gte=1"` + MaxAttempts int `validate:"eq=-1|gt=0"` + ReconnectHardFailTimeout time.Duration `validate:"gt=0"` +} + +var validate = validator.New() + +func (c Config) Validate() error { + return validate.Struct(c) +} + +// caller must ensure config.Validate() == nil +func Do(ctx context.Context, config Config, planner Planner) (ReportFunc, WaitFunc) { + + if err := config.Validate(); err != nil { + panic(err) + } -func Do(ctx context.Context, planner Planner) (ReportFunc, WaitFunc) { log := getLog(ctx) l := chainlock.New() run := &run{ @@ -201,7 +217,7 @@ func Do(ctx context.Context, planner Planner) (ReportFunc, WaitFunc) { defer log.Debug("run ended") var prev *attempt mainLog := log - for ano := 0; ano < int(maxAttempts) || maxAttempts == 0; ano++ { + for ano := 0; ano < int(config.MaxAttempts); ano++ { log := mainLog.WithField("attempt_number", ano) log.Debug("start attempt") @@ -213,6 +229,7 @@ func Do(ctx context.Context, planner Planner) (ReportFunc, WaitFunc) { l: l, startedAt: time.Now(), planner: planner, + config: config, } run.attempts = append(run.attempts, cur) run.l.DropWhile(func() { @@ -248,7 +265,7 @@ func Do(ctx context.Context, planner Planner) (ReportFunc, WaitFunc) { shouldReconnect := mostRecentErrClass == errorClassTemporaryConnectivityRelated log.WithField("reconnect_decision", shouldReconnect).Debug("reconnect decision made") if shouldReconnect { - run.waitReconnect.Set(time.Now(), reconnectHardFailTimeout) + run.waitReconnect.Set(time.Now(), config.ReconnectHardFailTimeout) log.WithField("deadline", run.waitReconnect.End()).Error("temporary connectivity-related error identified, start waiting for reconnect") var connectErr error var connectErrTime time.Time @@ -421,7 +438,7 @@ func (a *attempt) doFilesystems(ctx context.Context, prevs map[*fs]*fs) { defer a.l.Lock().Unlock() stepQueue := newStepQueue() - defer stepQueue.Start(envconst.Int("ZREPL_REPLICATION_EXPERIMENTAL_REPLICATION_CONCURRENCY", 1))() // TODO parallel replication + defer stepQueue.Start(a.config.StepQueueConcurrency)() var fssesDone sync.WaitGroup for _, f := range a.fss { fssesDone.Add(1) diff --git a/replication/driver/replication_driver_test.go b/replication/driver/replication_driver_test.go index e5c477a..3f7f75b 100644 --- a/replication/driver/replication_driver_test.go +++ b/replication/driver/replication_driver_test.go @@ -154,7 +154,12 @@ func TestReplication(t *testing.T) { defer trace.WithTaskFromStackUpdateCtx(&ctx)() mp := &mockPlanner{} - getReport, wait := Do(ctx, mp) + driverConfig := Config{ + StepQueueConcurrency: 1, + MaxAttempts: 1, + ReconnectHardFailTimeout: 1 * time.Second, + } + getReport, wait := Do(ctx, driverConfig, mp) begin := time.Now() fireAt := []time.Duration{ // the following values are relative to the start diff --git a/replication/logic/replication_logic.go b/replication/logic/replication_logic.go index d026af7..cc5bbe7 100644 --- a/replication/logic/replication_logic.go +++ b/replication/logic/replication_logic.go @@ -19,7 +19,6 @@ import ( "github.com/zrepl/zrepl/replication/report" "github.com/zrepl/zrepl/util/bytecounter" "github.com/zrepl/zrepl/util/chainlock" - "github.com/zrepl/zrepl/util/envconst" "github.com/zrepl/zrepl/util/semaphore" "github.com/zrepl/zrepl/zfs" ) @@ -222,7 +221,11 @@ func (s *Step) ReportInfo() *report.StepInfo { } } +// caller must ensure policy.Validate() == nil func NewPlanner(secsPerState *prometheus.HistogramVec, bytesReplicated *prometheus.CounterVec, sender Sender, receiver Receiver, policy PlannerPolicy) *Planner { + if err := policy.Validate(); err != nil { + panic(err) + } return &Planner{ sender: sender, receiver: receiver, @@ -272,7 +275,7 @@ func (p *Planner) doPlanning(ctx context.Context) ([]*Filesystem, error) { } rfss := rlfssres.GetFilesystems() - sizeEstimateRequestSem := semaphore.New(envconst.Int64("ZREPL_REPLICATION_MAX_CONCURRENT_SIZE_ESTIMATE", 4)) + sizeEstimateRequestSem := semaphore.New(int64(p.policy.SizeEstimationConcurrency)) q := make([]*Filesystem, 0, len(sfss)) for _, fs := range sfss { diff --git a/replication/logic/replication_logic_policy.go b/replication/logic/replication_logic_policy.go index 30eb078..052d41d 100644 --- a/replication/logic/replication_logic_policy.go +++ b/replication/logic/replication_logic_policy.go @@ -1,6 +1,7 @@ package logic import ( + "github.com/go-playground/validator" "github.com/pkg/errors" "github.com/zrepl/zrepl/config" @@ -8,8 +9,15 @@ import ( ) type PlannerPolicy struct { - EncryptedSend tri // all sends must be encrypted (send -w, and encryption!=off) - ReplicationConfig *pdu.ReplicationConfig + EncryptedSend tri // all sends must be encrypted (send -w, and encryption!=off) + ReplicationConfig *pdu.ReplicationConfig + SizeEstimationConcurrency int `validate:"gte=1"` +} + +var validate = validator.New() + +func (p PlannerPolicy) Validate() error { + return validate.Struct(p) } func ReplicationConfigFromConfig(in *config.Replication) (*pdu.ReplicationConfig, error) { diff --git a/replication/replication.go b/replication/replication.go index 7f9e35b..8e1afe2 100644 --- a/replication/replication.go +++ b/replication/replication.go @@ -8,6 +8,6 @@ import ( "github.com/zrepl/zrepl/replication/driver" ) -func Do(ctx context.Context, planner driver.Planner) (driver.ReportFunc, driver.WaitFunc) { - return driver.Do(ctx, planner) +func Do(ctx context.Context, driverConfig driver.Config, planner driver.Planner) (driver.ReportFunc, driver.WaitFunc) { + return driver.Do(ctx, driverConfig, planner) }