diff --git a/endpoint/endpoint.go b/endpoint/endpoint.go index b643e12..ee72a08 100644 --- a/endpoint/endpoint.go +++ b/endpoint/endpoint.go @@ -342,7 +342,7 @@ func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*p msg := "cannot move replication cursor, keeping hold on `to` until successful" log(ctx).WithError(err).Error(msg) err = errors.Wrap(err, msg) - // it is correct to not release the hold if we can't move the cursor! + // it is correct to not destroy from and to step holds if we can't move the cursor! return &pdu.SendCompletedRes{}, err } } else { diff --git a/endpoint/endpoint_zfs_abstraction_cursor_and_last_received_hold.go b/endpoint/endpoint_zfs_abstraction_cursor_and_last_received_hold.go index 6c86ec4..81870c6 100644 --- a/endpoint/endpoint_zfs_abstraction_cursor_and_last_received_hold.go +++ b/endpoint/endpoint_zfs_abstraction_cursor_and_last_received_hold.go @@ -141,8 +141,7 @@ func CreateReplicationCursor(ctx context.Context, fs string, target zfs.Filesyst return nil, zfs.ErrBookmarkCloningNotSupported } - // idempotently create bookmark (guid is encoded in it, hence we'll most likely add a new one - // cleanup the old one afterwards + // idempotently create bookmark (guid is encoded in it) cursorBookmark, err := zfs.ZFSBookmark(ctx, fs, target, bookmarkname) if err != nil { @@ -160,57 +159,9 @@ func CreateReplicationCursor(ctx context.Context, fs string, target zfs.Filesyst }, nil } -type ReplicationCursor interface { - GetCreateTXG() uint64 -} - -func DestroyObsoleteReplicationCursors(ctx context.Context, fs string, current ReplicationCursor, jobID JobID) (_ []Abstraction, err error) { - - q := ListZFSHoldsAndBookmarksQuery{ - FS: ListZFSHoldsAndBookmarksQueryFilesystemFilter{ - FS: &fs, - }, - What: AbstractionTypeSet{ - AbstractionReplicationCursorBookmarkV2: true, - }, - JobID: &jobID, - CreateTXG: CreateTXGRange{ - Since: nil, - Until: &CreateTXGRangeBound{ - CreateTXG: current.GetCreateTXG(), - Inclusive: &zfs.NilBool{B: false}, - }, - }, - Concurrency: 1, - } - abs, absErr, err := ListAbstractions(ctx, q) - if err != nil { - return nil, errors.Wrap(err, "list abstractions") - } - if len(absErr) > 0 { - return nil, errors.Wrap(ListAbstractionsErrors(absErr), "list abstractions") - } - - var destroyed []Abstraction - var errs []error - for res := range BatchDestroy(ctx, abs) { - log := getLogger(ctx). - WithField("replication_cursor_bookmark", res.Abstraction) - if res.DestroyErr != nil { - errs = append(errs, res.DestroyErr) - log.WithError(err). - Error("cannot destroy obsolete replication cursor bookmark") - } else { - destroyed = append(destroyed, res.Abstraction) - log.Info("destroyed obsolete replication cursor bookmark") - } - } - if len(errs) == 0 { - return destroyed, nil - } else { - return destroyed, errorarray.Wrap(errs, "destroy obsolete replication cursor") - } -} +const ( + ReplicationCursorBookmarkNamePrefix = "zrepl_last_received_J_" +) var lastReceivedHoldTagRE = regexp.MustCompile("^zrepl_last_received_J_(.+)$") diff --git a/endpoint/endpoint_zfs_abstraction_step.go b/endpoint/endpoint_zfs_abstraction_step.go index 5fc91b0..78f7b78 100644 --- a/endpoint/endpoint_zfs_abstraction_step.go +++ b/endpoint/endpoint_zfs_abstraction_step.go @@ -7,7 +7,6 @@ import ( "github.com/pkg/errors" - "github.com/zrepl/zrepl/util/errorarray" "github.com/zrepl/zrepl/zfs" ) @@ -113,131 +112,6 @@ func HoldStep(ctx context.Context, fs string, v zfs.FilesystemVersion, jobID Job }, nil } -// idempotently release the step-hold on v if v is a snapshot -// or idempotently destroy the step-bookmark of v if v is a bookmark -// -// note that this operation leaves v itself untouched, unless v is the step-bookmark itself, in which case v is destroyed -// -// returns an instance of *zfs.DatasetDoesNotExist if `v` does not exist -func ReleaseStep(ctx context.Context, fs string, v zfs.FilesystemVersion, jobID JobID) error { - - if v.IsSnapshot() { - tag, err := StepHoldTag(jobID) - if err != nil { - return errors.Wrap(err, "step release tag") - } - - if err := zfs.ZFSRelease(ctx, tag, v.FullPath(fs)); err != nil { - return errors.Wrap(err, "step release: zfs") - } - - return nil - } - if !v.IsBookmark() { - panic(fmt.Sprintf("impl error: expecting version to be a bookmark, got %#v", v)) - } - - bmname, err := StepBookmarkName(fs, v.Guid, jobID) - if err != nil { - return errors.Wrap(err, "step release: determine bookmark name") - } - // idempotently destroy bookmark - - if err := zfs.ZFSDestroyIdempotent(ctx, bmname); err != nil { - return errors.Wrap(err, "step release: bookmark destroy: zfs") - } - - return nil -} - -// release {step holds, step bookmarks} earlier and including `mostRecent` -func ReleaseStepCummulativeInclusive(ctx context.Context, fs string, since *CreateTXGRangeBound, mostRecent zfs.FilesystemVersion, jobID JobID) error { - q := ListZFSHoldsAndBookmarksQuery{ - What: AbstractionTypeSet{ - AbstractionStepHold: true, - AbstractionStepBookmark: true, - }, - FS: ListZFSHoldsAndBookmarksQueryFilesystemFilter{ - FS: &fs, - }, - JobID: &jobID, - CreateTXG: CreateTXGRange{ - Since: since, - Until: &CreateTXGRangeBound{ - CreateTXG: mostRecent.CreateTXG, - Inclusive: &zfs.NilBool{B: true}, - }, - }, - Concurrency: 1, - } - abs, absErrs, err := ListAbstractions(ctx, q) - if err != nil { - return errors.Wrap(err, "step release cummulative: list") - } - if len(absErrs) > 0 { - return errors.Wrap(ListAbstractionsErrors(absErrs), "step release cummulative: list") - } - - getLogger(ctx).WithField("step_holds_and_bookmarks", fmt.Sprintf("%s", abs)).Debug("releasing step holds and bookmarks") - - var errs []error - for res := range BatchDestroy(ctx, abs) { - log := getLogger(ctx). - WithField("step_hold_or_bookmark", res.Abstraction) - if res.DestroyErr != nil { - errs = append(errs, res.DestroyErr) - log.WithError(err). - Error("cannot release step hold or bookmark") - } else { - log.Info("released step hold or bookmark") - } - } - if len(errs) == 0 { - return nil - } else { - return errorarray.Wrap(errs, "step release cummulative: release") - } -} - -func TryReleaseStepStaleFS(ctx context.Context, fs string, jobID JobID) { - - q := ListZFSHoldsAndBookmarksQuery{ - FS: ListZFSHoldsAndBookmarksQueryFilesystemFilter{ - FS: &fs, - }, - JobID: &jobID, - What: AbstractionTypeSet{ - AbstractionStepHold: true, - AbstractionStepBookmark: true, - AbstractionReplicationCursorBookmarkV2: true, - }, - Concurrency: 1, - } - staleness, err := ListStale(ctx, q) - if _, ok := err.(*ListStaleQueryError); ok { - panic(err) - } else if err != nil { - getLogger(ctx).WithError(err).Error("cannot list stale step holds and bookmarks") - return - } - for _, s := range staleness.Stale { - getLogger(ctx).WithField("stale_step_hold_or_bookmark", s).Info("batch-destroying stale step hold or bookmark") - } - for res := range BatchDestroy(ctx, staleness.Stale) { - if res.DestroyErr != nil { - getLogger(ctx). - WithField("stale_step_hold_or_bookmark", res.Abstraction). - WithError(res.DestroyErr). - Error("cannot destroy stale step-hold or bookmark") - } else { - getLogger(ctx). - WithField("stale_step_hold_or_bookmark", res.Abstraction). - WithError(res.DestroyErr). - Info("destroyed stale step-hold or bookmark") - } - } -} - var _ BookmarkExtractor = StepBookmarkExtractor func StepBookmarkExtractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion) (_ Abstraction) { diff --git a/platformtest/tests/replicationCursor.go b/platformtest/tests/replicationCursor.go index 33ef81d..72e3e5c 100644 --- a/platformtest/tests/replicationCursor.go +++ b/platformtest/tests/replicationCursor.go @@ -11,7 +11,7 @@ import ( "github.com/zrepl/zrepl/zfs" ) -func ReplicationCursor(ctx *platformtest.Context) { +func CreateReplicationCursor(ctx *platformtest.Context) { platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, ` CREATEROOT @@ -54,13 +54,7 @@ func ReplicationCursor(ctx *platformtest.Context) { assert.Equal(ctx, zfs.ErrBookmarkCloningNotSupported, err) // ... for target = replication cursor bookmark to be created cursorOfCursor, err := endpoint.CreateReplicationCursor(ctx, fs, cursorOfSnapIdemp.GetFilesystemVersion(), jobid) - checkCreateCursor(err, cursorOfCursor, cursorOfCursor.GetFilesystemVersion()) - - destroyed, err := endpoint.DestroyObsoleteReplicationCursors(ctx, fs, &snap, jobid) - if err != nil { - panic(err) - } - assert.Empty(ctx, destroyed) + checkCreateCursor(err, cursorOfCursor, cursorOfSnap.GetFilesystemVersion()) snapProps, err := zfs.ZFSGetFilesystemVersion(ctx, snap.FullPath(fs)) if err != nil { @@ -78,18 +72,4 @@ func ReplicationCursor(ctx *platformtest.Context) { panic(fmt.Sprintf("guids do not match: %v != %v", bm.Guid, snapProps.Guid)) } - // try moving - cursor1BookmarkName, err := endpoint.ReplicationCursorBookmarkName(fs, snap.Guid, jobid) - require.NoError(ctx, err) - - snap2 := fsversion(ctx, fs, "@2 with space") - - _, err = endpoint.CreateReplicationCursor(ctx, fs, snap, jobid) - assert.NoError(ctx, err) - destroyed, err = endpoint.DestroyObsoleteReplicationCursors(ctx, fs, &snap2, jobid) - - require.NoError(ctx, err) - require.Equal(ctx, 1, len(destroyed)) - require.Equal(ctx, endpoint.AbstractionReplicationCursorBookmarkV2, destroyed[0].GetType()) - require.Equal(ctx, cursor1BookmarkName, destroyed[0].GetName()) } diff --git a/platformtest/tests/tests.go b/platformtest/tests/tests.go index 1aee976..89c7098 100644 --- a/platformtest/tests/tests.go +++ b/platformtest/tests/tests.go @@ -17,7 +17,7 @@ var Cases = []Case{ BatchDestroy, UndestroyableSnapshotParsing, GetNonexistent, - ReplicationCursor, + CreateReplicationCursor, IdempotentHold, IdempotentBookmark, IdempotentDestroy,