mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-24 17:35:01 +01:00
30cdc1430e
This commit - adds a configuration in which no step holds, replication cursors, etc. are created - removes the send.step_holds.disable_incremental setting - creates a new config option `replication` for active-side jobs - adds the replication.protection.{initial,incremental} settings, each of which can have values - `guarantee_resumability` - `guarantee_incremental` - `guarantee_nothing` (refer to docs/configuration/replication.rst for semantics) The `replication` config from an active side is sent to both endpoint.Sender and endpoint.Receiver for each replication step. Sender and Receiver then act accordingly. For `guarantee_incremental`, we add the new `tentative-replication-cursor` abstraction. The necessity for that abstraction is outlined in https://github.com/zrepl/zrepl/issues/340. fixes https://github.com/zrepl/zrepl/issues/340
84 lines
1.9 KiB
Go
84 lines
1.9 KiB
Go
package endpoint
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"regexp"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/zrepl/zrepl/zfs"
|
|
)
|
|
|
|
var stepHoldTagRE = regexp.MustCompile("^zrepl_STEP_J_(.+)")
|
|
|
|
func StepHoldTag(jobid JobID) (string, error) {
|
|
return stepHoldTagImpl(jobid.String())
|
|
}
|
|
|
|
func stepHoldTagImpl(jobid string) (string, error) {
|
|
t := fmt.Sprintf("zrepl_STEP_J_%s", jobid)
|
|
if err := zfs.ValidHoldTag(t); err != nil {
|
|
return "", err
|
|
}
|
|
return t, nil
|
|
}
|
|
|
|
// err != nil always means that the bookmark is not a step bookmark
|
|
func ParseStepHoldTag(tag string) (JobID, error) {
|
|
match := stepHoldTagRE.FindStringSubmatch(tag)
|
|
if match == nil {
|
|
return JobID{}, fmt.Errorf("parse hold tag: match regex %q", stepHoldTagRE)
|
|
}
|
|
jobID, err := MakeJobID(match[1])
|
|
if err != nil {
|
|
return JobID{}, errors.Wrap(err, "parse hold tag: invalid job id field")
|
|
}
|
|
return jobID, nil
|
|
}
|
|
|
|
// idempotently hold `version`
|
|
func HoldStep(ctx context.Context, fs string, v zfs.FilesystemVersion, jobID JobID) (Abstraction, error) {
|
|
if !v.IsSnapshot() {
|
|
panic(fmt.Sprintf("version must be a snapshot got %#v", v))
|
|
}
|
|
|
|
tag, err := StepHoldTag(jobID)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "step hold tag")
|
|
}
|
|
|
|
if err := zfs.ZFSHold(ctx, fs, v, tag); err != nil {
|
|
return nil, errors.Wrap(err, "step hold: zfs")
|
|
}
|
|
|
|
return &holdBasedAbstraction{
|
|
Type: AbstractionStepHold,
|
|
FS: fs,
|
|
Tag: tag,
|
|
JobID: jobID,
|
|
FilesystemVersion: v,
|
|
}, nil
|
|
|
|
}
|
|
|
|
var _ HoldExtractor = StepHoldExtractor
|
|
|
|
func StepHoldExtractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion, holdTag string) Abstraction {
|
|
if v.Type != zfs.Snapshot {
|
|
panic("impl error")
|
|
}
|
|
|
|
jobID, err := ParseStepHoldTag(holdTag)
|
|
if err == nil {
|
|
return &holdBasedAbstraction{
|
|
Type: AbstractionStepHold,
|
|
FS: fs.ToString(),
|
|
Tag: holdTag,
|
|
FilesystemVersion: v,
|
|
JobID: jobID,
|
|
}
|
|
}
|
|
return nil
|
|
}
|