zrepl/endpoint/endpoint_zfs_abstraction_last_received_hold.go
Christian Schwarz 30cdc1430e replication + endpoint: replication guarantees: guarantee_{resumability,incremental,nothing}
This commit

- adds a configuration in which no step holds, replication cursors, etc. are created
- removes the send.step_holds.disable_incremental setting
- creates a new config option `replication` for active-side jobs
- adds the replication.protection.{initial,incremental} settings, each
  of which can have values
    - `guarantee_resumability`
    - `guarantee_incremental`
    - `guarantee_nothing`
  (refer to docs/configuration/replication.rst for semantics)

The `replication` config from an active side is sent to both endpoint.Sender and endpoint.Receiver
for each replication step. Sender and Receiver then act accordingly.

For `guarantee_incremental`, we add the new `tentative-replication-cursor` abstraction.
The necessity for that abstraction is outlined in https://github.com/zrepl/zrepl/issues/340.

fixes https://github.com/zrepl/zrepl/issues/340
2020-07-26 20:32:35 +02:00

92 lines
2.3 KiB
Go

package endpoint
import (
"context"
"fmt"
"regexp"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/zfs"
)
const (
LastReceivedHoldTagNamePrefix = "zrepl_last_received_J_"
)
var lastReceivedHoldTagRE = regexp.MustCompile("^zrepl_last_received_J_(.+)$")
var _ HoldExtractor = LastReceivedHoldExtractor
func LastReceivedHoldExtractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion, holdTag string) Abstraction {
var err error
if v.Type != zfs.Snapshot {
panic("impl error")
}
jobID, err := ParseLastReceivedHoldTag(holdTag)
if err == nil {
return &holdBasedAbstraction{
Type: AbstractionLastReceivedHold,
FS: fs.ToString(),
FilesystemVersion: v,
Tag: holdTag,
JobID: jobID,
}
}
return nil
}
// err != nil always means that the bookmark is not a step bookmark
func ParseLastReceivedHoldTag(tag string) (JobID, error) {
match := lastReceivedHoldTagRE.FindStringSubmatch(tag)
if match == nil {
return JobID{}, errors.Errorf("parse last-received-hold tag: does not match regex %s", lastReceivedHoldTagRE.String())
}
jobId, err := MakeJobID(match[1])
if err != nil {
return JobID{}, errors.Wrap(err, "parse last-received-hold tag: invalid job id field")
}
return jobId, nil
}
func LastReceivedHoldTag(jobID JobID) (string, error) {
return lastReceivedHoldImpl(jobID.String())
}
func lastReceivedHoldImpl(jobid string) (string, error) {
tag := fmt.Sprintf("%s%s", LastReceivedHoldTagNamePrefix, jobid)
if err := zfs.ValidHoldTag(tag); err != nil {
return "", err
}
return tag, nil
}
func CreateLastReceivedHold(ctx context.Context, fs string, to zfs.FilesystemVersion, jobID JobID) (Abstraction, error) {
if !to.IsSnapshot() {
return nil, errors.Errorf("last-received-hold: target must be a snapshot: %s", to.FullPath(fs))
}
tag, err := LastReceivedHoldTag(jobID)
if err != nil {
return nil, errors.Wrap(err, "last-received-hold: hold tag")
}
// we never want to be without a hold
// => hold new one before releasing old hold
err = zfs.ZFSHold(ctx, fs, to, tag)
if err != nil {
return nil, errors.Wrap(err, "last-received-hold: hold newly received")
}
return &holdBasedAbstraction{
Type: AbstractionLastReceivedHold,
FS: fs,
FilesystemVersion: to,
JobID: jobID,
Tag: tag,
}, nil
}