mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-27 19:04:28 +01:00
bbdc6f5465
Before this PR, we would panic in the `check` phase of `endpoint.Send()`'s `TryBatchDestroy` call in the following cases: the current protection strategy does NOT produce a tentative replication cursor AND * `FromVersion` is a tentative cursor bookmark * `FromVersion` is a snapshot, and there exists a tentative cursor bookmark for that snapshot * `FromVersion` is a bookmark != tentative cursor bookmark, but there exists a tentative cursor bookmark for the same snapshot as the `FromVersion` bookmark In those cases, the `check` concluded that we would delete `FromVersion`. It came to that conclusion because the tentative cursor isn't part of `obsoleteAbs` if the protection strategy doesn't produce a tentative replication cursor. The scenarios above can happen if the user changes the protection strategy from "with tentative cursor" to one "without tentative replication cursor", while there is a tentative replication cursor on disk. The workaround was to rename the tentative cursor. In all cases above, `TryBatchDestroy` would have destroyed the tentative cursor. In case 1, that would fail the `Send` step and potentially break replication if the cursor is the last common bookmark. The `check` conclusion was correct. In cases 2 and 3, deleting the tentative cursor would have been fine because `FromVersion` was a different entity than the tentative cursor. So, destroying the tentative cursor would be the right call. The solution in this PR is as follows: * add the `FromVersion` to the `liveAbs` set of live abstractions * rewrite the `check` closure to use the full dataset path (`fullpath`) to identify the concrete ZFS object instead of the `zfs.FilesystemVersionEqualIdentity`, which is only identified by matching GUID. * Holds have no dataset path and are not the `FromVersion` in any case, so disregard them. fixes #666
221 lines
7.7 KiB
Go
221 lines
7.7 KiB
Go
package endpoint
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/zrepl/zrepl/replication/logic/pdu"
|
|
"github.com/zrepl/zrepl/zfs"
|
|
)
|
|
|
|
type ReplicationGuaranteeOptions struct {
|
|
Initial ReplicationGuaranteeKind
|
|
Incremental ReplicationGuaranteeKind
|
|
}
|
|
|
|
func replicationGuaranteeOptionsFromPDU(in *pdu.ReplicationConfigProtection) (o ReplicationGuaranteeOptions, _ error) {
|
|
if in == nil {
|
|
return o, errors.New("pdu.ReplicationConfigProtection must not be nil")
|
|
}
|
|
initial, err := replicationGuaranteeKindFromPDU(in.GetInitial())
|
|
if err != nil {
|
|
return o, errors.Wrap(err, "pdu.ReplicationConfigProtection: field Initial")
|
|
}
|
|
incremental, err := replicationGuaranteeKindFromPDU(in.GetIncremental())
|
|
if err != nil {
|
|
return o, errors.Wrap(err, "pdu.ReplicationConfigProtection: field Incremental")
|
|
}
|
|
o = ReplicationGuaranteeOptions{
|
|
Initial: initial,
|
|
Incremental: incremental,
|
|
}
|
|
return o, nil
|
|
}
|
|
|
|
func replicationGuaranteeKindFromPDU(in pdu.ReplicationGuaranteeKind) (k ReplicationGuaranteeKind, _ error) {
|
|
switch in {
|
|
case pdu.ReplicationGuaranteeKind_GuaranteeNothing:
|
|
return ReplicationGuaranteeKindNone, nil
|
|
case pdu.ReplicationGuaranteeKind_GuaranteeIncrementalReplication:
|
|
return ReplicationGuaranteeKindIncremental, nil
|
|
case pdu.ReplicationGuaranteeKind_GuaranteeResumability:
|
|
return ReplicationGuaranteeKindResumability, nil
|
|
|
|
case pdu.ReplicationGuaranteeKind_GuaranteeInvalid:
|
|
fallthrough
|
|
default:
|
|
return k, errors.Errorf("%q", in.String())
|
|
}
|
|
}
|
|
|
|
func (o ReplicationGuaranteeOptions) Strategy(incremental bool) ReplicationGuaranteeStrategy {
|
|
g := o.Initial
|
|
if incremental {
|
|
g = o.Incremental
|
|
}
|
|
return ReplicationGuaranteeFromKind(g)
|
|
}
|
|
|
|
//go:generate enumer -type=ReplicationGuaranteeKind -json -transform=snake -trimprefix=ReplicationGuaranteeKind
|
|
type ReplicationGuaranteeKind int
|
|
|
|
const (
|
|
ReplicationGuaranteeKindResumability ReplicationGuaranteeKind = 1 << iota
|
|
ReplicationGuaranteeKindIncremental
|
|
ReplicationGuaranteeKindNone
|
|
)
|
|
|
|
type ReplicationGuaranteeStrategy interface {
|
|
Kind() ReplicationGuaranteeKind
|
|
SenderPreSend(ctx context.Context, jid JobID, sendArgs *zfs.ZFSSendArgsValidated) (keep []Abstraction, err error)
|
|
ReceiverPostRecv(ctx context.Context, jid JobID, fs string, toRecvd zfs.FilesystemVersion) (keep []Abstraction, err error)
|
|
SenderPostRecvConfirmed(ctx context.Context, jid JobID, fs string, to zfs.FilesystemVersion) (keep []Abstraction, err error)
|
|
}
|
|
|
|
func ReplicationGuaranteeFromKind(k ReplicationGuaranteeKind) ReplicationGuaranteeStrategy {
|
|
switch k {
|
|
case ReplicationGuaranteeKindNone:
|
|
return ReplicationGuaranteeNone{}
|
|
case ReplicationGuaranteeKindIncremental:
|
|
return ReplicationGuaranteeIncremental{}
|
|
case ReplicationGuaranteeKindResumability:
|
|
return ReplicationGuaranteeResumability{}
|
|
default:
|
|
panic(fmt.Sprintf("unreachable: %q %T", k, k))
|
|
}
|
|
}
|
|
|
|
type ReplicationGuaranteeNone struct{}
|
|
|
|
func (g ReplicationGuaranteeNone) String() string { return "none" }
|
|
|
|
func (g ReplicationGuaranteeNone) Kind() ReplicationGuaranteeKind {
|
|
return ReplicationGuaranteeKindNone
|
|
}
|
|
|
|
func (g ReplicationGuaranteeNone) SenderPreSend(ctx context.Context, jid JobID, sendArgs *zfs.ZFSSendArgsValidated) (keep []Abstraction, err error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (g ReplicationGuaranteeNone) ReceiverPostRecv(ctx context.Context, jid JobID, fs string, toRecvd zfs.FilesystemVersion) (keep []Abstraction, err error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (g ReplicationGuaranteeNone) SenderPostRecvConfirmed(ctx context.Context, jid JobID, fs string, to zfs.FilesystemVersion) (keep []Abstraction, err error) {
|
|
return nil, nil
|
|
}
|
|
|
|
type ReplicationGuaranteeIncremental struct{}
|
|
|
|
func (g ReplicationGuaranteeIncremental) String() string { return "incremental" }
|
|
|
|
func (g ReplicationGuaranteeIncremental) Kind() ReplicationGuaranteeKind {
|
|
return ReplicationGuaranteeKindIncremental
|
|
}
|
|
|
|
func (g ReplicationGuaranteeIncremental) SenderPreSend(ctx context.Context, jid JobID, sendArgs *zfs.ZFSSendArgsValidated) (keep []Abstraction, err error) {
|
|
if sendArgs.FromVersion != nil {
|
|
from, err := CreateTentativeReplicationCursor(ctx, sendArgs.FS, *sendArgs.FromVersion, jid)
|
|
if err != nil {
|
|
if err == zfs.ErrBookmarkCloningNotSupported {
|
|
getLogger(ctx).WithField("replication_guarantee", g).
|
|
WithField("bookmark", sendArgs.From.FullPath(sendArgs.FS)).
|
|
Info("bookmark cloning is not supported, speculating that `from` will not be destroyed until step is done")
|
|
} else {
|
|
return nil, err
|
|
}
|
|
}
|
|
keep = append(keep, from)
|
|
}
|
|
to, err := CreateTentativeReplicationCursor(ctx, sendArgs.FS, sendArgs.ToVersion, jid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
keep = append(keep, to)
|
|
|
|
return keep, nil
|
|
}
|
|
|
|
func (g ReplicationGuaranteeIncremental) ReceiverPostRecv(ctx context.Context, jid JobID, fs string, toRecvd zfs.FilesystemVersion) (keep []Abstraction, err error) {
|
|
return receiverPostRecvCommon(ctx, jid, fs, toRecvd)
|
|
}
|
|
|
|
func (g ReplicationGuaranteeIncremental) SenderPostRecvConfirmed(ctx context.Context, jid JobID, fs string, to zfs.FilesystemVersion) (keep []Abstraction, err error) {
|
|
return senderPostRecvConfirmedCommon(ctx, jid, fs, to)
|
|
}
|
|
|
|
type ReplicationGuaranteeResumability struct{}
|
|
|
|
func (g ReplicationGuaranteeResumability) String() string { return "resumability" }
|
|
|
|
func (g ReplicationGuaranteeResumability) Kind() ReplicationGuaranteeKind {
|
|
return ReplicationGuaranteeKindResumability
|
|
}
|
|
|
|
func (g ReplicationGuaranteeResumability) SenderPreSend(ctx context.Context, jid JobID, sendArgs *zfs.ZFSSendArgsValidated) (keep []Abstraction, err error) {
|
|
// try to hold the FromVersion
|
|
if sendArgs.FromVersion != nil {
|
|
if sendArgs.FromVersion.Type == zfs.Bookmark {
|
|
getLogger(ctx).WithField("replication_guarantee", g).WithField("fromVersion", sendArgs.FromVersion.FullPath(sendArgs.FS)).
|
|
Debug("cannot hold a bookmark, speculating that `from` will not be destroyed until step is done")
|
|
} else {
|
|
from, err := HoldStep(ctx, sendArgs.FS, *sendArgs.FromVersion, jid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
keep = append(keep, from)
|
|
}
|
|
// fallthrough
|
|
}
|
|
|
|
to, err := HoldStep(ctx, sendArgs.FS, sendArgs.ToVersion, jid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
keep = append(keep, to)
|
|
|
|
return keep, nil
|
|
}
|
|
|
|
func (g ReplicationGuaranteeResumability) ReceiverPostRecv(ctx context.Context, jid JobID, fs string, toRecvd zfs.FilesystemVersion) (keep []Abstraction, err error) {
|
|
return receiverPostRecvCommon(ctx, jid, fs, toRecvd)
|
|
}
|
|
|
|
func (g ReplicationGuaranteeResumability) SenderPostRecvConfirmed(ctx context.Context, jid JobID, fs string, to zfs.FilesystemVersion) (keep []Abstraction, err error) {
|
|
return senderPostRecvConfirmedCommon(ctx, jid, fs, to)
|
|
}
|
|
|
|
// helper function used by multiple strategies
|
|
func senderPostRecvConfirmedCommon(ctx context.Context, jid JobID, fs string, to zfs.FilesystemVersion) (keep []Abstraction, err error) {
|
|
|
|
log := getLogger(ctx).WithField("toVersion", to.FullPath(fs))
|
|
|
|
toReplicationCursor, err := CreateReplicationCursor(ctx, fs, to, jid)
|
|
if err != nil {
|
|
if err == zfs.ErrBookmarkCloningNotSupported {
|
|
log.Debug("not setting replication cursor, bookmark cloning not supported")
|
|
} else {
|
|
msg := "cannot move replication cursor, keeping hold on `to` until successful"
|
|
log.WithError(err).Error(msg)
|
|
err = errors.Wrap(err, msg)
|
|
return nil, err
|
|
}
|
|
} else {
|
|
log.WithField("to_cursor", toReplicationCursor.String()).Info("successfully created `to` replication cursor")
|
|
}
|
|
|
|
return []Abstraction{toReplicationCursor}, nil
|
|
}
|
|
|
|
// helper function used by multiple strategies
|
|
func receiverPostRecvCommon(ctx context.Context, jid JobID, fs string, toRecvd zfs.FilesystemVersion) (keep []Abstraction, err error) {
|
|
getLogger(ctx).Debug("create new last-received-hold")
|
|
lrh, err := CreateLastReceivedHold(ctx, fs, toRecvd, jid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return []Abstraction{lrh}, nil
|
|
}
|