mirror of
https://github.com/zrepl/zrepl.git
synced 2025-08-01 04:11:05 +02:00
The motivation for this recatoring are based on two independent issues: - @JMoVS found that the changes merged as part of #259 slowed his OS X based installation down significantly. Analysis of the zfs command logging introduced in #296 showed that `zfs holds` took most of the execution time, and they pointed out that not all of those `zfs holds` invocations were actually necessary. I.e.: zrepl was inefficient about retrieving information from ZFS. - @InsanePrawn found that failures on initial replication would lead to step holds accumulating on the sending side, i.e. they would never be cleaned up in the HintMostRecentCommonAncestor RPC handler. That was because we only sent that RPC if there was a most recent common ancestor detected during replication planning. @InsanePrawn prototyped an implementation of a `zrepl zfs-abstractions release` command to mitigate the situation. As part of that development work and back-and-forth with @problame, it became evident that the abstractions that #259 built on top of zfs in package endpoint (step holds, replication cursor, last-received-hold), were not well-represented for re-use in the `zrepl zfs-abstractions release` subocommand prototype. This commit refactors package endpoint to address both of these issues: - endpoint abstractions now share an interface `Abstraction` that, among other things, provides a uniform `Destroy()` method. However, that method should not be destroyed directly but instead the package-level `BatchDestroy` function should be used in order to allow for a migration to zfs channel programs in the future. - endpoint now has a query facitilty (`ListAbstractions`) which is used to find on-disk - step holds and bookmarks - replication cursors (v1, v2) - last-received-holds By describing the query in a struct, we can centralized the retrieval of information via the ZFS CLI and only have to be clever once. We are "clever" in the following ways: - When asking for hold-based abstractions, we only run `zfs holds` on snapshot that have `userrefs` > 0 - To support this functionality, add field `UserRefs` to zfs.FilesystemVersion and retrieve it anywhere we retrieve zfs.FilesystemVersion from ZFS. - When asking only for bookmark-based abstractions, we only run `zfs list -t bookmark`, not with snapshots. - Currently unused (except for CLI) per-filesystem concurrent lookup - Option to only include abstractions with CreateTXG in a specified range - refactor `endpoint`'s various ZFS info retrieval methods to use `ListAbstractions` - rename the `zrepl holds list` command to `zrepl zfs-abstractions list` - make `zrepl zfs-abstractions list` consume endpoint.ListAbstractions - Add a `ListStale` method which, given a query template, lists stale holds and bookmarks. - it uses replication cursor has different modes - the new `zrepl zfs-abstractions release-{all,stale}` commands can be used to remove abstractions of package endpoint - Adjust HintMostRecentCommonAncestor RPC for stale-holds cleanup: - send it also if no most recent common ancestor exists between sender and receiver - have the sender clean up its abstractions when it receives the RPC with no most recent common ancestor, using `ListStale` - Due to changed semantics, bump the protocol version. - Adjust HintMostRecentCommonAncestor RPC for performance problems encountered by @JMoVS - by default, per (job,fs)-combination, only consider cleaning step holds in the createtxg range `[last replication cursor,conservatively-estimated-receive-side-version)` - this behavior ensures resumability at cost proportional to the time that replication was donw - however, as explained in a comment, we might leak holds if the zrepl daemon stops running - that trade-off is acceptable because in the presumably rare this might happen the user has two tools at their hand: - Tool 1: run `zrepl zfs-abstractions release-stale` - Tool 2: use env var `ZREPL_ENDPOINT_SENDER_HINT_MOST_RECENT_STEP_HOLD_CLEANUP_MODE` to adjust the lower bound of the createtxg range (search for it in the code). The env var can also be used to disable hold-cleanup on the send-side entirely. supersedes closes #293 supersedes closes #282 fixes #280 fixes #278 Additionaly, we fixed a couple of bugs: - zfs: fix half-nil error reporting of dataset-does-not-exist for ZFSListChan and ZFSBookmark - endpoint: Sender's `HintMostRecentCommonAncestor` handler would not check whether access to the specified filesystem was allowed.
887 lines
29 KiB
Go
887 lines
29 KiB
Go
// Package endpoint implements replication endpoints for use with package replication.
|
|
package endpoint
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"path"
|
|
"sync"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/zrepl/zrepl/replication/logic/pdu"
|
|
"github.com/zrepl/zrepl/util/chainlock"
|
|
"github.com/zrepl/zrepl/util/envconst"
|
|
"github.com/zrepl/zrepl/util/semaphore"
|
|
"github.com/zrepl/zrepl/zfs"
|
|
)
|
|
|
|
type SenderConfig struct {
|
|
FSF zfs.DatasetFilter
|
|
Encrypt *zfs.NilBool
|
|
JobID JobID
|
|
}
|
|
|
|
func (c *SenderConfig) Validate() error {
|
|
c.JobID.MustValidate()
|
|
if err := c.Encrypt.Validate(); err != nil {
|
|
return errors.Wrap(err, "`Encrypt` field invalid")
|
|
}
|
|
if _, err := StepHoldTag(c.JobID); err != nil {
|
|
return fmt.Errorf("JobID cannot be used for hold tag: %s", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Sender implements replication.ReplicationEndpoint for a sending side
|
|
type Sender struct {
|
|
FSFilter zfs.DatasetFilter
|
|
encrypt *zfs.NilBool
|
|
jobId JobID
|
|
}
|
|
|
|
func NewSender(conf SenderConfig) *Sender {
|
|
if err := conf.Validate(); err != nil {
|
|
panic("invalid config" + err.Error())
|
|
}
|
|
return &Sender{
|
|
FSFilter: conf.FSF,
|
|
encrypt: conf.Encrypt,
|
|
jobId: conf.JobID,
|
|
}
|
|
}
|
|
|
|
func (s *Sender) filterCheckFS(fs string) (*zfs.DatasetPath, error) {
|
|
dp, err := zfs.NewDatasetPath(fs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if dp.Length() == 0 {
|
|
return nil, errors.New("empty filesystem not allowed")
|
|
}
|
|
pass, err := s.FSFilter.Filter(dp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !pass {
|
|
return nil, fmt.Errorf("endpoint does not allow access to filesystem %s", fs)
|
|
}
|
|
return dp, nil
|
|
}
|
|
|
|
func (s *Sender) ListFilesystems(ctx context.Context, r *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) {
|
|
fss, err := zfs.ZFSListMapping(ctx, s.FSFilter)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rfss := make([]*pdu.Filesystem, len(fss))
|
|
for i := range fss {
|
|
encEnabled, err := zfs.ZFSGetEncryptionEnabled(ctx, fss[i].ToString())
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "cannot get filesystem encryption status")
|
|
}
|
|
rfss[i] = &pdu.Filesystem{
|
|
Path: fss[i].ToString(),
|
|
// ResumeToken does not make sense from Sender
|
|
IsPlaceholder: false, // sender FSs are never placeholders
|
|
IsEncrypted: encEnabled,
|
|
}
|
|
}
|
|
res := &pdu.ListFilesystemRes{Filesystems: rfss}
|
|
return res, nil
|
|
}
|
|
|
|
func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error) {
|
|
lp, err := s.filterCheckFS(r.GetFilesystem())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
fsvs, err := zfs.ZFSListFilesystemVersions(lp, zfs.ListFilesystemVersionsOptions{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rfsvs := make([]*pdu.FilesystemVersion, len(fsvs))
|
|
for i := range fsvs {
|
|
rfsvs[i] = pdu.FilesystemVersionFromZFS(&fsvs[i])
|
|
}
|
|
res := &pdu.ListFilesystemVersionsRes{Versions: rfsvs}
|
|
return res, nil
|
|
|
|
}
|
|
|
|
func (p *Sender) HintMostRecentCommonAncestor(ctx context.Context, r *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error) {
|
|
|
|
fsp, err := p.filterCheckFS(r.GetFilesystem())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
fs := fsp.ToString()
|
|
|
|
log := getLogger(ctx).WithField("fs", fs).WithField("hinted_most_recent", fmt.Sprintf("%#v", r.GetSenderVersion()))
|
|
|
|
log.WithField("full_hint", r).Debug("full hint")
|
|
|
|
if r.GetSenderVersion() == nil {
|
|
// no common ancestor found, likely due to failed prior replication attempt
|
|
// => release stale step holds to prevent them from accumulating
|
|
// (they can accumulate on initial replication because each inital replication step might hold a different `to`)
|
|
// => replication cursors cannot accumulate because we always _move_ the replication cursor
|
|
log.Debug("releasing all step holds on the filesystem")
|
|
TryReleaseStepStaleFS(ctx, fs, p.jobId)
|
|
return &pdu.HintMostRecentCommonAncestorRes{}, nil
|
|
}
|
|
|
|
// we were hinted a specific common ancestor
|
|
|
|
mostRecentVersion, err := sendArgsFromPDUAndValidateExistsAndGetVersion(ctx, fs, r.GetSenderVersion())
|
|
if err != nil {
|
|
msg := "HintMostRecentCommonAncestor rpc with nonexistent most recent version"
|
|
log.Warn(msg)
|
|
return nil, errors.Wrap(err, msg)
|
|
}
|
|
|
|
// move replication cursor to this position
|
|
destroyedCursors, err := MoveReplicationCursor(ctx, fs, mostRecentVersion, p.jobId)
|
|
if err == zfs.ErrBookmarkCloningNotSupported {
|
|
log.Debug("not creating replication cursor from bookmark because ZFS does not support it")
|
|
// fallthrough
|
|
} else if err != nil {
|
|
return nil, errors.Wrap(err, "cannot set replication cursor to hinted version")
|
|
}
|
|
|
|
// take care of stale step holds
|
|
log.WithField("step-holds-cleanup-mode", senderHintMostRecentCommonAncestorStepCleanupMode).
|
|
Debug("taking care of possibly stale step holds")
|
|
doStepCleanup := false
|
|
var stepCleanupSince *CreateTXGRangeBound
|
|
switch senderHintMostRecentCommonAncestorStepCleanupMode {
|
|
case StepCleanupNoCleanup:
|
|
doStepCleanup = false
|
|
case StepCleanupRangeSinceUnbounded:
|
|
doStepCleanup = true
|
|
stepCleanupSince = nil
|
|
case StepCleanupRangeSinceReplicationCursor:
|
|
doStepCleanup = true
|
|
// Use the destroyed replication cursors as indicator how far the previous replication got.
|
|
// To be precise: We limit the amount of visisted snapshots to exactly those snapshots
|
|
// created since the last successful replication cursor movement (i.e. last successful replication step)
|
|
//
|
|
// If we crash now, we'll leak the step we are about to release, but the performance gain
|
|
// of limiting the amount of snapshots we visit makes up for that.
|
|
// Users have the `zrepl holds release-stale` command to cleanup leaked step holds.
|
|
for _, destroyed := range destroyedCursors {
|
|
if stepCleanupSince == nil {
|
|
stepCleanupSince = &CreateTXGRangeBound{
|
|
CreateTXG: destroyed.GetCreateTXG(),
|
|
Inclusive: &zfs.NilBool{B: true},
|
|
}
|
|
} else if destroyed.GetCreateTXG() < stepCleanupSince.CreateTXG {
|
|
stepCleanupSince.CreateTXG = destroyed.GetCreateTXG()
|
|
}
|
|
}
|
|
default:
|
|
panic(senderHintMostRecentCommonAncestorStepCleanupMode)
|
|
}
|
|
if !doStepCleanup {
|
|
log.Info("skipping cleanup of prior invocations' step holds due to environment variable setting")
|
|
} else {
|
|
if err := ReleaseStepCummulativeInclusive(ctx, fs, stepCleanupSince, mostRecentVersion, p.jobId); err != nil {
|
|
return nil, errors.Wrap(err, "cannot cleanup prior invocation's step holds and bookmarks")
|
|
} else {
|
|
log.Info("step hold cleanup done")
|
|
}
|
|
}
|
|
|
|
return &pdu.HintMostRecentCommonAncestorRes{}, nil
|
|
}
|
|
|
|
type HintMostRecentCommonAncestorStepCleanupMode struct{ string }
|
|
|
|
var (
|
|
StepCleanupRangeSinceReplicationCursor = HintMostRecentCommonAncestorStepCleanupMode{"range-since-replication-cursor"}
|
|
StepCleanupRangeSinceUnbounded = HintMostRecentCommonAncestorStepCleanupMode{"range-since-unbounded"}
|
|
StepCleanupNoCleanup = HintMostRecentCommonAncestorStepCleanupMode{"no-cleanup"}
|
|
)
|
|
|
|
func (m HintMostRecentCommonAncestorStepCleanupMode) String() string { return string(m.string) }
|
|
func (m *HintMostRecentCommonAncestorStepCleanupMode) Set(s string) error {
|
|
switch s {
|
|
case StepCleanupRangeSinceReplicationCursor.String():
|
|
*m = StepCleanupRangeSinceReplicationCursor
|
|
case StepCleanupRangeSinceUnbounded.String():
|
|
*m = StepCleanupRangeSinceUnbounded
|
|
case StepCleanupNoCleanup.String():
|
|
*m = StepCleanupNoCleanup
|
|
default:
|
|
return fmt.Errorf("unknown step cleanup mode %q", s)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
var senderHintMostRecentCommonAncestorStepCleanupMode = *envconst.Var("ZREPL_ENDPOINT_SENDER_HINT_MOST_RECENT_STEP_HOLD_CLEANUP_MODE", &StepCleanupRangeSinceReplicationCursor).(*HintMostRecentCommonAncestorStepCleanupMode)
|
|
|
|
var maxConcurrentZFSSendSemaphore = semaphore.New(envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_SEND", 10))
|
|
|
|
func uncheckedSendArgsFromPDU(fsv *pdu.FilesystemVersion) *zfs.ZFSSendArgVersion {
|
|
if fsv == nil {
|
|
return nil
|
|
}
|
|
return &zfs.ZFSSendArgVersion{RelName: fsv.GetRelName(), GUID: fsv.Guid}
|
|
}
|
|
|
|
func sendArgsFromPDUAndValidateExistsAndGetVersion(ctx context.Context, fs string, fsv *pdu.FilesystemVersion) (v zfs.FilesystemVersion, err error) {
|
|
sendArgs := uncheckedSendArgsFromPDU(fsv)
|
|
if sendArgs == nil {
|
|
return v, errors.New("must not be nil")
|
|
}
|
|
version, err := sendArgs.ValidateExistsAndGetVersion(ctx, fs)
|
|
if err != nil {
|
|
return v, err
|
|
}
|
|
return version, nil
|
|
}
|
|
|
|
func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.StreamCopier, error) {
|
|
|
|
_, err := s.filterCheckFS(r.Filesystem)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
switch r.Encrypted {
|
|
case pdu.Tri_DontCare:
|
|
// use s.encrypt setting
|
|
// ok, fallthrough outer
|
|
case pdu.Tri_False:
|
|
if s.encrypt.B {
|
|
return nil, nil, errors.New("only encrypted sends allowed (send -w + encryption!= off), but unencrypted send requested")
|
|
}
|
|
// fallthrough outer
|
|
case pdu.Tri_True:
|
|
if !s.encrypt.B {
|
|
return nil, nil, errors.New("only unencrypted sends allowed, but encrypted send requested")
|
|
}
|
|
// fallthrough outer
|
|
default:
|
|
return nil, nil, fmt.Errorf("unknown pdu.Tri variant %q", r.Encrypted)
|
|
}
|
|
|
|
sendArgsUnvalidated := zfs.ZFSSendArgsUnvalidated{
|
|
FS: r.Filesystem,
|
|
From: uncheckedSendArgsFromPDU(r.GetFrom()), // validated by zfs.ZFSSendDry / zfs.ZFSSend
|
|
To: uncheckedSendArgsFromPDU(r.GetTo()), // validated by zfs.ZFSSendDry / zfs.ZFSSend
|
|
Encrypted: s.encrypt,
|
|
ResumeToken: r.ResumeToken, // nil or not nil, depending on decoding success
|
|
}
|
|
|
|
sendArgs, err := sendArgsUnvalidated.Validate(ctx)
|
|
if err != nil {
|
|
return nil, nil, errors.Wrap(err, "validate send arguments")
|
|
}
|
|
|
|
getLogger(ctx).Debug("acquire concurrent send semaphore")
|
|
// TODO use try-acquire and fail with resource-exhaustion rpc status
|
|
// => would require handling on the client-side
|
|
// => this is a dataconn endpoint, doesn't have the status code semantics of gRPC
|
|
guard, err := maxConcurrentZFSSendSemaphore.Acquire(ctx)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
defer guard.Release()
|
|
|
|
si, err := zfs.ZFSSendDry(ctx, sendArgs)
|
|
if err != nil {
|
|
return nil, nil, errors.Wrap(err, "zfs send dry failed")
|
|
}
|
|
|
|
// From now on, assume that sendArgs has been validated by ZFSSendDry
|
|
// (because validation involves shelling out, it's actually a little expensive)
|
|
|
|
var expSize int64 = 0 // protocol says 0 means no estimate
|
|
if si.SizeEstimate != -1 { // but si returns -1 for no size estimate
|
|
expSize = si.SizeEstimate
|
|
}
|
|
res := &pdu.SendRes{
|
|
ExpectedSize: expSize,
|
|
UsedResumeToken: r.ResumeToken != "",
|
|
}
|
|
|
|
if r.DryRun {
|
|
return res, nil, nil
|
|
}
|
|
|
|
// update replication cursor
|
|
if sendArgs.From != nil {
|
|
// For all but the first replication, this should always be a no-op because SendCompleted already moved the cursor
|
|
_, err = MoveReplicationCursor(ctx, sendArgs.FS, sendArgs.FromVersion, s.jobId)
|
|
if err == zfs.ErrBookmarkCloningNotSupported {
|
|
getLogger(ctx).Debug("not creating replication cursor from bookmark because ZFS does not support it")
|
|
// fallthrough
|
|
} else if err != nil {
|
|
return nil, nil, errors.Wrap(err, "cannot set replication cursor to `from` version before starting send")
|
|
}
|
|
}
|
|
|
|
// make sure `From` doesn't go away in order to make this step resumable
|
|
if sendArgs.From != nil {
|
|
_, err := HoldStep(ctx, sendArgs.FS, *sendArgs.FromVersion, s.jobId)
|
|
if err == zfs.ErrBookmarkCloningNotSupported {
|
|
getLogger(ctx).Debug("not creating step bookmark because ZFS does not support it")
|
|
// fallthrough
|
|
} else if err != nil {
|
|
return nil, nil, errors.Wrapf(err, "cannot hold `from` version %q before starting send", *sendArgs.FromVersion)
|
|
}
|
|
}
|
|
// make sure `To` doesn't go away in order to make this step resumable
|
|
_, err = HoldStep(ctx, sendArgs.FS, sendArgs.ToVersion, s.jobId)
|
|
if err != nil {
|
|
return nil, nil, errors.Wrapf(err, "cannot hold `to` version %q before starting send", sendArgs.ToVersion)
|
|
}
|
|
|
|
// step holds & replication cursor released / moved forward in s.SendCompleted => s.moveCursorAndReleaseSendHolds
|
|
|
|
streamCopier, err := zfs.ZFSSend(ctx, sendArgs)
|
|
if err != nil {
|
|
return nil, nil, errors.Wrap(err, "zfs send failed")
|
|
}
|
|
return res, streamCopier, nil
|
|
}
|
|
|
|
func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
|
|
|
|
orig := r.GetOriginalReq() // may be nil, always use proto getters
|
|
fsp, err := p.filterCheckFS(orig.GetFilesystem())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
fs := fsp.ToString()
|
|
|
|
var from *zfs.FilesystemVersion
|
|
if orig.GetFrom() != nil {
|
|
f, err := sendArgsFromPDUAndValidateExistsAndGetVersion(ctx, fs, orig.GetFrom()) // no shadow
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "validate `from` exists")
|
|
}
|
|
from = &f
|
|
}
|
|
to, err := sendArgsFromPDUAndValidateExistsAndGetVersion(ctx, fs, orig.GetTo())
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "validate `to` exists")
|
|
}
|
|
|
|
log := getLogger(ctx).WithField("to_guid", to.Guid).
|
|
WithField("fs", fs).
|
|
WithField("to", to.RelName)
|
|
if from != nil {
|
|
log = log.WithField("from", from.RelName).WithField("from_guid", from.Guid)
|
|
}
|
|
|
|
log.Debug("move replication cursor to most recent common version")
|
|
destroyedCursors, err := MoveReplicationCursor(ctx, fs, to, p.jobId)
|
|
if err != nil {
|
|
if err == zfs.ErrBookmarkCloningNotSupported {
|
|
log.Debug("not setting replication cursor, bookmark cloning not supported")
|
|
} else {
|
|
msg := "cannot move replication cursor, keeping hold on `to` until successful"
|
|
log.WithError(err).Error(msg)
|
|
err = errors.Wrap(err, msg)
|
|
// it is correct to not release the hold if we can't move the cursor!
|
|
return &pdu.SendCompletedRes{}, err
|
|
}
|
|
} else {
|
|
log.Info("successfully moved replication cursor")
|
|
}
|
|
|
|
// kick off releasing of step holds / bookmarks
|
|
// if we fail to release them, don't bother the caller:
|
|
// they are merely an implementation detail on the sender for better resumability
|
|
var wg sync.WaitGroup
|
|
wg.Add(2)
|
|
go func() {
|
|
defer wg.Done()
|
|
log.Debug("release step-hold of or step-bookmark on `to`")
|
|
err = ReleaseStep(ctx, fs, to, p.jobId)
|
|
if err != nil {
|
|
log.WithError(err).Error("cannot release step-holds on or destroy step-bookmark of `to`")
|
|
} else {
|
|
log.Info("successfully released step-holds on or destroyed step-bookmark of `to`")
|
|
}
|
|
|
|
}()
|
|
go func() {
|
|
defer wg.Done()
|
|
if from == nil {
|
|
return
|
|
}
|
|
log.Debug("release step-hold of or step-bookmark on `from`")
|
|
err := ReleaseStep(ctx, fs, *from, p.jobId)
|
|
if err != nil {
|
|
if dne, ok := err.(*zfs.DatasetDoesNotExist); ok {
|
|
// If bookmark cloning is not supported, `from` might be the old replication cursor
|
|
// and thus have already been destroyed by MoveReplicationCursor above
|
|
// In that case, nonexistence of `from` is not an error, otherwise it is.
|
|
for _, c := range destroyedCursors {
|
|
if c.GetFullPath() == dne.Path {
|
|
log.Info("`from` was a replication cursor and has already been destroyed")
|
|
return
|
|
}
|
|
}
|
|
// fallthrough
|
|
}
|
|
log.WithError(err).Error("cannot release step-holds on or destroy step-bookmark of `from`")
|
|
} else {
|
|
log.Info("successfully released step-holds on or destroyed step-bookmark of `from`")
|
|
}
|
|
}()
|
|
wg.Wait()
|
|
|
|
return &pdu.SendCompletedRes{}, nil
|
|
}
|
|
|
|
func (p *Sender) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
|
|
dp, err := p.filterCheckFS(req.Filesystem)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return doDestroySnapshots(ctx, dp, req.Snapshots)
|
|
}
|
|
|
|
func (p *Sender) Ping(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
|
|
res := pdu.PingRes{
|
|
Echo: req.GetMessage(),
|
|
}
|
|
return &res, nil
|
|
}
|
|
|
|
func (p *Sender) PingDataconn(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
|
|
return p.Ping(ctx, req)
|
|
}
|
|
|
|
func (p *Sender) WaitForConnectivity(ctx context.Context) error {
|
|
return nil
|
|
}
|
|
|
|
func (p *Sender) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) {
|
|
dp, err := p.filterCheckFS(req.Filesystem)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cursor, err := GetMostRecentReplicationCursorOfJob(ctx, dp.ToString(), p.jobId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if cursor == nil {
|
|
return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Notexist{Notexist: true}}, nil
|
|
}
|
|
return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Guid{Guid: cursor.Guid}}, nil
|
|
}
|
|
|
|
func (p *Sender) Receive(ctx context.Context, r *pdu.ReceiveReq, receive zfs.StreamCopier) (*pdu.ReceiveRes, error) {
|
|
return nil, fmt.Errorf("sender does not implement Receive()")
|
|
}
|
|
|
|
type FSFilter interface { // FIXME unused
|
|
Filter(path *zfs.DatasetPath) (pass bool, err error)
|
|
}
|
|
|
|
// FIXME: can we get away without error types here?
|
|
type FSMap interface { // FIXME unused
|
|
FSFilter
|
|
Map(path *zfs.DatasetPath) (*zfs.DatasetPath, error)
|
|
Invert() (FSMap, error)
|
|
AsFilter() FSFilter
|
|
}
|
|
|
|
type ReceiverConfig struct {
|
|
JobID JobID
|
|
|
|
RootWithoutClientComponent *zfs.DatasetPath // TODO use
|
|
AppendClientIdentity bool
|
|
|
|
UpdateLastReceivedHold bool
|
|
}
|
|
|
|
func (c *ReceiverConfig) copyIn() {
|
|
c.RootWithoutClientComponent = c.RootWithoutClientComponent.Copy()
|
|
}
|
|
|
|
func (c *ReceiverConfig) Validate() error {
|
|
c.JobID.MustValidate()
|
|
if c.RootWithoutClientComponent.Length() <= 0 {
|
|
return errors.New("RootWithoutClientComponent must not be an empty dataset path")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Receiver implements replication.ReplicationEndpoint for a receiving side
|
|
type Receiver struct {
|
|
conf ReceiverConfig // validated
|
|
|
|
recvParentCreationMtx *chainlock.L
|
|
}
|
|
|
|
func NewReceiver(config ReceiverConfig) *Receiver {
|
|
config.copyIn()
|
|
if err := config.Validate(); err != nil {
|
|
panic(err)
|
|
}
|
|
return &Receiver{
|
|
conf: config,
|
|
recvParentCreationMtx: chainlock.New(),
|
|
}
|
|
}
|
|
|
|
func TestClientIdentity(rootFS *zfs.DatasetPath, clientIdentity string) error {
|
|
_, err := clientRoot(rootFS, clientIdentity)
|
|
return err
|
|
}
|
|
|
|
func clientRoot(rootFS *zfs.DatasetPath, clientIdentity string) (*zfs.DatasetPath, error) {
|
|
rootFSLen := rootFS.Length()
|
|
clientRootStr := path.Join(rootFS.ToString(), clientIdentity)
|
|
clientRoot, err := zfs.NewDatasetPath(clientRootStr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if rootFSLen+1 != clientRoot.Length() {
|
|
return nil, fmt.Errorf("client identity must be a single ZFS filesystem path component")
|
|
}
|
|
return clientRoot, nil
|
|
}
|
|
|
|
func (s *Receiver) clientRootFromCtx(ctx context.Context) *zfs.DatasetPath {
|
|
if !s.conf.AppendClientIdentity {
|
|
return s.conf.RootWithoutClientComponent.Copy()
|
|
}
|
|
|
|
clientIdentity, ok := ctx.Value(ClientIdentityKey).(string)
|
|
if !ok {
|
|
panic(fmt.Sprintf("ClientIdentityKey context value must be set"))
|
|
}
|
|
|
|
clientRoot, err := clientRoot(s.conf.RootWithoutClientComponent, clientIdentity)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("ClientIdentityContextKey must have been validated before invoking Receiver: %s", err))
|
|
}
|
|
return clientRoot
|
|
}
|
|
|
|
type subroot struct {
|
|
localRoot *zfs.DatasetPath
|
|
}
|
|
|
|
var _ zfs.DatasetFilter = subroot{}
|
|
|
|
// Filters local p
|
|
func (f subroot) Filter(p *zfs.DatasetPath) (pass bool, err error) {
|
|
return p.HasPrefix(f.localRoot) && !p.Equal(f.localRoot), nil
|
|
}
|
|
|
|
func (f subroot) MapToLocal(fs string) (*zfs.DatasetPath, error) {
|
|
p, err := zfs.NewDatasetPath(fs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if p.Length() == 0 {
|
|
return nil, errors.Errorf("cannot map empty filesystem")
|
|
}
|
|
c := f.localRoot.Copy()
|
|
c.Extend(p)
|
|
return c, nil
|
|
}
|
|
|
|
func (s *Receiver) ListFilesystems(ctx context.Context, req *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) {
|
|
root := s.clientRootFromCtx(ctx)
|
|
filtered, err := zfs.ZFSListMapping(ctx, subroot{root})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// present filesystem without the root_fs prefix
|
|
fss := make([]*pdu.Filesystem, 0, len(filtered))
|
|
for _, a := range filtered {
|
|
l := getLogger(ctx).WithField("fs", a)
|
|
ph, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, a)
|
|
if err != nil {
|
|
l.WithError(err).Error("error getting placeholder state")
|
|
return nil, errors.Wrapf(err, "cannot get placeholder state for fs %q", a)
|
|
}
|
|
l.WithField("placeholder_state", fmt.Sprintf("%#v", ph)).Debug("placeholder state")
|
|
if !ph.FSExists {
|
|
l.Error("inconsistent placeholder state: filesystem must exists")
|
|
err := errors.Errorf("inconsistent placeholder state: filesystem %q must exist in this context", a.ToString())
|
|
return nil, err
|
|
}
|
|
token, err := zfs.ZFSGetReceiveResumeTokenOrEmptyStringIfNotSupported(ctx, a)
|
|
if err != nil {
|
|
l.WithError(err).Error("cannot get receive resume token")
|
|
return nil, err
|
|
}
|
|
encEnabled, err := zfs.ZFSGetEncryptionEnabled(ctx, a.ToString())
|
|
if err != nil {
|
|
l.WithError(err).Error("cannot get encryption enabled status")
|
|
return nil, err
|
|
}
|
|
l.WithField("receive_resume_token", token).Debug("receive resume token")
|
|
|
|
a.TrimPrefix(root)
|
|
|
|
fs := &pdu.Filesystem{
|
|
Path: a.ToString(),
|
|
IsPlaceholder: ph.IsPlaceholder,
|
|
ResumeToken: token,
|
|
IsEncrypted: encEnabled,
|
|
}
|
|
fss = append(fss, fs)
|
|
}
|
|
if len(fss) == 0 {
|
|
getLogger(ctx).Debug("no filesystems found")
|
|
return &pdu.ListFilesystemRes{}, nil
|
|
}
|
|
return &pdu.ListFilesystemRes{Filesystems: fss}, nil
|
|
}
|
|
|
|
func (s *Receiver) ListFilesystemVersions(ctx context.Context, req *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error) {
|
|
root := s.clientRootFromCtx(ctx)
|
|
lp, err := subroot{root}.MapToLocal(req.GetFilesystem())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// TODO share following code with sender
|
|
|
|
fsvs, err := zfs.ZFSListFilesystemVersions(lp, zfs.ListFilesystemVersionsOptions{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rfsvs := make([]*pdu.FilesystemVersion, len(fsvs))
|
|
for i := range fsvs {
|
|
rfsvs[i] = pdu.FilesystemVersionFromZFS(&fsvs[i])
|
|
}
|
|
|
|
return &pdu.ListFilesystemVersionsRes{Versions: rfsvs}, nil
|
|
}
|
|
|
|
func (s *Receiver) Ping(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
|
|
res := pdu.PingRes{
|
|
Echo: req.GetMessage(),
|
|
}
|
|
return &res, nil
|
|
}
|
|
|
|
func (s *Receiver) PingDataconn(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
|
|
return s.Ping(ctx, req)
|
|
}
|
|
|
|
func (s *Receiver) WaitForConnectivity(ctx context.Context) error {
|
|
return nil
|
|
}
|
|
|
|
func (s *Receiver) ReplicationCursor(context.Context, *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) {
|
|
return nil, fmt.Errorf("ReplicationCursor not implemented for Receiver")
|
|
}
|
|
|
|
func (s *Receiver) Send(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, zfs.StreamCopier, error) {
|
|
return nil, nil, fmt.Errorf("receiver does not implement Send()")
|
|
}
|
|
|
|
var maxConcurrentZFSRecvSemaphore = semaphore.New(envconst.Int64("ZREPL_ENDPOINT_MAX_CONCURRENT_RECV", 10))
|
|
|
|
func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs.StreamCopier) (*pdu.ReceiveRes, error) {
|
|
getLogger(ctx).Debug("incoming Receive")
|
|
defer receive.Close()
|
|
|
|
root := s.clientRootFromCtx(ctx)
|
|
lp, err := subroot{root}.MapToLocal(req.Filesystem)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "`Filesystem` invalid")
|
|
}
|
|
|
|
to := uncheckedSendArgsFromPDU(req.GetTo())
|
|
if to == nil {
|
|
return nil, errors.New("`To` must not be nil")
|
|
}
|
|
if !to.IsSnapshot() {
|
|
return nil, errors.New("`To` must be a snapshot")
|
|
}
|
|
|
|
// create placeholder parent filesystems as appropriate
|
|
//
|
|
// Manipulating the ZFS dataset hierarchy must happen exclusively.
|
|
// TODO: Use fine-grained locking to allow separate clients / requests to pass
|
|
// through the following section concurrently when operating on disjoint
|
|
// ZFS dataset hierarchy subtrees.
|
|
var visitErr error
|
|
func() {
|
|
getLogger(ctx).Debug("begin acquire recvParentCreationMtx")
|
|
defer s.recvParentCreationMtx.Lock().Unlock()
|
|
getLogger(ctx).Debug("end acquire recvParentCreationMtx")
|
|
defer getLogger(ctx).Debug("release recvParentCreationMtx")
|
|
|
|
f := zfs.NewDatasetPathForest()
|
|
f.Add(lp)
|
|
getLogger(ctx).Debug("begin tree-walk")
|
|
f.WalkTopDown(func(v zfs.DatasetPathVisit) (visitChildTree bool) {
|
|
if v.Path.Equal(lp) {
|
|
return false
|
|
}
|
|
ph, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, v.Path)
|
|
getLogger(ctx).
|
|
WithField("fs", v.Path.ToString()).
|
|
WithField("placeholder_state", fmt.Sprintf("%#v", ph)).
|
|
WithField("err", fmt.Sprintf("%s", err)).
|
|
WithField("errType", fmt.Sprintf("%T", err)).
|
|
Debug("placeholder state for filesystem")
|
|
if err != nil {
|
|
visitErr = err
|
|
return false
|
|
}
|
|
|
|
if !ph.FSExists {
|
|
if s.conf.RootWithoutClientComponent.HasPrefix(v.Path) {
|
|
if v.Path.Length() == 1 {
|
|
visitErr = fmt.Errorf("pool %q not imported", v.Path.ToString())
|
|
} else {
|
|
visitErr = fmt.Errorf("root_fs %q does not exist", s.conf.RootWithoutClientComponent.ToString())
|
|
}
|
|
getLogger(ctx).WithError(visitErr).Error("placeholders are only created automatically below root_fs")
|
|
return false
|
|
}
|
|
l := getLogger(ctx).WithField("placeholder_fs", v.Path)
|
|
l.Debug("create placeholder filesystem")
|
|
err := zfs.ZFSCreatePlaceholderFilesystem(ctx, v.Path)
|
|
if err != nil {
|
|
l.WithError(err).Error("cannot create placeholder filesystem")
|
|
visitErr = err
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
getLogger(ctx).WithField("filesystem", v.Path.ToString()).Debug("exists")
|
|
return true // leave this fs as is
|
|
})
|
|
}()
|
|
getLogger(ctx).WithField("visitErr", visitErr).Debug("complete tree-walk")
|
|
if visitErr != nil {
|
|
return nil, visitErr
|
|
}
|
|
|
|
// determine whether we need to rollback the filesystem / change its placeholder state
|
|
var clearPlaceholderProperty bool
|
|
var recvOpts zfs.RecvOptions
|
|
ph, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, lp)
|
|
if err == nil && ph.FSExists && ph.IsPlaceholder {
|
|
recvOpts.RollbackAndForceRecv = true
|
|
clearPlaceholderProperty = true
|
|
}
|
|
if clearPlaceholderProperty {
|
|
if err := zfs.ZFSSetPlaceholder(ctx, lp, false); err != nil {
|
|
return nil, fmt.Errorf("cannot clear placeholder property for forced receive: %s", err)
|
|
}
|
|
}
|
|
|
|
if req.ClearResumeToken && ph.FSExists {
|
|
if err := zfs.ZFSRecvClearResumeToken(ctx, lp.ToString()); err != nil {
|
|
return nil, errors.Wrap(err, "cannot clear resume token")
|
|
}
|
|
}
|
|
|
|
recvOpts.SavePartialRecvState, err = zfs.ResumeRecvSupported(ctx, lp)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "cannot determine whether we can use resumable send & recv")
|
|
}
|
|
|
|
getLogger(ctx).Debug("acquire concurrent recv semaphore")
|
|
// TODO use try-acquire and fail with resource-exhaustion rpc status
|
|
// => would require handling on the client-side
|
|
// => this is a dataconn endpoint, doesn't have the status code semantics of gRPC
|
|
guard, err := maxConcurrentZFSRecvSemaphore.Acquire(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer guard.Release()
|
|
|
|
getLogger(ctx).WithField("opts", fmt.Sprintf("%#v", recvOpts)).Debug("start receive command")
|
|
|
|
snapFullPath := to.FullPath(lp.ToString())
|
|
if err := zfs.ZFSRecv(ctx, lp.ToString(), to, receive, recvOpts); err != nil {
|
|
getLogger(ctx).
|
|
WithError(err).
|
|
WithField("opts", recvOpts).
|
|
Error("zfs receive failed")
|
|
return nil, err
|
|
}
|
|
|
|
// validate that we actually received what the sender claimed
|
|
toRecvd, err := to.ValidateExistsAndGetVersion(ctx, lp.ToString())
|
|
if err != nil {
|
|
msg := "receive request's `To` version does not match what we received in the stream"
|
|
getLogger(ctx).WithError(err).WithField("snap", snapFullPath).Error(msg)
|
|
getLogger(ctx).Error("aborting recv request, but keeping received snapshot for inspection")
|
|
return nil, errors.Wrap(err, msg)
|
|
}
|
|
|
|
if s.conf.UpdateLastReceivedHold {
|
|
getLogger(ctx).Debug("move last-received-hold")
|
|
if err := MoveLastReceivedHold(ctx, lp.ToString(), toRecvd, s.conf.JobID); err != nil {
|
|
return nil, errors.Wrap(err, "cannot move last-received-hold")
|
|
}
|
|
}
|
|
|
|
return &pdu.ReceiveRes{}, nil
|
|
}
|
|
|
|
func (s *Receiver) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
|
|
root := s.clientRootFromCtx(ctx)
|
|
lp, err := subroot{root}.MapToLocal(req.Filesystem)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return doDestroySnapshots(ctx, lp, req.Snapshots)
|
|
}
|
|
|
|
func (p *Receiver) HintMostRecentCommonAncestor(ctx context.Context, r *pdu.HintMostRecentCommonAncestorReq) (*pdu.HintMostRecentCommonAncestorRes, error) {
|
|
// we don't move last-received-hold as part of this hint
|
|
// because that wouldn't give us any benefit wrt resumability.
|
|
//
|
|
// Other reason: the replication logic that issues this RPC would require refactoring
|
|
// to include the receiver's FilesystemVersion in the request)
|
|
return &pdu.HintMostRecentCommonAncestorRes{}, nil
|
|
}
|
|
|
|
func (p *Receiver) SendCompleted(context.Context, *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
|
|
return &pdu.SendCompletedRes{}, nil
|
|
}
|
|
|
|
func doDestroySnapshots(ctx context.Context, lp *zfs.DatasetPath, snaps []*pdu.FilesystemVersion) (*pdu.DestroySnapshotsRes, error) {
|
|
reqs := make([]*zfs.DestroySnapOp, len(snaps))
|
|
ress := make([]*pdu.DestroySnapshotRes, len(snaps))
|
|
errs := make([]error, len(snaps))
|
|
for i, fsv := range snaps {
|
|
if fsv.Type != pdu.FilesystemVersion_Snapshot {
|
|
return nil, fmt.Errorf("version %q is not a snapshot", fsv.Name)
|
|
}
|
|
ress[i] = &pdu.DestroySnapshotRes{
|
|
Snapshot: fsv,
|
|
// Error set after batch operation
|
|
}
|
|
reqs[i] = &zfs.DestroySnapOp{
|
|
Filesystem: lp.ToString(),
|
|
Name: fsv.Name,
|
|
ErrOut: &errs[i],
|
|
}
|
|
}
|
|
zfs.ZFSDestroyFilesystemVersions(reqs)
|
|
for i := range reqs {
|
|
if errs[i] != nil {
|
|
if de, ok := errs[i].(*zfs.DestroySnapshotsError); ok && len(de.Reason) == 1 {
|
|
ress[i].Error = de.Reason[0]
|
|
} else {
|
|
ress[i].Error = errs[i].Error()
|
|
}
|
|
}
|
|
}
|
|
return &pdu.DestroySnapshotsRes{
|
|
Results: ress,
|
|
}, nil
|
|
}
|