mirror of
https://github.com/zrepl/zrepl.git
synced 2025-02-17 19:01:12 +01:00
287 lines
8.1 KiB
Go
287 lines
8.1 KiB
Go
|
package endpoint
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"regexp"
|
||
|
|
||
|
"github.com/pkg/errors"
|
||
|
|
||
|
"github.com/zrepl/zrepl/util/errorarray"
|
||
|
"github.com/zrepl/zrepl/zfs"
|
||
|
)
|
||
|
|
||
|
var stepHoldTagRE = regexp.MustCompile("^zrepl_STEP_J_(.+)")
|
||
|
|
||
|
func StepHoldTag(jobid JobID) (string, error) {
|
||
|
return stepHoldTagImpl(jobid.String())
|
||
|
}
|
||
|
|
||
|
func stepHoldTagImpl(jobid string) (string, error) {
|
||
|
t := fmt.Sprintf("zrepl_STEP_J_%s", jobid)
|
||
|
if err := zfs.ValidHoldTag(t); err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
return t, nil
|
||
|
}
|
||
|
|
||
|
// err != nil always means that the bookmark is not a step bookmark
|
||
|
func ParseStepHoldTag(tag string) (JobID, error) {
|
||
|
match := stepHoldTagRE.FindStringSubmatch(tag)
|
||
|
if match == nil {
|
||
|
return JobID{}, fmt.Errorf("parse hold tag: match regex %q", stepHoldTagRE)
|
||
|
}
|
||
|
jobID, err := MakeJobID(match[1])
|
||
|
if err != nil {
|
||
|
return JobID{}, errors.Wrap(err, "parse hold tag: invalid job id field")
|
||
|
}
|
||
|
return jobID, nil
|
||
|
}
|
||
|
|
||
|
const stepBookmarkNamePrefix = "zrepl_STEP"
|
||
|
|
||
|
// v must be validated by caller
|
||
|
func StepBookmarkName(fs string, guid uint64, id JobID) (string, error) {
|
||
|
return stepBookmarkNameImpl(fs, guid, id.String())
|
||
|
}
|
||
|
|
||
|
func stepBookmarkNameImpl(fs string, guid uint64, jobid string) (string, error) {
|
||
|
return makeJobAndGuidBookmarkName(stepBookmarkNamePrefix, fs, guid, jobid)
|
||
|
}
|
||
|
|
||
|
// name is the full bookmark name, including dataset path
|
||
|
//
|
||
|
// err != nil always means that the bookmark is not a step bookmark
|
||
|
func ParseStepBookmarkName(fullname string) (guid uint64, jobID JobID, err error) {
|
||
|
guid, jobID, err = parseJobAndGuidBookmarkName(fullname, stepBookmarkNamePrefix)
|
||
|
if err != nil {
|
||
|
err = errors.Wrap(err, "parse step bookmark name") // no shadow!
|
||
|
}
|
||
|
return guid, jobID, err
|
||
|
}
|
||
|
|
||
|
// idempotently hold / step-bookmark `version`
|
||
|
//
|
||
|
// returns ErrBookmarkCloningNotSupported if version is a bookmark and bookmarking bookmarks is not supported by ZFS
|
||
|
func HoldStep(ctx context.Context, fs string, v zfs.FilesystemVersion, jobID JobID) (Abstraction, error) {
|
||
|
if v.IsSnapshot() {
|
||
|
|
||
|
tag, err := StepHoldTag(jobID)
|
||
|
if err != nil {
|
||
|
return nil, errors.Wrap(err, "step hold tag")
|
||
|
}
|
||
|
|
||
|
if err := zfs.ZFSHold(ctx, fs, v, tag); err != nil {
|
||
|
return nil, errors.Wrap(err, "step hold: zfs")
|
||
|
}
|
||
|
|
||
|
return &holdBasedAbstraction{
|
||
|
Type: AbstractionStepHold,
|
||
|
FS: fs,
|
||
|
Tag: tag,
|
||
|
JobID: jobID,
|
||
|
FilesystemVersion: v,
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
if !v.IsBookmark() {
|
||
|
panic(fmt.Sprintf("version must bei either snapshot or bookmark, got %#v", v))
|
||
|
}
|
||
|
|
||
|
bmname, err := StepBookmarkName(fs, v.Guid, jobID)
|
||
|
if err != nil {
|
||
|
return nil, errors.Wrap(err, "create step bookmark: determine bookmark name")
|
||
|
}
|
||
|
// idempotently create bookmark
|
||
|
err = zfs.ZFSBookmark(ctx, fs, v.ToSendArgVersion(), bmname)
|
||
|
if err != nil {
|
||
|
if err == zfs.ErrBookmarkCloningNotSupported {
|
||
|
// TODO we could actually try to find a local snapshot that has the requested GUID
|
||
|
// however, the replication algorithm prefers snapshots anyways, so this quest
|
||
|
// is most likely not going to be successful. Also, there's the possibility that
|
||
|
// the caller might want to filter what snapshots are eligibile, and this would
|
||
|
// complicate things even further.
|
||
|
return nil, err // TODO go1.13 use wrapping
|
||
|
}
|
||
|
return nil, errors.Wrap(err, "create step bookmark: zfs")
|
||
|
}
|
||
|
return &bookmarkBasedAbstraction{
|
||
|
Type: AbstractionStepBookmark,
|
||
|
FS: fs,
|
||
|
FilesystemVersion: v,
|
||
|
JobID: jobID,
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
// idempotently release the step-hold on v if v is a snapshot
|
||
|
// or idempotently destroy the step-bookmark of v if v is a bookmark
|
||
|
//
|
||
|
// note that this operation leaves v itself untouched, unless v is the step-bookmark itself, in which case v is destroyed
|
||
|
//
|
||
|
// returns an instance of *zfs.DatasetDoesNotExist if `v` does not exist
|
||
|
func ReleaseStep(ctx context.Context, fs string, v zfs.FilesystemVersion, jobID JobID) error {
|
||
|
|
||
|
if v.IsSnapshot() {
|
||
|
tag, err := StepHoldTag(jobID)
|
||
|
if err != nil {
|
||
|
return errors.Wrap(err, "step release tag")
|
||
|
}
|
||
|
|
||
|
if err := zfs.ZFSRelease(ctx, tag, v.FullPath(fs)); err != nil {
|
||
|
return errors.Wrap(err, "step release: zfs")
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
if !v.IsBookmark() {
|
||
|
panic(fmt.Sprintf("impl error: expecting version to be a bookmark, got %#v", v))
|
||
|
}
|
||
|
|
||
|
bmname, err := StepBookmarkName(fs, v.Guid, jobID)
|
||
|
if err != nil {
|
||
|
return errors.Wrap(err, "step release: determine bookmark name")
|
||
|
}
|
||
|
// idempotently destroy bookmark
|
||
|
|
||
|
if err := zfs.ZFSDestroyIdempotent(ctx, bmname); err != nil {
|
||
|
return errors.Wrap(err, "step release: bookmark destroy: zfs")
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// release {step holds, step bookmarks} earlier and including `mostRecent`
|
||
|
func ReleaseStepCummulativeInclusive(ctx context.Context, fs string, since *CreateTXGRangeBound, mostRecent zfs.FilesystemVersion, jobID JobID) error {
|
||
|
q := ListZFSHoldsAndBookmarksQuery{
|
||
|
What: AbstractionTypeSet{
|
||
|
AbstractionStepHold: true,
|
||
|
AbstractionStepBookmark: true,
|
||
|
},
|
||
|
FS: ListZFSHoldsAndBookmarksQueryFilesystemFilter{
|
||
|
FS: &fs,
|
||
|
},
|
||
|
JobID: &jobID,
|
||
|
CreateTXG: CreateTXGRange{
|
||
|
Since: since,
|
||
|
Until: &CreateTXGRangeBound{
|
||
|
CreateTXG: mostRecent.CreateTXG,
|
||
|
Inclusive: &zfs.NilBool{B: true},
|
||
|
},
|
||
|
},
|
||
|
Concurrency: 1,
|
||
|
}
|
||
|
abs, absErrs, err := ListAbstractions(ctx, q)
|
||
|
if err != nil {
|
||
|
return errors.Wrap(err, "step release cummulative: list")
|
||
|
}
|
||
|
if len(absErrs) > 0 {
|
||
|
return errors.Wrap(ListAbstractionsErrors(absErrs), "step release cummulative: list")
|
||
|
}
|
||
|
|
||
|
getLogger(ctx).WithField("step_holds_and_bookmarks", fmt.Sprintf("%s", abs)).Debug("releasing step holds and bookmarks")
|
||
|
|
||
|
var errs []error
|
||
|
for res := range BatchDestroy(ctx, abs) {
|
||
|
log := getLogger(ctx).
|
||
|
WithField("step_hold_or_bookmark", res.Abstraction)
|
||
|
if res.DestroyErr != nil {
|
||
|
errs = append(errs, res.DestroyErr)
|
||
|
log.WithError(err).
|
||
|
Error("cannot release step hold or bookmark")
|
||
|
} else {
|
||
|
log.Info("released step hold or bookmark")
|
||
|
}
|
||
|
}
|
||
|
if len(errs) == 0 {
|
||
|
return nil
|
||
|
} else {
|
||
|
return errorarray.Wrap(errs, "step release cummulative: release")
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TryReleaseStepStaleFS(ctx context.Context, fs string, jobID JobID) {
|
||
|
|
||
|
q := ListZFSHoldsAndBookmarksQuery{
|
||
|
FS: ListZFSHoldsAndBookmarksQueryFilesystemFilter{
|
||
|
FS: &fs,
|
||
|
},
|
||
|
JobID: &jobID,
|
||
|
What: AbstractionTypeSet{
|
||
|
AbstractionStepHold: true,
|
||
|
AbstractionStepBookmark: true,
|
||
|
AbstractionReplicationCursorBookmarkV2: true,
|
||
|
},
|
||
|
Concurrency: 1,
|
||
|
}
|
||
|
staleness, err := ListStale(ctx, q)
|
||
|
if _, ok := err.(*ListStaleQueryError); ok {
|
||
|
panic(err)
|
||
|
} else if err != nil {
|
||
|
getLogger(ctx).WithError(err).Error("cannot list stale step holds and bookmarks")
|
||
|
return
|
||
|
}
|
||
|
for _, s := range staleness.Stale {
|
||
|
getLogger(ctx).WithField("stale_step_hold_or_bookmark", s).Info("batch-destroying stale step hold or bookmark")
|
||
|
}
|
||
|
for res := range BatchDestroy(ctx, staleness.Stale) {
|
||
|
if res.DestroyErr != nil {
|
||
|
getLogger(ctx).
|
||
|
WithField("stale_step_hold_or_bookmark", res.Abstraction).
|
||
|
WithError(res.DestroyErr).
|
||
|
Error("cannot destroy stale step-hold or bookmark")
|
||
|
} else {
|
||
|
getLogger(ctx).
|
||
|
WithField("stale_step_hold_or_bookmark", res.Abstraction).
|
||
|
WithError(res.DestroyErr).
|
||
|
Info("destroyed stale step-hold or bookmark")
|
||
|
}
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
var _ BookmarkExtractor = StepBookmarkExtractor
|
||
|
|
||
|
func StepBookmarkExtractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion) (_ Abstraction) {
|
||
|
if v.Type != zfs.Bookmark {
|
||
|
panic("impl error")
|
||
|
}
|
||
|
|
||
|
fullname := v.ToAbsPath(fs)
|
||
|
|
||
|
guid, jobid, err := ParseStepBookmarkName(fullname)
|
||
|
if guid != v.Guid {
|
||
|
// TODO log this possibly tinkered-with bookmark
|
||
|
return nil
|
||
|
}
|
||
|
if err == nil {
|
||
|
bm := &bookmarkBasedAbstraction{
|
||
|
Type: AbstractionStepBookmark,
|
||
|
FS: fs.ToString(),
|
||
|
FilesystemVersion: v,
|
||
|
JobID: jobid,
|
||
|
}
|
||
|
return bm
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
var _ HoldExtractor = StepHoldExtractor
|
||
|
|
||
|
func StepHoldExtractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion, holdTag string) Abstraction {
|
||
|
if v.Type != zfs.Snapshot {
|
||
|
panic("impl error")
|
||
|
}
|
||
|
|
||
|
jobID, err := ParseStepHoldTag(holdTag)
|
||
|
if err == nil {
|
||
|
return &holdBasedAbstraction{
|
||
|
Type: AbstractionStepHold,
|
||
|
FS: fs.ToString(),
|
||
|
Tag: holdTag,
|
||
|
FilesystemVersion: v,
|
||
|
JobID: jobID,
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|