mirror of
synced 2024-12-21 22:51:09 +01:00
replication: simplify parallel replication variables & expose them in config
closes #140
This commit is contained in:
@ -99,7 +99,8 @@ type RecvOptions struct {
type Replication struct {
Protection *ReplicationOptionsProtection `yaml:"protection,optional,fromdefaults"`
Protection *ReplicationOptionsProtection `yaml:"protection,optional,fromdefaults"`
Concurrency *ReplicationOptionsConcurrency `yaml:"concurrency,optional,fromdefaults"`
type ReplicationOptionsProtection struct {
@ -107,6 +108,11 @@ type ReplicationOptionsProtection struct {
Incremental string `yaml:"incremental,optional,default=guarantee_resumability"`
type ReplicationOptionsConcurrency struct {
Steps int `yaml:"steps,optional,default=1"`
SizeEstimates int `yaml:"size_estimates,optional,default=4"`
func (l *RecvOptions) SetDefault() {
*l = RecvOptions{Properties: &PropertyRecvOptions{}}
@ -11,6 +11,7 @@ import (
@ -33,6 +34,8 @@ type ActiveSide struct {
name endpoint.JobID
connecter transport.Connecter
replicationDriverConfig driver.Config
prunerFactory *pruner.PrunerFactory
promRepStateSecs *prometheus.HistogramVec // labels: state
@ -159,8 +162,12 @@ func modePushFromConfig(g *config.Global, in *config.PushJob, jobID endpoint.Job
m.plannerPolicy = &logic.PlannerPolicy{
EncryptedSend: logic.TriFromBool(in.Send.Encrypted),
ReplicationConfig: replicationConfig,
EncryptedSend: logic.TriFromBool(in.Send.Encrypted),
ReplicationConfig: replicationConfig,
SizeEstimationConcurrency: in.Replication.Concurrency.SizeEstimates,
if err := m.plannerPolicy.Validate(); err != nil {
return nil, errors.Wrap(err, "cannot build planner policy")
if m.snapper, err = snapper.FromConfig(g, m.senderConfig.FSF, in.Snapshotting); err != nil {
@ -254,8 +261,12 @@ func modePullFromConfig(g *config.Global, in *config.PullJob, jobID endpoint.Job
m.plannerPolicy = &logic.PlannerPolicy{
EncryptedSend: logic.DontCare,
ReplicationConfig: replicationConfig,
EncryptedSend: logic.DontCare,
ReplicationConfig: replicationConfig,
SizeEstimationConcurrency: in.Replication.Concurrency.SizeEstimates,
if err := m.plannerPolicy.Validate(); err != nil {
return nil, errors.Wrap(err, "cannot build planner policy")
m.receiverConfig, err = buildReceiverConfig(in, jobID)
@ -266,6 +277,16 @@ func modePullFromConfig(g *config.Global, in *config.PullJob, jobID endpoint.Job
return m, nil
func replicationDriverConfigFromConfig(in *config.Replication) (c driver.Config, err error) {
c = driver.Config{
StepQueueConcurrency: in.Concurrency.Steps,
MaxAttempts: envconst.Int("ZREPL_REPLICATION_MAX_ATTEMPTS", 3),
ReconnectHardFailTimeout: envconst.Duration("ZREPL_REPLICATION_RECONNECT_HARD_FAIL_TIMEOUT", 10*time.Minute),
err = c.Validate()
return c, err
func activeSide(g *config.Global, in *config.ActiveJob, configJob interface{}) (j *ActiveSide, err error) {
j = &ActiveSide{}
@ -326,6 +347,11 @@ func activeSide(g *config.Global, in *config.ActiveJob, configJob interface{}) (
return nil, err
j.replicationDriverConfig, err = replicationDriverConfigFromConfig(in.Replication)
if err != nil {
return nil, errors.Wrap(err, "cannot build replication driver config")
return j, nil
@ -459,7 +485,7 @@ func (j *ActiveSide) do(ctx context.Context) {
*tasks = activeSideTasks{}
tasks.replicationCancel = func() { repCancel(); endSpan() }
tasks.replicationReport, repWait = replication.Do(
ctx, logic.NewPlanner(j.promRepStateSecs, j.promBytesReplicated, sender, receiver, j.mode.PlannerPolicy()),
ctx, j.replicationDriverConfig, logic.NewPlanner(j.promRepStateSecs, j.promBytesReplicated, sender, receiver, j.mode.PlannerPolicy()),
tasks.state = ActiveSideReplicating
@ -144,3 +144,113 @@ func TestSampleConfigsAreBuiltWithoutErrors(t *testing.T) {
func TestReplicationOptions(t *testing.T) {
tmpl := `
- name: foo
type: push
type: local
listener_name: foo
client_identity: bar
filesystems: {"<": true}
type: manual
- type: last_n
count: 10
- type: last_n
count: 10
type Test struct {
name string
input string
expectOk func(t *testing.T, a *ActiveSide, m *modePush)
expectError bool
tests := []Test{
name: "defaults",
input: `
replication: {}
expectOk: func(t *testing.T, a *ActiveSide, m *modePush) {},
name: "steps_zero",
input: `
steps: 0
expectError: true,
name: "size_estimates_zero",
input: `
size_estimates: 0
expectError: true,
name: "custom_values",
input: `
steps: 23
size_estimates: 42
expectOk: func(t *testing.T, a *ActiveSide, m *modePush) {
assert.Equal(t, 23, a.replicationDriverConfig.StepQueueConcurrency)
assert.Equal(t, 42, m.plannerPolicy.SizeEstimationConcurrency)
name: "negative_values_forbidden",
input: `
steps: -23
size_estimates: -42
expectError: true,
fill := func(s string) string { return fmt.Sprintf(tmpl, s) }
for _, ts := range tests {
t.Run(ts.name, func(t *testing.T) {
assert.True(t, (ts.expectError) != (ts.expectOk != nil))
cstr := fill(ts.input)
t.Logf("testing config:\n%s", cstr)
c, err := config.ParseConfigBytes([]byte(cstr))
require.NoError(t, err)
jobs, err := JobsFromConfig(c)
if ts.expectOk != nil {
require.NoError(t, err)
require.NotNil(t, c)
require.NoError(t, err)
require.Len(t, jobs, 1)
a := jobs[0].(*ActiveSide)
m := a.mode.(*modePush)
ts.expectOk(t, a, m)
} else if ts.expectError {
require.Error(t, err)
} else {
t.Fatalf("test must define expectOk or expectError")
@ -14,6 +14,10 @@ Replication Options
initial: guarantee_resumability # guarantee_{resumability,incremental,nothing}
incremental: guarantee_resumability # guarantee_{resumability,incremental,nothing}
size_estimates: 4
steps: 1
.. _replication-option-protection:
@ -45,3 +49,26 @@ which is useful if replication happens so rarely (or fails so frequently) that t
When changing this flag, obsoleted zrepl-managed bookmarks and holds will be destroyed on the next replication step that is attempted for each filesystem.
.. _replication-option-concurrency:
``concurrency`` option
The ``concurrency`` options control the maximum amount of concurrency during replication.
The default values allow some concurrency during size estimation but no parallelism for the actual replication.
* ``concurrency.steps`` (default = 1) controls the maximum number of concurrently executed :ref:`replication steps <overview-how-replication-works>`.
The planning step for each file system is counted as a single step.
* ``concurrency.size_estimates`` (default = 4) controls the maximum number of concurrent step size estimations done by the job.
Note that initial replication cannot start replicating child filesystems before the parent filesystem's initial replication step has completed.
Some notes on tuning these values:
* Disk: Size estimation is less I/O intensive than step execution because it does not need to access the data blocks.
* CPU: Size estimation is usually a dense CPU burst whereas step execution CPU utilization is stretched out over time because of disk IO.
Faster disks, sending a compressed dataset in :ref:`plain mode <zfs-background-knowledge-plain-vs-raw-sends>` and the zrepl transport mode all contribute to higher CPU requirements.
* Network bandwidth: Size estimation does not consume meaningful amounts of bandwidth, step execution does.
* :ref:`zrepl ZFS abstractions <zrepl-zfs-abstractions>`: for each replication step zrepl needs to update its ZFS abstractions through the ``zfs`` command which often waits multiple seconds for the zpool to sync.
Thus, if the actual send & recv time of a step is small compared to the time spent on zrepl ZFS abstractions then increasing step execution concurrency will result in a lower overall turnaround time.
@ -18,7 +18,6 @@ import (
zfsprop "github.com/zrepl/zrepl/zfs/property"
@ -130,9 +129,6 @@ func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesyst
var maxConcurrentZFSSend = envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_SEND", 10)
var maxConcurrentZFSSendSemaphore = semaphore.New(maxConcurrentZFSSend)
func uncheckedSendArgsFromPDU(fsv *pdu.FilesystemVersion) *zfs.ZFSSendArgVersion {
if fsv == nil {
return nil
@ -199,16 +195,6 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
return nil, nil, errors.Wrap(err, "validate send arguments")
getLogger(ctx).Debug("acquire concurrent send semaphore")
// TODO use try-acquire and fail with resource-exhaustion rpc status
// => would require handling on the client-side
// => this is a dataconn endpoint, doesn't have the status code semantics of gRPC
guard, err := maxConcurrentZFSSendSemaphore.Acquire(ctx)
if err != nil {
return nil, nil, err
defer guard.Release()
si, err := zfs.ZFSSendDry(ctx, sendArgs)
if err != nil {
return nil, nil, errors.Wrap(err, "zfs send dry failed")
@ -682,8 +668,6 @@ func (s *Receiver) Send(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, io
return nil, nil, fmt.Errorf("receiver does not implement Send()")
var maxConcurrentZFSRecvSemaphore = semaphore.New(envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_RECV", 10))
func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.ReadCloser) (*pdu.ReceiveRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
@ -803,16 +787,6 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.
return nil, errors.Wrap(err, "cannot determine whether we can use resumable send & recv")
log.Debug("acquire concurrent recv semaphore")
// TODO use try-acquire and fail with resource-exhaustion rpc status
// => would require handling on the client-side
// => this is a dataconn endpoint, doesn't have the status code semantics of gRPC
guard, err := maxConcurrentZFSRecvSemaphore.Acquire(ctx)
if err != nil {
return nil, err
defer guard.Release()
var peek bytes.Buffer
var MaxPeek = envconst.Int64("ZREPL_ENDPOINT_RECV_PEEK_SIZE", 1<<20)
log.WithField("max_peek_bytes", MaxPeek).Info("peeking incoming stream")
@ -7,12 +7,15 @@ require (
github.com/gdamore/tcell v1.2.0
github.com/gitchander/permutation v0.0.0-20181107151852-9e56b92e9909
github.com/go-logfmt/logfmt v0.4.0
github.com/go-playground/universal-translator v0.17.0 // indirect
github.com/go-playground/validator v9.31.0+incompatible
github.com/go-sql-driver/mysql v1.4.1-0.20190907122137-b2c03bcae3d4
github.com/golang/protobuf v1.4.3
github.com/google/uuid v1.1.2
github.com/jinzhu/copier v0.0.0-20170922082739-db4671f3a9b8
github.com/kisielk/gotool v1.0.0 // indirect
github.com/kr/pretty v0.1.0
github.com/leodido/go-urn v1.2.1 // indirect
github.com/lib/pq v1.2.0
github.com/mattn/go-colorable v0.1.4 // indirect
github.com/mattn/go-isatty v0.0.8
@ -29,7 +32,7 @@ require (
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44 // go1.12 thinks it needs this
github.com/spf13/cobra v0.0.2
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.5.1
github.com/stretchr/testify v1.6.1
github.com/willf/bitset v1.1.10
github.com/yudai/gojsondiff v0.0.0-20170107030110-7b1b7adf999d
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // go1.12 thinks it needs this
@ -59,6 +59,12 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0 h1:MP4Eh7ZCb31lleYCFuwm0oe4/YGak+5l1vA2NOE80nA=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8=
github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no=
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
github.com/go-playground/validator v9.31.0+incompatible h1:UA72EPEogEnq76ehGdEDp4Mit+3FDh548oRqwVgNsHA=
github.com/go-playground/validator v9.31.0+incompatible/go.mod h1:yrEkQXlcI+PugkyDjY2bRrL/UBU4f3rvrgkN3V8JEig=
github.com/go-sql-driver/mysql v1.4.1-0.20190907122137-b2c03bcae3d4 h1:0suja/iKSDbEIYLbrS/8C7iArJiWpgCNcR+zwAHu7Ig=
github.com/go-sql-driver/mysql v1.4.1-0.20190907122137-b2c03bcae3d4/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
@ -171,6 +177,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w=
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/logrusorgru/aurora v0.0.0-20181002194514-a7b3b318ed4e/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
@ -300,6 +308,8 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/theckman/goconstraint v1.11.0 h1:oBUwN5wpE4dwyPhRGraEgJsFTr+JtLWiDnaJZJeeXI0=
github.com/theckman/goconstraint v1.11.0/go.mod h1:zkCR/f2kOULTk/h1ujgyB9BlCNLaqlQ6GN2Zl4mg81g=
github.com/timakin/bodyclose v0.0.0-20190407043127-4a873e97b2bb/go.mod h1:Qimiffbc6q9tBWlVV6x0P9sat/ao1xEkREYPPj9hphk=
@ -380,6 +390,7 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn
golang.org/x/text v0.0.0-20170915090833-1cbadb444a80/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
@ -460,6 +471,8 @@ gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bl
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
@ -7,6 +7,7 @@ import (
@ -15,6 +16,7 @@ import (
@ -92,6 +94,11 @@ func (i replicationInvocation) Do(ctx *platformtest.Context) *report.Report {
report, wait := replication.Do(
MaxAttempts: 1,
StepQueueConcurrency: 1,
ReconnectHardFailTimeout: 1 * time.Second,
logic.NewPlanner(nil, nil, sender, receiver, plannerPolicy),
@ -9,6 +9,7 @@ import (
@ -19,7 +20,6 @@ import (
type interval struct {
@ -84,6 +84,7 @@ type Planner interface {
// an attempt represents a single planning & execution of fs replications
type attempt struct {
planner Planner
config Config
l *chainlock.L
@ -181,10 +182,25 @@ type step struct {
type ReportFunc func() *report.Report
type WaitFunc func(block bool) (done bool)
var maxAttempts = envconst.Int64("ZREPL_REPLICATION_MAX_ATTEMPTS", 3)
var reconnectHardFailTimeout = envconst.Duration("ZREPL_REPLICATION_RECONNECT_HARD_FAIL_TIMEOUT", 10*time.Minute)
type Config struct {
StepQueueConcurrency int `validate:"gte=1"`
MaxAttempts int `validate:"eq=-1|gt=0"`
ReconnectHardFailTimeout time.Duration `validate:"gt=0"`
var validate = validator.New()
func (c Config) Validate() error {
return validate.Struct(c)
// caller must ensure config.Validate() == nil
func Do(ctx context.Context, config Config, planner Planner) (ReportFunc, WaitFunc) {
if err := config.Validate(); err != nil {
func Do(ctx context.Context, planner Planner) (ReportFunc, WaitFunc) {
log := getLog(ctx)
l := chainlock.New()
run := &run{
@ -201,7 +217,7 @@ func Do(ctx context.Context, planner Planner) (ReportFunc, WaitFunc) {
defer log.Debug("run ended")
var prev *attempt
mainLog := log
for ano := 0; ano < int(maxAttempts) || maxAttempts == 0; ano++ {
for ano := 0; ano < int(config.MaxAttempts); ano++ {
log := mainLog.WithField("attempt_number", ano)
log.Debug("start attempt")
@ -213,6 +229,7 @@ func Do(ctx context.Context, planner Planner) (ReportFunc, WaitFunc) {
l: l,
startedAt: time.Now(),
planner: planner,
config: config,
run.attempts = append(run.attempts, cur)
run.l.DropWhile(func() {
@ -248,7 +265,7 @@ func Do(ctx context.Context, planner Planner) (ReportFunc, WaitFunc) {
shouldReconnect := mostRecentErrClass == errorClassTemporaryConnectivityRelated
log.WithField("reconnect_decision", shouldReconnect).Debug("reconnect decision made")
if shouldReconnect {
run.waitReconnect.Set(time.Now(), reconnectHardFailTimeout)
run.waitReconnect.Set(time.Now(), config.ReconnectHardFailTimeout)
log.WithField("deadline", run.waitReconnect.End()).Error("temporary connectivity-related error identified, start waiting for reconnect")
var connectErr error
var connectErrTime time.Time
@ -421,7 +438,7 @@ func (a *attempt) doFilesystems(ctx context.Context, prevs map[*fs]*fs) {
defer a.l.Lock().Unlock()
stepQueue := newStepQueue()
defer stepQueue.Start(envconst.Int("ZREPL_REPLICATION_EXPERIMENTAL_REPLICATION_CONCURRENCY", 1))() // TODO parallel replication
defer stepQueue.Start(a.config.StepQueueConcurrency)()
var fssesDone sync.WaitGroup
for _, f := range a.fss {
@ -154,7 +154,12 @@ func TestReplication(t *testing.T) {
defer trace.WithTaskFromStackUpdateCtx(&ctx)()
mp := &mockPlanner{}
getReport, wait := Do(ctx, mp)
driverConfig := Config{
StepQueueConcurrency: 1,
MaxAttempts: 1,
ReconnectHardFailTimeout: 1 * time.Second,
getReport, wait := Do(ctx, driverConfig, mp)
begin := time.Now()
fireAt := []time.Duration{
// the following values are relative to the start
@ -19,7 +19,6 @@ import (
@ -222,7 +221,11 @@ func (s *Step) ReportInfo() *report.StepInfo {
// caller must ensure policy.Validate() == nil
func NewPlanner(secsPerState *prometheus.HistogramVec, bytesReplicated *prometheus.CounterVec, sender Sender, receiver Receiver, policy PlannerPolicy) *Planner {
if err := policy.Validate(); err != nil {
return &Planner{
sender: sender,
receiver: receiver,
@ -272,7 +275,7 @@ func (p *Planner) doPlanning(ctx context.Context) ([]*Filesystem, error) {
rfss := rlfssres.GetFilesystems()
sizeEstimateRequestSem := semaphore.New(envconst.Int64("ZREPL_REPLICATION_MAX_CONCURRENT_SIZE_ESTIMATE", 4))
sizeEstimateRequestSem := semaphore.New(int64(p.policy.SizeEstimationConcurrency))
q := make([]*Filesystem, 0, len(sfss))
for _, fs := range sfss {
@ -1,6 +1,7 @@
package logic
import (
@ -8,8 +9,15 @@ import (
type PlannerPolicy struct {
EncryptedSend tri // all sends must be encrypted (send -w, and encryption!=off)
ReplicationConfig *pdu.ReplicationConfig
EncryptedSend tri // all sends must be encrypted (send -w, and encryption!=off)
ReplicationConfig *pdu.ReplicationConfig
SizeEstimationConcurrency int `validate:"gte=1"`
var validate = validator.New()
func (p PlannerPolicy) Validate() error {
return validate.Struct(p)
func ReplicationConfigFromConfig(in *config.Replication) (*pdu.ReplicationConfig, error) {
@ -8,6 +8,6 @@ import (
func Do(ctx context.Context, planner driver.Planner) (driver.ReportFunc, driver.WaitFunc) {
return driver.Do(ctx, planner)
func Do(ctx context.Context, driverConfig driver.Config, planner driver.Planner) (driver.ReportFunc, driver.WaitFunc) {
return driver.Do(ctx, driverConfig, planner)
Reference in New Issue
Block a user