zrepl/endpoint/endpoint_guarantees.go

215 lines
7.5 KiB
Go
Raw Permalink Normal View History

package endpoint
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/replication/logic/pdu"
"github.com/zrepl/zrepl/zfs"
)
type ReplicationGuaranteeOptions struct {
Initial ReplicationGuaranteeKind
Incremental ReplicationGuaranteeKind
}
func replicationGuaranteeOptionsFromPDU(in *pdu.ReplicationConfigProtection) (o ReplicationGuaranteeOptions, _ error) {
if in == nil {
return o, errors.New("pdu.ReplicationConfigProtection must not be nil")
}
initial, err := replicationGuaranteeKindFromPDU(in.GetInitial())
if err != nil {
return o, errors.Wrap(err, "pdu.ReplicationConfigProtection: field Initial")
}
incremental, err := replicationGuaranteeKindFromPDU(in.GetIncremental())
if err != nil {
return o, errors.Wrap(err, "pdu.ReplicationConfigProtection: field Incremental")
}
o = ReplicationGuaranteeOptions{
Initial: initial,
Incremental: incremental,
}
return o, nil
}
func replicationGuaranteeKindFromPDU(in pdu.ReplicationGuaranteeKind) (k ReplicationGuaranteeKind, _ error) {
switch in {
case pdu.ReplicationGuaranteeKind_GuaranteeNothing:
return ReplicationGuaranteeKindNone, nil
case pdu.ReplicationGuaranteeKind_GuaranteeIncrementalReplication:
return ReplicationGuaranteeKindIncremental, nil
case pdu.ReplicationGuaranteeKind_GuaranteeResumability:
return ReplicationGuaranteeKindResumability, nil
case pdu.ReplicationGuaranteeKind_GuaranteeInvalid:
fallthrough
default:
return k, errors.Errorf("%q", in.String())
}
}
func (o ReplicationGuaranteeOptions) Strategy(incremental bool) ReplicationGuaranteeStrategy {
g := o.Initial
if incremental {
g = o.Incremental
}
return ReplicationGuaranteeFromKind(g)
}
//go:generate enumer -type=ReplicationGuaranteeKind -json -transform=snake -trimprefix=ReplicationGuaranteeKind
type ReplicationGuaranteeKind int
const (
ReplicationGuaranteeKindResumability ReplicationGuaranteeKind = 1 << iota
ReplicationGuaranteeKindIncremental
ReplicationGuaranteeKindNone
)
type ReplicationGuaranteeStrategy interface {
Kind() ReplicationGuaranteeKind
SenderPreSend(ctx context.Context, jid JobID, sendArgs *zfs.ZFSSendArgsValidated) (keep []Abstraction, err error)
ReceiverPostRecv(ctx context.Context, jid JobID, fs string, toRecvd zfs.FilesystemVersion) (keep []Abstraction, err error)
SenderPostRecvConfirmed(ctx context.Context, jid JobID, fs string, to zfs.FilesystemVersion) (keep []Abstraction, err error)
}
func ReplicationGuaranteeFromKind(k ReplicationGuaranteeKind) ReplicationGuaranteeStrategy {
switch k {
case ReplicationGuaranteeKindNone:
return ReplicationGuaranteeNone{}
case ReplicationGuaranteeKindIncremental:
return ReplicationGuaranteeIncremental{}
case ReplicationGuaranteeKindResumability:
return ReplicationGuaranteeResumability{}
default:
panic(fmt.Sprintf("unreachable: %q %T", k, k))
}
}
type ReplicationGuaranteeNone struct{}
func (g ReplicationGuaranteeNone) Kind() ReplicationGuaranteeKind {
return ReplicationGuaranteeKindNone
}
func (g ReplicationGuaranteeNone) SenderPreSend(ctx context.Context, jid JobID, sendArgs *zfs.ZFSSendArgsValidated) (keep []Abstraction, err error) {
return nil, nil
}
func (g ReplicationGuaranteeNone) ReceiverPostRecv(ctx context.Context, jid JobID, fs string, toRecvd zfs.FilesystemVersion) (keep []Abstraction, err error) {
return nil, nil
}
func (g ReplicationGuaranteeNone) SenderPostRecvConfirmed(ctx context.Context, jid JobID, fs string, to zfs.FilesystemVersion) (keep []Abstraction, err error) {
return nil, nil
}
type ReplicationGuaranteeIncremental struct{}
func (g ReplicationGuaranteeIncremental) Kind() ReplicationGuaranteeKind {
return ReplicationGuaranteeKindIncremental
}
func (g ReplicationGuaranteeIncremental) SenderPreSend(ctx context.Context, jid JobID, sendArgs *zfs.ZFSSendArgsValidated) (keep []Abstraction, err error) {
if sendArgs.FromVersion != nil {
from, err := CreateTentativeReplicationCursor(ctx, sendArgs.FS, *sendArgs.FromVersion, jid)
if err != nil {
if err == zfs.ErrBookmarkCloningNotSupported {
getLogger(ctx).WithField("replication_guarantee", g).
WithField("bookmark", sendArgs.From.FullPath(sendArgs.FS)).
Info("bookmark cloning is not supported, speculating that `from` will not be destroyed until step is done")
} else {
return nil, err
}
}
keep = append(keep, from)
}
to, err := CreateTentativeReplicationCursor(ctx, sendArgs.FS, sendArgs.ToVersion, jid)
if err != nil {
return nil, err
}
keep = append(keep, to)
return keep, nil
}
func (g ReplicationGuaranteeIncremental) ReceiverPostRecv(ctx context.Context, jid JobID, fs string, toRecvd zfs.FilesystemVersion) (keep []Abstraction, err error) {
return receiverPostRecvCommon(ctx, jid, fs, toRecvd)
}
func (g ReplicationGuaranteeIncremental) SenderPostRecvConfirmed(ctx context.Context, jid JobID, fs string, to zfs.FilesystemVersion) (keep []Abstraction, err error) {
return senderPostRecvConfirmedCommon(ctx, jid, fs, to)
}
type ReplicationGuaranteeResumability struct{}
func (g ReplicationGuaranteeResumability) Kind() ReplicationGuaranteeKind {
return ReplicationGuaranteeKindResumability
}
func (g ReplicationGuaranteeResumability) SenderPreSend(ctx context.Context, jid JobID, sendArgs *zfs.ZFSSendArgsValidated) (keep []Abstraction, err error) {
// try to hold the FromVersion
if sendArgs.FromVersion != nil {
if sendArgs.FromVersion.Type == zfs.Bookmark {
getLogger(ctx).WithField("replication_guarantee", g).WithField("fromVersion", sendArgs.FromVersion.FullPath(sendArgs.FS)).
Debug("cannot hold a bookmark, speculating that `from` will not be destroyed until step is done")
} else {
from, err := HoldStep(ctx, sendArgs.FS, *sendArgs.FromVersion, jid)
if err != nil {
return nil, err
}
keep = append(keep, from)
}
// fallthrough
}
to, err := HoldStep(ctx, sendArgs.FS, sendArgs.ToVersion, jid)
if err != nil {
return nil, err
}
keep = append(keep, to)
return keep, nil
}
func (g ReplicationGuaranteeResumability) ReceiverPostRecv(ctx context.Context, jid JobID, fs string, toRecvd zfs.FilesystemVersion) (keep []Abstraction, err error) {
return receiverPostRecvCommon(ctx, jid, fs, toRecvd)
}
func (g ReplicationGuaranteeResumability) SenderPostRecvConfirmed(ctx context.Context, jid JobID, fs string, to zfs.FilesystemVersion) (keep []Abstraction, err error) {
return senderPostRecvConfirmedCommon(ctx, jid, fs, to)
}
// helper function used by multiple strategies
func senderPostRecvConfirmedCommon(ctx context.Context, jid JobID, fs string, to zfs.FilesystemVersion) (keep []Abstraction, err error) {
log := getLogger(ctx).WithField("toVersion", to.FullPath(fs))
toReplicationCursor, err := CreateReplicationCursor(ctx, fs, to, jid)
if err != nil {
if err == zfs.ErrBookmarkCloningNotSupported {
log.Debug("not setting replication cursor, bookmark cloning not supported")
} else {
msg := "cannot move replication cursor, keeping hold on `to` until successful"
log.WithError(err).Error(msg)
err = errors.Wrap(err, msg)
return nil, err
}
} else {
log.WithField("to_cursor", toReplicationCursor.String()).Info("successfully created `to` replication cursor")
}
return []Abstraction{toReplicationCursor}, nil
}
// helper function used by multiple strategies
func receiverPostRecvCommon(ctx context.Context, jid JobID, fs string, toRecvd zfs.FilesystemVersion) (keep []Abstraction, err error) {
getLogger(ctx).Debug("create new last-received-hold")
lrh, err := CreateLastReceivedHold(ctx, fs, toRecvd, jid)
if err != nil {
return nil, err
}
return []Abstraction{lrh}, nil
}