zrepl/endpoint/endpoint.go

887 lines
29 KiB
Go

// Package endpoint implements replication endpoints for use with package replication.
package endpoint
import (
"context"
"fmt"
"path"
"sync"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/replication/logic/pdu"
"github.com/zrepl/zrepl/util/chainlock"
"github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/util/semaphore"
"github.com/zrepl/zrepl/zfs"
)
type SenderConfig struct {
FSF zfs.DatasetFilter
Encrypt *zfs.NilBool
JobID JobID
}
func (c *SenderConfig) Validate() error {
c.JobID.MustValidate()
if err := c.Encrypt.Validate(); err != nil {
return errors.Wrap(err, "`Encrypt` field invalid")
}
if _, err := StepHoldTag(c.JobID); err != nil {
return fmt.Errorf("JobID cannot be used for hold tag: %s", err)
}
return nil
}
// Sender implements replication.ReplicationEndpoint for a sending side
type Sender struct {
FSFilter zfs.DatasetFilter
encrypt *zfs.NilBool
jobId JobID
}
func NewSender(conf SenderConfig) *Sender {
if err := conf.Validate(); err != nil {
panic("invalid config" + err.Error())
}
return &Sender{
FSFilter: conf.FSF,
encrypt: conf.Encrypt,
jobId: conf.JobID,
}
}
func (s *Sender) filterCheckFS(fs string) (*zfs.DatasetPath, error) {
dp, err := zfs.NewDatasetPath(fs)
if err != nil {
return nil, err
}
if dp.Length() == 0 {
return nil, errors.New("empty filesystem not allowed")
}
pass, err := s.FSFilter.Filter(dp)
if err != nil {
return nil, err
}
if !pass {
return nil, fmt.Errorf("endpoint does not allow access to filesystem %s", fs)
}
return dp, nil
}
func (s *Sender) ListFilesystems(ctx context.Context, r *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) {
fss, err := zfs.ZFSListMapping(ctx, s.FSFilter)
if err != nil {
return nil, err
}
rfss := make([]*pdu.Filesystem, len(fss))
for i := range fss {
encEnabled, err := zfs.ZFSGetEncryptionEnabled(ctx, fss[i].ToString())
if err != nil {
return nil, errors.Wrap(err, "cannot get filesystem encryption status")
}
rfss[i] = &pdu.Filesystem{
Path: fss[i].ToString(),
// ResumeToken does not make sense from Sender
IsPlaceholder: false, // sender FSs are never placeholders
IsEncrypted: encEnabled,
}
}
res := &pdu.ListFilesystemRes{Filesystems: rfss}
return res, nil
}
func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error) {
lp, err := s.filterCheckFS(r.GetFilesystem())
if err != nil {
return nil, err
}
fsvs, err := zfs.ZFSListFilesystemVersions(lp, zfs.ListFilesystemVersionsOptions{})
if err != nil {
return nil, err
}
rfsvs := make([]*pdu.FilesystemVersion, len(fsvs))
for i := range fsvs {
rfsvs[i] = pdu.FilesystemVersionFromZFS(&fsvs[i])
}
res := &pdu.ListFilesystemVersionsRes{Versions: rfsvs}
return res, nil
}
func (p *Sender) HintMostRecentCommonAncestor(ctx context.Context, r *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error) {
fsp, err := p.filterCheckFS(r.GetFilesystem())
if err != nil {
return nil, err
}
fs := fsp.ToString()
log := getLogger(ctx).WithField("fs", fs).WithField("hinted_most_recent", fmt.Sprintf("%#v", r.GetSenderVersion()))
log.WithField("full_hint", r).Debug("full hint")
if r.GetSenderVersion() == nil {
// no common ancestor found, likely due to failed prior replication attempt
// => release stale step holds to prevent them from accumulating
// (they can accumulate on initial replication because each inital replication step might hold a different `to`)
// => replication cursors cannot accumulate because we always _move_ the replication cursor
log.Debug("releasing all step holds on the filesystem")
TryReleaseStepStaleFS(ctx, fs, p.jobId)
return &pdu.HintMostRecentCommonAncestorRes{}, nil
}
// we were hinted a specific common ancestor
mostRecentVersion, err := sendArgsFromPDUAndValidateExistsAndGetVersion(ctx, fs, r.GetSenderVersion())
if err != nil {
msg := "HintMostRecentCommonAncestor rpc with nonexistent most recent version"
log.Warn(msg)
return nil, errors.Wrap(err, msg)
}
// move replication cursor to this position
destroyedCursors, err := MoveReplicationCursor(ctx, fs, mostRecentVersion, p.jobId)
if err == zfs.ErrBookmarkCloningNotSupported {
log.Debug("not creating replication cursor from bookmark because ZFS does not support it")
// fallthrough
} else if err != nil {
return nil, errors.Wrap(err, "cannot set replication cursor to hinted version")
}
// take care of stale step holds
log.WithField("step-holds-cleanup-mode", hintMostRecentCommonAncestorStepCleanupMode).
Debug("taking care of possibly stale step holds")
doStepCleanup := false
var stepCleanupSince *CreateTXGRangeBound
switch hintMostRecentCommonAncestorStepCleanupMode {
case StepCleanupNoCleanup:
doStepCleanup = false
case StepCleanupRangeSinceUnbounded:
doStepCleanup = true
stepCleanupSince = nil
case StepCleanupRangeSinceReplicationCursor:
doStepCleanup = true
// Use the destroyed replication cursors as indicator how far the previous replication got.
// To be precise: We limit the amount of visisted snapshots to exactly those snapshots
// created since the last successful replication cursor movement (i.e. last successful replication step)
//
// If we crash now, we'll leak the step we are about to release, but the performance gain
// of limiting the amount of snapshots we visit makes up for that.
// Users have the `zrepl holds release-stale` command to cleanup leaked step holds.
for _, destroyed := range destroyedCursors {
if stepCleanupSince == nil {
stepCleanupSince = &CreateTXGRangeBound{
CreateTXG: destroyed.GetCreateTXG(),
Inclusive: &zfs.NilBool{B: true},
}
} else if destroyed.GetCreateTXG() < stepCleanupSince.CreateTXG {
stepCleanupSince.CreateTXG = destroyed.GetCreateTXG()
}
}
default:
panic(hintMostRecentCommonAncestorStepCleanupMode)
}
if !doStepCleanup {
log.Info("skipping cleanup of prior invocations' step holds due to environment variable setting")
} else {
if err := ReleaseStepCummulativeInclusive(ctx, fs, stepCleanupSince, mostRecentVersion, p.jobId); err != nil {
return nil, errors.Wrap(err, "cannot cleanup prior invocation's step holds and bookmarks")
} else {
log.Info("step hold cleanup done")
}
}
return &pdu.HintMostRecentCommonAncestorRes{}, nil
}
type HintMostRecentCommonAncestorStepCleanupMode struct{ string }
var (
StepCleanupRangeSinceReplicationCursor = HintMostRecentCommonAncestorStepCleanupMode{"range-since-replication-cursor"}
StepCleanupRangeSinceUnbounded = HintMostRecentCommonAncestorStepCleanupMode{"range-since-unbounded"}
StepCleanupNoCleanup = HintMostRecentCommonAncestorStepCleanupMode{"no-cleanup"}
)
func (m HintMostRecentCommonAncestorStepCleanupMode) String() string { return string(m.string) }
func (m *HintMostRecentCommonAncestorStepCleanupMode) Set(s string) error {
switch s {
case StepCleanupRangeSinceReplicationCursor.String():
*m = StepCleanupRangeSinceReplicationCursor
case StepCleanupRangeSinceUnbounded.String():
*m = StepCleanupRangeSinceUnbounded
case StepCleanupNoCleanup.String():
*m = StepCleanupNoCleanup
default:
return fmt.Errorf("unknown step cleanup mode %q", s)
}
return nil
}
var hintMostRecentCommonAncestorStepCleanupMode = *envconst.Var("ZREPL_ENDPOINT_HINT_MOST_RECENT_STEP_HOLD_CLEANUP_MODE", &StepCleanupRangeSinceReplicationCursor).(*HintMostRecentCommonAncestorStepCleanupMode)
var maxConcurrentZFSSendSemaphore = semaphore.New(envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_SEND", 10))
func uncheckedSendArgsFromPDU(fsv *pdu.FilesystemVersion) *zfs.ZFSSendArgVersion {
if fsv == nil {
return nil
}
return &zfs.ZFSSendArgVersion{RelName: fsv.GetRelName(), GUID: fsv.Guid}
}
func sendArgsFromPDUAndValidateExistsAndGetVersion(ctx context.Context, fs string, fsv *pdu.FilesystemVersion) (v zfs.FilesystemVersion, err error) {
sendArgs := uncheckedSendArgsFromPDU(fsv)
if sendArgs == nil {
return v, errors.New("must not be nil")
}
version, err := sendArgs.ValidateExistsAndGetVersion(ctx, fs)
if err != nil {
return v, err
}
return version, nil
}
func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.StreamCopier, error) {
_, err := s.filterCheckFS(r.Filesystem)
if err != nil {
return nil, nil, err
}
switch r.Encrypted {
case pdu.Tri_DontCare:
// use s.encrypt setting
// ok, fallthrough outer
case pdu.Tri_False:
if s.encrypt.B {
return nil, nil, errors.New("only encrypted sends allowed (send -w + encryption!= off), but unencrypted send requested")
}
// fallthrough outer
case pdu.Tri_True:
if !s.encrypt.B {
return nil, nil, errors.New("only unencrypted sends allowed, but encrypted send requested")
}
// fallthrough outer
default:
return nil, nil, fmt.Errorf("unknown pdu.Tri variant %q", r.Encrypted)
}
sendArgsUnvalidated := zfs.ZFSSendArgsUnvalidated{
FS: r.Filesystem,
From: uncheckedSendArgsFromPDU(r.GetFrom()), // validated by zfs.ZFSSendDry / zfs.ZFSSend
To: uncheckedSendArgsFromPDU(r.GetTo()), // validated by zfs.ZFSSendDry / zfs.ZFSSend
Encrypted: s.encrypt,
ResumeToken: r.ResumeToken, // nil or not nil, depending on decoding success
}
sendArgs, err := sendArgsUnvalidated.Validate(ctx)
if err != nil {
return nil, nil, errors.Wrap(err, "validate send arguments")
}
getLogger(ctx).Debug("acquire concurrent send semaphore")
// TODO use try-acquire and fail with resource-exhaustion rpc status
// => would require handling on the client-side
// => this is a dataconn endpoint, doesn't have the status code semantics of gRPC
guard, err := maxConcurrentZFSSendSemaphore.Acquire(ctx)
if err != nil {
return nil, nil, err
}
defer guard.Release()
si, err := zfs.ZFSSendDry(ctx, sendArgs)
if err != nil {
return nil, nil, errors.Wrap(err, "zfs send dry failed")
}
// From now on, assume that sendArgs has been validated by ZFSSendDry
// (because validation involves shelling out, it's actually a little expensive)
var expSize int64 = 0 // protocol says 0 means no estimate
if si.SizeEstimate != -1 { // but si returns -1 for no size estimate
expSize = si.SizeEstimate
}
res := &pdu.SendRes{
ExpectedSize: expSize,
UsedResumeToken: r.ResumeToken != "",
}
if r.DryRun {
return res, nil, nil
}
// update replication cursor
if sendArgs.From != nil {
// For all but the first replication, this should always be a no-op because SendCompleted already moved the cursor
_, err = MoveReplicationCursor(ctx, sendArgs.FS, sendArgs.FromVersion, s.jobId)
if err == zfs.ErrBookmarkCloningNotSupported {
getLogger(ctx).Debug("not creating replication cursor from bookmark because ZFS does not support it")
// fallthrough
} else if err != nil {
return nil, nil, errors.Wrap(err, "cannot set replication cursor to `from` version before starting send")
}
}
// make sure `From` doesn't go away in order to make this step resumable
if sendArgs.From != nil {
err := HoldStep(ctx, sendArgs.FS, *sendArgs.FromVersion, s.jobId)
if err == zfs.ErrBookmarkCloningNotSupported {
getLogger(ctx).Debug("not creating step bookmark because ZFS does not support it")
// fallthrough
} else if err != nil {
return nil, nil, errors.Wrapf(err, "cannot hold `from` version %q before starting send", *sendArgs.FromVersion)
}
}
// make sure `To` doesn't go away in order to make this step resumable
err = HoldStep(ctx, sendArgs.FS, sendArgs.ToVersion, s.jobId)
if err != nil {
return nil, nil, errors.Wrapf(err, "cannot hold `to` version %q before starting send", sendArgs.ToVersion)
}
// step holds & replication cursor released / moved forward in s.SendCompleted => s.moveCursorAndReleaseSendHolds
streamCopier, err := zfs.ZFSSend(ctx, sendArgs)
if err != nil {
return nil, nil, errors.Wrap(err, "zfs send failed")
}
return res, streamCopier, nil
}
func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
orig := r.GetOriginalReq() // may be nil, always use proto getters
fsp, err := p.filterCheckFS(orig.GetFilesystem())
if err != nil {
return nil, err
}
fs := fsp.ToString()
var from *zfs.FilesystemVersion
if orig.GetFrom() != nil {
f, err := sendArgsFromPDUAndValidateExistsAndGetVersion(ctx, fs, orig.GetFrom()) // no shadow
if err != nil {
return nil, errors.Wrap(err, "validate `from` exists")
}
from = &f
}
to, err := sendArgsFromPDUAndValidateExistsAndGetVersion(ctx, fs, orig.GetTo())
if err != nil {
return nil, errors.Wrap(err, "validate `to` exists")
}
log := getLogger(ctx).WithField("to_guid", to.Guid).
WithField("fs", fs).
WithField("to", to.RelName)
if from != nil {
log = log.WithField("from", from.RelName).WithField("from_guid", from.Guid)
}
log.Debug("move replication cursor to most recent common version")
destroyedCursors, err := MoveReplicationCursor(ctx, fs, to, p.jobId)
if err != nil {
if err == zfs.ErrBookmarkCloningNotSupported {
log.Debug("not setting replication cursor, bookmark cloning not supported")
} else {
msg := "cannot move replication cursor, keeping hold on `to` until successful"
log.WithError(err).Error(msg)
err = errors.Wrap(err, msg)
// it is correct to not release the hold if we can't move the cursor!
return &pdu.SendCompletedRes{}, err
}
} else {
log.Info("successfully moved replication cursor")
}
// kick off releasing of step holds / bookmarks
// if we fail to release them, don't bother the caller:
// they are merely an implementation detail on the sender for better resumability
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
log.Debug("release step-hold of or step-bookmark on `to`")
err = ReleaseStep(ctx, fs, to, p.jobId)
if err != nil {
log.WithError(err).Error("cannot release step-holds on or destroy step-bookmark of `to`")
} else {
log.Info("successfully released step-holds on or destroyed step-bookmark of `to`")
}
}()
go func() {
defer wg.Done()
if from == nil {
return
}
log.Debug("release step-hold of or step-bookmark on `from`")
err := ReleaseStep(ctx, fs, *from, p.jobId)
if err != nil {
if dne, ok := err.(*zfs.DatasetDoesNotExist); ok {
// If bookmark cloning is not supported, `from` might be the old replication cursor
// and thus have already been destroyed by MoveReplicationCursor above
// In that case, nonexistence of `from` is not an error, otherwise it is.
for _, c := range destroyedCursors {
if c.GetFullPath() == dne.Path {
log.Info("`from` was a replication cursor and has already been destroyed")
return
}
}
// fallthrough
}
log.WithError(err).Error("cannot release step-holds on or destroy step-bookmark of `from`")
} else {
log.Info("successfully released step-holds on or destroyed step-bookmark of `from`")
}
}()
wg.Wait()
return &pdu.SendCompletedRes{}, nil
}
func (p *Sender) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
dp, err := p.filterCheckFS(req.Filesystem)
if err != nil {
return nil, err
}
return doDestroySnapshots(ctx, dp, req.Snapshots)
}
func (p *Sender) Ping(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
res := pdu.PingRes{
Echo: req.GetMessage(),
}
return &res, nil
}
func (p *Sender) PingDataconn(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
return p.Ping(ctx, req)
}
func (p *Sender) WaitForConnectivity(ctx context.Context) error {
return nil
}
func (p *Sender) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) {
dp, err := p.filterCheckFS(req.Filesystem)
if err != nil {
return nil, err
}
cursor, err := GetMostRecentReplicationCursorOfJob(ctx, dp.ToString(), p.jobId)
if err != nil {
return nil, err
}
if cursor == nil {
return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Notexist{Notexist: true}}, nil
}
return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Guid{Guid: cursor.Guid}}, nil
}
func (p *Sender) Receive(ctx context.Context, r *pdu.ReceiveReq, receive zfs.StreamCopier) (*pdu.ReceiveRes, error) {
return nil, fmt.Errorf("sender does not implement Receive()")
}
type FSFilter interface { // FIXME unused
Filter(path *zfs.DatasetPath) (pass bool, err error)
}
// FIXME: can we get away without error types here?
type FSMap interface { // FIXME unused
FSFilter
Map(path *zfs.DatasetPath) (*zfs.DatasetPath, error)
Invert() (FSMap, error)
AsFilter() FSFilter
}
type ReceiverConfig struct {
JobID JobID
RootWithoutClientComponent *zfs.DatasetPath // TODO use
AppendClientIdentity bool
UpdateLastReceivedHold bool
}
func (c *ReceiverConfig) copyIn() {
c.RootWithoutClientComponent = c.RootWithoutClientComponent.Copy()
}
func (c *ReceiverConfig) Validate() error {
c.JobID.MustValidate()
if c.RootWithoutClientComponent.Length() <= 0 {
return errors.New("RootWithoutClientComponent must not be an empty dataset path")
}
return nil
}
// Receiver implements replication.ReplicationEndpoint for a receiving side
type Receiver struct {
conf ReceiverConfig // validated
recvParentCreationMtx *chainlock.L
}
func NewReceiver(config ReceiverConfig) *Receiver {
config.copyIn()
if err := config.Validate(); err != nil {
panic(err)
}
return &Receiver{
conf: config,
recvParentCreationMtx: chainlock.New(),
}
}
func TestClientIdentity(rootFS *zfs.DatasetPath, clientIdentity string) error {
_, err := clientRoot(rootFS, clientIdentity)
return err
}
func clientRoot(rootFS *zfs.DatasetPath, clientIdentity string) (*zfs.DatasetPath, error) {
rootFSLen := rootFS.Length()
clientRootStr := path.Join(rootFS.ToString(), clientIdentity)
clientRoot, err := zfs.NewDatasetPath(clientRootStr)
if err != nil {
return nil, err
}
if rootFSLen+1 != clientRoot.Length() {
return nil, fmt.Errorf("client identity must be a single ZFS filesystem path component")
}
return clientRoot, nil
}
func (s *Receiver) clientRootFromCtx(ctx context.Context) *zfs.DatasetPath {
if !s.conf.AppendClientIdentity {
return s.conf.RootWithoutClientComponent.Copy()
}
clientIdentity, ok := ctx.Value(ClientIdentityKey).(string)
if !ok {
panic(fmt.Sprintf("ClientIdentityKey context value must be set"))
}
clientRoot, err := clientRoot(s.conf.RootWithoutClientComponent, clientIdentity)
if err != nil {
panic(fmt.Sprintf("ClientIdentityContextKey must have been validated before invoking Receiver: %s", err))
}
return clientRoot
}
type subroot struct {
localRoot *zfs.DatasetPath
}
var _ zfs.DatasetFilter = subroot{}
// Filters local p
func (f subroot) Filter(p *zfs.DatasetPath) (pass bool, err error) {
return p.HasPrefix(f.localRoot) && !p.Equal(f.localRoot), nil
}
func (f subroot) MapToLocal(fs string) (*zfs.DatasetPath, error) {
p, err := zfs.NewDatasetPath(fs)
if err != nil {
return nil, err
}
if p.Length() == 0 {
return nil, errors.Errorf("cannot map empty filesystem")
}
c := f.localRoot.Copy()
c.Extend(p)
return c, nil
}
func (s *Receiver) ListFilesystems(ctx context.Context, req *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) {
root := s.clientRootFromCtx(ctx)
filtered, err := zfs.ZFSListMapping(ctx, subroot{root})
if err != nil {
return nil, err
}
// present filesystem without the root_fs prefix
fss := make([]*pdu.Filesystem, 0, len(filtered))
for _, a := range filtered {
l := getLogger(ctx).WithField("fs", a)
ph, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, a)
if err != nil {
l.WithError(err).Error("error getting placeholder state")
return nil, errors.Wrapf(err, "cannot get placeholder state for fs %q", a)
}
l.WithField("placeholder_state", fmt.Sprintf("%#v", ph)).Debug("placeholder state")
if !ph.FSExists {
l.Error("inconsistent placeholder state: filesystem must exists")
err := errors.Errorf("inconsistent placeholder state: filesystem %q must exist in this context", a.ToString())
return nil, err
}
token, err := zfs.ZFSGetReceiveResumeTokenOrEmptyStringIfNotSupported(ctx, a)
if err != nil {
l.WithError(err).Error("cannot get receive resume token")
return nil, err
}
encEnabled, err := zfs.ZFSGetEncryptionEnabled(ctx, a.ToString())
if err != nil {
l.WithError(err).Error("cannot get encryption enabled status")
return nil, err
}
l.WithField("receive_resume_token", token).Debug("receive resume token")
a.TrimPrefix(root)
fs := &pdu.Filesystem{
Path: a.ToString(),
IsPlaceholder: ph.IsPlaceholder,
ResumeToken: token,
IsEncrypted: encEnabled,
}
fss = append(fss, fs)
}
if len(fss) == 0 {
getLogger(ctx).Debug("no filesystems found")
return &pdu.ListFilesystemRes{}, nil
}
return &pdu.ListFilesystemRes{Filesystems: fss}, nil
}
func (s *Receiver) ListFilesystemVersions(ctx context.Context, req *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error) {
root := s.clientRootFromCtx(ctx)
lp, err := subroot{root}.MapToLocal(req.GetFilesystem())
if err != nil {
return nil, err
}
// TODO share following code with sender
fsvs, err := zfs.ZFSListFilesystemVersions(lp, zfs.ListFilesystemVersionsOptions{})
if err != nil {
return nil, err
}
rfsvs := make([]*pdu.FilesystemVersion, len(fsvs))
for i := range fsvs {
rfsvs[i] = pdu.FilesystemVersionFromZFS(&fsvs[i])
}
return &pdu.ListFilesystemVersionsRes{Versions: rfsvs}, nil
}
func (s *Receiver) Ping(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
res := pdu.PingRes{
Echo: req.GetMessage(),
}
return &res, nil
}
func (s *Receiver) PingDataconn(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
return s.Ping(ctx, req)
}
func (s *Receiver) WaitForConnectivity(ctx context.Context) error {
return nil
}
func (s *Receiver) ReplicationCursor(context.Context, *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) {
return nil, fmt.Errorf("ReplicationCursor not implemented for Receiver")
}
func (s *Receiver) Send(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, zfs.StreamCopier, error) {
return nil, nil, fmt.Errorf("receiver does not implement Send()")
}
var maxConcurrentZFSRecvSemaphore = semaphore.New(envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_RECV", 10))
func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs.StreamCopier) (*pdu.ReceiveRes, error) {
getLogger(ctx).Debug("incoming Receive")
defer receive.Close()
root := s.clientRootFromCtx(ctx)
lp, err := subroot{root}.MapToLocal(req.Filesystem)
if err != nil {
return nil, errors.Wrap(err, "`Filesystem` invalid")
}
to := uncheckedSendArgsFromPDU(req.GetTo())
if to == nil {
return nil, errors.New("`To` must not be nil")
}
if !to.IsSnapshot() {
return nil, errors.New("`To` must be a snapshot")
}
// create placeholder parent filesystems as appropriate
//
// Manipulating the ZFS dataset hierarchy must happen exclusively.
// TODO: Use fine-grained locking to allow separate clients / requests to pass
// through the following section concurrently when operating on disjoint
// ZFS dataset hierarchy subtrees.
var visitErr error
func() {
getLogger(ctx).Debug("begin acquire recvParentCreationMtx")
defer s.recvParentCreationMtx.Lock().Unlock()
getLogger(ctx).Debug("end acquire recvParentCreationMtx")
defer getLogger(ctx).Debug("release recvParentCreationMtx")
f := zfs.NewDatasetPathForest()
f.Add(lp)
getLogger(ctx).Debug("begin tree-walk")
f.WalkTopDown(func(v zfs.DatasetPathVisit) (visitChildTree bool) {
if v.Path.Equal(lp) {
return false
}
ph, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, v.Path)
getLogger(ctx).
WithField("fs", v.Path.ToString()).
WithField("placeholder_state", fmt.Sprintf("%#v", ph)).
WithField("err", fmt.Sprintf("%s", err)).
WithField("errType", fmt.Sprintf("%T", err)).
Debug("placeholder state for filesystem")
if err != nil {
visitErr = err
return false
}
if !ph.FSExists {
if s.conf.RootWithoutClientComponent.HasPrefix(v.Path) {
if v.Path.Length() == 1 {
visitErr = fmt.Errorf("pool %q not imported", v.Path.ToString())
} else {
visitErr = fmt.Errorf("root_fs %q does not exist", s.conf.RootWithoutClientComponent.ToString())
}
getLogger(ctx).WithError(visitErr).Error("placeholders are only created automatically below root_fs")
return false
}
l := getLogger(ctx).WithField("placeholder_fs", v.Path)
l.Debug("create placeholder filesystem")
err := zfs.ZFSCreatePlaceholderFilesystem(ctx, v.Path)
if err != nil {
l.WithError(err).Error("cannot create placeholder filesystem")
visitErr = err
return false
}
return true
}
getLogger(ctx).WithField("filesystem", v.Path.ToString()).Debug("exists")
return true // leave this fs as is
})
}()
getLogger(ctx).WithField("visitErr", visitErr).Debug("complete tree-walk")
if visitErr != nil {
return nil, visitErr
}
// determine whether we need to rollback the filesystem / change its placeholder state
var clearPlaceholderProperty bool
var recvOpts zfs.RecvOptions
ph, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, lp)
if err == nil && ph.FSExists && ph.IsPlaceholder {
recvOpts.RollbackAndForceRecv = true
clearPlaceholderProperty = true
}
if clearPlaceholderProperty {
if err := zfs.ZFSSetPlaceholder(ctx, lp, false); err != nil {
return nil, fmt.Errorf("cannot clear placeholder property for forced receive: %s", err)
}
}
if req.ClearResumeToken && ph.FSExists {
if err := zfs.ZFSRecvClearResumeToken(ctx, lp.ToString()); err != nil {
return nil, errors.Wrap(err, "cannot clear resume token")
}
}
recvOpts.SavePartialRecvState, err = zfs.ResumeRecvSupported(ctx, lp)
if err != nil {
return nil, errors.Wrap(err, "cannot determine whether we can use resumable send & recv")
}
getLogger(ctx).Debug("acquire concurrent recv semaphore")
// TODO use try-acquire and fail with resource-exhaustion rpc status
// => would require handling on the client-side
// => this is a dataconn endpoint, doesn't have the status code semantics of gRPC
guard, err := maxConcurrentZFSRecvSemaphore.Acquire(ctx)
if err != nil {
return nil, err
}
defer guard.Release()
getLogger(ctx).WithField("opts", fmt.Sprintf("%#v", recvOpts)).Debug("start receive command")
snapFullPath := to.FullPath(lp.ToString())
if err := zfs.ZFSRecv(ctx, lp.ToString(), to, receive, recvOpts); err != nil {
getLogger(ctx).
WithError(err).
WithField("opts", recvOpts).
Error("zfs receive failed")
return nil, err
}
// validate that we actually received what the sender claimed
toRecvd, err := to.ValidateExistsAndGetVersion(ctx, lp.ToString())
if err != nil {
msg := "receive request's `To` version does not match what we received in the stream"
getLogger(ctx).WithError(err).WithField("snap", snapFullPath).Error(msg)
getLogger(ctx).Error("aborting recv request, but keeping received snapshot for inspection")
return nil, errors.Wrap(err, msg)
}
if s.conf.UpdateLastReceivedHold {
getLogger(ctx).Debug("move last-received-hold")
if err := MoveLastReceivedHold(ctx, lp.ToString(), toRecvd, s.conf.JobID); err != nil {
return nil, errors.Wrap(err, "cannot move last-received-hold")
}
}
return &pdu.ReceiveRes{}, nil
}
func (s *Receiver) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
root := s.clientRootFromCtx(ctx)
lp, err := subroot{root}.MapToLocal(req.Filesystem)
if err != nil {
return nil, err
}
return doDestroySnapshots(ctx, lp, req.Snapshots)
}
func (p *Receiver) HintMostRecentCommonAncestor(ctx context.Context, r *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error) {
// we don't move last-received-hold as part of this hint
// because that wouldn't give us any benefit wrt resumability.
//
// Other reason: the replication logic that issues this RPC would require refactoring
// to include the receiver's FilesystemVersion in the request)
return &pdu.HintMostRecentCommonAncestorRes{}, nil
}
func (p *Receiver) SendCompleted(context.Context, *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
return &pdu.SendCompletedRes{}, nil
}
func doDestroySnapshots(ctx context.Context, lp *zfs.DatasetPath, snaps []*pdu.FilesystemVersion) (*pdu.DestroySnapshotsRes, error) {
reqs := make([]*zfs.DestroySnapOp, len(snaps))
ress := make([]*pdu.DestroySnapshotRes, len(snaps))
errs := make([]error, len(snaps))
for i, fsv := range snaps {
if fsv.Type != pdu.FilesystemVersion_Snapshot {
return nil, fmt.Errorf("version %q is not a snapshot", fsv.Name)
}
ress[i] = &pdu.DestroySnapshotRes{
Snapshot: fsv,
// Error set after batch operation
}
reqs[i] = &zfs.DestroySnapOp{
Filesystem: lp.ToString(),
Name: fsv.Name,
ErrOut: &errs[i],
}
}
zfs.ZFSDestroyFilesystemVersions(reqs)
for i := range reqs {
if errs[i] != nil {
if de, ok := errs[i].(*zfs.DestroySnapshotsError); ok && len(de.Reason) == 1 {
ress[i].Error = de.Reason[0]
} else {
ress[i].Error = errs[i].Error()
}
}
}
return &pdu.DestroySnapshotsRes{
Results: ress,
}, nil
}