[#316] endpoint: delete unreachable code

This commit is contained in:
Christian Schwarz 2020-05-20 12:57:54 +02:00
parent 292b85b5ef
commit 1bc731e782
5 changed files with 8 additions and 203 deletions

View File

@ -342,7 +342,7 @@ func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*p
msg := "cannot move replication cursor, keeping hold on `to` until successful"
log(ctx).WithError(err).Error(msg)
err = errors.Wrap(err, msg)
// it is correct to not release the hold if we can't move the cursor!
// it is correct to not destroy from and to step holds if we can't move the cursor!
return &pdu.SendCompletedRes{}, err
}
} else {

View File

@ -141,8 +141,7 @@ func CreateReplicationCursor(ctx context.Context, fs string, target zfs.Filesyst
return nil, zfs.ErrBookmarkCloningNotSupported
}
// idempotently create bookmark (guid is encoded in it, hence we'll most likely add a new one
// cleanup the old one afterwards
// idempotently create bookmark (guid is encoded in it)
cursorBookmark, err := zfs.ZFSBookmark(ctx, fs, target, bookmarkname)
if err != nil {
@ -160,57 +159,9 @@ func CreateReplicationCursor(ctx context.Context, fs string, target zfs.Filesyst
}, nil
}
type ReplicationCursor interface {
GetCreateTXG() uint64
}
func DestroyObsoleteReplicationCursors(ctx context.Context, fs string, current ReplicationCursor, jobID JobID) (_ []Abstraction, err error) {
q := ListZFSHoldsAndBookmarksQuery{
FS: ListZFSHoldsAndBookmarksQueryFilesystemFilter{
FS: &fs,
},
What: AbstractionTypeSet{
AbstractionReplicationCursorBookmarkV2: true,
},
JobID: &jobID,
CreateTXG: CreateTXGRange{
Since: nil,
Until: &CreateTXGRangeBound{
CreateTXG: current.GetCreateTXG(),
Inclusive: &zfs.NilBool{B: false},
},
},
Concurrency: 1,
}
abs, absErr, err := ListAbstractions(ctx, q)
if err != nil {
return nil, errors.Wrap(err, "list abstractions")
}
if len(absErr) > 0 {
return nil, errors.Wrap(ListAbstractionsErrors(absErr), "list abstractions")
}
var destroyed []Abstraction
var errs []error
for res := range BatchDestroy(ctx, abs) {
log := getLogger(ctx).
WithField("replication_cursor_bookmark", res.Abstraction)
if res.DestroyErr != nil {
errs = append(errs, res.DestroyErr)
log.WithError(err).
Error("cannot destroy obsolete replication cursor bookmark")
} else {
destroyed = append(destroyed, res.Abstraction)
log.Info("destroyed obsolete replication cursor bookmark")
}
}
if len(errs) == 0 {
return destroyed, nil
} else {
return destroyed, errorarray.Wrap(errs, "destroy obsolete replication cursor")
}
}
const (
ReplicationCursorBookmarkNamePrefix = "zrepl_last_received_J_"
)
var lastReceivedHoldTagRE = regexp.MustCompile("^zrepl_last_received_J_(.+)$")

View File

@ -7,7 +7,6 @@ import (
"github.com/pkg/errors"
"github.com/zrepl/zrepl/util/errorarray"
"github.com/zrepl/zrepl/zfs"
)
@ -113,131 +112,6 @@ func HoldStep(ctx context.Context, fs string, v zfs.FilesystemVersion, jobID Job
}, nil
}
// idempotently release the step-hold on v if v is a snapshot
// or idempotently destroy the step-bookmark of v if v is a bookmark
//
// note that this operation leaves v itself untouched, unless v is the step-bookmark itself, in which case v is destroyed
//
// returns an instance of *zfs.DatasetDoesNotExist if `v` does not exist
func ReleaseStep(ctx context.Context, fs string, v zfs.FilesystemVersion, jobID JobID) error {
if v.IsSnapshot() {
tag, err := StepHoldTag(jobID)
if err != nil {
return errors.Wrap(err, "step release tag")
}
if err := zfs.ZFSRelease(ctx, tag, v.FullPath(fs)); err != nil {
return errors.Wrap(err, "step release: zfs")
}
return nil
}
if !v.IsBookmark() {
panic(fmt.Sprintf("impl error: expecting version to be a bookmark, got %#v", v))
}
bmname, err := StepBookmarkName(fs, v.Guid, jobID)
if err != nil {
return errors.Wrap(err, "step release: determine bookmark name")
}
// idempotently destroy bookmark
if err := zfs.ZFSDestroyIdempotent(ctx, bmname); err != nil {
return errors.Wrap(err, "step release: bookmark destroy: zfs")
}
return nil
}
// release {step holds, step bookmarks} earlier and including `mostRecent`
func ReleaseStepCummulativeInclusive(ctx context.Context, fs string, since *CreateTXGRangeBound, mostRecent zfs.FilesystemVersion, jobID JobID) error {
q := ListZFSHoldsAndBookmarksQuery{
What: AbstractionTypeSet{
AbstractionStepHold: true,
AbstractionStepBookmark: true,
},
FS: ListZFSHoldsAndBookmarksQueryFilesystemFilter{
FS: &fs,
},
JobID: &jobID,
CreateTXG: CreateTXGRange{
Since: since,
Until: &CreateTXGRangeBound{
CreateTXG: mostRecent.CreateTXG,
Inclusive: &zfs.NilBool{B: true},
},
},
Concurrency: 1,
}
abs, absErrs, err := ListAbstractions(ctx, q)
if err != nil {
return errors.Wrap(err, "step release cummulative: list")
}
if len(absErrs) > 0 {
return errors.Wrap(ListAbstractionsErrors(absErrs), "step release cummulative: list")
}
getLogger(ctx).WithField("step_holds_and_bookmarks", fmt.Sprintf("%s", abs)).Debug("releasing step holds and bookmarks")
var errs []error
for res := range BatchDestroy(ctx, abs) {
log := getLogger(ctx).
WithField("step_hold_or_bookmark", res.Abstraction)
if res.DestroyErr != nil {
errs = append(errs, res.DestroyErr)
log.WithError(err).
Error("cannot release step hold or bookmark")
} else {
log.Info("released step hold or bookmark")
}
}
if len(errs) == 0 {
return nil
} else {
return errorarray.Wrap(errs, "step release cummulative: release")
}
}
func TryReleaseStepStaleFS(ctx context.Context, fs string, jobID JobID) {
q := ListZFSHoldsAndBookmarksQuery{
FS: ListZFSHoldsAndBookmarksQueryFilesystemFilter{
FS: &fs,
},
JobID: &jobID,
What: AbstractionTypeSet{
AbstractionStepHold: true,
AbstractionStepBookmark: true,
AbstractionReplicationCursorBookmarkV2: true,
},
Concurrency: 1,
}
staleness, err := ListStale(ctx, q)
if _, ok := err.(*ListStaleQueryError); ok {
panic(err)
} else if err != nil {
getLogger(ctx).WithError(err).Error("cannot list stale step holds and bookmarks")
return
}
for _, s := range staleness.Stale {
getLogger(ctx).WithField("stale_step_hold_or_bookmark", s).Info("batch-destroying stale step hold or bookmark")
}
for res := range BatchDestroy(ctx, staleness.Stale) {
if res.DestroyErr != nil {
getLogger(ctx).
WithField("stale_step_hold_or_bookmark", res.Abstraction).
WithError(res.DestroyErr).
Error("cannot destroy stale step-hold or bookmark")
} else {
getLogger(ctx).
WithField("stale_step_hold_or_bookmark", res.Abstraction).
WithError(res.DestroyErr).
Info("destroyed stale step-hold or bookmark")
}
}
}
var _ BookmarkExtractor = StepBookmarkExtractor
func StepBookmarkExtractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion) (_ Abstraction) {

View File

@ -11,7 +11,7 @@ import (
"github.com/zrepl/zrepl/zfs"
)
func ReplicationCursor(ctx *platformtest.Context) {
func CreateReplicationCursor(ctx *platformtest.Context) {
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
CREATEROOT
@ -54,13 +54,7 @@ func ReplicationCursor(ctx *platformtest.Context) {
assert.Equal(ctx, zfs.ErrBookmarkCloningNotSupported, err)
// ... for target = replication cursor bookmark to be created
cursorOfCursor, err := endpoint.CreateReplicationCursor(ctx, fs, cursorOfSnapIdemp.GetFilesystemVersion(), jobid)
checkCreateCursor(err, cursorOfCursor, cursorOfCursor.GetFilesystemVersion())
destroyed, err := endpoint.DestroyObsoleteReplicationCursors(ctx, fs, &snap, jobid)
if err != nil {
panic(err)
}
assert.Empty(ctx, destroyed)
checkCreateCursor(err, cursorOfCursor, cursorOfSnap.GetFilesystemVersion())
snapProps, err := zfs.ZFSGetFilesystemVersion(ctx, snap.FullPath(fs))
if err != nil {
@ -78,18 +72,4 @@ func ReplicationCursor(ctx *platformtest.Context) {
panic(fmt.Sprintf("guids do not match: %v != %v", bm.Guid, snapProps.Guid))
}
// try moving
cursor1BookmarkName, err := endpoint.ReplicationCursorBookmarkName(fs, snap.Guid, jobid)
require.NoError(ctx, err)
snap2 := fsversion(ctx, fs, "@2 with space")
_, err = endpoint.CreateReplicationCursor(ctx, fs, snap, jobid)
assert.NoError(ctx, err)
destroyed, err = endpoint.DestroyObsoleteReplicationCursors(ctx, fs, &snap2, jobid)
require.NoError(ctx, err)
require.Equal(ctx, 1, len(destroyed))
require.Equal(ctx, endpoint.AbstractionReplicationCursorBookmarkV2, destroyed[0].GetType())
require.Equal(ctx, cursor1BookmarkName, destroyed[0].GetName())
}

View File

@ -17,7 +17,7 @@ var Cases = []Case{
BatchDestroy,
UndestroyableSnapshotParsing,
GetNonexistent,
ReplicationCursor,
CreateReplicationCursor,
IdempotentHold,
IdempotentBookmark,
IdempotentDestroy,