zrepl/daemon/job/build_jobs_sendrecvoptions.go
Christian Schwarz 30cdc1430e replication + endpoint: replication guarantees: guarantee_{resumability,incremental,nothing}
This commit

- adds a configuration in which no step holds, replication cursors, etc. are created
- removes the send.step_holds.disable_incremental setting
- creates a new config option `replication` for active-side jobs
- adds the replication.protection.{initial,incremental} settings, each
  of which can have values
    - `guarantee_resumability`
    - `guarantee_incremental`
    - `guarantee_nothing`
  (refer to docs/configuration/replication.rst for semantics)

The `replication` config from an active side is sent to both endpoint.Sender and endpoint.Receiver
for each replication step. Sender and Receiver then act accordingly.

For `guarantee_incremental`, we add the new `tentative-replication-cursor` abstraction.
The necessity for that abstraction is outlined in https://github.com/zrepl/zrepl/issues/340.

fixes https://github.com/zrepl/zrepl/issues/340
2020-07-26 20:32:35 +02:00

56 lines
1.5 KiB
Go

package job
import (
"github.com/pkg/errors"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/filters"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/zfs"
)
type SendingJobConfig interface {
GetFilesystems() config.FilesystemsFilter
GetSendOptions() *config.SendOptions // must not be nil
}
func buildSenderConfig(in SendingJobConfig, jobID endpoint.JobID) (*endpoint.SenderConfig, error) {
fsf, err := filters.DatasetMapFilterFromConfig(in.GetFilesystems())
if err != nil {
return nil, errors.Wrap(err, "cannot build filesystem filter")
}
return &endpoint.SenderConfig{
FSF: fsf,
Encrypt: &zfs.NilBool{B: in.GetSendOptions().Encrypted},
JobID: jobID,
}, nil
}
type ReceivingJobConfig interface {
GetRootFS() string
GetAppendClientIdentity() bool
GetRecvOptions() *config.RecvOptions
}
func buildReceiverConfig(in ReceivingJobConfig, jobID endpoint.JobID) (rc endpoint.ReceiverConfig, err error) {
rootFs, err := zfs.NewDatasetPath(in.GetRootFS())
if err != nil {
return rc, errors.New("root_fs is not a valid zfs filesystem path")
}
if rootFs.Length() <= 0 {
return rc, errors.New("root_fs must not be empty") // duplicates error check of receiver
}
rc = endpoint.ReceiverConfig{
JobID: jobID,
RootWithoutClientComponent: rootFs,
AppendClientIdentity: in.GetAppendClientIdentity(),
}
if err := rc.Validate(); err != nil {
return rc, errors.Wrap(err, "cannot build receiver config")
}
return rc, nil
}