From 292b85b5ef17cebf089d7343137c13362fb31c03 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sun, 10 May 2020 15:06:44 +0200 Subject: [PATCH] [#316] endpoint / replication protocol: more robust step-holds and replication cursor management - drop HintMostRecentCommonAncestor rpc call - it is wrong to put faith into the active side of the replication to always make that call (we might not trust it, ref pull setup) - clean up step holds + step bookmarks + replication cursor bookmarks on send RPC instead - this makes it symmetric with Receive RPC - use a cache (endpoint.sendAbstractionsCache) to avoid the cost of listing the on-disk endpoint abstractions state on every step The "create" methods for endpoint abstractions (CreateReplicationCursor, HoldStep) are now fully idempotent and return an Abstraction. Notes about endpoint.sendAbstractionsCache: - fills lazily from disk state on first `Get` operation - fill from disk is generally only attempted once - unless the `ListAbstractions` fails, in which case the fill from disk is retried on next `Get` (the current `Get` will observe a subset of the actual on-disk abstractions) - the `Invalidate` method is called - it is a global (zrepl process-wide) cache fixes #316 --- daemon/daemon.go | 2 + daemon/job/active.go | 12 + daemon/job/active_test.go | 19 ++ endpoint/endpoint.go | 252 +++++----------- endpoint/endpoint_metrics.go | 7 + endpoint/endpoint_send_abstractions_cache.go | 203 +++++++++++++ endpoint/endpoint_zfs_abstraction.go | 24 ++ ...straction_cursor_and_last_received_hold.go | 42 +-- endpoint/endpoint_zfs_abstraction_step.go | 5 +- platformtest/tests/idempotentBookmark.go | 14 +- platformtest/tests/idempotentDestroy.go | 4 +- platformtest/tests/replicationCursor.go | 36 ++- replication/logic/pdu/pdu.pb.go | 279 +++++------------- replication/logic/pdu/pdu.proto | 17 -- replication/logic/replication_logic.go | 38 --- rpc/rpc_client.go | 7 - rpc/versionhandshake/versionhandshake.go | 2 +- zfs/versions.go | 7 + zfs/zfs.go | 37 ++- 19 files changed, 513 insertions(+), 494 deletions(-) create mode 100644 daemon/job/active_test.go create mode 100644 endpoint/endpoint_metrics.go create mode 100644 endpoint/endpoint_send_abstractions_cache.go diff --git a/daemon/daemon.go b/daemon/daemon.go index 94ed24a..824272d 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -13,6 +13,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/zrepl/zrepl/daemon/logging/trace" + "github.com/zrepl/zrepl/endpoint" "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/job" @@ -95,6 +96,7 @@ func Run(ctx context.Context, conf *config.Config) error { // register global (=non job-local) metrics zfscmd.RegisterMetrics(prometheus.DefaultRegisterer) trace.RegisterMetrics(prometheus.DefaultRegisterer) + endpoint.RegisterMetrics(prometheus.DefaultRegisterer) log.Info("starting daemon") diff --git a/daemon/job/active.go b/daemon/job/active.go index 46f1d65..a65a8a6 100644 --- a/daemon/job/active.go +++ b/daemon/job/active.go @@ -376,9 +376,21 @@ func (j *ActiveSide) SenderConfig() *endpoint.SenderConfig { return push.senderConfig } +// The active side of a replication uses one end (sender or receiver) +// directly by method invocation, without going through a transport that +// provides a client identity. +// However, in order to avoid the need to distinguish between direct-method-invocating +// clients and RPC client, we use an invalid client identity as a sentinel value. +func FakeActiveSideDirectMethodInvocationClientIdentity(jobId endpoint.JobID) string { + return fmt.Sprintf("", jobId.String()) +} + func (j *ActiveSide) Run(ctx context.Context) { ctx, endTask := trace.WithTaskAndSpan(ctx, "active-side-job", j.Name()) defer endTask() + + ctx = context.WithValue(ctx, endpoint.ClientIdentityKey, FakeActiveSideDirectMethodInvocationClientIdentity(j.name)) + log := GetLogger(ctx) defer log.Info("job exiting") diff --git a/daemon/job/active_test.go b/daemon/job/active_test.go new file mode 100644 index 0000000..59d9b53 --- /dev/null +++ b/daemon/job/active_test.go @@ -0,0 +1,19 @@ +package job + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/zrepl/zrepl/endpoint" + "github.com/zrepl/zrepl/transport" +) + +func TestFakeActiveSideDirectMethodInvocationClientIdentityDoesNotPassValidityTest(t *testing.T) { + jobid, err := endpoint.MakeJobID("validjobname") + require.NoError(t, err) + clientIdentity := FakeActiveSideDirectMethodInvocationClientIdentity(jobid) + t.Logf("%v", clientIdentity) + err = transport.ValidateClientIdentity(clientIdentity) + assert.Error(t, err) +} diff --git a/endpoint/endpoint.go b/endpoint/endpoint.go index 35279c9..b643e12 100644 --- a/endpoint/endpoint.go +++ b/endpoint/endpoint.go @@ -7,8 +7,8 @@ import ( "fmt" "io" "path" - "sync" + "github.com/kr/pretty" "github.com/pkg/errors" "github.com/zrepl/zrepl/daemon/logging/trace" @@ -117,119 +117,8 @@ func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesyst } -func (p *Sender) HintMostRecentCommonAncestor(ctx context.Context, r *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error) { - defer trace.WithSpanFromStackUpdateCtx(&ctx)() - - fsp, err := p.filterCheckFS(r.GetFilesystem()) - if err != nil { - return nil, err - } - fs := fsp.ToString() - - log := getLogger(ctx).WithField("fs", fs).WithField("hinted_most_recent", fmt.Sprintf("%#v", r.GetSenderVersion())) - - log.WithField("full_hint", r).Debug("full hint") - - if r.GetSenderVersion() == nil { - // no common ancestor found, likely due to failed prior replication attempt - // => release stale step holds to prevent them from accumulating - // (they can accumulate on initial replication because each inital replication step might hold a different `to`) - // => replication cursors cannot accumulate because we always _move_ the replication cursor - log.Debug("releasing all step holds on the filesystem") - TryReleaseStepStaleFS(ctx, fs, p.jobId) - return &pdu.HintMostRecentCommonAncestorRes{}, nil - } - - // we were hinted a specific common ancestor - - mostRecentVersion, err := sendArgsFromPDUAndValidateExistsAndGetVersion(ctx, fs, r.GetSenderVersion()) - if err != nil { - msg := "HintMostRecentCommonAncestor rpc with nonexistent most recent version" - log.Warn(msg) - return nil, errors.Wrap(err, msg) - } - - // move replication cursor to this position - destroyedCursors, err := MoveReplicationCursor(ctx, fs, mostRecentVersion, p.jobId) - if err == zfs.ErrBookmarkCloningNotSupported { - log.Debug("not creating replication cursor from bookmark because ZFS does not support it") - // fallthrough - } else if err != nil { - return nil, errors.Wrap(err, "cannot set replication cursor to hinted version") - } - - // take care of stale step holds - log.WithField("step-holds-cleanup-mode", senderHintMostRecentCommonAncestorStepCleanupMode). - Debug("taking care of possibly stale step holds") - doStepCleanup := false - var stepCleanupSince *CreateTXGRangeBound - switch senderHintMostRecentCommonAncestorStepCleanupMode { - case StepCleanupNoCleanup: - doStepCleanup = false - case StepCleanupRangeSinceUnbounded: - doStepCleanup = true - stepCleanupSince = nil - case StepCleanupRangeSinceReplicationCursor: - doStepCleanup = true - // Use the destroyed replication cursors as indicator how far the previous replication got. - // To be precise: We limit the amount of visisted snapshots to exactly those snapshots - // created since the last successful replication cursor movement (i.e. last successful replication step) - // - // If we crash now, we'll leak the step we are about to release, but the performance gain - // of limiting the amount of snapshots we visit makes up for that. - // Users have the `zrepl holds release-stale` command to cleanup leaked step holds. - for _, destroyed := range destroyedCursors { - if stepCleanupSince == nil { - stepCleanupSince = &CreateTXGRangeBound{ - CreateTXG: destroyed.GetCreateTXG(), - Inclusive: &zfs.NilBool{B: true}, - } - } else if destroyed.GetCreateTXG() < stepCleanupSince.CreateTXG { - stepCleanupSince.CreateTXG = destroyed.GetCreateTXG() - } - } - default: - panic(senderHintMostRecentCommonAncestorStepCleanupMode) - } - if !doStepCleanup { - log.Info("skipping cleanup of prior invocations' step holds due to environment variable setting") - } else { - if err := ReleaseStepCummulativeInclusive(ctx, fs, stepCleanupSince, mostRecentVersion, p.jobId); err != nil { - return nil, errors.Wrap(err, "cannot cleanup prior invocation's step holds and bookmarks") - } else { - log.Info("step hold cleanup done") - } - } - - return &pdu.HintMostRecentCommonAncestorRes{}, nil -} - -type HintMostRecentCommonAncestorStepCleanupMode struct{ string } - -var ( - StepCleanupRangeSinceReplicationCursor = HintMostRecentCommonAncestorStepCleanupMode{"range-since-replication-cursor"} - StepCleanupRangeSinceUnbounded = HintMostRecentCommonAncestorStepCleanupMode{"range-since-unbounded"} - StepCleanupNoCleanup = HintMostRecentCommonAncestorStepCleanupMode{"no-cleanup"} -) - -func (m HintMostRecentCommonAncestorStepCleanupMode) String() string { return string(m.string) } -func (m *HintMostRecentCommonAncestorStepCleanupMode) Set(s string) error { - switch s { - case StepCleanupRangeSinceReplicationCursor.String(): - *m = StepCleanupRangeSinceReplicationCursor - case StepCleanupRangeSinceUnbounded.String(): - *m = StepCleanupRangeSinceUnbounded - case StepCleanupNoCleanup.String(): - *m = StepCleanupNoCleanup - default: - return fmt.Errorf("unknown step cleanup mode %q", s) - } - return nil -} - -var senderHintMostRecentCommonAncestorStepCleanupMode = *envconst.Var("ZREPL_ENDPOINT_SENDER_HINT_MOST_RECENT_STEP_HOLD_CLEANUP_MODE", &StepCleanupRangeSinceReplicationCursor).(*HintMostRecentCommonAncestorStepCleanupMode) - -var maxConcurrentZFSSendSemaphore = semaphore.New(envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_SEND", 10)) +var maxConcurrentZFSSend = envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_SEND", 10) +var maxConcurrentZFSSendSemaphore = semaphore.New(maxConcurrentZFSSend) func uncheckedSendArgsFromPDU(fsv *pdu.FilesystemVersion) *zfs.ZFSSendArgVersion { if fsv == nil { @@ -319,10 +208,11 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea return res, nil, nil } - // update replication cursor + // create a replication cursor for `From` (usually an idempotent no-op because SendCompleted already created it before) + var fromReplicationCursor Abstraction if sendArgs.From != nil { // For all but the first replication, this should always be a no-op because SendCompleted already moved the cursor - _, err = MoveReplicationCursor(ctx, sendArgs.FS, sendArgs.FromVersion, s.jobId) + fromReplicationCursor, err = CreateReplicationCursor(ctx, sendArgs.FS, *sendArgs.FromVersion, s.jobId) // no shadow if err == zfs.ErrBookmarkCloningNotSupported { getLogger(ctx).Debug("not creating replication cursor from bookmark because ZFS does not support it") // fallthrough @@ -331,9 +221,10 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea } } + var fromHold, toHold Abstraction // make sure `From` doesn't go away in order to make this step resumable if sendArgs.From != nil { - _, err := HoldStep(ctx, sendArgs.FS, *sendArgs.FromVersion, s.jobId) + fromHold, err = HoldStep(ctx, sendArgs.FS, *sendArgs.FromVersion, s.jobId) // no shadow if err == zfs.ErrBookmarkCloningNotSupported { getLogger(ctx).Debug("not creating step bookmark because ZFS does not support it") // fallthrough @@ -342,17 +233,71 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea } } // make sure `To` doesn't go away in order to make this step resumable - _, err = HoldStep(ctx, sendArgs.FS, sendArgs.ToVersion, s.jobId) + toHold, err = HoldStep(ctx, sendArgs.FS, sendArgs.ToVersion, s.jobId) if err != nil { return nil, nil, errors.Wrapf(err, "cannot hold `to` version %q before starting send", sendArgs.ToVersion) } - // step holds & replication cursor released / moved forward in s.SendCompleted => s.moveCursorAndReleaseSendHolds + // cleanup the mess that _this function_ might have created in prior failed attempts: + // + // In summary, we delete every endpoint ZFS abstraction created on this filesystem for this job id, + // except for the ones we just created above. + // + // This is the most robust approach to avoid leaking (= forgetting to clean up) endpoint ZFS abstractions, + // all under the assumption that there will only ever be one send for a (jobId,fs) combination at any given time. + // + // Note that the SendCompleted rpc can't be relied upon for this purpose: + // - it might be lost due to network errors, + // - or never be sent by a potentially malicious or buggy client, + // - or never be send because the replication step failed at some point + // (potentially leaving a resumable state on the receiver, which is the case where we really do not want to blow away the step holds too soon.) + // + // Note further that a resuming send, due to the idempotent nature of func CreateReplicationCursor and HoldStep, + // will never lose its step holds because we just (idempotently re-)created them above, before attempting the cleanup. + func() { + ctx, endSpan := trace.WithSpan(ctx, "cleanup-stale-abstractions") + defer endSpan() + + liveAbs := []Abstraction{fromHold, toHold, fromReplicationCursor} + keep := func(a Abstraction) (keep bool) { + keep = false + for _, k := range liveAbs { + keep = keep || AbstractionEquals(a, k) + } + return keep + } + check := func(obsoleteAbs []Abstraction) { + // last line of defense: check that we don't destroy the incremental `from` and `to` + // if we did that, we might be about to blow away the last common filesystem version between sender and receiver + mustLiveVersions := []zfs.FilesystemVersion{sendArgs.ToVersion} + if sendArgs.FromVersion != nil { + mustLiveVersions = append(mustLiveVersions, *sendArgs.FromVersion) + } + for _, staleVersion := range obsoleteAbs { + for _, mustLiveVersion := range mustLiveVersions { + if zfs.FilesystemVersionEqualIdentity(mustLiveVersion, staleVersion.GetFilesystemVersion()) { + panic(fmt.Sprintf("impl error: %q would be destroyed because it is considered stale but it is part of of sendArgs=%s", mustLiveVersion.String(), pretty.Sprint(sendArgs))) + } + } + } + } + sendAbstractionsCacheSingleton.TryBatchDestroy(ctx, s.jobId, sendArgs.FS, keep, check) + }() + + if fromHold != nil { + sendAbstractionsCacheSingleton.Put(fromHold) + } + sendAbstractionsCacheSingleton.Put(toHold) + if fromReplicationCursor != nil { + sendAbstractionsCacheSingleton.Put(fromReplicationCursor) + } sendStream, err := zfs.ZFSSend(ctx, sendArgs) if err != nil { + // it's ok to not destroy the abstractions we just created here, a new send attempt will take care of it return nil, nil, errors.Wrap(err, "zfs send failed") } + return res, sendStream, nil } @@ -389,8 +334,7 @@ func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*p return log } - log(ctx).Debug("move replication cursor to most recent common version") - destroyedCursors, err := MoveReplicationCursor(ctx, fs, to, p.jobId) + toReplicationCursor, err := CreateReplicationCursor(ctx, fs, to, p.jobId) if err != nil { if err == zfs.ErrBookmarkCloningNotSupported { log(ctx).Debug("not setting replication cursor, bookmark cloning not supported") @@ -402,59 +346,17 @@ func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*p return &pdu.SendCompletedRes{}, err } } else { - log(ctx).Info("successfully moved replication cursor") + sendAbstractionsCacheSingleton.Put(toReplicationCursor) + log(ctx).WithField("to_cursor", toReplicationCursor.String()).Info("successfully created `to` replication cursor") } - // kick off releasing of step holds / bookmarks - // if we fail to release them, don't bother the caller: - // they are merely an implementation detail on the sender for better resumability - var wg sync.WaitGroup - wg.Add(2) - go func() { - defer wg.Done() - ctx, endTask := trace.WithTask(ctx, "release-step-hold-to") - defer endTask() - - log(ctx).Debug("release step-hold of or step-bookmark on `to`") - err = ReleaseStep(ctx, fs, to, p.jobId) - if err != nil { - log(ctx).WithError(err).Error("cannot release step-holds on or destroy step-bookmark of `to`") - } else { - log(ctx).Info("successfully released step-holds on or destroyed step-bookmark of `to`") - } - - }() - go func() { - defer wg.Done() - ctx, endTask := trace.WithTask(ctx, "release-step-hold-from") - defer endTask() - - if from == nil { - return - } - log(ctx).Debug("release step-hold of or step-bookmark on `from`") - err := ReleaseStep(ctx, fs, *from, p.jobId) - if err != nil { - if dne, ok := err.(*zfs.DatasetDoesNotExist); ok { - // If bookmark cloning is not supported, `from` might be the old replication cursor - // and thus have already been destroyed by MoveReplicationCursor above - // In that case, nonexistence of `from` is not an error, otherwise it is. - for _, c := range destroyedCursors { - if c.GetFullPath() == dne.Path { - log(ctx).Info("`from` was a replication cursor and has already been destroyed") - return - } - } - // fallthrough - } - log(ctx).WithError(err).Error("cannot release step-holds on or destroy step-bookmark of `from`") - } else { - log(ctx).Info("successfully released step-holds on or destroyed step-bookmark of `from`") - } - }() - wg.Wait() + keep := func(a Abstraction) bool { + return AbstractionEquals(a, toReplicationCursor) + } + sendAbstractionsCacheSingleton.TryBatchDestroy(ctx, p.jobId, fs, keep, nil) return &pdu.SendCompletedRes{}, nil + } func (p *Sender) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) { @@ -974,16 +876,6 @@ func (s *Receiver) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapsho return doDestroySnapshots(ctx, lp, req.Snapshots) } -func (p *Receiver) HintMostRecentCommonAncestor(ctx context.Context, r *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error) { - defer trace.WithSpanFromStackUpdateCtx(&ctx)() - // we don't move last-received-hold as part of this hint - // because that wouldn't give us any benefit wrt resumability. - // - // Other reason: the replication logic that issues this RPC would require refactoring - // to include the receiver's FilesystemVersion in the request) - return &pdu.HintMostRecentCommonAncestorRes{}, nil -} - func (p *Receiver) SendCompleted(ctx context.Context, _ *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) { defer trace.WithSpanFromStackUpdateCtx(&ctx)() diff --git a/endpoint/endpoint_metrics.go b/endpoint/endpoint_metrics.go new file mode 100644 index 0000000..15471fd --- /dev/null +++ b/endpoint/endpoint_metrics.go @@ -0,0 +1,7 @@ +package endpoint + +import "github.com/prometheus/client_golang/prometheus" + +func RegisterMetrics(r prometheus.Registerer) { + r.MustRegister(sendAbstractionsCacheMetrics.count) +} diff --git a/endpoint/endpoint_send_abstractions_cache.go b/endpoint/endpoint_send_abstractions_cache.go new file mode 100644 index 0000000..beb8e40 --- /dev/null +++ b/endpoint/endpoint_send_abstractions_cache.go @@ -0,0 +1,203 @@ +package endpoint + +import ( + "context" + "fmt" + "sync" + + "github.com/prometheus/client_golang/prometheus" + "github.com/zrepl/zrepl/daemon/logging/trace" + "github.com/zrepl/zrepl/util/chainlock" +) + +var sendAbstractionsCacheMetrics struct { + count prometheus.Gauge +} + +func init() { + sendAbstractionsCacheMetrics.count = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "zrepl", + Subsystem: "endpoint", + Name: "send_abstractions_cache_entry_count", + Help: "number of send abstractions tracked in the sendAbstractionsCache data structure", + }) +} + +var sendAbstractionsCacheSingleton = newSendAbstractionsCache() + +type sendAbstractionsCacheDidLoadFSState int + +const ( + sendAbstractionsCacheDidLoadFSStateNo sendAbstractionsCacheDidLoadFSState = iota // 0-value has meaning + sendAbstractionsCacheDidLoadFSStateInProgress + sendAbstractionsCacheDidLoadFSStateDone +) + +type sendAbstractionsCache struct { + mtx chainlock.L + abstractions []Abstraction + didLoadFS map[string]sendAbstractionsCacheDidLoadFSState + didLoadFSChanged *sync.Cond +} + +func newSendAbstractionsCache() *sendAbstractionsCache { + c := &sendAbstractionsCache{ + didLoadFS: make(map[string]sendAbstractionsCacheDidLoadFSState), + } + c.didLoadFSChanged = c.mtx.NewCond() + return c +} + +func (s *sendAbstractionsCache) Put(a Abstraction) { + defer s.mtx.Lock().Unlock() + + var zeroJobId JobID + if a.GetJobID() == nil { + panic("abstraction must not have nil job id") + } else if *a.GetJobID() == zeroJobId { + panic(fmt.Sprintf("abstraction must not have zero-value job id: %s", a)) + } + + s.abstractions = append(s.abstractions, a) + sendAbstractionsCacheMetrics.count.Set(float64(len(s.abstractions))) +} + +func (s *sendAbstractionsCache) InvalidateFSCache(fs string) { + // FIXME: O(n) + newAbs := make([]Abstraction, 0, len(s.abstractions)) + for _, a := range s.abstractions { + if a.GetFS() != fs { + newAbs = append(newAbs, a) + } + } + s.abstractions = newAbs + sendAbstractionsCacheMetrics.count.Set(float64(len(s.abstractions))) + + s.didLoadFS[fs] = sendAbstractionsCacheDidLoadFSStateNo + s.didLoadFSChanged.Broadcast() + +} + +// - logs errors in getting on-disk abstractions +// - only fetches on-disk abstractions once, but every time from the in-memory store +// +// That means that for precise results, all abstractions created by the endpoint must be .Put into this cache. +func (s *sendAbstractionsCache) GetAndDeleteByJobIDAndFS(ctx context.Context, jobID JobID, fs string, keep func(a Abstraction) bool) (ret []Abstraction) { + defer s.mtx.Lock().Unlock() + defer trace.WithSpanFromStackUpdateCtx(&ctx)() + var zeroJobId JobID + if jobID == zeroJobId { + panic("must not pass zero-value job id") + } + if fs == "" { + panic("must not pass zero-value fs") + } + + s.tryLoadOnDiskSendAbstractions(ctx, fs) + + // FIXME O(n) + var remaining []Abstraction + for _, a := range s.abstractions { + aJobId := *a.GetJobID() + aFS := a.GetFS() + if aJobId == jobID && aFS == fs && !keep(a) { + ret = append(ret, a) + } else { + remaining = append(remaining, a) + } + } + s.abstractions = remaining + sendAbstractionsCacheMetrics.count.Set(float64(len(s.abstractions))) + + return ret +} + +// caller must hold s.mtx +func (s *sendAbstractionsCache) tryLoadOnDiskSendAbstractions(ctx context.Context, fs string) { + for s.didLoadFS[fs] != sendAbstractionsCacheDidLoadFSStateDone { + if s.didLoadFS[fs] == sendAbstractionsCacheDidLoadFSStateInProgress { + s.didLoadFSChanged.Wait() + continue + } + if s.didLoadFS[fs] != sendAbstractionsCacheDidLoadFSStateNo { + panic(fmt.Sprintf("unreachable: %v", s.didLoadFS[fs])) + } + + s.didLoadFS[fs] = sendAbstractionsCacheDidLoadFSStateInProgress + defer s.didLoadFSChanged.Broadcast() + + var onDiskAbs []Abstraction + var err error + s.mtx.DropWhile(func() { + onDiskAbs, err = s.tryLoadOnDiskSendAbstractionsImpl(ctx, fs) // no shadow + }) + + if err != nil { + s.didLoadFS[fs] = sendAbstractionsCacheDidLoadFSStateNo + getLogger(ctx).WithField("fs", fs).WithError(err).Error("cannot list send step abstractions for filesystem") + } else { + s.didLoadFS[fs] = sendAbstractionsCacheDidLoadFSStateDone + s.abstractions = append(s.abstractions, onDiskAbs...) + getLogger(ctx).WithField("fs", fs).WithField("abstractions", onDiskAbs).Debug("loaded step abstractions for filesystem") + } + return + } +} + +// caller should _not hold s.mtx +func (s *sendAbstractionsCache) tryLoadOnDiskSendAbstractionsImpl(ctx context.Context, fs string) ([]Abstraction, error) { + defer trace.WithSpanFromStackUpdateCtx(&ctx)() + + q := ListZFSHoldsAndBookmarksQuery{ + FS: ListZFSHoldsAndBookmarksQueryFilesystemFilter{ + FS: &fs, + }, + JobID: nil, + What: AbstractionTypeSet{ + AbstractionStepHold: true, + AbstractionStepBookmark: true, + AbstractionReplicationCursorBookmarkV2: true, + }, + Concurrency: 1, + } + abs, absErrs, err := ListAbstractions(ctx, q) + if err != nil { + return nil, err + } + // safe to ignore absErrs here, this is best-effort cleanup + if len(absErrs) > 0 { + return nil, ListAbstractionsErrors(absErrs) + } + return abs, nil +} + +func (s *sendAbstractionsCache) TryBatchDestroy(ctx context.Context, jobId JobID, fs string, keep func(a Abstraction) bool, check func(willDestroy []Abstraction)) { + // no s.mtx, we only use the public interface in this function + + defer trace.WithSpanFromStackUpdateCtx(&ctx)() + + obsoleteAbs := s.GetAndDeleteByJobIDAndFS(ctx, jobId, fs, keep) + + if check != nil { + check(obsoleteAbs) + } + + hadErr := false + for res := range BatchDestroy(ctx, obsoleteAbs) { + if res.DestroyErr != nil { + hadErr = true + getLogger(ctx). + WithField("abstraction", res.Abstraction). + WithError(res.DestroyErr). + Error("cannot destroy stale send step abstraction") + } else { + getLogger(ctx). + WithField("abstraction", res.Abstraction). + Info("destroyed stale send step abstraction") + } + } + if hadErr { + s.InvalidateFSCache(fs) + } + +} diff --git a/endpoint/endpoint_zfs_abstraction.go b/endpoint/endpoint_zfs_abstraction.go index e157e86..7e4bb5a 100644 --- a/endpoint/endpoint_zfs_abstraction.go +++ b/endpoint/endpoint_zfs_abstraction.go @@ -54,6 +54,30 @@ type Abstraction interface { json.Marshaler } +func AbstractionEquals(a, b Abstraction) bool { + if (a != nil) != (b != nil) { + return false + } + if a == nil && b == nil { + return true + } + var aJobId, bJobId JobID + if aJid := a.GetJobID(); aJid != nil { + aJobId = *aJid + } + if bJid := b.GetJobID(); bJid != nil { + bJobId = *bJid + } + return a.GetType() == b.GetType() && + a.GetFS() == b.GetFS() && + a.GetName() == b.GetName() && + a.GetFullPath() == b.GetFullPath() && + aJobId == bJobId && + a.GetCreateTXG() == b.GetCreateTXG() && + zfs.FilesystemVersionEqualIdentity(a.GetFilesystemVersion(), b.GetFilesystemVersion()) && + a.String() == b.String() +} + func (t AbstractionType) Validate() error { switch t { case AbstractionStepBookmark: diff --git a/endpoint/endpoint_zfs_abstraction_cursor_and_last_received_hold.go b/endpoint/endpoint_zfs_abstraction_cursor_and_last_received_hold.go index a998abf..6c86ec4 100644 --- a/endpoint/endpoint_zfs_abstraction_cursor_and_last_received_hold.go +++ b/endpoint/endpoint_zfs_abstraction_cursor_and_last_received_hold.go @@ -118,31 +118,33 @@ func GetReplicationCursors(ctx context.Context, dp *zfs.DatasetPath, jobID JobID return candidates, nil } -type ReplicationCursorTarget interface { - IsSnapshot() bool - GetGuid() uint64 - GetCreateTXG() uint64 - ToSendArgVersion() zfs.ZFSSendArgVersion -} - -// `target` is validated before replication cursor is set. if validation fails, the cursor is not moved. +// idempotently create a replication cursor targeting `target` // // returns ErrBookmarkCloningNotSupported if version is a bookmark and bookmarking bookmarks is not supported by ZFS -func MoveReplicationCursor(ctx context.Context, fs string, target ReplicationCursorTarget, jobID JobID) (destroyedCursors []Abstraction, err error) { - - if !target.IsSnapshot() { - return nil, zfs.ErrBookmarkCloningNotSupported - } +func CreateReplicationCursor(ctx context.Context, fs string, target zfs.FilesystemVersion, jobID JobID) (a Abstraction, err error) { bookmarkname, err := ReplicationCursorBookmarkName(fs, target.GetGuid(), jobID) if err != nil { return nil, errors.Wrap(err, "determine replication cursor name") } + if target.IsBookmark() && target.GetName() == bookmarkname { + return &bookmarkBasedAbstraction{ + Type: AbstractionReplicationCursorBookmarkV2, + FS: fs, + FilesystemVersion: target, + JobID: jobID, + }, nil + } + + if !target.IsSnapshot() { + return nil, zfs.ErrBookmarkCloningNotSupported + } + // idempotently create bookmark (guid is encoded in it, hence we'll most likely add a new one // cleanup the old one afterwards - err = zfs.ZFSBookmark(ctx, fs, target.ToSendArgVersion(), bookmarkname) + cursorBookmark, err := zfs.ZFSBookmark(ctx, fs, target, bookmarkname) if err != nil { if err == zfs.ErrBookmarkCloningNotSupported { return nil, err // TODO go1.13 use wrapping @@ -150,12 +152,12 @@ func MoveReplicationCursor(ctx context.Context, fs string, target ReplicationCur return nil, errors.Wrapf(err, "cannot create bookmark") } - destroyedCursors, err = DestroyObsoleteReplicationCursors(ctx, fs, target, jobID) - if err != nil { - return nil, errors.Wrap(err, "destroy obsolete replication cursors") - } - - return destroyedCursors, nil + return &bookmarkBasedAbstraction{ + Type: AbstractionReplicationCursorBookmarkV2, + FS: fs, + FilesystemVersion: cursorBookmark, + JobID: jobID, + }, nil } type ReplicationCursor interface { diff --git a/endpoint/endpoint_zfs_abstraction_step.go b/endpoint/endpoint_zfs_abstraction_step.go index ff46897..5fc91b0 100644 --- a/endpoint/endpoint_zfs_abstraction_step.go +++ b/endpoint/endpoint_zfs_abstraction_step.go @@ -93,7 +93,7 @@ func HoldStep(ctx context.Context, fs string, v zfs.FilesystemVersion, jobID Job return nil, errors.Wrap(err, "create step bookmark: determine bookmark name") } // idempotently create bookmark - err = zfs.ZFSBookmark(ctx, fs, v.ToSendArgVersion(), bmname) + stepBookmark, err := zfs.ZFSBookmark(ctx, fs, v, bmname) if err != nil { if err == zfs.ErrBookmarkCloningNotSupported { // TODO we could actually try to find a local snapshot that has the requested GUID @@ -108,7 +108,7 @@ func HoldStep(ctx context.Context, fs string, v zfs.FilesystemVersion, jobID Job return &bookmarkBasedAbstraction{ Type: AbstractionStepBookmark, FS: fs, - FilesystemVersion: v, + FilesystemVersion: stepBookmark, JobID: jobID, }, nil } @@ -236,7 +236,6 @@ func TryReleaseStepStaleFS(ctx context.Context, fs string, jobID JobID) { Info("destroyed stale step-hold or bookmark") } } - } var _ BookmarkExtractor = StepBookmarkExtractor diff --git a/platformtest/tests/idempotentBookmark.go b/platformtest/tests/idempotentBookmark.go index d3426c7..94c6e0f 100644 --- a/platformtest/tests/idempotentBookmark.go +++ b/platformtest/tests/idempotentBookmark.go @@ -3,6 +3,7 @@ package tests import ( "fmt" + "github.com/stretchr/testify/assert" "github.com/zrepl/zrepl/platformtest" "github.com/zrepl/zrepl/zfs" ) @@ -19,22 +20,23 @@ func IdempotentBookmark(ctx *platformtest.Context) { fs := fmt.Sprintf("%s/foo bar", ctx.RootDataset) - asnap := sendArgVersion(ctx, fs, "@a snap") - anotherSnap := sendArgVersion(ctx, fs, "@another snap") + asnap := fsversion(ctx, fs, "@a snap") + anotherSnap := fsversion(ctx, fs, "@another snap") - err := zfs.ZFSBookmark(ctx, fs, asnap, "a bookmark") + aBookmark, err := zfs.ZFSBookmark(ctx, fs, asnap, "a bookmark") if err != nil { panic(err) } // do it again, should be idempotent - err = zfs.ZFSBookmark(ctx, fs, asnap, "a bookmark") + aBookmarkIdemp, err := zfs.ZFSBookmark(ctx, fs, asnap, "a bookmark") if err != nil { panic(err) } + assert.Equal(ctx, aBookmark, aBookmarkIdemp) // should fail for another snapshot - err = zfs.ZFSBookmark(ctx, fs, anotherSnap, "a bookmark") + _, err = zfs.ZFSBookmark(ctx, fs, anotherSnap, "a bookmark") if err == nil { panic(err) } @@ -48,7 +50,7 @@ func IdempotentBookmark(ctx *platformtest.Context) { } // do it again, should fail with special error type - err = zfs.ZFSBookmark(ctx, fs, asnap, "a bookmark") + _, err = zfs.ZFSBookmark(ctx, fs, asnap, "a bookmark") if err == nil { panic(err) } diff --git a/platformtest/tests/idempotentDestroy.go b/platformtest/tests/idempotentDestroy.go index 53de8b1..4e5ad20 100644 --- a/platformtest/tests/idempotentDestroy.go +++ b/platformtest/tests/idempotentDestroy.go @@ -18,8 +18,8 @@ func IdempotentDestroy(ctx *platformtest.Context) { `) fs := fmt.Sprintf("%s/foo bar", ctx.RootDataset) - asnap := sendArgVersion(ctx, fs, "@a snap") - err := zfs.ZFSBookmark(ctx, fs, asnap, "a bookmark") + asnap := fsversion(ctx, fs, "@a snap") + _, err := zfs.ZFSBookmark(ctx, fs, asnap, "a bookmark") if err != nil { panic(err) } diff --git a/platformtest/tests/replicationCursor.go b/platformtest/tests/replicationCursor.go index 8c51c01..33ef81d 100644 --- a/platformtest/tests/replicationCursor.go +++ b/platformtest/tests/replicationCursor.go @@ -19,9 +19,9 @@ func ReplicationCursor(ctx *platformtest.Context) { + "foo bar@1 with space" R zfs bookmark "${ROOTDS}/foo bar@1 with space" "${ROOTDS}/foo bar#1 with space" + "foo bar@2 with space" - R zfs bookmark "${ROOTDS}/foo bar@1 with space" "${ROOTDS}/foo bar#2 with space" + R zfs bookmark "${ROOTDS}/foo bar@2 with space" "${ROOTDS}/foo bar#2 with space" + "foo bar@3 with space" - R zfs bookmark "${ROOTDS}/foo bar@1 with space" "${ROOTDS}/foo bar#3 with space" + R zfs bookmark "${ROOTDS}/foo bar@3 with space" "${ROOTDS}/foo bar#3 with space" `) jobid := endpoint.MustMakeJobID("zreplplatformtest") @@ -32,9 +32,31 @@ func ReplicationCursor(ctx *platformtest.Context) { } fs := ds.ToString() - snap := fsversion(ctx, fs, "@1 with space") - destroyed, err := endpoint.MoveReplicationCursor(ctx, fs, &snap, jobid) + checkCreateCursor := func(createErr error, c endpoint.Abstraction, references zfs.FilesystemVersion) { + assert.NoError(ctx, createErr) + expectName, err := endpoint.ReplicationCursorBookmarkName(fs, references.Guid, jobid) + assert.NoError(ctx, err) + require.Equal(ctx, expectName, c.GetFilesystemVersion().Name) + } + + snap := fsversion(ctx, fs, "@1 with space") + book := fsversion(ctx, fs, "#1 with space") + + // create first cursor + cursorOfSnap, err := endpoint.CreateReplicationCursor(ctx, fs, snap, jobid) + checkCreateCursor(err, cursorOfSnap, snap) + // check CreateReplicationCursor is idempotent (for snapshot target) + cursorOfSnapIdemp, err := endpoint.CreateReplicationCursor(ctx, fs, snap, jobid) + checkCreateCursor(err, cursorOfSnap, snap) + // ... for target = non-cursor bookmark + _, err = endpoint.CreateReplicationCursor(ctx, fs, book, jobid) + assert.Equal(ctx, zfs.ErrBookmarkCloningNotSupported, err) + // ... for target = replication cursor bookmark to be created + cursorOfCursor, err := endpoint.CreateReplicationCursor(ctx, fs, cursorOfSnapIdemp.GetFilesystemVersion(), jobid) + checkCreateCursor(err, cursorOfCursor, cursorOfCursor.GetFilesystemVersion()) + + destroyed, err := endpoint.DestroyObsoleteReplicationCursors(ctx, fs, &snap, jobid) if err != nil { panic(err) } @@ -61,7 +83,11 @@ func ReplicationCursor(ctx *platformtest.Context) { require.NoError(ctx, err) snap2 := fsversion(ctx, fs, "@2 with space") - destroyed, err = endpoint.MoveReplicationCursor(ctx, fs, &snap2, jobid) + + _, err = endpoint.CreateReplicationCursor(ctx, fs, snap, jobid) + assert.NoError(ctx, err) + destroyed, err = endpoint.DestroyObsoleteReplicationCursors(ctx, fs, &snap2, jobid) + require.NoError(ctx, err) require.Equal(ctx, 1, len(destroyed)) require.Equal(ctx, endpoint.AbstractionReplicationCursorBookmarkV2, destroyed[0].GetType()) diff --git a/replication/logic/pdu/pdu.pb.go b/replication/logic/pdu/pdu.pb.go index d10331f..cdb85bc 100644 --- a/replication/logic/pdu/pdu.pb.go +++ b/replication/logic/pdu/pdu.pb.go @@ -46,7 +46,7 @@ func (x Tri) String() string { return proto.EnumName(Tri_name, int32(x)) } func (Tri) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{0} + return fileDescriptor_pdu_483c6918b7b3d747, []int{0} } type FilesystemVersion_VersionType int32 @@ -69,7 +69,7 @@ func (x FilesystemVersion_VersionType) String() string { return proto.EnumName(FilesystemVersion_VersionType_name, int32(x)) } func (FilesystemVersion_VersionType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{5, 0} + return fileDescriptor_pdu_483c6918b7b3d747, []int{5, 0} } type ListFilesystemReq struct { @@ -82,7 +82,7 @@ func (m *ListFilesystemReq) Reset() { *m = ListFilesystemReq{} } func (m *ListFilesystemReq) String() string { return proto.CompactTextString(m) } func (*ListFilesystemReq) ProtoMessage() {} func (*ListFilesystemReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{0} + return fileDescriptor_pdu_483c6918b7b3d747, []int{0} } func (m *ListFilesystemReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ListFilesystemReq.Unmarshal(m, b) @@ -113,7 +113,7 @@ func (m *ListFilesystemRes) Reset() { *m = ListFilesystemRes{} } func (m *ListFilesystemRes) String() string { return proto.CompactTextString(m) } func (*ListFilesystemRes) ProtoMessage() {} func (*ListFilesystemRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{1} + return fileDescriptor_pdu_483c6918b7b3d747, []int{1} } func (m *ListFilesystemRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ListFilesystemRes.Unmarshal(m, b) @@ -154,7 +154,7 @@ func (m *Filesystem) Reset() { *m = Filesystem{} } func (m *Filesystem) String() string { return proto.CompactTextString(m) } func (*Filesystem) ProtoMessage() {} func (*Filesystem) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{2} + return fileDescriptor_pdu_483c6918b7b3d747, []int{2} } func (m *Filesystem) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Filesystem.Unmarshal(m, b) @@ -213,7 +213,7 @@ func (m *ListFilesystemVersionsReq) Reset() { *m = ListFilesystemVersion func (m *ListFilesystemVersionsReq) String() string { return proto.CompactTextString(m) } func (*ListFilesystemVersionsReq) ProtoMessage() {} func (*ListFilesystemVersionsReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{3} + return fileDescriptor_pdu_483c6918b7b3d747, []int{3} } func (m *ListFilesystemVersionsReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ListFilesystemVersionsReq.Unmarshal(m, b) @@ -251,7 +251,7 @@ func (m *ListFilesystemVersionsRes) Reset() { *m = ListFilesystemVersion func (m *ListFilesystemVersionsRes) String() string { return proto.CompactTextString(m) } func (*ListFilesystemVersionsRes) ProtoMessage() {} func (*ListFilesystemVersionsRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{4} + return fileDescriptor_pdu_483c6918b7b3d747, []int{4} } func (m *ListFilesystemVersionsRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ListFilesystemVersionsRes.Unmarshal(m, b) @@ -293,7 +293,7 @@ func (m *FilesystemVersion) Reset() { *m = FilesystemVersion{} } func (m *FilesystemVersion) String() string { return proto.CompactTextString(m) } func (*FilesystemVersion) ProtoMessage() {} func (*FilesystemVersion) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{5} + return fileDescriptor_pdu_483c6918b7b3d747, []int{5} } func (m *FilesystemVersion) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_FilesystemVersion.Unmarshal(m, b) @@ -371,7 +371,7 @@ func (m *SendReq) Reset() { *m = SendReq{} } func (m *SendReq) String() string { return proto.CompactTextString(m) } func (*SendReq) ProtoMessage() {} func (*SendReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{6} + return fileDescriptor_pdu_483c6918b7b3d747, []int{6} } func (m *SendReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SendReq.Unmarshal(m, b) @@ -445,7 +445,7 @@ func (m *Property) Reset() { *m = Property{} } func (m *Property) String() string { return proto.CompactTextString(m) } func (*Property) ProtoMessage() {} func (*Property) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{7} + return fileDescriptor_pdu_483c6918b7b3d747, []int{7} } func (m *Property) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Property.Unmarshal(m, b) @@ -496,7 +496,7 @@ func (m *SendRes) Reset() { *m = SendRes{} } func (m *SendRes) String() string { return proto.CompactTextString(m) } func (*SendRes) ProtoMessage() {} func (*SendRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{8} + return fileDescriptor_pdu_483c6918b7b3d747, []int{8} } func (m *SendRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SendRes.Unmarshal(m, b) @@ -548,7 +548,7 @@ func (m *SendCompletedReq) Reset() { *m = SendCompletedReq{} } func (m *SendCompletedReq) String() string { return proto.CompactTextString(m) } func (*SendCompletedReq) ProtoMessage() {} func (*SendCompletedReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{9} + return fileDescriptor_pdu_483c6918b7b3d747, []int{9} } func (m *SendCompletedReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SendCompletedReq.Unmarshal(m, b) @@ -585,7 +585,7 @@ func (m *SendCompletedRes) Reset() { *m = SendCompletedRes{} } func (m *SendCompletedRes) String() string { return proto.CompactTextString(m) } func (*SendCompletedRes) ProtoMessage() {} func (*SendCompletedRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{10} + return fileDescriptor_pdu_483c6918b7b3d747, []int{10} } func (m *SendCompletedRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SendCompletedRes.Unmarshal(m, b) @@ -620,7 +620,7 @@ func (m *ReceiveReq) Reset() { *m = ReceiveReq{} } func (m *ReceiveReq) String() string { return proto.CompactTextString(m) } func (*ReceiveReq) ProtoMessage() {} func (*ReceiveReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{11} + return fileDescriptor_pdu_483c6918b7b3d747, []int{11} } func (m *ReceiveReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ReceiveReq.Unmarshal(m, b) @@ -671,7 +671,7 @@ func (m *ReceiveRes) Reset() { *m = ReceiveRes{} } func (m *ReceiveRes) String() string { return proto.CompactTextString(m) } func (*ReceiveRes) ProtoMessage() {} func (*ReceiveRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{12} + return fileDescriptor_pdu_483c6918b7b3d747, []int{12} } func (m *ReceiveRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ReceiveRes.Unmarshal(m, b) @@ -704,7 +704,7 @@ func (m *DestroySnapshotsReq) Reset() { *m = DestroySnapshotsReq{} } func (m *DestroySnapshotsReq) String() string { return proto.CompactTextString(m) } func (*DestroySnapshotsReq) ProtoMessage() {} func (*DestroySnapshotsReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{13} + return fileDescriptor_pdu_483c6918b7b3d747, []int{13} } func (m *DestroySnapshotsReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DestroySnapshotsReq.Unmarshal(m, b) @@ -750,7 +750,7 @@ func (m *DestroySnapshotRes) Reset() { *m = DestroySnapshotRes{} } func (m *DestroySnapshotRes) String() string { return proto.CompactTextString(m) } func (*DestroySnapshotRes) ProtoMessage() {} func (*DestroySnapshotRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{14} + return fileDescriptor_pdu_483c6918b7b3d747, []int{14} } func (m *DestroySnapshotRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DestroySnapshotRes.Unmarshal(m, b) @@ -795,7 +795,7 @@ func (m *DestroySnapshotsRes) Reset() { *m = DestroySnapshotsRes{} } func (m *DestroySnapshotsRes) String() string { return proto.CompactTextString(m) } func (*DestroySnapshotsRes) ProtoMessage() {} func (*DestroySnapshotsRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{15} + return fileDescriptor_pdu_483c6918b7b3d747, []int{15} } func (m *DestroySnapshotsRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DestroySnapshotsRes.Unmarshal(m, b) @@ -833,7 +833,7 @@ func (m *ReplicationCursorReq) Reset() { *m = ReplicationCursorReq{} } func (m *ReplicationCursorReq) String() string { return proto.CompactTextString(m) } func (*ReplicationCursorReq) ProtoMessage() {} func (*ReplicationCursorReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{16} + return fileDescriptor_pdu_483c6918b7b3d747, []int{16} } func (m *ReplicationCursorReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ReplicationCursorReq.Unmarshal(m, b) @@ -874,7 +874,7 @@ func (m *ReplicationCursorRes) Reset() { *m = ReplicationCursorRes{} } func (m *ReplicationCursorRes) String() string { return proto.CompactTextString(m) } func (*ReplicationCursorRes) ProtoMessage() {} func (*ReplicationCursorRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{17} + return fileDescriptor_pdu_483c6918b7b3d747, []int{17} } func (m *ReplicationCursorRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ReplicationCursorRes.Unmarshal(m, b) @@ -1010,7 +1010,7 @@ func (m *PingReq) Reset() { *m = PingReq{} } func (m *PingReq) String() string { return proto.CompactTextString(m) } func (*PingReq) ProtoMessage() {} func (*PingReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{18} + return fileDescriptor_pdu_483c6918b7b3d747, []int{18} } func (m *PingReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PingReq.Unmarshal(m, b) @@ -1049,7 +1049,7 @@ func (m *PingRes) Reset() { *m = PingRes{} } func (m *PingRes) String() string { return proto.CompactTextString(m) } func (*PingRes) ProtoMessage() {} func (*PingRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{19} + return fileDescriptor_pdu_483c6918b7b3d747, []int{19} } func (m *PingRes) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PingRes.Unmarshal(m, b) @@ -1076,91 +1076,6 @@ func (m *PingRes) GetEcho() string { return "" } -type HintMostRecentCommonAncestorReq struct { - Filesystem string `protobuf:"bytes,1,opt,name=Filesystem,proto3" json:"Filesystem,omitempty"` - // A copy of the FilesystemVersion on the sending side that the replication - // algorithm identified as a shared most recent common version between sending - // and receiving side. - // - // If nil, this is an indication that the replication algorithm could not - // find a common ancestor between the two sides. - // NOTE: nilness does not mean that replication never happened - there could - // as well be a replication conflict. thus, dont' jump to conclusions too - // rapidly here. - SenderVersion *FilesystemVersion `protobuf:"bytes,2,opt,name=SenderVersion,proto3" json:"SenderVersion,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *HintMostRecentCommonAncestorReq) Reset() { *m = HintMostRecentCommonAncestorReq{} } -func (m *HintMostRecentCommonAncestorReq) String() string { return proto.CompactTextString(m) } -func (*HintMostRecentCommonAncestorReq) ProtoMessage() {} -func (*HintMostRecentCommonAncestorReq) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{20} -} -func (m *HintMostRecentCommonAncestorReq) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_HintMostRecentCommonAncestorReq.Unmarshal(m, b) -} -func (m *HintMostRecentCommonAncestorReq) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_HintMostRecentCommonAncestorReq.Marshal(b, m, deterministic) -} -func (dst *HintMostRecentCommonAncestorReq) XXX_Merge(src proto.Message) { - xxx_messageInfo_HintMostRecentCommonAncestorReq.Merge(dst, src) -} -func (m *HintMostRecentCommonAncestorReq) XXX_Size() int { - return xxx_messageInfo_HintMostRecentCommonAncestorReq.Size(m) -} -func (m *HintMostRecentCommonAncestorReq) XXX_DiscardUnknown() { - xxx_messageInfo_HintMostRecentCommonAncestorReq.DiscardUnknown(m) -} - -var xxx_messageInfo_HintMostRecentCommonAncestorReq proto.InternalMessageInfo - -func (m *HintMostRecentCommonAncestorReq) GetFilesystem() string { - if m != nil { - return m.Filesystem - } - return "" -} - -func (m *HintMostRecentCommonAncestorReq) GetSenderVersion() *FilesystemVersion { - if m != nil { - return m.SenderVersion - } - return nil -} - -type HintMostRecentCommonAncestorRes struct { - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *HintMostRecentCommonAncestorRes) Reset() { *m = HintMostRecentCommonAncestorRes{} } -func (m *HintMostRecentCommonAncestorRes) String() string { return proto.CompactTextString(m) } -func (*HintMostRecentCommonAncestorRes) ProtoMessage() {} -func (*HintMostRecentCommonAncestorRes) Descriptor() ([]byte, []int) { - return fileDescriptor_pdu_e59763dc61674a79, []int{21} -} -func (m *HintMostRecentCommonAncestorRes) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_HintMostRecentCommonAncestorRes.Unmarshal(m, b) -} -func (m *HintMostRecentCommonAncestorRes) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_HintMostRecentCommonAncestorRes.Marshal(b, m, deterministic) -} -func (dst *HintMostRecentCommonAncestorRes) XXX_Merge(src proto.Message) { - xxx_messageInfo_HintMostRecentCommonAncestorRes.Merge(dst, src) -} -func (m *HintMostRecentCommonAncestorRes) XXX_Size() int { - return xxx_messageInfo_HintMostRecentCommonAncestorRes.Size(m) -} -func (m *HintMostRecentCommonAncestorRes) XXX_DiscardUnknown() { - xxx_messageInfo_HintMostRecentCommonAncestorRes.DiscardUnknown(m) -} - -var xxx_messageInfo_HintMostRecentCommonAncestorRes proto.InternalMessageInfo - func init() { proto.RegisterType((*ListFilesystemReq)(nil), "ListFilesystemReq") proto.RegisterType((*ListFilesystemRes)(nil), "ListFilesystemRes") @@ -1182,8 +1097,6 @@ func init() { proto.RegisterType((*ReplicationCursorRes)(nil), "ReplicationCursorRes") proto.RegisterType((*PingReq)(nil), "PingReq") proto.RegisterType((*PingRes)(nil), "PingRes") - proto.RegisterType((*HintMostRecentCommonAncestorReq)(nil), "HintMostRecentCommonAncestorReq") - proto.RegisterType((*HintMostRecentCommonAncestorRes)(nil), "HintMostRecentCommonAncestorRes") proto.RegisterEnum("Tri", Tri_name, Tri_value) proto.RegisterEnum("FilesystemVersion_VersionType", FilesystemVersion_VersionType_name, FilesystemVersion_VersionType_value) } @@ -1206,7 +1119,6 @@ type ReplicationClient interface { DestroySnapshots(ctx context.Context, in *DestroySnapshotsReq, opts ...grpc.CallOption) (*DestroySnapshotsRes, error) ReplicationCursor(ctx context.Context, in *ReplicationCursorReq, opts ...grpc.CallOption) (*ReplicationCursorRes, error) SendCompleted(ctx context.Context, in *SendCompletedReq, opts ...grpc.CallOption) (*SendCompletedRes, error) - HintMostRecentCommonAncestor(ctx context.Context, in *HintMostRecentCommonAncestorReq, opts ...grpc.CallOption) (*HintMostRecentCommonAncestorRes, error) } type replicationClient struct { @@ -1271,15 +1183,6 @@ func (c *replicationClient) SendCompleted(ctx context.Context, in *SendCompleted return out, nil } -func (c *replicationClient) HintMostRecentCommonAncestor(ctx context.Context, in *HintMostRecentCommonAncestorReq, opts ...grpc.CallOption) (*HintMostRecentCommonAncestorRes, error) { - out := new(HintMostRecentCommonAncestorRes) - err := c.cc.Invoke(ctx, "/Replication/HintMostRecentCommonAncestor", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - // ReplicationServer is the server API for Replication service. type ReplicationServer interface { Ping(context.Context, *PingReq) (*PingRes, error) @@ -1288,7 +1191,6 @@ type ReplicationServer interface { DestroySnapshots(context.Context, *DestroySnapshotsReq) (*DestroySnapshotsRes, error) ReplicationCursor(context.Context, *ReplicationCursorReq) (*ReplicationCursorRes, error) SendCompleted(context.Context, *SendCompletedReq) (*SendCompletedRes, error) - HintMostRecentCommonAncestor(context.Context, *HintMostRecentCommonAncestorReq) (*HintMostRecentCommonAncestorRes, error) } func RegisterReplicationServer(s *grpc.Server, srv ReplicationServer) { @@ -1403,24 +1305,6 @@ func _Replication_SendCompleted_Handler(srv interface{}, ctx context.Context, de return interceptor(ctx, in, info, handler) } -func _Replication_HintMostRecentCommonAncestor_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(HintMostRecentCommonAncestorReq) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ReplicationServer).HintMostRecentCommonAncestor(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/Replication/HintMostRecentCommonAncestor", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ReplicationServer).HintMostRecentCommonAncestor(ctx, req.(*HintMostRecentCommonAncestorReq)) - } - return interceptor(ctx, in, info, handler) -} - var _Replication_serviceDesc = grpc.ServiceDesc{ ServiceName: "Replication", HandlerType: (*ReplicationServer)(nil), @@ -1449,73 +1333,66 @@ var _Replication_serviceDesc = grpc.ServiceDesc{ MethodName: "SendCompleted", Handler: _Replication_SendCompleted_Handler, }, - { - MethodName: "HintMostRecentCommonAncestor", - Handler: _Replication_HintMostRecentCommonAncestor_Handler, - }, }, Streams: []grpc.StreamDesc{}, Metadata: "pdu.proto", } -func init() { proto.RegisterFile("pdu.proto", fileDescriptor_pdu_e59763dc61674a79) } +func init() { proto.RegisterFile("pdu.proto", fileDescriptor_pdu_483c6918b7b3d747) } -var fileDescriptor_pdu_e59763dc61674a79 = []byte{ - // 892 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x56, 0xdf, 0x6f, 0xdb, 0x36, - 0x10, 0x8e, 0x6c, 0x39, 0x91, 0xcf, 0xe9, 0xea, 0x5c, 0xb2, 0x42, 0x13, 0xba, 0xce, 0xe3, 0x86, - 0xc1, 0x0d, 0x30, 0x61, 0xc8, 0x7e, 0x60, 0xc3, 0x80, 0x02, 0x8d, 0x93, 0x34, 0xc5, 0xd6, 0xce, - 0x60, 0xbc, 0x62, 0xe8, 0x9b, 0x6a, 0x1f, 0x12, 0x21, 0xb2, 0xa8, 0x90, 0xf4, 0x50, 0x6f, 0x7b, - 0xda, 0xe3, 0xfe, 0xbd, 0xe5, 0x0f, 0x2a, 0x44, 0x4b, 0xb6, 0x6c, 0xc9, 0x89, 0x9f, 0xcc, 0xfb, - 0x78, 0x14, 0xef, 0xbe, 0xfb, 0xee, 0x68, 0x68, 0x26, 0xa3, 0x89, 0x9f, 0x48, 0xa1, 0x05, 0xdb, - 0x87, 0xbd, 0x5f, 0x43, 0xa5, 0xcf, 0xc2, 0x88, 0xd4, 0x54, 0x69, 0x1a, 0x73, 0xba, 0x61, 0xc7, - 0x65, 0x50, 0xe1, 0xd7, 0xd0, 0x5a, 0x00, 0xca, 0xb5, 0x3a, 0xf5, 0x6e, 0xeb, 0xa8, 0xe5, 0x17, - 0x9c, 0x8a, 0xfb, 0xec, 0x3f, 0x0b, 0x60, 0x61, 0x23, 0x82, 0xdd, 0x0f, 0xf4, 0x95, 0x6b, 0x75, - 0xac, 0x6e, 0x93, 0x9b, 0x35, 0x76, 0xa0, 0xc5, 0x49, 0x4d, 0xc6, 0x34, 0x10, 0xd7, 0x14, 0xbb, - 0x35, 0xb3, 0x55, 0x84, 0xf0, 0x4b, 0x78, 0xf0, 0x52, 0xf5, 0xa3, 0x60, 0x48, 0x57, 0x22, 0x1a, - 0x91, 0x74, 0xeb, 0x1d, 0xab, 0xeb, 0xf0, 0x65, 0x30, 0xfd, 0xce, 0x4b, 0x75, 0x1a, 0x0f, 0xe5, - 0x34, 0xd1, 0x34, 0x72, 0x6d, 0xe3, 0x53, 0x84, 0xd8, 0xcf, 0xf0, 0xc9, 0x72, 0x42, 0x6f, 0x48, - 0xaa, 0x50, 0xc4, 0x8a, 0xd3, 0x0d, 0x3e, 0x29, 0x06, 0x9a, 0x05, 0x58, 0x40, 0xd8, 0x2f, 0xeb, - 0x0f, 0x2b, 0xf4, 0xc1, 0xc9, 0xcd, 0x8c, 0x12, 0xf4, 0x4b, 0x9e, 0x7c, 0xee, 0xc3, 0x6e, 0x2d, - 0xd8, 0x2b, 0xed, 0xe3, 0x11, 0xd8, 0x83, 0x69, 0x42, 0xe6, 0xf2, 0x8f, 0x8e, 0x9e, 0x94, 0xbf, - 0xe0, 0x67, 0xbf, 0xa9, 0x17, 0x37, 0xbe, 0x29, 0xa3, 0xaf, 0x83, 0x31, 0x65, 0xb4, 0x99, 0x75, - 0x8a, 0xbd, 0x98, 0x84, 0x23, 0x43, 0x93, 0xcd, 0xcd, 0x1a, 0x1f, 0x43, 0xb3, 0x27, 0x29, 0xd0, - 0x34, 0xf8, 0xe3, 0x85, 0xe1, 0xc6, 0xe6, 0x0b, 0x00, 0x3d, 0x70, 0x8c, 0x11, 0x8a, 0xd8, 0x6d, - 0x98, 0x2f, 0xcd, 0x6d, 0xf6, 0x14, 0x5a, 0x85, 0x6b, 0x71, 0x17, 0x9c, 0x8b, 0x38, 0x48, 0xd4, - 0x95, 0xd0, 0xed, 0xad, 0xd4, 0x3a, 0x16, 0xe2, 0x7a, 0x1c, 0xc8, 0xeb, 0xb6, 0xc5, 0xfe, 0xb7, - 0x60, 0xe7, 0x82, 0xe2, 0xd1, 0x06, 0x7c, 0xe2, 0x57, 0x60, 0x9f, 0x49, 0x31, 0x36, 0x81, 0x57, - 0xd3, 0x65, 0xf6, 0x91, 0x41, 0x6d, 0x20, 0x4c, 0x2a, 0xd5, 0x5e, 0xb5, 0x81, 0x58, 0x95, 0x90, - 0x5d, 0x96, 0x10, 0x83, 0xe6, 0x42, 0x1a, 0x0d, 0xc3, 0xaf, 0xed, 0x0f, 0x64, 0xc8, 0x17, 0x30, - 0x3e, 0x82, 0xed, 0x13, 0x39, 0xe5, 0x93, 0xd8, 0xdd, 0x36, 0xda, 0xc9, 0x2c, 0xf6, 0x1d, 0x38, - 0x7d, 0x29, 0x12, 0x92, 0x7a, 0x3a, 0xa7, 0xdb, 0x2a, 0xd0, 0x7d, 0x00, 0x8d, 0x37, 0x41, 0x34, - 0xc9, 0x6b, 0x30, 0x33, 0xd8, 0xbf, 0x73, 0x2e, 0x14, 0x76, 0xe1, 0xe1, 0xef, 0x8a, 0x46, 0xab, - 0x32, 0x77, 0xf8, 0x2a, 0x8c, 0x0c, 0x76, 0x4f, 0xdf, 0x27, 0x34, 0xd4, 0x34, 0xba, 0x08, 0xff, - 0x22, 0x93, 0x77, 0x9d, 0x2f, 0x61, 0xf8, 0x14, 0x20, 0x8b, 0x27, 0x24, 0xe5, 0xda, 0x46, 0x6e, - 0x4d, 0x3f, 0x0f, 0x91, 0x17, 0x36, 0xd9, 0x33, 0x68, 0xa7, 0x31, 0xf4, 0xc4, 0x38, 0x89, 0x48, - 0x93, 0x29, 0xcc, 0x21, 0xb4, 0x7e, 0x93, 0xe1, 0x65, 0x18, 0x07, 0x11, 0xa7, 0x9b, 0x8c, 0x7f, - 0xc7, 0xcf, 0xea, 0xc6, 0x8b, 0x9b, 0x0c, 0x4b, 0xe7, 0x15, 0xfb, 0x07, 0x80, 0xd3, 0x90, 0xc2, - 0x3f, 0x69, 0x93, 0x32, 0xcf, 0xca, 0x57, 0xbb, 0xb3, 0x7c, 0x87, 0xd0, 0xee, 0x45, 0x14, 0xc8, - 0x22, 0x3f, 0xb3, 0x16, 0x2f, 0xe1, 0x6c, 0xb7, 0x70, 0xbb, 0x62, 0x97, 0xb0, 0x7f, 0x42, 0x4a, - 0x4b, 0x31, 0xcd, 0x35, 0xb9, 0x49, 0x2f, 0xe3, 0x37, 0xd0, 0x9c, 0xfb, 0xbb, 0xb5, 0xb5, 0xfd, - 0xba, 0x70, 0x62, 0x6f, 0x01, 0x57, 0x2e, 0xca, 0xda, 0x3e, 0x37, 0xcd, 0x2d, 0x6b, 0xda, 0x3e, - 0xf7, 0x49, 0x95, 0x72, 0x2a, 0xa5, 0x90, 0xb9, 0x52, 0x8c, 0xc1, 0x4e, 0xaa, 0x92, 0x48, 0x27, - 0xed, 0x4e, 0x9a, 0x78, 0xa4, 0xf3, 0x91, 0xb2, 0xef, 0x97, 0x43, 0xe0, 0xb9, 0x0f, 0xfb, 0x01, - 0x0e, 0x38, 0x25, 0x51, 0x38, 0x34, 0x5d, 0xdb, 0x9b, 0x48, 0x25, 0xe4, 0x26, 0x73, 0x6d, 0x50, - 0x79, 0x4e, 0xe1, 0x41, 0x36, 0x44, 0xd2, 0x13, 0xf6, 0xf9, 0xd6, 0x7c, 0x8c, 0x38, 0xaf, 0x85, - 0xa6, 0xf7, 0xa1, 0xd2, 0x33, 0x09, 0x9f, 0x6f, 0xf1, 0x39, 0x72, 0xec, 0xc0, 0xf6, 0x2c, 0x1c, - 0xf6, 0x05, 0xec, 0xf4, 0xc3, 0xf8, 0x32, 0x0d, 0xc0, 0x85, 0x9d, 0x57, 0xa4, 0x54, 0x70, 0x99, - 0x77, 0x4d, 0x6e, 0xb2, 0x4f, 0x73, 0x27, 0x95, 0xf6, 0xd5, 0xe9, 0xf0, 0x4a, 0xe4, 0x7d, 0x95, - 0xae, 0xd9, 0xdf, 0xf0, 0xd9, 0x79, 0x18, 0xeb, 0x57, 0x42, 0xe9, 0xb4, 0xe4, 0xb1, 0xee, 0x89, - 0xf1, 0x58, 0xc4, 0xcf, 0xe3, 0x21, 0x29, 0xbd, 0x51, 0x72, 0xf8, 0x23, 0x3c, 0x48, 0xf5, 0x4b, - 0x32, 0xab, 0xc5, 0x1d, 0x42, 0x5c, 0x76, 0x64, 0x9f, 0xdf, 0x77, 0xb9, 0x3a, 0xec, 0x42, 0x7d, - 0x20, 0xc3, 0x74, 0x04, 0x9e, 0x88, 0x58, 0xf7, 0x02, 0x49, 0xed, 0x2d, 0x6c, 0x42, 0xe3, 0x2c, - 0x88, 0x14, 0xb5, 0x2d, 0x74, 0xc0, 0x1e, 0xc8, 0x09, 0xb5, 0x6b, 0x47, 0xb7, 0xf5, 0x74, 0x40, - 0xcd, 0x49, 0x46, 0x0f, 0xec, 0x34, 0x71, 0x74, 0xfc, 0x8c, 0x24, 0x2f, 0x5f, 0x29, 0xfc, 0x09, - 0x1e, 0x2e, 0xbf, 0x33, 0x0a, 0xd1, 0x2f, 0x3d, 0xce, 0x5e, 0x19, 0x53, 0xd8, 0x87, 0x47, 0xd5, - 0x4f, 0x14, 0x7a, 0xfe, 0xda, 0x87, 0xcf, 0x5b, 0xbf, 0xa7, 0xf0, 0x19, 0xb4, 0x57, 0xa5, 0x89, - 0x07, 0x7e, 0x45, 0xcb, 0x79, 0x55, 0xa8, 0xc2, 0xe7, 0xb0, 0x57, 0x12, 0x17, 0x7e, 0xec, 0x57, - 0x09, 0xd5, 0xab, 0x84, 0x15, 0x7e, 0x3f, 0x2b, 0xe1, 0x7c, 0x04, 0xe1, 0x9e, 0xbf, 0x3a, 0xd2, - 0xbc, 0x12, 0xa4, 0xf0, 0x1d, 0x3c, 0xbe, 0xab, 0x7e, 0xd8, 0xf1, 0xef, 0xd1, 0x96, 0x77, 0x9f, - 0x87, 0x3a, 0x6e, 0xbc, 0xad, 0x27, 0xa3, 0xc9, 0xbb, 0x6d, 0xf3, 0x1f, 0xea, 0xdb, 0x0f, 0x01, - 0x00, 0x00, 0xff, 0xff, 0xad, 0x4e, 0x98, 0x29, 0x50, 0x09, 0x00, 0x00, +var fileDescriptor_pdu_483c6918b7b3d747 = []byte{ + // 833 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x56, 0x5f, 0x6f, 0xe3, 0x44, + 0x10, 0xaf, 0x13, 0xa7, 0x75, 0x26, 0x3d, 0x2e, 0x9d, 0x96, 0x93, 0xb1, 0xe0, 0x54, 0x2d, 0x08, + 0xe5, 0x2a, 0x61, 0xa1, 0xf2, 0x47, 0x42, 0x48, 0x27, 0xd1, 0xb4, 0xbd, 0x3b, 0x01, 0x47, 0xb4, + 0x35, 0x27, 0x74, 0x6f, 0x26, 0x19, 0xb5, 0x56, 0x1d, 0xaf, 0xbb, 0xe3, 0xa0, 0x0b, 0xe2, 0x89, + 0x47, 0xbe, 0x1e, 0x7c, 0x10, 0x3e, 0x02, 0xf2, 0xc6, 0x4e, 0x9c, 0xd8, 0x41, 0x79, 0xca, 0xce, + 0x6f, 0x66, 0x77, 0x67, 0x7f, 0xf3, 0x9b, 0x71, 0xa0, 0x9b, 0x4e, 0x66, 0x7e, 0xaa, 0x55, 0xa6, + 0xc4, 0x31, 0x1c, 0xfd, 0x10, 0x71, 0x76, 0x1d, 0xc5, 0xc4, 0x73, 0xce, 0x68, 0x2a, 0xe9, 0x41, + 0x5c, 0xd4, 0x41, 0xc6, 0xcf, 0xa0, 0xb7, 0x02, 0xd8, 0xb5, 0x4e, 0xdb, 0x83, 0xde, 0x79, 0xcf, + 0xaf, 0x04, 0x55, 0xfd, 0xe2, 0x2f, 0x0b, 0x60, 0x65, 0x23, 0x82, 0x3d, 0x0a, 0xb3, 0x3b, 0xd7, + 0x3a, 0xb5, 0x06, 0x5d, 0x69, 0xd6, 0x78, 0x0a, 0x3d, 0x49, 0x3c, 0x9b, 0x52, 0xa0, 0xee, 0x29, + 0x71, 0x5b, 0xc6, 0x55, 0x85, 0xf0, 0x13, 0x78, 0xf4, 0x8a, 0x47, 0x71, 0x38, 0xa6, 0x3b, 0x15, + 0x4f, 0x48, 0xbb, 0xed, 0x53, 0x6b, 0xe0, 0xc8, 0x75, 0x30, 0x3f, 0xe7, 0x15, 0x5f, 0x25, 0x63, + 0x3d, 0x4f, 0x33, 0x9a, 0xb8, 0xb6, 0x89, 0xa9, 0x42, 0xe2, 0x5b, 0xf8, 0x60, 0xfd, 0x41, 0x6f, + 0x48, 0x73, 0xa4, 0x12, 0x96, 0xf4, 0x80, 0x4f, 0xab, 0x89, 0x16, 0x09, 0x56, 0x10, 0xf1, 0xfd, + 0xf6, 0xcd, 0x8c, 0x3e, 0x38, 0xa5, 0x59, 0x50, 0x82, 0x7e, 0x2d, 0x52, 0x2e, 0x63, 0xc4, 0x3f, + 0x16, 0x1c, 0xd5, 0xfc, 0x78, 0x0e, 0x76, 0x30, 0x4f, 0xc9, 0x5c, 0xfe, 0xde, 0xf9, 0xd3, 0xfa, + 0x09, 0x7e, 0xf1, 0x9b, 0x47, 0x49, 0x13, 0x9b, 0x33, 0xfa, 0x3a, 0x9c, 0x52, 0x41, 0x9b, 0x59, + 0xe7, 0xd8, 0x8b, 0x59, 0x34, 0x31, 0x34, 0xd9, 0xd2, 0xac, 0xf1, 0x43, 0xe8, 0x0e, 0x35, 0x85, + 0x19, 0x05, 0xbf, 0xbc, 0x30, 0xdc, 0xd8, 0x72, 0x05, 0xa0, 0x07, 0x8e, 0x31, 0x22, 0x95, 0xb8, + 0x1d, 0x73, 0xd2, 0xd2, 0x16, 0xcf, 0xa0, 0x57, 0xb9, 0x16, 0x0f, 0xc1, 0xb9, 0x49, 0xc2, 0x94, + 0xef, 0x54, 0xd6, 0xdf, 0xcb, 0xad, 0x0b, 0xa5, 0xee, 0xa7, 0xa1, 0xbe, 0xef, 0x5b, 0xe2, 0x6f, + 0x0b, 0x0e, 0x6e, 0x28, 0x99, 0xec, 0xc0, 0x27, 0x7e, 0x0a, 0xf6, 0xb5, 0x56, 0x53, 0x93, 0x78, + 0x33, 0x5d, 0xc6, 0x8f, 0x02, 0x5a, 0x81, 0x32, 0x4f, 0x69, 0x8e, 0x6a, 0x05, 0x6a, 0x53, 0x42, + 0x76, 0x5d, 0x42, 0x02, 0xba, 0x2b, 0x69, 0x74, 0x0c, 0xbf, 0xb6, 0x1f, 0xe8, 0x48, 0xae, 0x60, + 0x7c, 0x02, 0xfb, 0x97, 0x7a, 0x2e, 0x67, 0x89, 0xbb, 0x6f, 0xb4, 0x53, 0x58, 0xe2, 0x4b, 0x70, + 0x46, 0x5a, 0xa5, 0xa4, 0xb3, 0xf9, 0x92, 0x6e, 0xab, 0x42, 0xf7, 0x09, 0x74, 0xde, 0x84, 0xf1, + 0xac, 0xac, 0xc1, 0xc2, 0x10, 0x7f, 0x2e, 0xb9, 0x60, 0x1c, 0xc0, 0xe3, 0x9f, 0x99, 0x26, 0x9b, + 0x32, 0x77, 0xe4, 0x26, 0x8c, 0x02, 0x0e, 0xaf, 0xde, 0xa5, 0x34, 0xce, 0x68, 0x72, 0x13, 0xfd, + 0x4e, 0xe6, 0xdd, 0x6d, 0xb9, 0x86, 0xe1, 0x33, 0x80, 0x22, 0x9f, 0x88, 0xd8, 0xb5, 0x8d, 0xdc, + 0xba, 0x7e, 0x99, 0xa2, 0xac, 0x38, 0xc5, 0x73, 0xe8, 0xe7, 0x39, 0x0c, 0xd5, 0x34, 0x8d, 0x29, + 0x23, 0x53, 0x98, 0x33, 0xe8, 0xfd, 0xa4, 0xa3, 0xdb, 0x28, 0x09, 0x63, 0x49, 0x0f, 0x05, 0xff, + 0x8e, 0x5f, 0xd4, 0x4d, 0x56, 0x9d, 0x02, 0x6b, 0xfb, 0x59, 0xfc, 0x01, 0x20, 0x69, 0x4c, 0xd1, + 0x6f, 0xb4, 0x4b, 0x99, 0x17, 0xe5, 0x6b, 0xfd, 0x6f, 0xf9, 0xce, 0xa0, 0x3f, 0x8c, 0x29, 0xd4, + 0x55, 0x7e, 0x16, 0x2d, 0x5e, 0xc3, 0xc5, 0x61, 0xe5, 0x76, 0x16, 0xb7, 0x70, 0x7c, 0x49, 0x9c, + 0x69, 0x35, 0x2f, 0x35, 0xb9, 0x4b, 0x2f, 0xe3, 0xe7, 0xd0, 0x5d, 0xc6, 0xbb, 0xad, 0xad, 0xfd, + 0xba, 0x0a, 0x12, 0x6f, 0x01, 0x37, 0x2e, 0x2a, 0xda, 0xbe, 0x34, 0xcd, 0x2d, 0x5b, 0xda, 0xbe, + 0x8c, 0xc9, 0x95, 0x72, 0xa5, 0xb5, 0xd2, 0xa5, 0x52, 0x8c, 0x21, 0x2e, 0x9b, 0x1e, 0x91, 0x4f, + 0xda, 0x83, 0xfc, 0xe1, 0x71, 0x56, 0x8e, 0x94, 0x63, 0xbf, 0x9e, 0x82, 0x2c, 0x63, 0xc4, 0xd7, + 0x70, 0x22, 0x29, 0x8d, 0xa3, 0xb1, 0xe9, 0xda, 0xe1, 0x4c, 0xb3, 0xd2, 0xbb, 0xcc, 0xb5, 0xa0, + 0x71, 0x1f, 0xe3, 0x49, 0x31, 0x44, 0xf2, 0x1d, 0xf6, 0xcb, 0xbd, 0xe5, 0x18, 0x71, 0x5e, 0xab, + 0x8c, 0xde, 0x45, 0x9c, 0x2d, 0x24, 0xfc, 0x72, 0x4f, 0x2e, 0x91, 0x0b, 0x07, 0xf6, 0x17, 0xe9, + 0x88, 0x8f, 0xe1, 0x60, 0x14, 0x25, 0xb7, 0x79, 0x02, 0x2e, 0x1c, 0xfc, 0x48, 0xcc, 0xe1, 0x6d, + 0xd9, 0x35, 0xa5, 0x29, 0x3e, 0x2a, 0x83, 0x38, 0xef, 0xab, 0xab, 0xf1, 0x9d, 0x2a, 0xfb, 0x2a, + 0x5f, 0x9f, 0x0d, 0xa0, 0x1d, 0xe8, 0x28, 0x1f, 0x31, 0x97, 0x2a, 0xc9, 0x86, 0xa1, 0xa6, 0xfe, + 0x1e, 0x76, 0xa1, 0x73, 0x1d, 0xc6, 0x4c, 0x7d, 0x0b, 0x1d, 0xb0, 0x03, 0x3d, 0xa3, 0x7e, 0xeb, + 0xfc, 0xdf, 0x56, 0x3e, 0x00, 0x96, 0x8f, 0x40, 0x0f, 0xec, 0xfc, 0x60, 0x74, 0xfc, 0x22, 0x09, + 0xaf, 0x5c, 0x31, 0x7e, 0x03, 0x8f, 0xd7, 0xe7, 0x38, 0x23, 0xfa, 0xb5, 0x8f, 0x9f, 0x57, 0xc7, + 0x18, 0x47, 0xf0, 0xa4, 0xf9, 0x13, 0x80, 0x9e, 0xbf, 0xf5, 0xc3, 0xe2, 0x6d, 0xf7, 0x31, 0x3e, + 0x87, 0xfe, 0x66, 0xe9, 0xf1, 0xc4, 0x6f, 0x90, 0xb4, 0xd7, 0x84, 0x32, 0x7e, 0x07, 0x47, 0xb5, + 0xe2, 0xe1, 0xfb, 0x7e, 0x93, 0x10, 0xbc, 0x46, 0x98, 0xf1, 0x2b, 0x78, 0xb4, 0xd6, 0xe2, 0x78, + 0xe4, 0x6f, 0x8e, 0x0c, 0xaf, 0x06, 0xf1, 0x45, 0xe7, 0x6d, 0x3b, 0x9d, 0xcc, 0x7e, 0xdd, 0x37, + 0xff, 0x1f, 0xbe, 0xf8, 0x2f, 0x00, 0x00, 0xff, 0xff, 0x27, 0x95, 0xc1, 0x78, 0x4c, 0x08, 0x00, + 0x00, } diff --git a/replication/logic/pdu/pdu.proto b/replication/logic/pdu/pdu.proto index 7acb47f..983504e 100644 --- a/replication/logic/pdu/pdu.proto +++ b/replication/logic/pdu/pdu.proto @@ -9,7 +9,6 @@ service Replication { rpc DestroySnapshots(DestroySnapshotsReq) returns (DestroySnapshotsRes); rpc ReplicationCursor(ReplicationCursorReq) returns (ReplicationCursorRes); rpc SendCompleted(SendCompletedReq) returns (SendCompletedRes); - rpc HintMostRecentCommonAncestor(HintMostRecentCommonAncestorReq) returns (HintMostRecentCommonAncestorRes); // for Send and Recv, see package rpc } @@ -126,19 +125,3 @@ message PingRes { // Echo must be PingReq.Message string Echo = 1; } - -message HintMostRecentCommonAncestorReq { - string Filesystem = 1; - - // A copy of the FilesystemVersion on the sending side that the replication - // algorithm identified as a shared most recent common version between sending - // and receiving side. - // - // If nil, this is an indication that the replication algorithm could not - // find a common ancestor between the two sides. - // NOTE: nilness does not mean that replication never happened - there could - // as well be a replication conflict. thus, dont' jump to conclusions too - // rapidly here. - FilesystemVersion SenderVersion = 2; -} -message HintMostRecentCommonAncestorRes {} \ No newline at end of file diff --git a/replication/logic/replication_logic.go b/replication/logic/replication_logic.go index 3b2548a..3d05a10 100644 --- a/replication/logic/replication_logic.go +++ b/replication/logic/replication_logic.go @@ -33,7 +33,6 @@ type Endpoint interface { ListFilesystemVersions(ctx context.Context, req *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) WaitForConnectivity(ctx context.Context) error - HintMostRecentCommonAncestor(context.Context, *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error) } type Sender interface { @@ -359,43 +358,6 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) { log(ctx).WithField("token", resumeToken).Debug("decode resume token") } - // give both sides a hint about how far prior replication attempts got - // This serves as a cummulative variant of SendCompleted and can be useful - // for example to release stale holds from an earlier (interrupted) replication. - // TODO FIXME: enqueue this as a replication step instead of doing it here during planning - // then again, the step should run regardless of planning success - // so maybe a separate phase before PLANNING, then? - path, conflict := IncrementalPath(rfsvs, sfsvs) - var sender_mrca *pdu.FilesystemVersion - if conflict == nil && len(path) > 0 { - sender_mrca = path[0] // shadow - } - // yes, sender_mrca may be nil, indicating that we do not have an mrca - { - var wg sync.WaitGroup - doHint := func(ep Endpoint, name string) { - defer wg.Done() - ctx, endTask := trace.WithTask(ctx, "hint-mrca-"+name) - defer endTask() - - log := log(ctx).WithField("to_side", name). - WithField("sender_mrca", sender_mrca.String()) - log.Debug("hint most recent common ancestor") - hint := &pdu.HintMostRecentCommonAncestorReq{ - Filesystem: fs.Path, - SenderVersion: sender_mrca, - } - _, err := ep.HintMostRecentCommonAncestor(ctx, hint) - if err != nil { - log.WithError(err).Error("error hinting most recent common ancestor") - } - } - wg.Add(2) - go doHint(fs.sender, "sender") - go doHint(fs.receiver, "receiver") - wg.Wait() - } - var steps []*Step // build the list of replication steps // diff --git a/rpc/rpc_client.go b/rpc/rpc_client.go index e3b76d4..af8cbfd 100644 --- a/rpc/rpc_client.go +++ b/rpc/rpc_client.go @@ -142,13 +142,6 @@ func (c *Client) SendCompleted(ctx context.Context, in *pdu.SendCompletedReq) (* return c.controlClient.SendCompleted(ctx, in) } -func (c *Client) HintMostRecentCommonAncestor(ctx context.Context, in *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error) { - ctx, endSpan := trace.WithSpan(ctx, "rpc.client.HintMostRecentCommonAncestor") - defer endSpan() - - return c.controlClient.HintMostRecentCommonAncestor(ctx, in) -} - func (c *Client) WaitForConnectivity(ctx context.Context) error { ctx, endSpan := trace.WithSpan(ctx, "rpc.client.WaitForConnectivity") defer endSpan() diff --git a/rpc/versionhandshake/versionhandshake.go b/rpc/versionhandshake/versionhandshake.go index ee51d80..222881d 100644 --- a/rpc/versionhandshake/versionhandshake.go +++ b/rpc/versionhandshake/versionhandshake.go @@ -152,7 +152,7 @@ func (m *HandshakeMessage) DecodeReader(r io.Reader, maxLen int) error { func DoHandshakeCurrentVersion(conn net.Conn, deadline time.Time) *HandshakeError { // current protocol version is hardcoded here - return DoHandshakeVersion(conn, deadline, 3) + return DoHandshakeVersion(conn, deadline, 4) } const HandshakeMessageMaxLen = 16 * 4096 diff --git a/zfs/versions.go b/zfs/versions.go index bcd5cec..20955e0 100644 --- a/zfs/versions.go +++ b/zfs/versions.go @@ -116,6 +116,13 @@ func (v FilesystemVersion) RelName() string { } func (v FilesystemVersion) String() string { return v.RelName() } +// Only takes into account those attributes of FilesystemVersion that +// are immutable over time in ZFS. +func FilesystemVersionEqualIdentity(a, b FilesystemVersion) bool { + // .Name is mutable + return a.Guid == b.Guid && a.CreateTXG == b.CreateTXG && a.Creation == b.Creation +} + func (v FilesystemVersion) ToAbsPath(p *DatasetPath) string { var b bytes.Buffer b.WriteString(p.ToString()) diff --git a/zfs/zfs.go b/zfs/zfs.go index d2297be..be7dcea 100644 --- a/zfs/zfs.go +++ b/zfs/zfs.go @@ -1595,22 +1595,32 @@ var ErrBookmarkCloningNotSupported = fmt.Errorf("bookmark cloning feature is not // // does not destroy an existing bookmark, returns // -func ZFSBookmark(ctx context.Context, fs string, v ZFSSendArgVersion, bookmark string) (err error) { +func ZFSBookmark(ctx context.Context, fs string, v FilesystemVersion, bookmark string) (bm FilesystemVersion, err error) { + + bm = FilesystemVersion{ + Type: Bookmark, + Name: bookmark, + UserRefs: OptionUint64{Valid: false}, + // bookmarks have the same createtxg, guid and creation as their origin + CreateTXG: v.CreateTXG, + Guid: v.Guid, + Creation: v.Creation, + } promTimer := prometheus.NewTimer(prom.ZFSBookmarkDuration.WithLabelValues(fs)) defer promTimer.ObserveDuration() if !v.IsSnapshot() { - return ErrBookmarkCloningNotSupported // TODO This is work in progress: https://github.com/zfsonlinux/zfs/pull/9571 + return bm, ErrBookmarkCloningNotSupported // TODO This is work in progress: https://github.com/zfsonlinux/zfs/pull/9571 } snapname := v.FullPath(fs) if err := EntityNamecheck(snapname, EntityTypeSnapshot); err != nil { - return err + return bm, err } bookmarkname := fmt.Sprintf("%s#%s", fs, bookmark) if err := EntityNamecheck(bookmarkname, EntityTypeBookmark); err != nil { - return err + return bm, err } debug("bookmark: %q %q", snapname, bookmarkname) @@ -1619,27 +1629,27 @@ func ZFSBookmark(ctx context.Context, fs string, v ZFSSendArgVersion, bookmark s stdio, err := cmd.CombinedOutput() if err != nil { if ddne := tryDatasetDoesNotExist(snapname, stdio); ddne != nil { - return ddne + return bm, ddne } else if zfsBookmarkExistsRegex.Match(stdio) { // check if this was idempotent bookGuid, err := ZFSGetGUID(ctx, fs, "#"+bookmark) if err != nil { - return errors.Wrap(err, "bookmark idempotency check") // guid error expressive enough + return bm, errors.Wrap(err, "bookmark idempotency check") // guid error expressive enough } - if v.GUID == bookGuid { - debug("bookmark: %q %q was idempotent: {snap,book}guid %d == %d", snapname, bookmarkname, v.GUID, bookGuid) - return nil + if v.Guid == bookGuid { + debug("bookmark: %q %q was idempotent: {snap,book}guid %d == %d", snapname, bookmarkname, v.Guid, bookGuid) + return bm, nil } - return &BookmarkExists{ - fs: fs, bookmarkOrigin: v, bookmark: bookmark, + return bm, &BookmarkExists{ + fs: fs, bookmarkOrigin: v.ToSendArgVersion(), bookmark: bookmark, zfsMsg: string(stdio), bookGuid: bookGuid, } } else { - return &ZFSError{ + return bm, &ZFSError{ Stderr: stdio, WaitErr: err, } @@ -1647,8 +1657,7 @@ func ZFSBookmark(ctx context.Context, fs string, v ZFSSendArgVersion, bookmark s } - return nil - + return bm, nil } func ZFSRollback(ctx context.Context, fs *DatasetPath, snapshot FilesystemVersion, rollbackArgs ...string) (err error) {