zrepl/endpoint/endpoint.go
Christian Schwarz 58c08c855f new features: {resumable,encrypted,hold-protected} send-recv, last-received-hold
- **Resumable Send & Recv Support**
  No knobs required, automatically used where supported.
- **Hold-Protected Send & Recv**
  Automatic ZFS holds to ensure that we can always resume a replication step.
- **Encrypted Send & Recv Support** for OpenZFS native encryption.
  Configurable at the job level, i.e., for all filesystems a job is responsible for.
- **Receive-side hold on last received dataset**
  The counterpart to the replication cursor bookmark on the send-side.
  Ensures that incremental replication will always be possible between a sender and receiver.

Design Doc
----------

`replication/design.md` doc describes how we use ZFS holds and bookmarks to ensure that a single replication step is always resumable.

The replication algorithm described in the design doc introduces the notion of job IDs (please read the details on this design doc).
We reuse the job names for job IDs and use `JobID` type to ensure that a job name can be embedded into hold tags, bookmark names, etc.
This might BREAK CONFIG on upgrade.

Protocol Version Bump
---------------------

This commit makes backwards-incompatible changes to the replication/pdu protobufs.
Thus, bump the version number used in the protocol handshake.

Replication Cursor Format Change
--------------------------------

The new replication cursor bookmark format is: `#zrepl_CURSOR_G_${this.GUID}_J_${jobid}`
Including the GUID enables transaction-safe moving-forward of the cursor.
Including the job id enables that multiple sending jobs can send the same filesystem without interfering.
The `zrepl migrate replication-cursor:v1-v2` subcommand can be used to safely destroy old-format cursors once zrepl has created new-format cursors.

Changes in This Commit
----------------------

- package zfs
  - infrastructure for holds
  - infrastructure for resume token decoding
  - implement a variant of OpenZFS's `entity_namecheck` and use it for validation in new code
  - ZFSSendArgs to specify a ZFS send operation
    - validation code protects against malicious resume tokens by checking that the token encodes the same send parameters that the send-side would use if no resume token were available (i.e. same filesystem, `fromguid`, `toguid`)
  - RecvOptions support for `recv -s` flag
  - convert a bunch of ZFS operations to be idempotent
    - achieved through more differentiated error message scraping / additional pre-/post-checks

- package replication/pdu
  - add field for encryption to send request messages
  - add fields for resume handling to send & recv request messages
  - receive requests now contain `FilesystemVersion To` in addition to the filesystem into which the stream should be `recv`d into
    - can use `zfs recv $root_fs/$client_id/path/to/dataset@${To.Name}`, which enables additional validation after recv (i.e. whether `To.Guid` matched what we received in the stream)
    - used to set `last-received-hold`
- package replication/logic
  - introduce `PlannerPolicy` struct, currently only used to configure whether encrypted sends should be requested from the sender
  - integrate encryption and resume token support into `Step` struct

- package endpoint
  - move the concepts that endpoint builds on top of ZFS to a single file `endpoint/endpoint_zfs.go`
    - step-holds + step-bookmarks
    - last-received-hold
    - new replication cursor + old replication cursor compat code
  - adjust `endpoint/endpoint.go` handlers for
    - encryption
    - resumability
    - new replication cursor
    - last-received-hold

- client subcommand `zrepl holds list`: list all holds and hold-like bookmarks that zrepl thinks belong to it
- client subcommand `zrepl migrate replication-cursor:v1-v2`
2020-02-14 22:00:13 +01:00

799 lines
25 KiB
Go

// Package endpoint implements replication endpoints for use with package replication.
package endpoint
import (
"context"
"fmt"
"path"
"sync"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/replication/logic/pdu"
"github.com/zrepl/zrepl/util/chainlock"
"github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/util/semaphore"
"github.com/zrepl/zrepl/zfs"
)
type SenderConfig struct {
FSF zfs.DatasetFilter
Encrypt *zfs.NilBool
JobID JobID
}
func (c *SenderConfig) Validate() error {
c.JobID.MustValidate()
if err := c.Encrypt.Validate(); err != nil {
return errors.Wrap(err, "`Encrypt` field invalid")
}
if _, err := StepHoldTag(c.JobID); err != nil {
return fmt.Errorf("JobID cannot be used for hold tag: %s", err)
}
return nil
}
// Sender implements replication.ReplicationEndpoint for a sending side
type Sender struct {
FSFilter zfs.DatasetFilter
encrypt *zfs.NilBool
jobId JobID
}
func NewSender(conf SenderConfig) *Sender {
if err := conf.Validate(); err != nil {
panic("invalid config" + err.Error())
}
return &Sender{
FSFilter: conf.FSF,
encrypt: conf.Encrypt,
jobId: conf.JobID,
}
}
func (s *Sender) filterCheckFS(fs string) (*zfs.DatasetPath, error) {
dp, err := zfs.NewDatasetPath(fs)
if err != nil {
return nil, err
}
if dp.Length() == 0 {
return nil, errors.New("empty filesystem not allowed")
}
pass, err := s.FSFilter.Filter(dp)
if err != nil {
return nil, err
}
if !pass {
return nil, fmt.Errorf("endpoint does not allow access to filesystem %s", fs)
}
return dp, nil
}
func (s *Sender) ListFilesystems(ctx context.Context, r *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) {
fss, err := zfs.ZFSListMapping(ctx, s.FSFilter)
if err != nil {
return nil, err
}
rfss := make([]*pdu.Filesystem, len(fss))
for i := range fss {
encEnabled, err := zfs.ZFSGetEncryptionEnabled(ctx, fss[i].ToString())
if err != nil {
return nil, errors.Wrap(err, "cannot get filesystem encryption status")
}
rfss[i] = &pdu.Filesystem{
Path: fss[i].ToString(),
// ResumeToken does not make sense from Sender
IsPlaceholder: false, // sender FSs are never placeholders
IsEncrypted: encEnabled,
}
}
res := &pdu.ListFilesystemRes{Filesystems: rfss}
return res, nil
}
func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error) {
lp, err := s.filterCheckFS(r.GetFilesystem())
if err != nil {
return nil, err
}
fsvs, err := zfs.ZFSListFilesystemVersions(lp, nil)
if err != nil {
return nil, err
}
rfsvs := make([]*pdu.FilesystemVersion, len(fsvs))
for i := range fsvs {
rfsvs[i] = pdu.FilesystemVersionFromZFS(&fsvs[i])
}
res := &pdu.ListFilesystemVersionsRes{Versions: rfsvs}
return res, nil
}
func (p *Sender) HintMostRecentCommonAncestor(ctx context.Context, r *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error) {
var err error
fs := r.GetFilesystem()
mostRecent, err := sendArgsFromPDUAndValidateExists(ctx, fs, r.GetSenderVersion())
if err != nil {
msg := "HintMostRecentCommonAncestor rpc with nonexistent most recent version"
getLogger(ctx).WithField("fs", fs).WithField("hinted_most_recent", fmt.Sprintf("%#v", mostRecent)).
Warn(msg)
return nil, errors.Wrap(err, msg)
}
// move replication cursor to this position
_, err = MoveReplicationCursor(ctx, fs, mostRecent, p.jobId)
if err == zfs.ErrBookmarkCloningNotSupported {
getLogger(ctx).Debug("not creating replication cursor from bookmark because ZFS does not support it")
// fallthrough
} else if err != nil {
return nil, errors.Wrap(err, "cannot set replication cursor to hinted version")
}
// cleanup previous steps
if err := ReleaseStepAll(ctx, fs, mostRecent, p.jobId); err != nil {
return nil, errors.Wrap(err, "cannot cleanup prior invocation's step holds and bookmarks")
}
return &pdu.HintMostRecentCommonAncestorRes{}, nil
}
var maxConcurrentZFSSendSemaphore = semaphore.New(envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_SEND", 10))
func uncheckedSendArgsFromPDU(fsv *pdu.FilesystemVersion) *zfs.ZFSSendArgVersion {
if fsv == nil {
return nil
}
return &zfs.ZFSSendArgVersion{RelName: fsv.GetRelName(), GUID: fsv.Guid}
}
func sendArgsFromPDUAndValidateExists(ctx context.Context, fs string, fsv *pdu.FilesystemVersion) (*zfs.ZFSSendArgVersion, error) {
v := uncheckedSendArgsFromPDU(fsv)
if v == nil {
return nil, errors.New("must not be nil")
}
if err := v.ValidateExists(ctx, fs); err != nil {
return nil, err
}
return v, nil
}
func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.StreamCopier, error) {
_, err := s.filterCheckFS(r.Filesystem)
if err != nil {
return nil, nil, err
}
switch r.Encrypted {
case pdu.Tri_DontCare:
// use s.encrypt setting
// ok, fallthrough outer
case pdu.Tri_False:
if s.encrypt.B {
return nil, nil, errors.New("only encrytped sends allowed (send -w + encryption!= off), but unencrytped send requested")
}
// fallthrough outer
case pdu.Tri_True:
if !s.encrypt.B {
return nil, nil, errors.New("only unencrypted sends allowed, but encrypted send requested")
}
// fallthrough outer
default:
return nil, nil, fmt.Errorf("unknown pdu.Tri variant %q", r.Encrypted)
}
sendArgs := zfs.ZFSSendArgs{
FS: r.Filesystem,
From: uncheckedSendArgsFromPDU(r.GetFrom()), // validated by zfs.ZFSSendDry / zfs.ZFSSend
To: uncheckedSendArgsFromPDU(r.GetTo()), // validated by zfs.ZFSSendDry / zfs.ZFSSend
Encrypted: s.encrypt,
ResumeToken: r.ResumeToken, // nil or not nil, depending on decoding success
}
getLogger(ctx).Debug("acquire concurrent send semaphore")
// TODO use try-acquire and fail with resource-exhaustion rpc status
// => would require handling on the client-side
// => this is a dataconn endpoint, doesn't have the status code semantics of gRPC
guard, err := maxConcurrentZFSSendSemaphore.Acquire(ctx)
if err != nil {
return nil, nil, err
}
defer guard.Release()
si, err := zfs.ZFSSendDry(ctx, sendArgs)
if err != nil {
return nil, nil, errors.Wrap(err, "zfs send dry failed")
}
// From now on, assume that sendArgs has been validated by ZFSSendDry
// (because validation invovles shelling out, it's actually a little expensive)
var expSize int64 = 0 // protocol says 0 means no estimate
if si.SizeEstimate != -1 { // but si returns -1 for no size estimate
expSize = si.SizeEstimate
}
res := &pdu.SendRes{
ExpectedSize: expSize,
UsedResumeToken: r.ResumeToken != "",
}
if r.DryRun {
return res, nil, nil
}
// update replication cursor
if sendArgs.From != nil {
// For all but the first replication, this should always be a no-op because SendCompleted already moved the cursor
_, err = MoveReplicationCursor(ctx, sendArgs.FS, sendArgs.From, s.jobId)
if err == zfs.ErrBookmarkCloningNotSupported {
getLogger(ctx).Debug("not creating replication cursor from bookmark because ZFS does not support it")
// fallthrough
} else if err != nil {
return nil, nil, errors.Wrap(err, "cannot set replication cursor to `from` version before starting send")
}
}
// make sure `From` doesn't go away in order to make this step resumable
if sendArgs.From != nil {
err := HoldStep(ctx, sendArgs.FS, sendArgs.From, s.jobId)
if err == zfs.ErrBookmarkCloningNotSupported {
getLogger(ctx).Debug("not creating step bookmark because ZFS does not support it")
// fallthrough
} else if err != nil {
return nil, nil, errors.Wrap(err, "cannot create step bookmark")
}
}
// make sure `To` doesn't go away in order to make this step resumable
err = HoldStep(ctx, sendArgs.FS, sendArgs.To, s.jobId)
if err != nil {
return nil, nil, errors.Wrapf(err, "cannot hold `to` version %q before starting send", sendArgs.To.RelName)
}
// step holds & replication cursor released / moved forward in s.SendCompleted => s.moveCursorAndReleaseSendHolds
streamCopier, err := zfs.ZFSSend(ctx, sendArgs)
if err != nil {
return nil, nil, errors.Wrap(err, "zfs send failed")
}
return res, streamCopier, nil
}
func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
orig := r.GetOriginalReq() // may be nil, always use proto getters
fs := orig.GetFilesystem()
var err error
var from *zfs.ZFSSendArgVersion
if orig.GetFrom() != nil {
from, err = sendArgsFromPDUAndValidateExists(ctx, fs, orig.GetFrom()) // no shadow
if err != nil {
return nil, errors.Wrap(err, "validate `from` exists")
}
}
to, err := sendArgsFromPDUAndValidateExists(ctx, fs, orig.GetTo())
if err != nil {
return nil, errors.Wrap(err, "validate `to` exists")
}
log := getLogger(ctx).WithField("to_guid", to.GUID).
WithField("fs", fs).
WithField("to", to.RelName)
if from != nil {
log = log.WithField("from", from.RelName).WithField("from_guid", from.GUID)
}
log.Debug("move replication cursor to most recent common version")
destroyedCursors, err := MoveReplicationCursor(ctx, fs, to, p.jobId)
if err != nil {
if err == zfs.ErrBookmarkCloningNotSupported {
log.Debug("not setting replication cursor, bookmark cloning not supported")
} else {
msg := "cannot move replication cursor, keeping hold on `to` until successful"
log.WithError(err).Error(msg)
err = errors.Wrap(err, msg)
// it is correct to not release the hold if we can't move the cursor!
return &pdu.SendCompletedRes{}, err
}
} else {
log.Info("successfully moved replication cursor")
}
// kick off releasing of step holds / bookmarks
// if we fail to release them, don't bother the caller:
// they are merely an implementation detail on the sender for better resumability
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
log.Debug("release step-hold of or step-bookmark on `to`")
err = ReleaseStep(ctx, fs, to, p.jobId)
if err != nil {
log.WithError(err).Error("cannot release step-holds on or destroy step-bookmark of `to`")
} else {
log.Info("successfully released step-holds on or destroyed step-bookmark of `to`")
}
}()
go func() {
defer wg.Done()
if from == nil {
return
}
log.Debug("release step-hold of or step-bookmark on `from`")
err := ReleaseStep(ctx, fs, from, p.jobId)
if err != nil {
if dne, ok := err.(*zfs.DatasetDoesNotExist); ok {
// If bookmark cloning is not supported, `from` might be the old replication cursor
// and thus have already been destroyed by MoveReplicationCursor above
// In that case, nonexistence of `from` is not an error, otherwise it is.
fsp, err := zfs.NewDatasetPath(fs)
if err != nil {
panic(err) // fs has been validated multiple times above
}
for _, fsv := range destroyedCursors {
if fsv.ToAbsPath(fsp) == dne.Path {
log.Info("`from` was a replication cursor and has already been destroyed")
return
}
}
// fallthrough
}
log.WithError(err).Error("cannot release step-holds on or destroy step-bookmark of `from`")
} else {
log.Info("successfully released step-holds on or destroyed step-bookmark of `from`")
}
}()
wg.Wait()
return &pdu.SendCompletedRes{}, nil
}
func (p *Sender) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
dp, err := p.filterCheckFS(req.Filesystem)
if err != nil {
return nil, err
}
return doDestroySnapshots(ctx, dp, req.Snapshots)
}
func (p *Sender) Ping(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
res := pdu.PingRes{
Echo: req.GetMessage(),
}
return &res, nil
}
func (p *Sender) PingDataconn(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
return p.Ping(ctx, req)
}
func (p *Sender) WaitForConnectivity(ctx context.Context) error {
return nil
}
func (p *Sender) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) {
dp, err := p.filterCheckFS(req.Filesystem)
if err != nil {
return nil, err
}
cursor, err := GetMostRecentReplicationCursorOfJob(ctx, dp.ToString(), p.jobId)
if err != nil {
return nil, err
}
if cursor == nil {
return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Notexist{Notexist: true}}, nil
}
return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Guid{Guid: cursor.Guid}}, nil
}
func (p *Sender) Receive(ctx context.Context, r *pdu.ReceiveReq, receive zfs.StreamCopier) (*pdu.ReceiveRes, error) {
return nil, fmt.Errorf("sender does not implement Receive()")
}
type FSFilter interface { // FIXME unused
Filter(path *zfs.DatasetPath) (pass bool, err error)
}
// FIXME: can we get away without error types here?
type FSMap interface { // FIXME unused
FSFilter
Map(path *zfs.DatasetPath) (*zfs.DatasetPath, error)
Invert() (FSMap, error)
AsFilter() FSFilter
}
type ReceiverConfig struct {
JobID JobID
RootWithoutClientComponent *zfs.DatasetPath // TODO use
AppendClientIdentity bool
UpdateLastReceivedHold bool
}
func (c *ReceiverConfig) copyIn() {
c.RootWithoutClientComponent = c.RootWithoutClientComponent.Copy()
}
func (c *ReceiverConfig) Validate() error {
c.JobID.MustValidate()
if c.RootWithoutClientComponent.Length() <= 0 {
return errors.New("RootWithoutClientComponent must not be an empty dataset path")
}
return nil
}
// Receiver implements replication.ReplicationEndpoint for a receiving side
type Receiver struct {
conf ReceiverConfig // validated
recvParentCreationMtx *chainlock.L
}
func NewReceiver(config ReceiverConfig) *Receiver {
config.copyIn()
if err := config.Validate(); err != nil {
panic(err)
}
return &Receiver{
conf: config,
recvParentCreationMtx: chainlock.New(),
}
}
func TestClientIdentity(rootFS *zfs.DatasetPath, clientIdentity string) error {
_, err := clientRoot(rootFS, clientIdentity)
return err
}
func clientRoot(rootFS *zfs.DatasetPath, clientIdentity string) (*zfs.DatasetPath, error) {
rootFSLen := rootFS.Length()
clientRootStr := path.Join(rootFS.ToString(), clientIdentity)
clientRoot, err := zfs.NewDatasetPath(clientRootStr)
if err != nil {
return nil, err
}
if rootFSLen+1 != clientRoot.Length() {
return nil, fmt.Errorf("client identity must be a single ZFS filesystem path component")
}
return clientRoot, nil
}
func (s *Receiver) clientRootFromCtx(ctx context.Context) *zfs.DatasetPath {
if !s.conf.AppendClientIdentity {
return s.conf.RootWithoutClientComponent.Copy()
}
clientIdentity, ok := ctx.Value(ClientIdentityKey).(string)
if !ok {
panic(fmt.Sprintf("ClientIdentityKey context value must be set"))
}
clientRoot, err := clientRoot(s.conf.RootWithoutClientComponent, clientIdentity)
if err != nil {
panic(fmt.Sprintf("ClientIdentityContextKey must have been validated before invoking Receiver: %s", err))
}
return clientRoot
}
type subroot struct {
localRoot *zfs.DatasetPath
}
var _ zfs.DatasetFilter = subroot{}
// Filters local p
func (f subroot) Filter(p *zfs.DatasetPath) (pass bool, err error) {
return p.HasPrefix(f.localRoot) && !p.Equal(f.localRoot), nil
}
func (f subroot) MapToLocal(fs string) (*zfs.DatasetPath, error) {
p, err := zfs.NewDatasetPath(fs)
if err != nil {
return nil, err
}
if p.Length() == 0 {
return nil, errors.Errorf("cannot map empty filesystem")
}
c := f.localRoot.Copy()
c.Extend(p)
return c, nil
}
func (s *Receiver) ListFilesystems(ctx context.Context, req *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) {
root := s.clientRootFromCtx(ctx)
filtered, err := zfs.ZFSListMapping(ctx, subroot{root})
if err != nil {
return nil, err
}
// present filesystem without the root_fs prefix
fss := make([]*pdu.Filesystem, 0, len(filtered))
for _, a := range filtered {
l := getLogger(ctx).WithField("fs", a)
ph, err := zfs.ZFSGetFilesystemPlaceholderState(a)
if err != nil {
l.WithError(err).Error("error getting placeholder state")
return nil, errors.Wrapf(err, "cannot get placeholder state for fs %q", a)
}
l.WithField("placeholder_state", fmt.Sprintf("%#v", ph)).Debug("placeholder state")
if !ph.FSExists {
l.Error("inconsistent placeholder state: filesystem must exists")
err := errors.Errorf("inconsistent placeholder state: filesystem %q must exist in this context", a.ToString())
return nil, err
}
token, err := zfs.ZFSGetReceiveResumeTokenOrEmptyStringIfNotSupported(ctx, a)
if err != nil {
l.WithError(err).Error("cannot get receive resume token")
return nil, err
}
encEnabled, err := zfs.ZFSGetEncryptionEnabled(ctx, a.ToString())
if err != nil {
l.WithError(err).Error("cannot get encryption enabled status")
return nil, err
}
l.WithField("receive_resume_token", token).Debug("receive resume token")
a.TrimPrefix(root)
fs := &pdu.Filesystem{
Path: a.ToString(),
IsPlaceholder: ph.IsPlaceholder,
ResumeToken: token,
IsEncrypted: encEnabled,
}
fss = append(fss, fs)
}
if len(fss) == 0 {
getLogger(ctx).Debug("no filesystems found")
return &pdu.ListFilesystemRes{}, nil
}
return &pdu.ListFilesystemRes{Filesystems: fss}, nil
}
func (s *Receiver) ListFilesystemVersions(ctx context.Context, req *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error) {
root := s.clientRootFromCtx(ctx)
lp, err := subroot{root}.MapToLocal(req.GetFilesystem())
if err != nil {
return nil, err
}
fsvs, err := zfs.ZFSListFilesystemVersions(lp, nil)
if err != nil {
return nil, err
}
rfsvs := make([]*pdu.FilesystemVersion, len(fsvs))
for i := range fsvs {
rfsvs[i] = pdu.FilesystemVersionFromZFS(&fsvs[i])
}
return &pdu.ListFilesystemVersionsRes{Versions: rfsvs}, nil
}
func (s *Receiver) Ping(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
res := pdu.PingRes{
Echo: req.GetMessage(),
}
return &res, nil
}
func (s *Receiver) PingDataconn(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
return s.Ping(ctx, req)
}
func (s *Receiver) WaitForConnectivity(ctx context.Context) error {
return nil
}
func (s *Receiver) ReplicationCursor(context.Context, *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) {
return nil, fmt.Errorf("ReplicationCursor not implemented for Receiver")
}
func (s *Receiver) Send(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, zfs.StreamCopier, error) {
return nil, nil, fmt.Errorf("receiver does not implement Send()")
}
var maxConcurrentZFSRecvSemaphore = semaphore.New(envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_RECV", 10))
func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs.StreamCopier) (*pdu.ReceiveRes, error) {
getLogger(ctx).Debug("incoming Receive")
defer receive.Close()
root := s.clientRootFromCtx(ctx)
lp, err := subroot{root}.MapToLocal(req.Filesystem)
if err != nil {
return nil, errors.Wrap(err, "`Filesystem` invalid")
}
to := uncheckedSendArgsFromPDU(req.GetTo())
if to == nil {
return nil, errors.New("`To` must not be nil")
}
if err := to.ValidateInMemory(lp.ToString()); err != nil {
return nil, errors.Wrap(err, "`To` invalid")
}
if !to.IsSnapshot() {
return nil, errors.New("`To` must be a snapshot")
}
// create placeholder parent filesystems as appropriate
//
// Manipulating the ZFS dataset hierarchy must happen exclusively.
// TODO: Use fine-grained locking to allow separate clients / requests to pass
// through the following section concurrently when operating on disjoint
// ZFS dataset hierarchy subtrees.
var visitErr error
func() {
getLogger(ctx).Debug("begin aquire recvParentCreationMtx")
defer s.recvParentCreationMtx.Lock().Unlock()
getLogger(ctx).Debug("end aquire recvParentCreationMtx")
defer getLogger(ctx).Debug("release recvParentCreationMtx")
f := zfs.NewDatasetPathForest()
f.Add(lp)
getLogger(ctx).Debug("begin tree-walk")
f.WalkTopDown(func(v zfs.DatasetPathVisit) (visitChildTree bool) {
if v.Path.Equal(lp) {
return false
}
ph, err := zfs.ZFSGetFilesystemPlaceholderState(v.Path)
getLogger(ctx).
WithField("fs", v.Path.ToString()).
WithField("placeholder_state", fmt.Sprintf("%#v", ph)).
WithField("err", fmt.Sprintf("%s", err)).
WithField("errType", fmt.Sprintf("%T", err)).
Debug("placeholder state for filesystem")
if err != nil {
visitErr = err
return false
}
if !ph.FSExists {
if s.conf.RootWithoutClientComponent.HasPrefix(v.Path) {
if v.Path.Length() == 1 {
visitErr = fmt.Errorf("pool %q not imported", v.Path.ToString())
} else {
visitErr = fmt.Errorf("root_fs %q does not exist", s.conf.RootWithoutClientComponent.ToString())
}
getLogger(ctx).WithError(visitErr).Error("placeholders are only created automatically below root_fs")
return false
}
l := getLogger(ctx).WithField("placeholder_fs", v.Path)
l.Debug("create placeholder filesystem")
err := zfs.ZFSCreatePlaceholderFilesystem(v.Path)
if err != nil {
l.WithError(err).Error("cannot create placeholder filesystem")
visitErr = err
return false
}
return true
}
getLogger(ctx).WithField("filesystem", v.Path.ToString()).Debug("exists")
return true // leave this fs as is
})
}()
getLogger(ctx).WithField("visitErr", visitErr).Debug("complete tree-walk")
if visitErr != nil {
return nil, visitErr
}
// determine whether we need to rollback the filesystem / change its placeholder state
var clearPlaceholderProperty bool
var recvOpts zfs.RecvOptions
ph, err := zfs.ZFSGetFilesystemPlaceholderState(lp)
if err == nil && ph.FSExists && ph.IsPlaceholder {
recvOpts.RollbackAndForceRecv = true
clearPlaceholderProperty = true
}
if clearPlaceholderProperty {
if err := zfs.ZFSSetPlaceholder(lp, false); err != nil {
return nil, fmt.Errorf("cannot clear placeholder property for forced receive: %s", err)
}
}
if req.ClearResumeToken && ph.FSExists {
if err := zfs.ZFSRecvClearResumeToken(lp.ToString()); err != nil {
return nil, errors.Wrap(err, "cannot clear resume token")
}
}
recvOpts.SavePartialRecvState, err = zfs.ResumeRecvSupported(ctx, lp)
if err != nil {
return nil, errors.Wrap(err, "cannot determine whether we can use resumable send & recv")
}
getLogger(ctx).Debug("acquire concurrent recv semaphore")
// TODO use try-acquire and fail with resource-exhaustion rpc status
// => would require handling on the client-side
// => this is a dataconn endpoint, doesn't have the status code semantics of gRPC
guard, err := maxConcurrentZFSRecvSemaphore.Acquire(ctx)
if err != nil {
return nil, err
}
defer guard.Release()
getLogger(ctx).WithField("opts", fmt.Sprintf("%#v", recvOpts)).Debug("start receive command")
snapFullPath := to.FullPath(lp.ToString())
if err := zfs.ZFSRecv(ctx, lp.ToString(), to, receive, recvOpts); err != nil {
getLogger(ctx).
WithError(err).
WithField("opts", recvOpts).
Error("zfs receive failed")
return nil, err
}
// validate that we actually received what the sender claimed
if err := to.ValidateExists(ctx, lp.ToString()); err != nil {
msg := "receive request's `To` version does not match what we received in the stream"
getLogger(ctx).WithError(err).WithField("snap", snapFullPath).Error(msg)
getLogger(ctx).Error("aborting recv request, but keeping received snapshot for inspection")
return nil, errors.Wrap(err, msg)
}
if s.conf.UpdateLastReceivedHold {
getLogger(ctx).Debug("move last-received-hold")
if err := MoveLastReceivedHold(ctx, lp.ToString(), *to, s.conf.JobID); err != nil {
return nil, errors.Wrap(err, "cannot move last-received-hold")
}
}
return &pdu.ReceiveRes{}, nil
}
func (s *Receiver) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
root := s.clientRootFromCtx(ctx)
lp, err := subroot{root}.MapToLocal(req.Filesystem)
if err != nil {
return nil, err
}
return doDestroySnapshots(ctx, lp, req.Snapshots)
}
func (p *Receiver) HintMostRecentCommonAncestor(ctx context.Context, r *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error) {
// we don't move last-received-hold as part of this hint
// because that wouldn't give us any benefit wrt resumability.
//
// Other reason: the replication logic that issues this RPC would require refactoring
// to include the receiver's FilesystemVersion in the request)
return &pdu.HintMostRecentCommonAncestorRes{}, nil
}
func (p *Receiver) SendCompleted(context.Context, *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
return &pdu.SendCompletedRes{}, nil
}
func doDestroySnapshots(ctx context.Context, lp *zfs.DatasetPath, snaps []*pdu.FilesystemVersion) (*pdu.DestroySnapshotsRes, error) {
reqs := make([]*zfs.DestroySnapOp, len(snaps))
ress := make([]*pdu.DestroySnapshotRes, len(snaps))
errs := make([]error, len(snaps))
for i, fsv := range snaps {
if fsv.Type != pdu.FilesystemVersion_Snapshot {
return nil, fmt.Errorf("version %q is not a snapshot", fsv.Name)
}
ress[i] = &pdu.DestroySnapshotRes{
Snapshot: fsv,
// Error set after batch operation
}
reqs[i] = &zfs.DestroySnapOp{
Filesystem: lp.ToString(),
Name: fsv.Name,
ErrOut: &errs[i],
}
}
zfs.ZFSDestroyFilesystemVersions(reqs)
for i := range reqs {
if errs[i] != nil {
if de, ok := errs[i].(*zfs.DestroySnapshotsError); ok && len(de.Reason) == 1 {
ress[i].Error = de.Reason[0]
} else {
ress[i].Error = errs[i].Error()
}
}
}
return &pdu.DestroySnapshotsRes{
Results: ress,
}, nil
}