[#316] endpoint / replication protocol: more robust step-holds and replication cursor management

- drop HintMostRecentCommonAncestor rpc call
    - it is wrong to put faith into the active side of the replication to always make that call
      (we might not trust it, ref pull setup)
- clean up step holds + step bookmarks + replication cursor bookmarks on
  send RPC instead
    - this makes it symmetric with Receive RPC
- use a cache (endpoint.sendAbstractionsCache) to avoid the cost of
  listing the on-disk endpoint abstractions state on every step

The "create" methods for endpoint abstractions (CreateReplicationCursor, HoldStep) are now fully
idempotent and return an Abstraction.

Notes about endpoint.sendAbstractionsCache:
- fills lazily from disk state on first `Get` operation
- fill from disk is generally only attempted once
    - unless the `ListAbstractions` fails, in which case the fill from
      disk is retried on next `Get` (the current `Get` will observe a
      subset of the actual on-disk abstractions)
    - the `Invalidate` method is called
- it is a global (zrepl process-wide) cache

fixes #316
This commit is contained in:
Christian Schwarz 2020-05-10 15:06:44 +02:00
parent dce98d50da
commit 292b85b5ef
19 changed files with 513 additions and 494 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/job"
@ -95,6 +96,7 @@ func Run(ctx context.Context, conf *config.Config) error {
// register global (=non job-local) metrics
zfscmd.RegisterMetrics(prometheus.DefaultRegisterer)
trace.RegisterMetrics(prometheus.DefaultRegisterer)
endpoint.RegisterMetrics(prometheus.DefaultRegisterer)
log.Info("starting daemon")

View File

@ -376,9 +376,21 @@ func (j *ActiveSide) SenderConfig() *endpoint.SenderConfig {
return push.senderConfig
}
// The active side of a replication uses one end (sender or receiver)
// directly by method invocation, without going through a transport that
// provides a client identity.
// However, in order to avoid the need to distinguish between direct-method-invocating
// clients and RPC client, we use an invalid client identity as a sentinel value.
func FakeActiveSideDirectMethodInvocationClientIdentity(jobId endpoint.JobID) string {
return fmt.Sprintf("<local><active><job><client><identity><job=%q>", jobId.String())
}
func (j *ActiveSide) Run(ctx context.Context) {
ctx, endTask := trace.WithTaskAndSpan(ctx, "active-side-job", j.Name())
defer endTask()
ctx = context.WithValue(ctx, endpoint.ClientIdentityKey, FakeActiveSideDirectMethodInvocationClientIdentity(j.name))
log := GetLogger(ctx)
defer log.Info("job exiting")

19
daemon/job/active_test.go Normal file
View File

@ -0,0 +1,19 @@
package job
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/transport"
)
func TestFakeActiveSideDirectMethodInvocationClientIdentityDoesNotPassValidityTest(t *testing.T) {
jobid, err := endpoint.MakeJobID("validjobname")
require.NoError(t, err)
clientIdentity := FakeActiveSideDirectMethodInvocationClientIdentity(jobid)
t.Logf("%v", clientIdentity)
err = transport.ValidateClientIdentity(clientIdentity)
assert.Error(t, err)
}

View File

@ -7,8 +7,8 @@ import (
"fmt"
"io"
"path"
"sync"
"github.com/kr/pretty"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/daemon/logging/trace"
@ -117,119 +117,8 @@ func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesyst
}
func (p *Sender) HintMostRecentCommonAncestor(ctx context.Context, r *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
fsp, err := p.filterCheckFS(r.GetFilesystem())
if err != nil {
return nil, err
}
fs := fsp.ToString()
log := getLogger(ctx).WithField("fs", fs).WithField("hinted_most_recent", fmt.Sprintf("%#v", r.GetSenderVersion()))
log.WithField("full_hint", r).Debug("full hint")
if r.GetSenderVersion() == nil {
// no common ancestor found, likely due to failed prior replication attempt
// => release stale step holds to prevent them from accumulating
// (they can accumulate on initial replication because each inital replication step might hold a different `to`)
// => replication cursors cannot accumulate because we always _move_ the replication cursor
log.Debug("releasing all step holds on the filesystem")
TryReleaseStepStaleFS(ctx, fs, p.jobId)
return &pdu.HintMostRecentCommonAncestorRes{}, nil
}
// we were hinted a specific common ancestor
mostRecentVersion, err := sendArgsFromPDUAndValidateExistsAndGetVersion(ctx, fs, r.GetSenderVersion())
if err != nil {
msg := "HintMostRecentCommonAncestor rpc with nonexistent most recent version"
log.Warn(msg)
return nil, errors.Wrap(err, msg)
}
// move replication cursor to this position
destroyedCursors, err := MoveReplicationCursor(ctx, fs, mostRecentVersion, p.jobId)
if err == zfs.ErrBookmarkCloningNotSupported {
log.Debug("not creating replication cursor from bookmark because ZFS does not support it")
// fallthrough
} else if err != nil {
return nil, errors.Wrap(err, "cannot set replication cursor to hinted version")
}
// take care of stale step holds
log.WithField("step-holds-cleanup-mode", senderHintMostRecentCommonAncestorStepCleanupMode).
Debug("taking care of possibly stale step holds")
doStepCleanup := false
var stepCleanupSince *CreateTXGRangeBound
switch senderHintMostRecentCommonAncestorStepCleanupMode {
case StepCleanupNoCleanup:
doStepCleanup = false
case StepCleanupRangeSinceUnbounded:
doStepCleanup = true
stepCleanupSince = nil
case StepCleanupRangeSinceReplicationCursor:
doStepCleanup = true
// Use the destroyed replication cursors as indicator how far the previous replication got.
// To be precise: We limit the amount of visisted snapshots to exactly those snapshots
// created since the last successful replication cursor movement (i.e. last successful replication step)
//
// If we crash now, we'll leak the step we are about to release, but the performance gain
// of limiting the amount of snapshots we visit makes up for that.
// Users have the `zrepl holds release-stale` command to cleanup leaked step holds.
for _, destroyed := range destroyedCursors {
if stepCleanupSince == nil {
stepCleanupSince = &CreateTXGRangeBound{
CreateTXG: destroyed.GetCreateTXG(),
Inclusive: &zfs.NilBool{B: true},
}
} else if destroyed.GetCreateTXG() < stepCleanupSince.CreateTXG {
stepCleanupSince.CreateTXG = destroyed.GetCreateTXG()
}
}
default:
panic(senderHintMostRecentCommonAncestorStepCleanupMode)
}
if !doStepCleanup {
log.Info("skipping cleanup of prior invocations' step holds due to environment variable setting")
} else {
if err := ReleaseStepCummulativeInclusive(ctx, fs, stepCleanupSince, mostRecentVersion, p.jobId); err != nil {
return nil, errors.Wrap(err, "cannot cleanup prior invocation's step holds and bookmarks")
} else {
log.Info("step hold cleanup done")
}
}
return &pdu.HintMostRecentCommonAncestorRes{}, nil
}
type HintMostRecentCommonAncestorStepCleanupMode struct{ string }
var (
StepCleanupRangeSinceReplicationCursor = HintMostRecentCommonAncestorStepCleanupMode{"range-since-replication-cursor"}
StepCleanupRangeSinceUnbounded = HintMostRecentCommonAncestorStepCleanupMode{"range-since-unbounded"}
StepCleanupNoCleanup = HintMostRecentCommonAncestorStepCleanupMode{"no-cleanup"}
)
func (m HintMostRecentCommonAncestorStepCleanupMode) String() string { return string(m.string) }
func (m *HintMostRecentCommonAncestorStepCleanupMode) Set(s string) error {
switch s {
case StepCleanupRangeSinceReplicationCursor.String():
*m = StepCleanupRangeSinceReplicationCursor
case StepCleanupRangeSinceUnbounded.String():
*m = StepCleanupRangeSinceUnbounded
case StepCleanupNoCleanup.String():
*m = StepCleanupNoCleanup
default:
return fmt.Errorf("unknown step cleanup mode %q", s)
}
return nil
}
var senderHintMostRecentCommonAncestorStepCleanupMode = *envconst.Var("ZREPL_ENDPOINT_SENDER_HINT_MOST_RECENT_STEP_HOLD_CLEANUP_MODE", &StepCleanupRangeSinceReplicationCursor).(*HintMostRecentCommonAncestorStepCleanupMode)
var maxConcurrentZFSSendSemaphore = semaphore.New(envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_SEND", 10))
var maxConcurrentZFSSend = envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_SEND", 10)
var maxConcurrentZFSSendSemaphore = semaphore.New(maxConcurrentZFSSend)
func uncheckedSendArgsFromPDU(fsv *pdu.FilesystemVersion) *zfs.ZFSSendArgVersion {
if fsv == nil {
@ -319,10 +208,11 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
return res, nil, nil
}
// update replication cursor
// create a replication cursor for `From` (usually an idempotent no-op because SendCompleted already created it before)
var fromReplicationCursor Abstraction
if sendArgs.From != nil {
// For all but the first replication, this should always be a no-op because SendCompleted already moved the cursor
_, err = MoveReplicationCursor(ctx, sendArgs.FS, sendArgs.FromVersion, s.jobId)
fromReplicationCursor, err = CreateReplicationCursor(ctx, sendArgs.FS, *sendArgs.FromVersion, s.jobId) // no shadow
if err == zfs.ErrBookmarkCloningNotSupported {
getLogger(ctx).Debug("not creating replication cursor from bookmark because ZFS does not support it")
// fallthrough
@ -331,9 +221,10 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
}
}
var fromHold, toHold Abstraction
// make sure `From` doesn't go away in order to make this step resumable
if sendArgs.From != nil {
_, err := HoldStep(ctx, sendArgs.FS, *sendArgs.FromVersion, s.jobId)
fromHold, err = HoldStep(ctx, sendArgs.FS, *sendArgs.FromVersion, s.jobId) // no shadow
if err == zfs.ErrBookmarkCloningNotSupported {
getLogger(ctx).Debug("not creating step bookmark because ZFS does not support it")
// fallthrough
@ -342,17 +233,71 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
}
}
// make sure `To` doesn't go away in order to make this step resumable
_, err = HoldStep(ctx, sendArgs.FS, sendArgs.ToVersion, s.jobId)
toHold, err = HoldStep(ctx, sendArgs.FS, sendArgs.ToVersion, s.jobId)
if err != nil {
return nil, nil, errors.Wrapf(err, "cannot hold `to` version %q before starting send", sendArgs.ToVersion)
}
// step holds & replication cursor released / moved forward in s.SendCompleted => s.moveCursorAndReleaseSendHolds
// cleanup the mess that _this function_ might have created in prior failed attempts:
//
// In summary, we delete every endpoint ZFS abstraction created on this filesystem for this job id,
// except for the ones we just created above.
//
// This is the most robust approach to avoid leaking (= forgetting to clean up) endpoint ZFS abstractions,
// all under the assumption that there will only ever be one send for a (jobId,fs) combination at any given time.
//
// Note that the SendCompleted rpc can't be relied upon for this purpose:
// - it might be lost due to network errors,
// - or never be sent by a potentially malicious or buggy client,
// - or never be send because the replication step failed at some point
// (potentially leaving a resumable state on the receiver, which is the case where we really do not want to blow away the step holds too soon.)
//
// Note further that a resuming send, due to the idempotent nature of func CreateReplicationCursor and HoldStep,
// will never lose its step holds because we just (idempotently re-)created them above, before attempting the cleanup.
func() {
ctx, endSpan := trace.WithSpan(ctx, "cleanup-stale-abstractions")
defer endSpan()
liveAbs := []Abstraction{fromHold, toHold, fromReplicationCursor}
keep := func(a Abstraction) (keep bool) {
keep = false
for _, k := range liveAbs {
keep = keep || AbstractionEquals(a, k)
}
return keep
}
check := func(obsoleteAbs []Abstraction) {
// last line of defense: check that we don't destroy the incremental `from` and `to`
// if we did that, we might be about to blow away the last common filesystem version between sender and receiver
mustLiveVersions := []zfs.FilesystemVersion{sendArgs.ToVersion}
if sendArgs.FromVersion != nil {
mustLiveVersions = append(mustLiveVersions, *sendArgs.FromVersion)
}
for _, staleVersion := range obsoleteAbs {
for _, mustLiveVersion := range mustLiveVersions {
if zfs.FilesystemVersionEqualIdentity(mustLiveVersion, staleVersion.GetFilesystemVersion()) {
panic(fmt.Sprintf("impl error: %q would be destroyed because it is considered stale but it is part of of sendArgs=%s", mustLiveVersion.String(), pretty.Sprint(sendArgs)))
}
}
}
}
sendAbstractionsCacheSingleton.TryBatchDestroy(ctx, s.jobId, sendArgs.FS, keep, check)
}()
if fromHold != nil {
sendAbstractionsCacheSingleton.Put(fromHold)
}
sendAbstractionsCacheSingleton.Put(toHold)
if fromReplicationCursor != nil {
sendAbstractionsCacheSingleton.Put(fromReplicationCursor)
}
sendStream, err := zfs.ZFSSend(ctx, sendArgs)
if err != nil {
// it's ok to not destroy the abstractions we just created here, a new send attempt will take care of it
return nil, nil, errors.Wrap(err, "zfs send failed")
}
return res, sendStream, nil
}
@ -389,8 +334,7 @@ func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*p
return log
}
log(ctx).Debug("move replication cursor to most recent common version")
destroyedCursors, err := MoveReplicationCursor(ctx, fs, to, p.jobId)
toReplicationCursor, err := CreateReplicationCursor(ctx, fs, to, p.jobId)
if err != nil {
if err == zfs.ErrBookmarkCloningNotSupported {
log(ctx).Debug("not setting replication cursor, bookmark cloning not supported")
@ -402,59 +346,17 @@ func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*p
return &pdu.SendCompletedRes{}, err
}
} else {
log(ctx).Info("successfully moved replication cursor")
sendAbstractionsCacheSingleton.Put(toReplicationCursor)
log(ctx).WithField("to_cursor", toReplicationCursor.String()).Info("successfully created `to` replication cursor")
}
// kick off releasing of step holds / bookmarks
// if we fail to release them, don't bother the caller:
// they are merely an implementation detail on the sender for better resumability
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
ctx, endTask := trace.WithTask(ctx, "release-step-hold-to")
defer endTask()
log(ctx).Debug("release step-hold of or step-bookmark on `to`")
err = ReleaseStep(ctx, fs, to, p.jobId)
if err != nil {
log(ctx).WithError(err).Error("cannot release step-holds on or destroy step-bookmark of `to`")
} else {
log(ctx).Info("successfully released step-holds on or destroyed step-bookmark of `to`")
keep := func(a Abstraction) bool {
return AbstractionEquals(a, toReplicationCursor)
}
}()
go func() {
defer wg.Done()
ctx, endTask := trace.WithTask(ctx, "release-step-hold-from")
defer endTask()
if from == nil {
return
}
log(ctx).Debug("release step-hold of or step-bookmark on `from`")
err := ReleaseStep(ctx, fs, *from, p.jobId)
if err != nil {
if dne, ok := err.(*zfs.DatasetDoesNotExist); ok {
// If bookmark cloning is not supported, `from` might be the old replication cursor
// and thus have already been destroyed by MoveReplicationCursor above
// In that case, nonexistence of `from` is not an error, otherwise it is.
for _, c := range destroyedCursors {
if c.GetFullPath() == dne.Path {
log(ctx).Info("`from` was a replication cursor and has already been destroyed")
return
}
}
// fallthrough
}
log(ctx).WithError(err).Error("cannot release step-holds on or destroy step-bookmark of `from`")
} else {
log(ctx).Info("successfully released step-holds on or destroyed step-bookmark of `from`")
}
}()
wg.Wait()
sendAbstractionsCacheSingleton.TryBatchDestroy(ctx, p.jobId, fs, keep, nil)
return &pdu.SendCompletedRes{}, nil
}
func (p *Sender) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
@ -974,16 +876,6 @@ func (s *Receiver) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapsho
return doDestroySnapshots(ctx, lp, req.Snapshots)
}
func (p *Receiver) HintMostRecentCommonAncestor(ctx context.Context, r *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
// we don't move last-received-hold as part of this hint
// because that wouldn't give us any benefit wrt resumability.
//
// Other reason: the replication logic that issues this RPC would require refactoring
// to include the receiver's FilesystemVersion in the request)
return &pdu.HintMostRecentCommonAncestorRes{}, nil
}
func (p *Receiver) SendCompleted(ctx context.Context, _ *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()

View File

@ -0,0 +1,7 @@
package endpoint
import "github.com/prometheus/client_golang/prometheus"
func RegisterMetrics(r prometheus.Registerer) {
r.MustRegister(sendAbstractionsCacheMetrics.count)
}

View File

@ -0,0 +1,203 @@
package endpoint
import (
"context"
"fmt"
"sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/util/chainlock"
)
var sendAbstractionsCacheMetrics struct {
count prometheus.Gauge
}
func init() {
sendAbstractionsCacheMetrics.count = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "zrepl",
Subsystem: "endpoint",
Name: "send_abstractions_cache_entry_count",
Help: "number of send abstractions tracked in the sendAbstractionsCache data structure",
})
}
var sendAbstractionsCacheSingleton = newSendAbstractionsCache()
type sendAbstractionsCacheDidLoadFSState int
const (
sendAbstractionsCacheDidLoadFSStateNo sendAbstractionsCacheDidLoadFSState = iota // 0-value has meaning
sendAbstractionsCacheDidLoadFSStateInProgress
sendAbstractionsCacheDidLoadFSStateDone
)
type sendAbstractionsCache struct {
mtx chainlock.L
abstractions []Abstraction
didLoadFS map[string]sendAbstractionsCacheDidLoadFSState
didLoadFSChanged *sync.Cond
}
func newSendAbstractionsCache() *sendAbstractionsCache {
c := &sendAbstractionsCache{
didLoadFS: make(map[string]sendAbstractionsCacheDidLoadFSState),
}
c.didLoadFSChanged = c.mtx.NewCond()
return c
}
func (s *sendAbstractionsCache) Put(a Abstraction) {
defer s.mtx.Lock().Unlock()
var zeroJobId JobID
if a.GetJobID() == nil {
panic("abstraction must not have nil job id")
} else if *a.GetJobID() == zeroJobId {
panic(fmt.Sprintf("abstraction must not have zero-value job id: %s", a))
}
s.abstractions = append(s.abstractions, a)
sendAbstractionsCacheMetrics.count.Set(float64(len(s.abstractions)))
}
func (s *sendAbstractionsCache) InvalidateFSCache(fs string) {
// FIXME: O(n)
newAbs := make([]Abstraction, 0, len(s.abstractions))
for _, a := range s.abstractions {
if a.GetFS() != fs {
newAbs = append(newAbs, a)
}
}
s.abstractions = newAbs
sendAbstractionsCacheMetrics.count.Set(float64(len(s.abstractions)))
s.didLoadFS[fs] = sendAbstractionsCacheDidLoadFSStateNo
s.didLoadFSChanged.Broadcast()
}
// - logs errors in getting on-disk abstractions
// - only fetches on-disk abstractions once, but every time from the in-memory store
//
// That means that for precise results, all abstractions created by the endpoint must be .Put into this cache.
func (s *sendAbstractionsCache) GetAndDeleteByJobIDAndFS(ctx context.Context, jobID JobID, fs string, keep func(a Abstraction) bool) (ret []Abstraction) {
defer s.mtx.Lock().Unlock()
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
var zeroJobId JobID
if jobID == zeroJobId {
panic("must not pass zero-value job id")
}
if fs == "" {
panic("must not pass zero-value fs")
}
s.tryLoadOnDiskSendAbstractions(ctx, fs)
// FIXME O(n)
var remaining []Abstraction
for _, a := range s.abstractions {
aJobId := *a.GetJobID()
aFS := a.GetFS()
if aJobId == jobID && aFS == fs && !keep(a) {
ret = append(ret, a)
} else {
remaining = append(remaining, a)
}
}
s.abstractions = remaining
sendAbstractionsCacheMetrics.count.Set(float64(len(s.abstractions)))
return ret
}
// caller must hold s.mtx
func (s *sendAbstractionsCache) tryLoadOnDiskSendAbstractions(ctx context.Context, fs string) {
for s.didLoadFS[fs] != sendAbstractionsCacheDidLoadFSStateDone {
if s.didLoadFS[fs] == sendAbstractionsCacheDidLoadFSStateInProgress {
s.didLoadFSChanged.Wait()
continue
}
if s.didLoadFS[fs] != sendAbstractionsCacheDidLoadFSStateNo {
panic(fmt.Sprintf("unreachable: %v", s.didLoadFS[fs]))
}
s.didLoadFS[fs] = sendAbstractionsCacheDidLoadFSStateInProgress
defer s.didLoadFSChanged.Broadcast()
var onDiskAbs []Abstraction
var err error
s.mtx.DropWhile(func() {
onDiskAbs, err = s.tryLoadOnDiskSendAbstractionsImpl(ctx, fs) // no shadow
})
if err != nil {
s.didLoadFS[fs] = sendAbstractionsCacheDidLoadFSStateNo
getLogger(ctx).WithField("fs", fs).WithError(err).Error("cannot list send step abstractions for filesystem")
} else {
s.didLoadFS[fs] = sendAbstractionsCacheDidLoadFSStateDone
s.abstractions = append(s.abstractions, onDiskAbs...)
getLogger(ctx).WithField("fs", fs).WithField("abstractions", onDiskAbs).Debug("loaded step abstractions for filesystem")
}
return
}
}
// caller should _not hold s.mtx
func (s *sendAbstractionsCache) tryLoadOnDiskSendAbstractionsImpl(ctx context.Context, fs string) ([]Abstraction, error) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
q := ListZFSHoldsAndBookmarksQuery{
FS: ListZFSHoldsAndBookmarksQueryFilesystemFilter{
FS: &fs,
},
JobID: nil,
What: AbstractionTypeSet{
AbstractionStepHold: true,
AbstractionStepBookmark: true,
AbstractionReplicationCursorBookmarkV2: true,
},
Concurrency: 1,
}
abs, absErrs, err := ListAbstractions(ctx, q)
if err != nil {
return nil, err
}
// safe to ignore absErrs here, this is best-effort cleanup
if len(absErrs) > 0 {
return nil, ListAbstractionsErrors(absErrs)
}
return abs, nil
}
func (s *sendAbstractionsCache) TryBatchDestroy(ctx context.Context, jobId JobID, fs string, keep func(a Abstraction) bool, check func(willDestroy []Abstraction)) {
// no s.mtx, we only use the public interface in this function
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
obsoleteAbs := s.GetAndDeleteByJobIDAndFS(ctx, jobId, fs, keep)
if check != nil {
check(obsoleteAbs)
}
hadErr := false
for res := range BatchDestroy(ctx, obsoleteAbs) {
if res.DestroyErr != nil {
hadErr = true
getLogger(ctx).
WithField("abstraction", res.Abstraction).
WithError(res.DestroyErr).
Error("cannot destroy stale send step abstraction")
} else {
getLogger(ctx).
WithField("abstraction", res.Abstraction).
Info("destroyed stale send step abstraction")
}
}
if hadErr {
s.InvalidateFSCache(fs)
}
}

View File

@ -54,6 +54,30 @@ type Abstraction interface {
json.Marshaler
}
func AbstractionEquals(a, b Abstraction) bool {
if (a != nil) != (b != nil) {
return false
}
if a == nil && b == nil {
return true
}
var aJobId, bJobId JobID
if aJid := a.GetJobID(); aJid != nil {
aJobId = *aJid
}
if bJid := b.GetJobID(); bJid != nil {
bJobId = *bJid
}
return a.GetType() == b.GetType() &&
a.GetFS() == b.GetFS() &&
a.GetName() == b.GetName() &&
a.GetFullPath() == b.GetFullPath() &&
aJobId == bJobId &&
a.GetCreateTXG() == b.GetCreateTXG() &&
zfs.FilesystemVersionEqualIdentity(a.GetFilesystemVersion(), b.GetFilesystemVersion()) &&
a.String() == b.String()
}
func (t AbstractionType) Validate() error {
switch t {
case AbstractionStepBookmark:

View File

@ -118,31 +118,33 @@ func GetReplicationCursors(ctx context.Context, dp *zfs.DatasetPath, jobID JobID
return candidates, nil
}
type ReplicationCursorTarget interface {
IsSnapshot() bool
GetGuid() uint64
GetCreateTXG() uint64
ToSendArgVersion() zfs.ZFSSendArgVersion
}
// `target` is validated before replication cursor is set. if validation fails, the cursor is not moved.
// idempotently create a replication cursor targeting `target`
//
// returns ErrBookmarkCloningNotSupported if version is a bookmark and bookmarking bookmarks is not supported by ZFS
func MoveReplicationCursor(ctx context.Context, fs string, target ReplicationCursorTarget, jobID JobID) (destroyedCursors []Abstraction, err error) {
if !target.IsSnapshot() {
return nil, zfs.ErrBookmarkCloningNotSupported
}
func CreateReplicationCursor(ctx context.Context, fs string, target zfs.FilesystemVersion, jobID JobID) (a Abstraction, err error) {
bookmarkname, err := ReplicationCursorBookmarkName(fs, target.GetGuid(), jobID)
if err != nil {
return nil, errors.Wrap(err, "determine replication cursor name")
}
if target.IsBookmark() && target.GetName() == bookmarkname {
return &bookmarkBasedAbstraction{
Type: AbstractionReplicationCursorBookmarkV2,
FS: fs,
FilesystemVersion: target,
JobID: jobID,
}, nil
}
if !target.IsSnapshot() {
return nil, zfs.ErrBookmarkCloningNotSupported
}
// idempotently create bookmark (guid is encoded in it, hence we'll most likely add a new one
// cleanup the old one afterwards
err = zfs.ZFSBookmark(ctx, fs, target.ToSendArgVersion(), bookmarkname)
cursorBookmark, err := zfs.ZFSBookmark(ctx, fs, target, bookmarkname)
if err != nil {
if err == zfs.ErrBookmarkCloningNotSupported {
return nil, err // TODO go1.13 use wrapping
@ -150,12 +152,12 @@ func MoveReplicationCursor(ctx context.Context, fs string, target ReplicationCur
return nil, errors.Wrapf(err, "cannot create bookmark")
}
destroyedCursors, err = DestroyObsoleteReplicationCursors(ctx, fs, target, jobID)
if err != nil {
return nil, errors.Wrap(err, "destroy obsolete replication cursors")
}
return destroyedCursors, nil
return &bookmarkBasedAbstraction{
Type: AbstractionReplicationCursorBookmarkV2,
FS: fs,
FilesystemVersion: cursorBookmark,
JobID: jobID,
}, nil
}
type ReplicationCursor interface {

View File

@ -93,7 +93,7 @@ func HoldStep(ctx context.Context, fs string, v zfs.FilesystemVersion, jobID Job
return nil, errors.Wrap(err, "create step bookmark: determine bookmark name")
}
// idempotently create bookmark
err = zfs.ZFSBookmark(ctx, fs, v.ToSendArgVersion(), bmname)
stepBookmark, err := zfs.ZFSBookmark(ctx, fs, v, bmname)
if err != nil {
if err == zfs.ErrBookmarkCloningNotSupported {
// TODO we could actually try to find a local snapshot that has the requested GUID
@ -108,7 +108,7 @@ func HoldStep(ctx context.Context, fs string, v zfs.FilesystemVersion, jobID Job
return &bookmarkBasedAbstraction{
Type: AbstractionStepBookmark,
FS: fs,
FilesystemVersion: v,
FilesystemVersion: stepBookmark,
JobID: jobID,
}, nil
}
@ -236,7 +236,6 @@ func TryReleaseStepStaleFS(ctx context.Context, fs string, jobID JobID) {
Info("destroyed stale step-hold or bookmark")
}
}
}
var _ BookmarkExtractor = StepBookmarkExtractor

View File

@ -3,6 +3,7 @@ package tests
import (
"fmt"
"github.com/stretchr/testify/assert"
"github.com/zrepl/zrepl/platformtest"
"github.com/zrepl/zrepl/zfs"
)
@ -19,22 +20,23 @@ func IdempotentBookmark(ctx *platformtest.Context) {
fs := fmt.Sprintf("%s/foo bar", ctx.RootDataset)
asnap := sendArgVersion(ctx, fs, "@a snap")
anotherSnap := sendArgVersion(ctx, fs, "@another snap")
asnap := fsversion(ctx, fs, "@a snap")
anotherSnap := fsversion(ctx, fs, "@another snap")
err := zfs.ZFSBookmark(ctx, fs, asnap, "a bookmark")
aBookmark, err := zfs.ZFSBookmark(ctx, fs, asnap, "a bookmark")
if err != nil {
panic(err)
}
// do it again, should be idempotent
err = zfs.ZFSBookmark(ctx, fs, asnap, "a bookmark")
aBookmarkIdemp, err := zfs.ZFSBookmark(ctx, fs, asnap, "a bookmark")
if err != nil {
panic(err)
}
assert.Equal(ctx, aBookmark, aBookmarkIdemp)
// should fail for another snapshot
err = zfs.ZFSBookmark(ctx, fs, anotherSnap, "a bookmark")
_, err = zfs.ZFSBookmark(ctx, fs, anotherSnap, "a bookmark")
if err == nil {
panic(err)
}
@ -48,7 +50,7 @@ func IdempotentBookmark(ctx *platformtest.Context) {
}
// do it again, should fail with special error type
err = zfs.ZFSBookmark(ctx, fs, asnap, "a bookmark")
_, err = zfs.ZFSBookmark(ctx, fs, asnap, "a bookmark")
if err == nil {
panic(err)
}

View File

@ -18,8 +18,8 @@ func IdempotentDestroy(ctx *platformtest.Context) {
`)
fs := fmt.Sprintf("%s/foo bar", ctx.RootDataset)
asnap := sendArgVersion(ctx, fs, "@a snap")
err := zfs.ZFSBookmark(ctx, fs, asnap, "a bookmark")
asnap := fsversion(ctx, fs, "@a snap")
_, err := zfs.ZFSBookmark(ctx, fs, asnap, "a bookmark")
if err != nil {
panic(err)
}

View File

@ -19,9 +19,9 @@ func ReplicationCursor(ctx *platformtest.Context) {
+ "foo bar@1 with space"
R zfs bookmark "${ROOTDS}/foo bar@1 with space" "${ROOTDS}/foo bar#1 with space"
+ "foo bar@2 with space"
R zfs bookmark "${ROOTDS}/foo bar@1 with space" "${ROOTDS}/foo bar#2 with space"
R zfs bookmark "${ROOTDS}/foo bar@2 with space" "${ROOTDS}/foo bar#2 with space"
+ "foo bar@3 with space"
R zfs bookmark "${ROOTDS}/foo bar@1 with space" "${ROOTDS}/foo bar#3 with space"
R zfs bookmark "${ROOTDS}/foo bar@3 with space" "${ROOTDS}/foo bar#3 with space"
`)
jobid := endpoint.MustMakeJobID("zreplplatformtest")
@ -32,9 +32,31 @@ func ReplicationCursor(ctx *platformtest.Context) {
}
fs := ds.ToString()
snap := fsversion(ctx, fs, "@1 with space")
destroyed, err := endpoint.MoveReplicationCursor(ctx, fs, &snap, jobid)
checkCreateCursor := func(createErr error, c endpoint.Abstraction, references zfs.FilesystemVersion) {
assert.NoError(ctx, createErr)
expectName, err := endpoint.ReplicationCursorBookmarkName(fs, references.Guid, jobid)
assert.NoError(ctx, err)
require.Equal(ctx, expectName, c.GetFilesystemVersion().Name)
}
snap := fsversion(ctx, fs, "@1 with space")
book := fsversion(ctx, fs, "#1 with space")
// create first cursor
cursorOfSnap, err := endpoint.CreateReplicationCursor(ctx, fs, snap, jobid)
checkCreateCursor(err, cursorOfSnap, snap)
// check CreateReplicationCursor is idempotent (for snapshot target)
cursorOfSnapIdemp, err := endpoint.CreateReplicationCursor(ctx, fs, snap, jobid)
checkCreateCursor(err, cursorOfSnap, snap)
// ... for target = non-cursor bookmark
_, err = endpoint.CreateReplicationCursor(ctx, fs, book, jobid)
assert.Equal(ctx, zfs.ErrBookmarkCloningNotSupported, err)
// ... for target = replication cursor bookmark to be created
cursorOfCursor, err := endpoint.CreateReplicationCursor(ctx, fs, cursorOfSnapIdemp.GetFilesystemVersion(), jobid)
checkCreateCursor(err, cursorOfCursor, cursorOfCursor.GetFilesystemVersion())
destroyed, err := endpoint.DestroyObsoleteReplicationCursors(ctx, fs, &snap, jobid)
if err != nil {
panic(err)
}
@ -61,7 +83,11 @@ func ReplicationCursor(ctx *platformtest.Context) {
require.NoError(ctx, err)
snap2 := fsversion(ctx, fs, "@2 with space")
destroyed, err = endpoint.MoveReplicationCursor(ctx, fs, &snap2, jobid)
_, err = endpoint.CreateReplicationCursor(ctx, fs, snap, jobid)
assert.NoError(ctx, err)
destroyed, err = endpoint.DestroyObsoleteReplicationCursors(ctx, fs, &snap2, jobid)
require.NoError(ctx, err)
require.Equal(ctx, 1, len(destroyed))
require.Equal(ctx, endpoint.AbstractionReplicationCursorBookmarkV2, destroyed[0].GetType())

View File

@ -46,7 +46,7 @@ func (x Tri) String() string {
return proto.EnumName(Tri_name, int32(x))
}
func (Tri) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{0}
return fileDescriptor_pdu_483c6918b7b3d747, []int{0}
}
type FilesystemVersion_VersionType int32
@ -69,7 +69,7 @@ func (x FilesystemVersion_VersionType) String() string {
return proto.EnumName(FilesystemVersion_VersionType_name, int32(x))
}
func (FilesystemVersion_VersionType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{5, 0}
return fileDescriptor_pdu_483c6918b7b3d747, []int{5, 0}
}
type ListFilesystemReq struct {
@ -82,7 +82,7 @@ func (m *ListFilesystemReq) Reset() { *m = ListFilesystemReq{} }
func (m *ListFilesystemReq) String() string { return proto.CompactTextString(m) }
func (*ListFilesystemReq) ProtoMessage() {}
func (*ListFilesystemReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{0}
return fileDescriptor_pdu_483c6918b7b3d747, []int{0}
}
func (m *ListFilesystemReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListFilesystemReq.Unmarshal(m, b)
@ -113,7 +113,7 @@ func (m *ListFilesystemRes) Reset() { *m = ListFilesystemRes{} }
func (m *ListFilesystemRes) String() string { return proto.CompactTextString(m) }
func (*ListFilesystemRes) ProtoMessage() {}
func (*ListFilesystemRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{1}
return fileDescriptor_pdu_483c6918b7b3d747, []int{1}
}
func (m *ListFilesystemRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListFilesystemRes.Unmarshal(m, b)
@ -154,7 +154,7 @@ func (m *Filesystem) Reset() { *m = Filesystem{} }
func (m *Filesystem) String() string { return proto.CompactTextString(m) }
func (*Filesystem) ProtoMessage() {}
func (*Filesystem) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{2}
return fileDescriptor_pdu_483c6918b7b3d747, []int{2}
}
func (m *Filesystem) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Filesystem.Unmarshal(m, b)
@ -213,7 +213,7 @@ func (m *ListFilesystemVersionsReq) Reset() { *m = ListFilesystemVersion
func (m *ListFilesystemVersionsReq) String() string { return proto.CompactTextString(m) }
func (*ListFilesystemVersionsReq) ProtoMessage() {}
func (*ListFilesystemVersionsReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{3}
return fileDescriptor_pdu_483c6918b7b3d747, []int{3}
}
func (m *ListFilesystemVersionsReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListFilesystemVersionsReq.Unmarshal(m, b)
@ -251,7 +251,7 @@ func (m *ListFilesystemVersionsRes) Reset() { *m = ListFilesystemVersion
func (m *ListFilesystemVersionsRes) String() string { return proto.CompactTextString(m) }
func (*ListFilesystemVersionsRes) ProtoMessage() {}
func (*ListFilesystemVersionsRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{4}
return fileDescriptor_pdu_483c6918b7b3d747, []int{4}
}
func (m *ListFilesystemVersionsRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListFilesystemVersionsRes.Unmarshal(m, b)
@ -293,7 +293,7 @@ func (m *FilesystemVersion) Reset() { *m = FilesystemVersion{} }
func (m *FilesystemVersion) String() string { return proto.CompactTextString(m) }
func (*FilesystemVersion) ProtoMessage() {}
func (*FilesystemVersion) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{5}
return fileDescriptor_pdu_483c6918b7b3d747, []int{5}
}
func (m *FilesystemVersion) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_FilesystemVersion.Unmarshal(m, b)
@ -371,7 +371,7 @@ func (m *SendReq) Reset() { *m = SendReq{} }
func (m *SendReq) String() string { return proto.CompactTextString(m) }
func (*SendReq) ProtoMessage() {}
func (*SendReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{6}
return fileDescriptor_pdu_483c6918b7b3d747, []int{6}
}
func (m *SendReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SendReq.Unmarshal(m, b)
@ -445,7 +445,7 @@ func (m *Property) Reset() { *m = Property{} }
func (m *Property) String() string { return proto.CompactTextString(m) }
func (*Property) ProtoMessage() {}
func (*Property) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{7}
return fileDescriptor_pdu_483c6918b7b3d747, []int{7}
}
func (m *Property) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Property.Unmarshal(m, b)
@ -496,7 +496,7 @@ func (m *SendRes) Reset() { *m = SendRes{} }
func (m *SendRes) String() string { return proto.CompactTextString(m) }
func (*SendRes) ProtoMessage() {}
func (*SendRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{8}
return fileDescriptor_pdu_483c6918b7b3d747, []int{8}
}
func (m *SendRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SendRes.Unmarshal(m, b)
@ -548,7 +548,7 @@ func (m *SendCompletedReq) Reset() { *m = SendCompletedReq{} }
func (m *SendCompletedReq) String() string { return proto.CompactTextString(m) }
func (*SendCompletedReq) ProtoMessage() {}
func (*SendCompletedReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{9}
return fileDescriptor_pdu_483c6918b7b3d747, []int{9}
}
func (m *SendCompletedReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SendCompletedReq.Unmarshal(m, b)
@ -585,7 +585,7 @@ func (m *SendCompletedRes) Reset() { *m = SendCompletedRes{} }
func (m *SendCompletedRes) String() string { return proto.CompactTextString(m) }
func (*SendCompletedRes) ProtoMessage() {}
func (*SendCompletedRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{10}
return fileDescriptor_pdu_483c6918b7b3d747, []int{10}
}
func (m *SendCompletedRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SendCompletedRes.Unmarshal(m, b)
@ -620,7 +620,7 @@ func (m *ReceiveReq) Reset() { *m = ReceiveReq{} }
func (m *ReceiveReq) String() string { return proto.CompactTextString(m) }
func (*ReceiveReq) ProtoMessage() {}
func (*ReceiveReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{11}
return fileDescriptor_pdu_483c6918b7b3d747, []int{11}
}
func (m *ReceiveReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReceiveReq.Unmarshal(m, b)
@ -671,7 +671,7 @@ func (m *ReceiveRes) Reset() { *m = ReceiveRes{} }
func (m *ReceiveRes) String() string { return proto.CompactTextString(m) }
func (*ReceiveRes) ProtoMessage() {}
func (*ReceiveRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{12}
return fileDescriptor_pdu_483c6918b7b3d747, []int{12}
}
func (m *ReceiveRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReceiveRes.Unmarshal(m, b)
@ -704,7 +704,7 @@ func (m *DestroySnapshotsReq) Reset() { *m = DestroySnapshotsReq{} }
func (m *DestroySnapshotsReq) String() string { return proto.CompactTextString(m) }
func (*DestroySnapshotsReq) ProtoMessage() {}
func (*DestroySnapshotsReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{13}
return fileDescriptor_pdu_483c6918b7b3d747, []int{13}
}
func (m *DestroySnapshotsReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DestroySnapshotsReq.Unmarshal(m, b)
@ -750,7 +750,7 @@ func (m *DestroySnapshotRes) Reset() { *m = DestroySnapshotRes{} }
func (m *DestroySnapshotRes) String() string { return proto.CompactTextString(m) }
func (*DestroySnapshotRes) ProtoMessage() {}
func (*DestroySnapshotRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{14}
return fileDescriptor_pdu_483c6918b7b3d747, []int{14}
}
func (m *DestroySnapshotRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DestroySnapshotRes.Unmarshal(m, b)
@ -795,7 +795,7 @@ func (m *DestroySnapshotsRes) Reset() { *m = DestroySnapshotsRes{} }
func (m *DestroySnapshotsRes) String() string { return proto.CompactTextString(m) }
func (*DestroySnapshotsRes) ProtoMessage() {}
func (*DestroySnapshotsRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{15}
return fileDescriptor_pdu_483c6918b7b3d747, []int{15}
}
func (m *DestroySnapshotsRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DestroySnapshotsRes.Unmarshal(m, b)
@ -833,7 +833,7 @@ func (m *ReplicationCursorReq) Reset() { *m = ReplicationCursorReq{} }
func (m *ReplicationCursorReq) String() string { return proto.CompactTextString(m) }
func (*ReplicationCursorReq) ProtoMessage() {}
func (*ReplicationCursorReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{16}
return fileDescriptor_pdu_483c6918b7b3d747, []int{16}
}
func (m *ReplicationCursorReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReplicationCursorReq.Unmarshal(m, b)
@ -874,7 +874,7 @@ func (m *ReplicationCursorRes) Reset() { *m = ReplicationCursorRes{} }
func (m *ReplicationCursorRes) String() string { return proto.CompactTextString(m) }
func (*ReplicationCursorRes) ProtoMessage() {}
func (*ReplicationCursorRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{17}
return fileDescriptor_pdu_483c6918b7b3d747, []int{17}
}
func (m *ReplicationCursorRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReplicationCursorRes.Unmarshal(m, b)
@ -1010,7 +1010,7 @@ func (m *PingReq) Reset() { *m = PingReq{} }
func (m *PingReq) String() string { return proto.CompactTextString(m) }
func (*PingReq) ProtoMessage() {}
func (*PingReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{18}
return fileDescriptor_pdu_483c6918b7b3d747, []int{18}
}
func (m *PingReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_PingReq.Unmarshal(m, b)
@ -1049,7 +1049,7 @@ func (m *PingRes) Reset() { *m = PingRes{} }
func (m *PingRes) String() string { return proto.CompactTextString(m) }
func (*PingRes) ProtoMessage() {}
func (*PingRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{19}
return fileDescriptor_pdu_483c6918b7b3d747, []int{19}
}
func (m *PingRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_PingRes.Unmarshal(m, b)
@ -1076,91 +1076,6 @@ func (m *PingRes) GetEcho() string {
return ""
}
type HintMostRecentCommonAncestorReq struct {
Filesystem string `protobuf:"bytes,1,opt,name=Filesystem,proto3" json:"Filesystem,omitempty"`
// A copy of the FilesystemVersion on the sending side that the replication
// algorithm identified as a shared most recent common version between sending
// and receiving side.
//
// If nil, this is an indication that the replication algorithm could not
// find a common ancestor between the two sides.
// NOTE: nilness does not mean that replication never happened - there could
// as well be a replication conflict. thus, dont' jump to conclusions too
// rapidly here.
SenderVersion *FilesystemVersion `protobuf:"bytes,2,opt,name=SenderVersion,proto3" json:"SenderVersion,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *HintMostRecentCommonAncestorReq) Reset() { *m = HintMostRecentCommonAncestorReq{} }
func (m *HintMostRecentCommonAncestorReq) String() string { return proto.CompactTextString(m) }
func (*HintMostRecentCommonAncestorReq) ProtoMessage() {}
func (*HintMostRecentCommonAncestorReq) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{20}
}
func (m *HintMostRecentCommonAncestorReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_HintMostRecentCommonAncestorReq.Unmarshal(m, b)
}
func (m *HintMostRecentCommonAncestorReq) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_HintMostRecentCommonAncestorReq.Marshal(b, m, deterministic)
}
func (dst *HintMostRecentCommonAncestorReq) XXX_Merge(src proto.Message) {
xxx_messageInfo_HintMostRecentCommonAncestorReq.Merge(dst, src)
}
func (m *HintMostRecentCommonAncestorReq) XXX_Size() int {
return xxx_messageInfo_HintMostRecentCommonAncestorReq.Size(m)
}
func (m *HintMostRecentCommonAncestorReq) XXX_DiscardUnknown() {
xxx_messageInfo_HintMostRecentCommonAncestorReq.DiscardUnknown(m)
}
var xxx_messageInfo_HintMostRecentCommonAncestorReq proto.InternalMessageInfo
func (m *HintMostRecentCommonAncestorReq) GetFilesystem() string {
if m != nil {
return m.Filesystem
}
return ""
}
func (m *HintMostRecentCommonAncestorReq) GetSenderVersion() *FilesystemVersion {
if m != nil {
return m.SenderVersion
}
return nil
}
type HintMostRecentCommonAncestorRes struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *HintMostRecentCommonAncestorRes) Reset() { *m = HintMostRecentCommonAncestorRes{} }
func (m *HintMostRecentCommonAncestorRes) String() string { return proto.CompactTextString(m) }
func (*HintMostRecentCommonAncestorRes) ProtoMessage() {}
func (*HintMostRecentCommonAncestorRes) Descriptor() ([]byte, []int) {
return fileDescriptor_pdu_e59763dc61674a79, []int{21}
}
func (m *HintMostRecentCommonAncestorRes) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_HintMostRecentCommonAncestorRes.Unmarshal(m, b)
}
func (m *HintMostRecentCommonAncestorRes) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_HintMostRecentCommonAncestorRes.Marshal(b, m, deterministic)
}
func (dst *HintMostRecentCommonAncestorRes) XXX_Merge(src proto.Message) {
xxx_messageInfo_HintMostRecentCommonAncestorRes.Merge(dst, src)
}
func (m *HintMostRecentCommonAncestorRes) XXX_Size() int {
return xxx_messageInfo_HintMostRecentCommonAncestorRes.Size(m)
}
func (m *HintMostRecentCommonAncestorRes) XXX_DiscardUnknown() {
xxx_messageInfo_HintMostRecentCommonAncestorRes.DiscardUnknown(m)
}
var xxx_messageInfo_HintMostRecentCommonAncestorRes proto.InternalMessageInfo
func init() {
proto.RegisterType((*ListFilesystemReq)(nil), "ListFilesystemReq")
proto.RegisterType((*ListFilesystemRes)(nil), "ListFilesystemRes")
@ -1182,8 +1097,6 @@ func init() {
proto.RegisterType((*ReplicationCursorRes)(nil), "ReplicationCursorRes")
proto.RegisterType((*PingReq)(nil), "PingReq")
proto.RegisterType((*PingRes)(nil), "PingRes")
proto.RegisterType((*HintMostRecentCommonAncestorReq)(nil), "HintMostRecentCommonAncestorReq")
proto.RegisterType((*HintMostRecentCommonAncestorRes)(nil), "HintMostRecentCommonAncestorRes")
proto.RegisterEnum("Tri", Tri_name, Tri_value)
proto.RegisterEnum("FilesystemVersion_VersionType", FilesystemVersion_VersionType_name, FilesystemVersion_VersionType_value)
}
@ -1206,7 +1119,6 @@ type ReplicationClient interface {
DestroySnapshots(ctx context.Context, in *DestroySnapshotsReq, opts ...grpc.CallOption) (*DestroySnapshotsRes, error)
ReplicationCursor(ctx context.Context, in *ReplicationCursorReq, opts ...grpc.CallOption) (*ReplicationCursorRes, error)
SendCompleted(ctx context.Context, in *SendCompletedReq, opts ...grpc.CallOption) (*SendCompletedRes, error)
HintMostRecentCommonAncestor(ctx context.Context, in *HintMostRecentCommonAncestorReq, opts ...grpc.CallOption) (*HintMostRecentCommonAncestorRes, error)
}
type replicationClient struct {
@ -1271,15 +1183,6 @@ func (c *replicationClient) SendCompleted(ctx context.Context, in *SendCompleted
return out, nil
}
func (c *replicationClient) HintMostRecentCommonAncestor(ctx context.Context, in *HintMostRecentCommonAncestorReq, opts ...grpc.CallOption) (*HintMostRecentCommonAncestorRes, error) {
out := new(HintMostRecentCommonAncestorRes)
err := c.cc.Invoke(ctx, "/Replication/HintMostRecentCommonAncestor", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// ReplicationServer is the server API for Replication service.
type ReplicationServer interface {
Ping(context.Context, *PingReq) (*PingRes, error)
@ -1288,7 +1191,6 @@ type ReplicationServer interface {
DestroySnapshots(context.Context, *DestroySnapshotsReq) (*DestroySnapshotsRes, error)
ReplicationCursor(context.Context, *ReplicationCursorReq) (*ReplicationCursorRes, error)
SendCompleted(context.Context, *SendCompletedReq) (*SendCompletedRes, error)
HintMostRecentCommonAncestor(context.Context, *HintMostRecentCommonAncestorReq) (*HintMostRecentCommonAncestorRes, error)
}
func RegisterReplicationServer(s *grpc.Server, srv ReplicationServer) {
@ -1403,24 +1305,6 @@ func _Replication_SendCompleted_Handler(srv interface{}, ctx context.Context, de
return interceptor(ctx, in, info, handler)
}
func _Replication_HintMostRecentCommonAncestor_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HintMostRecentCommonAncestorReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ReplicationServer).HintMostRecentCommonAncestor(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/Replication/HintMostRecentCommonAncestor",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ReplicationServer).HintMostRecentCommonAncestor(ctx, req.(*HintMostRecentCommonAncestorReq))
}
return interceptor(ctx, in, info, handler)
}
var _Replication_serviceDesc = grpc.ServiceDesc{
ServiceName: "Replication",
HandlerType: (*ReplicationServer)(nil),
@ -1449,73 +1333,66 @@ var _Replication_serviceDesc = grpc.ServiceDesc{
MethodName: "SendCompleted",
Handler: _Replication_SendCompleted_Handler,
},
{
MethodName: "HintMostRecentCommonAncestor",
Handler: _Replication_HintMostRecentCommonAncestor_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "pdu.proto",
}
func init() { proto.RegisterFile("pdu.proto", fileDescriptor_pdu_e59763dc61674a79) }
func init() { proto.RegisterFile("pdu.proto", fileDescriptor_pdu_483c6918b7b3d747) }
var fileDescriptor_pdu_e59763dc61674a79 = []byte{
// 892 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x56, 0xdf, 0x6f, 0xdb, 0x36,
0x10, 0x8e, 0x6c, 0x39, 0x91, 0xcf, 0xe9, 0xea, 0x5c, 0xb2, 0x42, 0x13, 0xba, 0xce, 0xe3, 0x86,
0xc1, 0x0d, 0x30, 0x61, 0xc8, 0x7e, 0x60, 0xc3, 0x80, 0x02, 0x8d, 0x93, 0x34, 0xc5, 0xd6, 0xce,
0x60, 0xbc, 0x62, 0xe8, 0x9b, 0x6a, 0x1f, 0x12, 0x21, 0xb2, 0xa8, 0x90, 0xf4, 0x50, 0x6f, 0x7b,
0xda, 0xe3, 0xfe, 0xbd, 0xe5, 0x0f, 0x2a, 0x44, 0x4b, 0xb6, 0x6c, 0xc9, 0x89, 0x9f, 0xcc, 0xfb,
0x78, 0x14, 0xef, 0xbe, 0xfb, 0xee, 0x68, 0x68, 0x26, 0xa3, 0x89, 0x9f, 0x48, 0xa1, 0x05, 0xdb,
0x87, 0xbd, 0x5f, 0x43, 0xa5, 0xcf, 0xc2, 0x88, 0xd4, 0x54, 0x69, 0x1a, 0x73, 0xba, 0x61, 0xc7,
0x65, 0x50, 0xe1, 0xd7, 0xd0, 0x5a, 0x00, 0xca, 0xb5, 0x3a, 0xf5, 0x6e, 0xeb, 0xa8, 0xe5, 0x17,
0x9c, 0x8a, 0xfb, 0xec, 0x3f, 0x0b, 0x60, 0x61, 0x23, 0x82, 0xdd, 0x0f, 0xf4, 0x95, 0x6b, 0x75,
0xac, 0x6e, 0x93, 0x9b, 0x35, 0x76, 0xa0, 0xc5, 0x49, 0x4d, 0xc6, 0x34, 0x10, 0xd7, 0x14, 0xbb,
0x35, 0xb3, 0x55, 0x84, 0xf0, 0x4b, 0x78, 0xf0, 0x52, 0xf5, 0xa3, 0x60, 0x48, 0x57, 0x22, 0x1a,
0x91, 0x74, 0xeb, 0x1d, 0xab, 0xeb, 0xf0, 0x65, 0x30, 0xfd, 0xce, 0x4b, 0x75, 0x1a, 0x0f, 0xe5,
0x34, 0xd1, 0x34, 0x72, 0x6d, 0xe3, 0x53, 0x84, 0xd8, 0xcf, 0xf0, 0xc9, 0x72, 0x42, 0x6f, 0x48,
0xaa, 0x50, 0xc4, 0x8a, 0xd3, 0x0d, 0x3e, 0x29, 0x06, 0x9a, 0x05, 0x58, 0x40, 0xd8, 0x2f, 0xeb,
0x0f, 0x2b, 0xf4, 0xc1, 0xc9, 0xcd, 0x8c, 0x12, 0xf4, 0x4b, 0x9e, 0x7c, 0xee, 0xc3, 0x6e, 0x2d,
0xd8, 0x2b, 0xed, 0xe3, 0x11, 0xd8, 0x83, 0x69, 0x42, 0xe6, 0xf2, 0x8f, 0x8e, 0x9e, 0x94, 0xbf,
0xe0, 0x67, 0xbf, 0xa9, 0x17, 0x37, 0xbe, 0x29, 0xa3, 0xaf, 0x83, 0x31, 0x65, 0xb4, 0x99, 0x75,
0x8a, 0xbd, 0x98, 0x84, 0x23, 0x43, 0x93, 0xcd, 0xcd, 0x1a, 0x1f, 0x43, 0xb3, 0x27, 0x29, 0xd0,
0x34, 0xf8, 0xe3, 0x85, 0xe1, 0xc6, 0xe6, 0x0b, 0x00, 0x3d, 0x70, 0x8c, 0x11, 0x8a, 0xd8, 0x6d,
0x98, 0x2f, 0xcd, 0x6d, 0xf6, 0x14, 0x5a, 0x85, 0x6b, 0x71, 0x17, 0x9c, 0x8b, 0x38, 0x48, 0xd4,
0x95, 0xd0, 0xed, 0xad, 0xd4, 0x3a, 0x16, 0xe2, 0x7a, 0x1c, 0xc8, 0xeb, 0xb6, 0xc5, 0xfe, 0xb7,
0x60, 0xe7, 0x82, 0xe2, 0xd1, 0x06, 0x7c, 0xe2, 0x57, 0x60, 0x9f, 0x49, 0x31, 0x36, 0x81, 0x57,
0xd3, 0x65, 0xf6, 0x91, 0x41, 0x6d, 0x20, 0x4c, 0x2a, 0xd5, 0x5e, 0xb5, 0x81, 0x58, 0x95, 0x90,
0x5d, 0x96, 0x10, 0x83, 0xe6, 0x42, 0x1a, 0x0d, 0xc3, 0xaf, 0xed, 0x0f, 0x64, 0xc8, 0x17, 0x30,
0x3e, 0x82, 0xed, 0x13, 0x39, 0xe5, 0x93, 0xd8, 0xdd, 0x36, 0xda, 0xc9, 0x2c, 0xf6, 0x1d, 0x38,
0x7d, 0x29, 0x12, 0x92, 0x7a, 0x3a, 0xa7, 0xdb, 0x2a, 0xd0, 0x7d, 0x00, 0x8d, 0x37, 0x41, 0x34,
0xc9, 0x6b, 0x30, 0x33, 0xd8, 0xbf, 0x73, 0x2e, 0x14, 0x76, 0xe1, 0xe1, 0xef, 0x8a, 0x46, 0xab,
0x32, 0x77, 0xf8, 0x2a, 0x8c, 0x0c, 0x76, 0x4f, 0xdf, 0x27, 0x34, 0xd4, 0x34, 0xba, 0x08, 0xff,
0x22, 0x93, 0x77, 0x9d, 0x2f, 0x61, 0xf8, 0x14, 0x20, 0x8b, 0x27, 0x24, 0xe5, 0xda, 0x46, 0x6e,
0x4d, 0x3f, 0x0f, 0x91, 0x17, 0x36, 0xd9, 0x33, 0x68, 0xa7, 0x31, 0xf4, 0xc4, 0x38, 0x89, 0x48,
0x93, 0x29, 0xcc, 0x21, 0xb4, 0x7e, 0x93, 0xe1, 0x65, 0x18, 0x07, 0x11, 0xa7, 0x9b, 0x8c, 0x7f,
0xc7, 0xcf, 0xea, 0xc6, 0x8b, 0x9b, 0x0c, 0x4b, 0xe7, 0x15, 0xfb, 0x07, 0x80, 0xd3, 0x90, 0xc2,
0x3f, 0x69, 0x93, 0x32, 0xcf, 0xca, 0x57, 0xbb, 0xb3, 0x7c, 0x87, 0xd0, 0xee, 0x45, 0x14, 0xc8,
0x22, 0x3f, 0xb3, 0x16, 0x2f, 0xe1, 0x6c, 0xb7, 0x70, 0xbb, 0x62, 0x97, 0xb0, 0x7f, 0x42, 0x4a,
0x4b, 0x31, 0xcd, 0x35, 0xb9, 0x49, 0x2f, 0xe3, 0x37, 0xd0, 0x9c, 0xfb, 0xbb, 0xb5, 0xb5, 0xfd,
0xba, 0x70, 0x62, 0x6f, 0x01, 0x57, 0x2e, 0xca, 0xda, 0x3e, 0x37, 0xcd, 0x2d, 0x6b, 0xda, 0x3e,
0xf7, 0x49, 0x95, 0x72, 0x2a, 0xa5, 0x90, 0xb9, 0x52, 0x8c, 0xc1, 0x4e, 0xaa, 0x92, 0x48, 0x27,
0xed, 0x4e, 0x9a, 0x78, 0xa4, 0xf3, 0x91, 0xb2, 0xef, 0x97, 0x43, 0xe0, 0xb9, 0x0f, 0xfb, 0x01,
0x0e, 0x38, 0x25, 0x51, 0x38, 0x34, 0x5d, 0xdb, 0x9b, 0x48, 0x25, 0xe4, 0x26, 0x73, 0x6d, 0x50,
0x79, 0x4e, 0xe1, 0x41, 0x36, 0x44, 0xd2, 0x13, 0xf6, 0xf9, 0xd6, 0x7c, 0x8c, 0x38, 0xaf, 0x85,
0xa6, 0xf7, 0xa1, 0xd2, 0x33, 0x09, 0x9f, 0x6f, 0xf1, 0x39, 0x72, 0xec, 0xc0, 0xf6, 0x2c, 0x1c,
0xf6, 0x05, 0xec, 0xf4, 0xc3, 0xf8, 0x32, 0x0d, 0xc0, 0x85, 0x9d, 0x57, 0xa4, 0x54, 0x70, 0x99,
0x77, 0x4d, 0x6e, 0xb2, 0x4f, 0x73, 0x27, 0x95, 0xf6, 0xd5, 0xe9, 0xf0, 0x4a, 0xe4, 0x7d, 0x95,
0xae, 0xd9, 0xdf, 0xf0, 0xd9, 0x79, 0x18, 0xeb, 0x57, 0x42, 0xe9, 0xb4, 0xe4, 0xb1, 0xee, 0x89,
0xf1, 0x58, 0xc4, 0xcf, 0xe3, 0x21, 0x29, 0xbd, 0x51, 0x72, 0xf8, 0x23, 0x3c, 0x48, 0xf5, 0x4b,
0x32, 0xab, 0xc5, 0x1d, 0x42, 0x5c, 0x76, 0x64, 0x9f, 0xdf, 0x77, 0xb9, 0x3a, 0xec, 0x42, 0x7d,
0x20, 0xc3, 0x74, 0x04, 0x9e, 0x88, 0x58, 0xf7, 0x02, 0x49, 0xed, 0x2d, 0x6c, 0x42, 0xe3, 0x2c,
0x88, 0x14, 0xb5, 0x2d, 0x74, 0xc0, 0x1e, 0xc8, 0x09, 0xb5, 0x6b, 0x47, 0xb7, 0xf5, 0x74, 0x40,
0xcd, 0x49, 0x46, 0x0f, 0xec, 0x34, 0x71, 0x74, 0xfc, 0x8c, 0x24, 0x2f, 0x5f, 0x29, 0xfc, 0x09,
0x1e, 0x2e, 0xbf, 0x33, 0x0a, 0xd1, 0x2f, 0x3d, 0xce, 0x5e, 0x19, 0x53, 0xd8, 0x87, 0x47, 0xd5,
0x4f, 0x14, 0x7a, 0xfe, 0xda, 0x87, 0xcf, 0x5b, 0xbf, 0xa7, 0xf0, 0x19, 0xb4, 0x57, 0xa5, 0x89,
0x07, 0x7e, 0x45, 0xcb, 0x79, 0x55, 0xa8, 0xc2, 0xe7, 0xb0, 0x57, 0x12, 0x17, 0x7e, 0xec, 0x57,
0x09, 0xd5, 0xab, 0x84, 0x15, 0x7e, 0x3f, 0x2b, 0xe1, 0x7c, 0x04, 0xe1, 0x9e, 0xbf, 0x3a, 0xd2,
0xbc, 0x12, 0xa4, 0xf0, 0x1d, 0x3c, 0xbe, 0xab, 0x7e, 0xd8, 0xf1, 0xef, 0xd1, 0x96, 0x77, 0x9f,
0x87, 0x3a, 0x6e, 0xbc, 0xad, 0x27, 0xa3, 0xc9, 0xbb, 0x6d, 0xf3, 0x1f, 0xea, 0xdb, 0x0f, 0x01,
0x00, 0x00, 0xff, 0xff, 0xad, 0x4e, 0x98, 0x29, 0x50, 0x09, 0x00, 0x00,
var fileDescriptor_pdu_483c6918b7b3d747 = []byte{
// 833 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x56, 0x5f, 0x6f, 0xe3, 0x44,
0x10, 0xaf, 0x13, 0xa7, 0x75, 0x26, 0x3d, 0x2e, 0x9d, 0x96, 0x93, 0xb1, 0xe0, 0x54, 0x2d, 0x08,
0xe5, 0x2a, 0x61, 0xa1, 0xf2, 0x47, 0x42, 0x48, 0x27, 0xd1, 0xb4, 0xbd, 0x3b, 0x01, 0x47, 0xb4,
0x35, 0x27, 0x74, 0x6f, 0x26, 0x19, 0xb5, 0x56, 0x1d, 0xaf, 0xbb, 0xe3, 0xa0, 0x0b, 0xe2, 0x89,
0x47, 0xbe, 0x1e, 0x7c, 0x10, 0x3e, 0x02, 0xf2, 0xc6, 0x4e, 0x9c, 0xd8, 0x41, 0x79, 0xca, 0xce,
0x6f, 0x66, 0x77, 0x67, 0x7f, 0xf3, 0x9b, 0x71, 0xa0, 0x9b, 0x4e, 0x66, 0x7e, 0xaa, 0x55, 0xa6,
0xc4, 0x31, 0x1c, 0xfd, 0x10, 0x71, 0x76, 0x1d, 0xc5, 0xc4, 0x73, 0xce, 0x68, 0x2a, 0xe9, 0x41,
0x5c, 0xd4, 0x41, 0xc6, 0xcf, 0xa0, 0xb7, 0x02, 0xd8, 0xb5, 0x4e, 0xdb, 0x83, 0xde, 0x79, 0xcf,
0xaf, 0x04, 0x55, 0xfd, 0xe2, 0x2f, 0x0b, 0x60, 0x65, 0x23, 0x82, 0x3d, 0x0a, 0xb3, 0x3b, 0xd7,
0x3a, 0xb5, 0x06, 0x5d, 0x69, 0xd6, 0x78, 0x0a, 0x3d, 0x49, 0x3c, 0x9b, 0x52, 0xa0, 0xee, 0x29,
0x71, 0x5b, 0xc6, 0x55, 0x85, 0xf0, 0x13, 0x78, 0xf4, 0x8a, 0x47, 0x71, 0x38, 0xa6, 0x3b, 0x15,
0x4f, 0x48, 0xbb, 0xed, 0x53, 0x6b, 0xe0, 0xc8, 0x75, 0x30, 0x3f, 0xe7, 0x15, 0x5f, 0x25, 0x63,
0x3d, 0x4f, 0x33, 0x9a, 0xb8, 0xb6, 0x89, 0xa9, 0x42, 0xe2, 0x5b, 0xf8, 0x60, 0xfd, 0x41, 0x6f,
0x48, 0x73, 0xa4, 0x12, 0x96, 0xf4, 0x80, 0x4f, 0xab, 0x89, 0x16, 0x09, 0x56, 0x10, 0xf1, 0xfd,
0xf6, 0xcd, 0x8c, 0x3e, 0x38, 0xa5, 0x59, 0x50, 0x82, 0x7e, 0x2d, 0x52, 0x2e, 0x63, 0xc4, 0x3f,
0x16, 0x1c, 0xd5, 0xfc, 0x78, 0x0e, 0x76, 0x30, 0x4f, 0xc9, 0x5c, 0xfe, 0xde, 0xf9, 0xd3, 0xfa,
0x09, 0x7e, 0xf1, 0x9b, 0x47, 0x49, 0x13, 0x9b, 0x33, 0xfa, 0x3a, 0x9c, 0x52, 0x41, 0x9b, 0x59,
0xe7, 0xd8, 0x8b, 0x59, 0x34, 0x31, 0x34, 0xd9, 0xd2, 0xac, 0xf1, 0x43, 0xe8, 0x0e, 0x35, 0x85,
0x19, 0x05, 0xbf, 0xbc, 0x30, 0xdc, 0xd8, 0x72, 0x05, 0xa0, 0x07, 0x8e, 0x31, 0x22, 0x95, 0xb8,
0x1d, 0x73, 0xd2, 0xd2, 0x16, 0xcf, 0xa0, 0x57, 0xb9, 0x16, 0x0f, 0xc1, 0xb9, 0x49, 0xc2, 0x94,
0xef, 0x54, 0xd6, 0xdf, 0xcb, 0xad, 0x0b, 0xa5, 0xee, 0xa7, 0xa1, 0xbe, 0xef, 0x5b, 0xe2, 0x6f,
0x0b, 0x0e, 0x6e, 0x28, 0x99, 0xec, 0xc0, 0x27, 0x7e, 0x0a, 0xf6, 0xb5, 0x56, 0x53, 0x93, 0x78,
0x33, 0x5d, 0xc6, 0x8f, 0x02, 0x5a, 0x81, 0x32, 0x4f, 0x69, 0x8e, 0x6a, 0x05, 0x6a, 0x53, 0x42,
0x76, 0x5d, 0x42, 0x02, 0xba, 0x2b, 0x69, 0x74, 0x0c, 0xbf, 0xb6, 0x1f, 0xe8, 0x48, 0xae, 0x60,
0x7c, 0x02, 0xfb, 0x97, 0x7a, 0x2e, 0x67, 0x89, 0xbb, 0x6f, 0xb4, 0x53, 0x58, 0xe2, 0x4b, 0x70,
0x46, 0x5a, 0xa5, 0xa4, 0xb3, 0xf9, 0x92, 0x6e, 0xab, 0x42, 0xf7, 0x09, 0x74, 0xde, 0x84, 0xf1,
0xac, 0xac, 0xc1, 0xc2, 0x10, 0x7f, 0x2e, 0xb9, 0x60, 0x1c, 0xc0, 0xe3, 0x9f, 0x99, 0x26, 0x9b,
0x32, 0x77, 0xe4, 0x26, 0x8c, 0x02, 0x0e, 0xaf, 0xde, 0xa5, 0x34, 0xce, 0x68, 0x72, 0x13, 0xfd,
0x4e, 0xe6, 0xdd, 0x6d, 0xb9, 0x86, 0xe1, 0x33, 0x80, 0x22, 0x9f, 0x88, 0xd8, 0xb5, 0x8d, 0xdc,
0xba, 0x7e, 0x99, 0xa2, 0xac, 0x38, 0xc5, 0x73, 0xe8, 0xe7, 0x39, 0x0c, 0xd5, 0x34, 0x8d, 0x29,
0x23, 0x53, 0x98, 0x33, 0xe8, 0xfd, 0xa4, 0xa3, 0xdb, 0x28, 0x09, 0x63, 0x49, 0x0f, 0x05, 0xff,
0x8e, 0x5f, 0xd4, 0x4d, 0x56, 0x9d, 0x02, 0x6b, 0xfb, 0x59, 0xfc, 0x01, 0x20, 0x69, 0x4c, 0xd1,
0x6f, 0xb4, 0x4b, 0x99, 0x17, 0xe5, 0x6b, 0xfd, 0x6f, 0xf9, 0xce, 0xa0, 0x3f, 0x8c, 0x29, 0xd4,
0x55, 0x7e, 0x16, 0x2d, 0x5e, 0xc3, 0xc5, 0x61, 0xe5, 0x76, 0x16, 0xb7, 0x70, 0x7c, 0x49, 0x9c,
0x69, 0x35, 0x2f, 0x35, 0xb9, 0x4b, 0x2f, 0xe3, 0xe7, 0xd0, 0x5d, 0xc6, 0xbb, 0xad, 0xad, 0xfd,
0xba, 0x0a, 0x12, 0x6f, 0x01, 0x37, 0x2e, 0x2a, 0xda, 0xbe, 0x34, 0xcd, 0x2d, 0x5b, 0xda, 0xbe,
0x8c, 0xc9, 0x95, 0x72, 0xa5, 0xb5, 0xd2, 0xa5, 0x52, 0x8c, 0x21, 0x2e, 0x9b, 0x1e, 0x91, 0x4f,
0xda, 0x83, 0xfc, 0xe1, 0x71, 0x56, 0x8e, 0x94, 0x63, 0xbf, 0x9e, 0x82, 0x2c, 0x63, 0xc4, 0xd7,
0x70, 0x22, 0x29, 0x8d, 0xa3, 0xb1, 0xe9, 0xda, 0xe1, 0x4c, 0xb3, 0xd2, 0xbb, 0xcc, 0xb5, 0xa0,
0x71, 0x1f, 0xe3, 0x49, 0x31, 0x44, 0xf2, 0x1d, 0xf6, 0xcb, 0xbd, 0xe5, 0x18, 0x71, 0x5e, 0xab,
0x8c, 0xde, 0x45, 0x9c, 0x2d, 0x24, 0xfc, 0x72, 0x4f, 0x2e, 0x91, 0x0b, 0x07, 0xf6, 0x17, 0xe9,
0x88, 0x8f, 0xe1, 0x60, 0x14, 0x25, 0xb7, 0x79, 0x02, 0x2e, 0x1c, 0xfc, 0x48, 0xcc, 0xe1, 0x6d,
0xd9, 0x35, 0xa5, 0x29, 0x3e, 0x2a, 0x83, 0x38, 0xef, 0xab, 0xab, 0xf1, 0x9d, 0x2a, 0xfb, 0x2a,
0x5f, 0x9f, 0x0d, 0xa0, 0x1d, 0xe8, 0x28, 0x1f, 0x31, 0x97, 0x2a, 0xc9, 0x86, 0xa1, 0xa6, 0xfe,
0x1e, 0x76, 0xa1, 0x73, 0x1d, 0xc6, 0x4c, 0x7d, 0x0b, 0x1d, 0xb0, 0x03, 0x3d, 0xa3, 0x7e, 0xeb,
0xfc, 0xdf, 0x56, 0x3e, 0x00, 0x96, 0x8f, 0x40, 0x0f, 0xec, 0xfc, 0x60, 0x74, 0xfc, 0x22, 0x09,
0xaf, 0x5c, 0x31, 0x7e, 0x03, 0x8f, 0xd7, 0xe7, 0x38, 0x23, 0xfa, 0xb5, 0x8f, 0x9f, 0x57, 0xc7,
0x18, 0x47, 0xf0, 0xa4, 0xf9, 0x13, 0x80, 0x9e, 0xbf, 0xf5, 0xc3, 0xe2, 0x6d, 0xf7, 0x31, 0x3e,
0x87, 0xfe, 0x66, 0xe9, 0xf1, 0xc4, 0x6f, 0x90, 0xb4, 0xd7, 0x84, 0x32, 0x7e, 0x07, 0x47, 0xb5,
0xe2, 0xe1, 0xfb, 0x7e, 0x93, 0x10, 0xbc, 0x46, 0x98, 0xf1, 0x2b, 0x78, 0xb4, 0xd6, 0xe2, 0x78,
0xe4, 0x6f, 0x8e, 0x0c, 0xaf, 0x06, 0xf1, 0x45, 0xe7, 0x6d, 0x3b, 0x9d, 0xcc, 0x7e, 0xdd, 0x37,
0xff, 0x1f, 0xbe, 0xf8, 0x2f, 0x00, 0x00, 0xff, 0xff, 0x27, 0x95, 0xc1, 0x78, 0x4c, 0x08, 0x00,
0x00,
}

View File

@ -9,7 +9,6 @@ service Replication {
rpc DestroySnapshots(DestroySnapshotsReq) returns (DestroySnapshotsRes);
rpc ReplicationCursor(ReplicationCursorReq) returns (ReplicationCursorRes);
rpc SendCompleted(SendCompletedReq) returns (SendCompletedRes);
rpc HintMostRecentCommonAncestor(HintMostRecentCommonAncestorReq) returns (HintMostRecentCommonAncestorRes);
// for Send and Recv, see package rpc
}
@ -126,19 +125,3 @@ message PingRes {
// Echo must be PingReq.Message
string Echo = 1;
}
message HintMostRecentCommonAncestorReq {
string Filesystem = 1;
// A copy of the FilesystemVersion on the sending side that the replication
// algorithm identified as a shared most recent common version between sending
// and receiving side.
//
// If nil, this is an indication that the replication algorithm could not
// find a common ancestor between the two sides.
// NOTE: nilness does not mean that replication never happened - there could
// as well be a replication conflict. thus, dont' jump to conclusions too
// rapidly here.
FilesystemVersion SenderVersion = 2;
}
message HintMostRecentCommonAncestorRes {}

View File

@ -33,7 +33,6 @@ type Endpoint interface {
ListFilesystemVersions(ctx context.Context, req *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error)
DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error)
WaitForConnectivity(ctx context.Context) error
HintMostRecentCommonAncestor(context.Context, *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error)
}
type Sender interface {
@ -359,43 +358,6 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) {
log(ctx).WithField("token", resumeToken).Debug("decode resume token")
}
// give both sides a hint about how far prior replication attempts got
// This serves as a cummulative variant of SendCompleted and can be useful
// for example to release stale holds from an earlier (interrupted) replication.
// TODO FIXME: enqueue this as a replication step instead of doing it here during planning
// then again, the step should run regardless of planning success
// so maybe a separate phase before PLANNING, then?
path, conflict := IncrementalPath(rfsvs, sfsvs)
var sender_mrca *pdu.FilesystemVersion
if conflict == nil && len(path) > 0 {
sender_mrca = path[0] // shadow
}
// yes, sender_mrca may be nil, indicating that we do not have an mrca
{
var wg sync.WaitGroup
doHint := func(ep Endpoint, name string) {
defer wg.Done()
ctx, endTask := trace.WithTask(ctx, "hint-mrca-"+name)
defer endTask()
log := log(ctx).WithField("to_side", name).
WithField("sender_mrca", sender_mrca.String())
log.Debug("hint most recent common ancestor")
hint := &pdu.HintMostRecentCommonAncestorReq{
Filesystem: fs.Path,
SenderVersion: sender_mrca,
}
_, err := ep.HintMostRecentCommonAncestor(ctx, hint)
if err != nil {
log.WithError(err).Error("error hinting most recent common ancestor")
}
}
wg.Add(2)
go doHint(fs.sender, "sender")
go doHint(fs.receiver, "receiver")
wg.Wait()
}
var steps []*Step
// build the list of replication steps
//

View File

@ -142,13 +142,6 @@ func (c *Client) SendCompleted(ctx context.Context, in *pdu.SendCompletedReq) (*
return c.controlClient.SendCompleted(ctx, in)
}
func (c *Client) HintMostRecentCommonAncestor(ctx context.Context, in *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error) {
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.HintMostRecentCommonAncestor")
defer endSpan()
return c.controlClient.HintMostRecentCommonAncestor(ctx, in)
}
func (c *Client) WaitForConnectivity(ctx context.Context) error {
ctx, endSpan := trace.WithSpan(ctx, "rpc.client.WaitForConnectivity")
defer endSpan()

View File

@ -152,7 +152,7 @@ func (m *HandshakeMessage) DecodeReader(r io.Reader, maxLen int) error {
func DoHandshakeCurrentVersion(conn net.Conn, deadline time.Time) *HandshakeError {
// current protocol version is hardcoded here
return DoHandshakeVersion(conn, deadline, 3)
return DoHandshakeVersion(conn, deadline, 4)
}
const HandshakeMessageMaxLen = 16 * 4096

View File

@ -116,6 +116,13 @@ func (v FilesystemVersion) RelName() string {
}
func (v FilesystemVersion) String() string { return v.RelName() }
// Only takes into account those attributes of FilesystemVersion that
// are immutable over time in ZFS.
func FilesystemVersionEqualIdentity(a, b FilesystemVersion) bool {
// .Name is mutable
return a.Guid == b.Guid && a.CreateTXG == b.CreateTXG && a.Creation == b.Creation
}
func (v FilesystemVersion) ToAbsPath(p *DatasetPath) string {
var b bytes.Buffer
b.WriteString(p.ToString())

View File

@ -1595,22 +1595,32 @@ var ErrBookmarkCloningNotSupported = fmt.Errorf("bookmark cloning feature is not
//
// does not destroy an existing bookmark, returns
//
func ZFSBookmark(ctx context.Context, fs string, v ZFSSendArgVersion, bookmark string) (err error) {
func ZFSBookmark(ctx context.Context, fs string, v FilesystemVersion, bookmark string) (bm FilesystemVersion, err error) {
bm = FilesystemVersion{
Type: Bookmark,
Name: bookmark,
UserRefs: OptionUint64{Valid: false},
// bookmarks have the same createtxg, guid and creation as their origin
CreateTXG: v.CreateTXG,
Guid: v.Guid,
Creation: v.Creation,
}
promTimer := prometheus.NewTimer(prom.ZFSBookmarkDuration.WithLabelValues(fs))
defer promTimer.ObserveDuration()
if !v.IsSnapshot() {
return ErrBookmarkCloningNotSupported // TODO This is work in progress: https://github.com/zfsonlinux/zfs/pull/9571
return bm, ErrBookmarkCloningNotSupported // TODO This is work in progress: https://github.com/zfsonlinux/zfs/pull/9571
}
snapname := v.FullPath(fs)
if err := EntityNamecheck(snapname, EntityTypeSnapshot); err != nil {
return err
return bm, err
}
bookmarkname := fmt.Sprintf("%s#%s", fs, bookmark)
if err := EntityNamecheck(bookmarkname, EntityTypeBookmark); err != nil {
return err
return bm, err
}
debug("bookmark: %q %q", snapname, bookmarkname)
@ -1619,27 +1629,27 @@ func ZFSBookmark(ctx context.Context, fs string, v ZFSSendArgVersion, bookmark s
stdio, err := cmd.CombinedOutput()
if err != nil {
if ddne := tryDatasetDoesNotExist(snapname, stdio); ddne != nil {
return ddne
return bm, ddne
} else if zfsBookmarkExistsRegex.Match(stdio) {
// check if this was idempotent
bookGuid, err := ZFSGetGUID(ctx, fs, "#"+bookmark)
if err != nil {
return errors.Wrap(err, "bookmark idempotency check") // guid error expressive enough
return bm, errors.Wrap(err, "bookmark idempotency check") // guid error expressive enough
}
if v.GUID == bookGuid {
debug("bookmark: %q %q was idempotent: {snap,book}guid %d == %d", snapname, bookmarkname, v.GUID, bookGuid)
return nil
if v.Guid == bookGuid {
debug("bookmark: %q %q was idempotent: {snap,book}guid %d == %d", snapname, bookmarkname, v.Guid, bookGuid)
return bm, nil
}
return &BookmarkExists{
fs: fs, bookmarkOrigin: v, bookmark: bookmark,
return bm, &BookmarkExists{
fs: fs, bookmarkOrigin: v.ToSendArgVersion(), bookmark: bookmark,
zfsMsg: string(stdio),
bookGuid: bookGuid,
}
} else {
return &ZFSError{
return bm, &ZFSError{
Stderr: stdio,
WaitErr: err,
}
@ -1647,8 +1657,7 @@ func ZFSBookmark(ctx context.Context, fs string, v ZFSSendArgVersion, bookmark s
}
return nil
return bm, nil
}
func ZFSRollback(ctx context.Context, fs *DatasetPath, snapshot FilesystemVersion, rollbackArgs ...string) (err error) {