mirror of
https://github.com/zrepl/zrepl.git
synced 2025-08-16 18:01:02 +02:00
new features: {resumable,encrypted,hold-protected} send-recv, last-received-hold
- **Resumable Send & Recv Support** No knobs required, automatically used where supported. - **Hold-Protected Send & Recv** Automatic ZFS holds to ensure that we can always resume a replication step. - **Encrypted Send & Recv Support** for OpenZFS native encryption. Configurable at the job level, i.e., for all filesystems a job is responsible for. - **Receive-side hold on last received dataset** The counterpart to the replication cursor bookmark on the send-side. Ensures that incremental replication will always be possible between a sender and receiver. Design Doc ---------- `replication/design.md` doc describes how we use ZFS holds and bookmarks to ensure that a single replication step is always resumable. The replication algorithm described in the design doc introduces the notion of job IDs (please read the details on this design doc). We reuse the job names for job IDs and use `JobID` type to ensure that a job name can be embedded into hold tags, bookmark names, etc. This might BREAK CONFIG on upgrade. Protocol Version Bump --------------------- This commit makes backwards-incompatible changes to the replication/pdu protobufs. Thus, bump the version number used in the protocol handshake. Replication Cursor Format Change -------------------------------- The new replication cursor bookmark format is: `#zrepl_CURSOR_G_${this.GUID}_J_${jobid}` Including the GUID enables transaction-safe moving-forward of the cursor. Including the job id enables that multiple sending jobs can send the same filesystem without interfering. The `zrepl migrate replication-cursor:v1-v2` subcommand can be used to safely destroy old-format cursors once zrepl has created new-format cursors. Changes in This Commit ---------------------- - package zfs - infrastructure for holds - infrastructure for resume token decoding - implement a variant of OpenZFS's `entity_namecheck` and use it for validation in new code - ZFSSendArgs to specify a ZFS send operation - validation code protects against malicious resume tokens by checking that the token encodes the same send parameters that the send-side would use if no resume token were available (i.e. same filesystem, `fromguid`, `toguid`) - RecvOptions support for `recv -s` flag - convert a bunch of ZFS operations to be idempotent - achieved through more differentiated error message scraping / additional pre-/post-checks - package replication/pdu - add field for encryption to send request messages - add fields for resume handling to send & recv request messages - receive requests now contain `FilesystemVersion To` in addition to the filesystem into which the stream should be `recv`d into - can use `zfs recv $root_fs/$client_id/path/to/dataset@${To.Name}`, which enables additional validation after recv (i.e. whether `To.Guid` matched what we received in the stream) - used to set `last-received-hold` - package replication/logic - introduce `PlannerPolicy` struct, currently only used to configure whether encrypted sends should be requested from the sender - integrate encryption and resume token support into `Step` struct - package endpoint - move the concepts that endpoint builds on top of ZFS to a single file `endpoint/endpoint_zfs.go` - step-holds + step-bookmarks - last-received-hold - new replication cursor + old replication cursor compat code - adjust `endpoint/endpoint.go` handlers for - encryption - resumability - new replication cursor - last-received-hold - client subcommand `zrepl holds list`: list all holds and hold-like bookmarks that zrepl thinks belong to it - client subcommand `zrepl migrate replication-cursor:v1-v2`
This commit is contained in:
@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path"
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
@ -15,13 +16,39 @@ import (
|
||||
"github.com/zrepl/zrepl/zfs"
|
||||
)
|
||||
|
||||
type SenderConfig struct {
|
||||
FSF zfs.DatasetFilter
|
||||
Encrypt *zfs.NilBool
|
||||
JobID JobID
|
||||
}
|
||||
|
||||
func (c *SenderConfig) Validate() error {
|
||||
c.JobID.MustValidate()
|
||||
if err := c.Encrypt.Validate(); err != nil {
|
||||
return errors.Wrap(err, "`Encrypt` field invalid")
|
||||
}
|
||||
if _, err := StepHoldTag(c.JobID); err != nil {
|
||||
return fmt.Errorf("JobID cannot be used for hold tag: %s", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sender implements replication.ReplicationEndpoint for a sending side
|
||||
type Sender struct {
|
||||
FSFilter zfs.DatasetFilter
|
||||
encrypt *zfs.NilBool
|
||||
jobId JobID
|
||||
}
|
||||
|
||||
func NewSender(fsf zfs.DatasetFilter) *Sender {
|
||||
return &Sender{FSFilter: fsf}
|
||||
func NewSender(conf SenderConfig) *Sender {
|
||||
if err := conf.Validate(); err != nil {
|
||||
panic("invalid config" + err.Error())
|
||||
}
|
||||
return &Sender{
|
||||
FSFilter: conf.FSF,
|
||||
encrypt: conf.Encrypt,
|
||||
jobId: conf.JobID,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Sender) filterCheckFS(fs string) (*zfs.DatasetPath, error) {
|
||||
@ -49,10 +76,15 @@ func (s *Sender) ListFilesystems(ctx context.Context, r *pdu.ListFilesystemReq)
|
||||
}
|
||||
rfss := make([]*pdu.Filesystem, len(fss))
|
||||
for i := range fss {
|
||||
encEnabled, err := zfs.ZFSGetEncryptionEnabled(ctx, fss[i].ToString())
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot get filesystem encryption status")
|
||||
}
|
||||
rfss[i] = &pdu.Filesystem{
|
||||
Path: fss[i].ToString(),
|
||||
// FIXME: not supporting ResumeToken yet
|
||||
// ResumeToken does not make sense from Sender
|
||||
IsPlaceholder: false, // sender FSs are never placeholders
|
||||
IsEncrypted: encEnabled,
|
||||
}
|
||||
}
|
||||
res := &pdu.ListFilesystemRes{Filesystems: rfss}
|
||||
@ -77,13 +109,86 @@ func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesyst
|
||||
|
||||
}
|
||||
|
||||
func (p *Sender) HintMostRecentCommonAncestor(ctx context.Context, r *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error) {
|
||||
var err error
|
||||
|
||||
fs := r.GetFilesystem()
|
||||
mostRecent, err := sendArgsFromPDUAndValidateExists(ctx, fs, r.GetSenderVersion())
|
||||
if err != nil {
|
||||
msg := "HintMostRecentCommonAncestor rpc with nonexistent most recent version"
|
||||
getLogger(ctx).WithField("fs", fs).WithField("hinted_most_recent", fmt.Sprintf("%#v", mostRecent)).
|
||||
Warn(msg)
|
||||
return nil, errors.Wrap(err, msg)
|
||||
}
|
||||
|
||||
// move replication cursor to this position
|
||||
_, err = MoveReplicationCursor(ctx, fs, mostRecent, p.jobId)
|
||||
if err == zfs.ErrBookmarkCloningNotSupported {
|
||||
getLogger(ctx).Debug("not creating replication cursor from bookmark because ZFS does not support it")
|
||||
// fallthrough
|
||||
} else if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot set replication cursor to hinted version")
|
||||
}
|
||||
|
||||
// cleanup previous steps
|
||||
if err := ReleaseStepAll(ctx, fs, mostRecent, p.jobId); err != nil {
|
||||
return nil, errors.Wrap(err, "cannot cleanup prior invocation's step holds and bookmarks")
|
||||
}
|
||||
|
||||
return &pdu.HintMostRecentCommonAncestorRes{}, nil
|
||||
}
|
||||
|
||||
var maxConcurrentZFSSendSemaphore = semaphore.New(envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_SEND", 10))
|
||||
|
||||
func uncheckedSendArgsFromPDU(fsv *pdu.FilesystemVersion) *zfs.ZFSSendArgVersion {
|
||||
if fsv == nil {
|
||||
return nil
|
||||
}
|
||||
return &zfs.ZFSSendArgVersion{RelName: fsv.GetRelName(), GUID: fsv.Guid}
|
||||
}
|
||||
|
||||
func sendArgsFromPDUAndValidateExists(ctx context.Context, fs string, fsv *pdu.FilesystemVersion) (*zfs.ZFSSendArgVersion, error) {
|
||||
v := uncheckedSendArgsFromPDU(fsv)
|
||||
if v == nil {
|
||||
return nil, errors.New("must not be nil")
|
||||
}
|
||||
if err := v.ValidateExists(ctx, fs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v, nil
|
||||
}
|
||||
|
||||
func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.StreamCopier, error) {
|
||||
|
||||
_, err := s.filterCheckFS(r.Filesystem)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
switch r.Encrypted {
|
||||
case pdu.Tri_DontCare:
|
||||
// use s.encrypt setting
|
||||
// ok, fallthrough outer
|
||||
case pdu.Tri_False:
|
||||
if s.encrypt.B {
|
||||
return nil, nil, errors.New("only encrytped sends allowed (send -w + encryption!= off), but unencrytped send requested")
|
||||
}
|
||||
// fallthrough outer
|
||||
case pdu.Tri_True:
|
||||
if !s.encrypt.B {
|
||||
return nil, nil, errors.New("only unencrypted sends allowed, but encrypted send requested")
|
||||
}
|
||||
// fallthrough outer
|
||||
default:
|
||||
return nil, nil, fmt.Errorf("unknown pdu.Tri variant %q", r.Encrypted)
|
||||
}
|
||||
|
||||
sendArgs := zfs.ZFSSendArgs{
|
||||
FS: r.Filesystem,
|
||||
From: uncheckedSendArgsFromPDU(r.GetFrom()), // validated by zfs.ZFSSendDry / zfs.ZFSSend
|
||||
To: uncheckedSendArgsFromPDU(r.GetTo()), // validated by zfs.ZFSSendDry / zfs.ZFSSend
|
||||
Encrypted: s.encrypt,
|
||||
ResumeToken: r.ResumeToken, // nil or not nil, depending on decoding success
|
||||
}
|
||||
|
||||
getLogger(ctx).Debug("acquire concurrent send semaphore")
|
||||
// TODO use try-acquire and fail with resource-exhaustion rpc status
|
||||
@ -95,28 +200,154 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.St
|
||||
}
|
||||
defer guard.Release()
|
||||
|
||||
si, err := zfs.ZFSSendDry(r.Filesystem, r.From, r.To, "")
|
||||
si, err := zfs.ZFSSendDry(ctx, sendArgs)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, nil, errors.Wrap(err, "zfs send dry failed")
|
||||
}
|
||||
|
||||
// From now on, assume that sendArgs has been validated by ZFSSendDry
|
||||
// (because validation invovles shelling out, it's actually a little expensive)
|
||||
|
||||
var expSize int64 = 0 // protocol says 0 means no estimate
|
||||
if si.SizeEstimate != -1 { // but si returns -1 for no size estimate
|
||||
expSize = si.SizeEstimate
|
||||
}
|
||||
res := &pdu.SendRes{ExpectedSize: expSize}
|
||||
res := &pdu.SendRes{
|
||||
ExpectedSize: expSize,
|
||||
UsedResumeToken: r.ResumeToken != "",
|
||||
}
|
||||
|
||||
if r.DryRun {
|
||||
return res, nil, nil
|
||||
}
|
||||
|
||||
streamCopier, err := zfs.ZFSSend(ctx, r.Filesystem, r.From, r.To, "")
|
||||
// update replication cursor
|
||||
if sendArgs.From != nil {
|
||||
// For all but the first replication, this should always be a no-op because SendCompleted already moved the cursor
|
||||
_, err = MoveReplicationCursor(ctx, sendArgs.FS, sendArgs.From, s.jobId)
|
||||
if err == zfs.ErrBookmarkCloningNotSupported {
|
||||
getLogger(ctx).Debug("not creating replication cursor from bookmark because ZFS does not support it")
|
||||
// fallthrough
|
||||
} else if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "cannot set replication cursor to `from` version before starting send")
|
||||
}
|
||||
}
|
||||
|
||||
// make sure `From` doesn't go away in order to make this step resumable
|
||||
if sendArgs.From != nil {
|
||||
err := HoldStep(ctx, sendArgs.FS, sendArgs.From, s.jobId)
|
||||
if err == zfs.ErrBookmarkCloningNotSupported {
|
||||
getLogger(ctx).Debug("not creating step bookmark because ZFS does not support it")
|
||||
// fallthrough
|
||||
} else if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "cannot create step bookmark")
|
||||
}
|
||||
}
|
||||
// make sure `To` doesn't go away in order to make this step resumable
|
||||
err = HoldStep(ctx, sendArgs.FS, sendArgs.To, s.jobId)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, nil, errors.Wrapf(err, "cannot hold `to` version %q before starting send", sendArgs.To.RelName)
|
||||
}
|
||||
|
||||
// step holds & replication cursor released / moved forward in s.SendCompleted => s.moveCursorAndReleaseSendHolds
|
||||
|
||||
streamCopier, err := zfs.ZFSSend(ctx, sendArgs)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "zfs send failed")
|
||||
}
|
||||
return res, streamCopier, nil
|
||||
}
|
||||
|
||||
func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
|
||||
orig := r.GetOriginalReq() // may be nil, always use proto getters
|
||||
fs := orig.GetFilesystem()
|
||||
|
||||
var err error
|
||||
var from *zfs.ZFSSendArgVersion
|
||||
if orig.GetFrom() != nil {
|
||||
from, err = sendArgsFromPDUAndValidateExists(ctx, fs, orig.GetFrom()) // no shadow
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "validate `from` exists")
|
||||
}
|
||||
}
|
||||
to, err := sendArgsFromPDUAndValidateExists(ctx, fs, orig.GetTo())
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "validate `to` exists")
|
||||
}
|
||||
|
||||
log := getLogger(ctx).WithField("to_guid", to.GUID).
|
||||
WithField("fs", fs).
|
||||
WithField("to", to.RelName)
|
||||
if from != nil {
|
||||
log = log.WithField("from", from.RelName).WithField("from_guid", from.GUID)
|
||||
}
|
||||
|
||||
log.Debug("move replication cursor to most recent common version")
|
||||
destroyedCursors, err := MoveReplicationCursor(ctx, fs, to, p.jobId)
|
||||
if err != nil {
|
||||
if err == zfs.ErrBookmarkCloningNotSupported {
|
||||
log.Debug("not setting replication cursor, bookmark cloning not supported")
|
||||
} else {
|
||||
msg := "cannot move replication cursor, keeping hold on `to` until successful"
|
||||
log.WithError(err).Error(msg)
|
||||
err = errors.Wrap(err, msg)
|
||||
// it is correct to not release the hold if we can't move the cursor!
|
||||
return &pdu.SendCompletedRes{}, err
|
||||
}
|
||||
} else {
|
||||
log.Info("successfully moved replication cursor")
|
||||
}
|
||||
|
||||
// kick off releasing of step holds / bookmarks
|
||||
// if we fail to release them, don't bother the caller:
|
||||
// they are merely an implementation detail on the sender for better resumability
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
log.Debug("release step-hold of or step-bookmark on `to`")
|
||||
err = ReleaseStep(ctx, fs, to, p.jobId)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("cannot release step-holds on or destroy step-bookmark of `to`")
|
||||
} else {
|
||||
log.Info("successfully released step-holds on or destroyed step-bookmark of `to`")
|
||||
}
|
||||
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if from == nil {
|
||||
return
|
||||
}
|
||||
log.Debug("release step-hold of or step-bookmark on `from`")
|
||||
err := ReleaseStep(ctx, fs, from, p.jobId)
|
||||
if err != nil {
|
||||
if dne, ok := err.(*zfs.DatasetDoesNotExist); ok {
|
||||
// If bookmark cloning is not supported, `from` might be the old replication cursor
|
||||
// and thus have already been destroyed by MoveReplicationCursor above
|
||||
// In that case, nonexistence of `from` is not an error, otherwise it is.
|
||||
fsp, err := zfs.NewDatasetPath(fs)
|
||||
if err != nil {
|
||||
panic(err) // fs has been validated multiple times above
|
||||
}
|
||||
for _, fsv := range destroyedCursors {
|
||||
if fsv.ToAbsPath(fsp) == dne.Path {
|
||||
log.Info("`from` was a replication cursor and has already been destroyed")
|
||||
return
|
||||
}
|
||||
}
|
||||
// fallthrough
|
||||
}
|
||||
log.WithError(err).Error("cannot release step-holds on or destroy step-bookmark of `from`")
|
||||
} else {
|
||||
log.Info("successfully released step-holds on or destroyed step-bookmark of `from`")
|
||||
}
|
||||
}()
|
||||
wg.Wait()
|
||||
|
||||
return &pdu.SendCompletedRes{}, nil
|
||||
}
|
||||
|
||||
func (p *Sender) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
|
||||
dp, err := p.filterCheckFS(req.Filesystem)
|
||||
if err != nil {
|
||||
@ -146,25 +377,14 @@ func (p *Sender) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCurs
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch op := req.Op.(type) {
|
||||
case *pdu.ReplicationCursorReq_Get:
|
||||
cursor, err := zfs.ZFSGetReplicationCursor(dp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cursor == nil {
|
||||
return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Notexist{Notexist: true}}, nil
|
||||
}
|
||||
return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Guid{Guid: cursor.Guid}}, nil
|
||||
case *pdu.ReplicationCursorReq_Set:
|
||||
guid, err := zfs.ZFSSetReplicationCursor(dp, op.Set.Snapshot)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Guid{Guid: guid}}, nil
|
||||
default:
|
||||
return nil, errors.Errorf("unknown op %T", op)
|
||||
cursor, err := GetMostRecentReplicationCursorOfJob(ctx, dp.ToString(), p.jobId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cursor == nil {
|
||||
return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Notexist{Notexist: true}}, nil
|
||||
}
|
||||
return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Guid{Guid: cursor.Guid}}, nil
|
||||
}
|
||||
|
||||
func (p *Sender) Receive(ctx context.Context, r *pdu.ReceiveReq, receive zfs.StreamCopier) (*pdu.ReceiveRes, error) {
|
||||
@ -183,22 +403,42 @@ type FSMap interface { // FIXME unused
|
||||
AsFilter() FSFilter
|
||||
}
|
||||
|
||||
type ReceiverConfig struct {
|
||||
JobID JobID
|
||||
|
||||
RootWithoutClientComponent *zfs.DatasetPath // TODO use
|
||||
AppendClientIdentity bool
|
||||
|
||||
UpdateLastReceivedHold bool
|
||||
}
|
||||
|
||||
func (c *ReceiverConfig) copyIn() {
|
||||
c.RootWithoutClientComponent = c.RootWithoutClientComponent.Copy()
|
||||
}
|
||||
|
||||
func (c *ReceiverConfig) Validate() error {
|
||||
c.JobID.MustValidate()
|
||||
if c.RootWithoutClientComponent.Length() <= 0 {
|
||||
return errors.New("RootWithoutClientComponent must not be an empty dataset path")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Receiver implements replication.ReplicationEndpoint for a receiving side
|
||||
type Receiver struct {
|
||||
rootWithoutClientComponent *zfs.DatasetPath
|
||||
appendClientIdentity bool
|
||||
conf ReceiverConfig // validated
|
||||
|
||||
recvParentCreationMtx *chainlock.L
|
||||
}
|
||||
|
||||
func NewReceiver(rootDataset *zfs.DatasetPath, appendClientIdentity bool) *Receiver {
|
||||
if rootDataset.Length() <= 0 {
|
||||
panic(fmt.Sprintf("root dataset must not be an empty path: %v", rootDataset))
|
||||
func NewReceiver(config ReceiverConfig) *Receiver {
|
||||
config.copyIn()
|
||||
if err := config.Validate(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return &Receiver{
|
||||
rootWithoutClientComponent: rootDataset.Copy(),
|
||||
appendClientIdentity: appendClientIdentity,
|
||||
recvParentCreationMtx: chainlock.New(),
|
||||
conf: config,
|
||||
recvParentCreationMtx: chainlock.New(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -221,8 +461,8 @@ func clientRoot(rootFS *zfs.DatasetPath, clientIdentity string) (*zfs.DatasetPat
|
||||
}
|
||||
|
||||
func (s *Receiver) clientRootFromCtx(ctx context.Context) *zfs.DatasetPath {
|
||||
if !s.appendClientIdentity {
|
||||
return s.rootWithoutClientComponent.Copy()
|
||||
if !s.conf.AppendClientIdentity {
|
||||
return s.conf.RootWithoutClientComponent.Copy()
|
||||
}
|
||||
|
||||
clientIdentity, ok := ctx.Value(ClientIdentityKey).(string)
|
||||
@ -230,7 +470,7 @@ func (s *Receiver) clientRootFromCtx(ctx context.Context) *zfs.DatasetPath {
|
||||
panic(fmt.Sprintf("ClientIdentityKey context value must be set"))
|
||||
}
|
||||
|
||||
clientRoot, err := clientRoot(s.rootWithoutClientComponent, clientIdentity)
|
||||
clientRoot, err := clientRoot(s.conf.RootWithoutClientComponent, clientIdentity)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("ClientIdentityContextKey must have been validated before invoking Receiver: %s", err))
|
||||
}
|
||||
@ -282,8 +522,27 @@ func (s *Receiver) ListFilesystems(ctx context.Context, req *pdu.ListFilesystemR
|
||||
err := errors.Errorf("inconsistent placeholder state: filesystem %q must exist in this context", a.ToString())
|
||||
return nil, err
|
||||
}
|
||||
token, err := zfs.ZFSGetReceiveResumeTokenOrEmptyStringIfNotSupported(ctx, a)
|
||||
if err != nil {
|
||||
l.WithError(err).Error("cannot get receive resume token")
|
||||
return nil, err
|
||||
}
|
||||
encEnabled, err := zfs.ZFSGetEncryptionEnabled(ctx, a.ToString())
|
||||
if err != nil {
|
||||
l.WithError(err).Error("cannot get encryption enabled status")
|
||||
return nil, err
|
||||
}
|
||||
l.WithField("receive_resume_token", token).Debug("receive resume token")
|
||||
|
||||
a.TrimPrefix(root)
|
||||
fss = append(fss, &pdu.Filesystem{Path: a.ToString(), IsPlaceholder: ph.IsPlaceholder})
|
||||
|
||||
fs := &pdu.Filesystem{
|
||||
Path: a.ToString(),
|
||||
IsPlaceholder: ph.IsPlaceholder,
|
||||
ResumeToken: token,
|
||||
IsEncrypted: encEnabled,
|
||||
}
|
||||
fss = append(fss, fs)
|
||||
}
|
||||
if len(fss) == 0 {
|
||||
getLogger(ctx).Debug("no filesystems found")
|
||||
@ -344,7 +603,18 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs
|
||||
root := s.clientRootFromCtx(ctx)
|
||||
lp, err := subroot{root}.MapToLocal(req.Filesystem)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.Wrap(err, "`Filesystem` invalid")
|
||||
}
|
||||
|
||||
to := uncheckedSendArgsFromPDU(req.GetTo())
|
||||
if to == nil {
|
||||
return nil, errors.New("`To` must not be nil")
|
||||
}
|
||||
if err := to.ValidateInMemory(lp.ToString()); err != nil {
|
||||
return nil, errors.Wrap(err, "`To` invalid")
|
||||
}
|
||||
if !to.IsSnapshot() {
|
||||
return nil, errors.New("`To` must be a snapshot")
|
||||
}
|
||||
|
||||
// create placeholder parent filesystems as appropriate
|
||||
@ -380,11 +650,11 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs
|
||||
}
|
||||
|
||||
if !ph.FSExists {
|
||||
if s.rootWithoutClientComponent.HasPrefix(v.Path) {
|
||||
if s.conf.RootWithoutClientComponent.HasPrefix(v.Path) {
|
||||
if v.Path.Length() == 1 {
|
||||
visitErr = fmt.Errorf("pool %q not imported", v.Path.ToString())
|
||||
} else {
|
||||
visitErr = fmt.Errorf("root_fs %q does not exist", s.rootWithoutClientComponent.ToString())
|
||||
visitErr = fmt.Errorf("root_fs %q does not exist", s.conf.RootWithoutClientComponent.ToString())
|
||||
}
|
||||
getLogger(ctx).WithError(visitErr).Error("placeholders are only created automatically below root_fs")
|
||||
return false
|
||||
@ -422,6 +692,17 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs
|
||||
}
|
||||
}
|
||||
|
||||
if req.ClearResumeToken && ph.FSExists {
|
||||
if err := zfs.ZFSRecvClearResumeToken(lp.ToString()); err != nil {
|
||||
return nil, errors.Wrap(err, "cannot clear resume token")
|
||||
}
|
||||
}
|
||||
|
||||
recvOpts.SavePartialRecvState, err = zfs.ResumeRecvSupported(ctx, lp)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot determine whether we can use resumable send & recv")
|
||||
}
|
||||
|
||||
getLogger(ctx).Debug("acquire concurrent recv semaphore")
|
||||
// TODO use try-acquire and fail with resource-exhaustion rpc status
|
||||
// => would require handling on the client-side
|
||||
@ -434,13 +715,30 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs
|
||||
|
||||
getLogger(ctx).WithField("opts", fmt.Sprintf("%#v", recvOpts)).Debug("start receive command")
|
||||
|
||||
if err := zfs.ZFSRecv(ctx, lp.ToString(), receive, recvOpts); err != nil {
|
||||
snapFullPath := to.FullPath(lp.ToString())
|
||||
if err := zfs.ZFSRecv(ctx, lp.ToString(), to, receive, recvOpts); err != nil {
|
||||
getLogger(ctx).
|
||||
WithError(err).
|
||||
WithField("opts", recvOpts).
|
||||
Error("zfs receive failed")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// validate that we actually received what the sender claimed
|
||||
if err := to.ValidateExists(ctx, lp.ToString()); err != nil {
|
||||
msg := "receive request's `To` version does not match what we received in the stream"
|
||||
getLogger(ctx).WithError(err).WithField("snap", snapFullPath).Error(msg)
|
||||
getLogger(ctx).Error("aborting recv request, but keeping received snapshot for inspection")
|
||||
return nil, errors.Wrap(err, msg)
|
||||
}
|
||||
|
||||
if s.conf.UpdateLastReceivedHold {
|
||||
getLogger(ctx).Debug("move last-received-hold")
|
||||
if err := MoveLastReceivedHold(ctx, lp.ToString(), *to, s.conf.JobID); err != nil {
|
||||
return nil, errors.Wrap(err, "cannot move last-received-hold")
|
||||
}
|
||||
}
|
||||
|
||||
return &pdu.ReceiveRes{}, nil
|
||||
}
|
||||
|
||||
@ -453,6 +751,19 @@ func (s *Receiver) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapsho
|
||||
return doDestroySnapshots(ctx, lp, req.Snapshots)
|
||||
}
|
||||
|
||||
func (p *Receiver) HintMostRecentCommonAncestor(ctx context.Context, r *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error) {
|
||||
// we don't move last-received-hold as part of this hint
|
||||
// because that wouldn't give us any benefit wrt resumability.
|
||||
//
|
||||
// Other reason: the replication logic that issues this RPC would require refactoring
|
||||
// to include the receiver's FilesystemVersion in the request)
|
||||
return &pdu.HintMostRecentCommonAncestorRes{}, nil
|
||||
}
|
||||
|
||||
func (p *Receiver) SendCompleted(context.Context, *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
|
||||
return &pdu.SendCompletedRes{}, nil
|
||||
}
|
||||
|
||||
func doDestroySnapshots(ctx context.Context, lp *zfs.DatasetPath, snaps []*pdu.FilesystemVersion) (*pdu.DestroySnapshotsRes, error) {
|
||||
reqs := make([]*zfs.DestroySnapOp, len(snaps))
|
||||
ress := make([]*pdu.DestroySnapshotRes, len(snaps))
|
||||
|
Reference in New Issue
Block a user