mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-25 01:44:43 +01:00
5615f4929a
fixes https://github.com/zrepl/zrepl/issues/742 Before this PR, when chaining replication from A => B => C, if B had placeholders and the `filesystems` included these placeholders, we'd incorrectly fail the planning phase with error `sender does not have any versions`. The non-placeholder child filesystems of these placeholders would then fail to replicate because of the initial-replication-dependency-tracking that we do, i.e., their parent failed to initially replication, hence they fail to replicate as well (`parent(s) failed during initial replication`). We can do better than that because we have the information whether a sender-side filesystem is a placeholder. This PR makes the planner act on that information. The outcome is that placeholders are replicated as placeholders (albeit the receiver remains in control of how these placeholders are created, i.e., `recv.placeholders`) The mechanism to do it is: 1. Don't plan any replication steps for filesystems that are placeholders on the sender. 2. Ensure that, if a receiving-side filesystem exists, it is indeed a placeholder. Check (2) may seem overly restrictive, but, the goal here is not just to mirror all non-placeholder filesystems, but also to mirror the hierarchy. Testing performed: - [x] confirm with issue reporter that this PR fixes their issue - [x] add a regression test that fails without the changes in this PR
1107 lines
37 KiB
Go
1107 lines
37 KiB
Go
// Package endpoint implements replication endpoints for use with package replication.
|
|
package endpoint
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"path"
|
|
"strings"
|
|
|
|
"github.com/kr/pretty"
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/zrepl/zrepl/daemon/logging/trace"
|
|
|
|
"github.com/zrepl/zrepl/replication/logic/pdu"
|
|
"github.com/zrepl/zrepl/util/bandwidthlimit"
|
|
"github.com/zrepl/zrepl/util/chainedio"
|
|
"github.com/zrepl/zrepl/util/chainlock"
|
|
"github.com/zrepl/zrepl/util/envconst"
|
|
"github.com/zrepl/zrepl/util/nodefault"
|
|
"github.com/zrepl/zrepl/zfs"
|
|
zfsprop "github.com/zrepl/zrepl/zfs/property"
|
|
)
|
|
|
|
type SenderConfig struct {
|
|
FSF zfs.DatasetFilter
|
|
JobID JobID
|
|
|
|
Encrypt *nodefault.Bool
|
|
SendRaw bool
|
|
SendProperties bool
|
|
SendBackupProperties bool
|
|
SendLargeBlocks bool
|
|
SendCompressed bool
|
|
SendEmbeddedData bool
|
|
SendSaved bool
|
|
|
|
BandwidthLimit bandwidthlimit.Config
|
|
}
|
|
|
|
func (c *SenderConfig) Validate() error {
|
|
c.JobID.MustValidate()
|
|
if err := c.Encrypt.ValidateNoDefault(); err != nil {
|
|
return errors.Wrap(err, "`Encrypt` field invalid")
|
|
}
|
|
if _, err := StepHoldTag(c.JobID); err != nil {
|
|
return fmt.Errorf("JobID cannot be used for hold tag: %s", err)
|
|
}
|
|
if err := bandwidthlimit.ValidateConfig(c.BandwidthLimit); err != nil {
|
|
return errors.Wrap(err, "`BandwidthLimit` field invalid")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Sender implements replication.ReplicationEndpoint for a sending side
|
|
type Sender struct {
|
|
pdu.UnsafeReplicationServer // prefer compilation errors over default 'method X not implemented' impl
|
|
|
|
FSFilter zfs.DatasetFilter
|
|
jobId JobID
|
|
config SenderConfig
|
|
bwLimit bandwidthlimit.Wrapper
|
|
}
|
|
|
|
func NewSender(conf SenderConfig) *Sender {
|
|
if err := conf.Validate(); err != nil {
|
|
panic("invalid config" + err.Error())
|
|
}
|
|
|
|
ratelimiter := bandwidthlimit.WrapperFromConfig(conf.BandwidthLimit)
|
|
|
|
return &Sender{
|
|
FSFilter: conf.FSF,
|
|
jobId: conf.JobID,
|
|
config: conf,
|
|
bwLimit: ratelimiter,
|
|
}
|
|
}
|
|
|
|
func (s *Sender) filterCheckFS(fs string) (*zfs.DatasetPath, error) {
|
|
dp, err := zfs.NewDatasetPath(fs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if dp.Length() == 0 {
|
|
return nil, errors.New("empty filesystem not allowed")
|
|
}
|
|
pass, err := s.FSFilter.Filter(dp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !pass {
|
|
return nil, fmt.Errorf("endpoint does not allow access to filesystem %s", fs)
|
|
}
|
|
return dp, nil
|
|
}
|
|
|
|
func (s *Sender) ListFilesystems(ctx context.Context, r *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) {
|
|
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
|
|
|
fss, err := zfs.ZFSListMapping(ctx, s.FSFilter)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rfss := make([]*pdu.Filesystem, 0, len(fss))
|
|
for _, a := range fss {
|
|
// TODO: dedup code with Receiver.ListFilesystems
|
|
l := getLogger(ctx).WithField("fs", a)
|
|
ph, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, a)
|
|
if err != nil {
|
|
l.WithError(err).Error("error getting placeholder state")
|
|
return nil, errors.Wrapf(err, "cannot get placeholder state for fs %q", a)
|
|
}
|
|
l.WithField("placeholder_state", fmt.Sprintf("%#v", ph)).Debug("placeholder state")
|
|
if !ph.FSExists {
|
|
l.Error("inconsistent placeholder state: filesystem must exists")
|
|
err := errors.Errorf("inconsistent placeholder state: filesystem %q must exist in this context", a.ToString())
|
|
return nil, err
|
|
}
|
|
|
|
fs := &pdu.Filesystem{
|
|
Path: a.ToString(),
|
|
// ResumeToken does not make sense from Sender
|
|
IsPlaceholder: ph.IsPlaceholder,
|
|
}
|
|
rfss = append(rfss, fs)
|
|
}
|
|
res := &pdu.ListFilesystemRes{Filesystems: rfss}
|
|
return res, nil
|
|
}
|
|
|
|
func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error) {
|
|
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
|
|
|
lp, err := s.filterCheckFS(r.GetFilesystem())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
fsvs, err := zfs.ZFSListFilesystemVersions(ctx, lp, zfs.ListFilesystemVersionsOptions{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rfsvs := make([]*pdu.FilesystemVersion, len(fsvs))
|
|
for i := range fsvs {
|
|
rfsvs[i] = pdu.FilesystemVersionFromZFS(&fsvs[i])
|
|
}
|
|
res := &pdu.ListFilesystemVersionsRes{Versions: rfsvs}
|
|
return res, nil
|
|
|
|
}
|
|
|
|
func uncheckedSendArgsFromPDU(fsv *pdu.FilesystemVersion) *zfs.ZFSSendArgVersion {
|
|
if fsv == nil {
|
|
return nil
|
|
}
|
|
return &zfs.ZFSSendArgVersion{RelName: fsv.GetRelName(), GUID: fsv.Guid}
|
|
}
|
|
|
|
func sendArgsFromPDUAndValidateExistsAndGetVersion(ctx context.Context, fs string, fsv *pdu.FilesystemVersion) (v zfs.FilesystemVersion, err error) {
|
|
sendArgs := uncheckedSendArgsFromPDU(fsv)
|
|
if sendArgs == nil {
|
|
return v, errors.New("must not be nil")
|
|
}
|
|
version, err := sendArgs.ValidateExistsAndGetVersion(ctx, fs)
|
|
if err != nil {
|
|
return v, err
|
|
}
|
|
return version, nil
|
|
}
|
|
|
|
func (s *Sender) sendMakeArgs(ctx context.Context, r *pdu.SendReq) (sendArgs zfs.ZFSSendArgsValidated, _ error) {
|
|
|
|
_, err := s.filterCheckFS(r.Filesystem)
|
|
if err != nil {
|
|
return sendArgs, err
|
|
}
|
|
|
|
sendArgsUnvalidated := zfs.ZFSSendArgsUnvalidated{
|
|
FS: r.Filesystem,
|
|
From: uncheckedSendArgsFromPDU(r.GetFrom()), // validated by zfs.ZFSSendDry / zfs.ZFSSend
|
|
To: uncheckedSendArgsFromPDU(r.GetTo()), // validated by zfs.ZFSSendDry / zfs.ZFSSend
|
|
ZFSSendFlags: zfs.ZFSSendFlags{
|
|
ResumeToken: r.ResumeToken, // nil or not nil, depending on decoding success
|
|
Encrypted: s.config.Encrypt,
|
|
Properties: s.config.SendProperties,
|
|
BackupProperties: s.config.SendBackupProperties,
|
|
Raw: s.config.SendRaw,
|
|
LargeBlocks: s.config.SendLargeBlocks,
|
|
Compressed: s.config.SendCompressed,
|
|
EmbeddedData: s.config.SendEmbeddedData,
|
|
Saved: s.config.SendSaved,
|
|
},
|
|
}
|
|
|
|
sendArgs, err = sendArgsUnvalidated.Validate(ctx)
|
|
if err != nil {
|
|
return sendArgs, errors.Wrap(err, "validate send arguments")
|
|
}
|
|
return sendArgs, nil
|
|
}
|
|
|
|
func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) {
|
|
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
|
|
|
sendArgs, err := s.sendMakeArgs(ctx, r)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// create holds or bookmarks of `From` and `To` to guarantee one of the following:
|
|
// - that the replication step can always be resumed (`holds`),
|
|
// - that the replication step can be interrupted and a future replication
|
|
// step with same or different `To` but same `From` is still possible (`bookmarks`)
|
|
// - nothing (`none`)
|
|
//
|
|
// ...
|
|
//
|
|
// ... actually create the abstractions
|
|
replicationGuaranteeOptions, err := replicationGuaranteeOptionsFromPDU(r.GetReplicationConfig().Protection)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
replicationGuaranteeStrategy := replicationGuaranteeOptions.Strategy(sendArgs.From != nil)
|
|
liveAbs, err := replicationGuaranteeStrategy.SenderPreSend(ctx, s.jobId, &sendArgs)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
for _, a := range liveAbs {
|
|
if a != nil {
|
|
abstractionsCacheSingleton.Put(a)
|
|
}
|
|
}
|
|
|
|
// cleanup the mess that _this function_ might have created in prior failed attempts:
|
|
//
|
|
// In summary, we delete every endpoint ZFS abstraction created on this filesystem for this job id,
|
|
// except for the ones we just created above.
|
|
//
|
|
// This is the most robust approach to avoid leaking (= forgetting to clean up) endpoint ZFS abstractions,
|
|
// all under the assumption that there will only ever be one send for a (jobId,fs) combination at any given time.
|
|
//
|
|
// Note that the SendCompleted rpc can't be relied upon for this purpose:
|
|
// - it might be lost due to network errors,
|
|
// - or never be sent by a potentially malicious or buggy client,
|
|
// - or never be send because the replication step failed at some point
|
|
// (potentially leaving a resumable state on the receiver, which is the case where we really do not want to blow away the step holds too soon.)
|
|
//
|
|
// Note further that a resuming send, due to the idempotent nature of func CreateReplicationCursor and HoldStep,
|
|
// will never lose its step holds because we just (idempotently re-)created them above, before attempting the cleanup.
|
|
destroyTypes := AbstractionTypeSet{
|
|
AbstractionStepHold: true,
|
|
AbstractionTentativeReplicationCursorBookmark: true,
|
|
}
|
|
// The replication planner can also pick an endpoint zfs abstraction as FromVersion.
|
|
// Keep it, so that the replication will succeed.
|
|
//
|
|
// NB: there is no abstraction for snapshots, so, we only need to check bookmarks.
|
|
if sendArgs.FromVersion != nil && sendArgs.FromVersion.IsBookmark() {
|
|
dp, err := zfs.NewDatasetPath(sendArgs.FS)
|
|
if err != nil {
|
|
panic(err) // sendArgs is validated, this shouldn't happen
|
|
}
|
|
liveAbs = append(liveAbs, destroyTypes.ExtractBookmark(dp, sendArgs.FromVersion))
|
|
}
|
|
func() {
|
|
ctx, endSpan := trace.WithSpan(ctx, "cleanup-stale-abstractions")
|
|
defer endSpan()
|
|
|
|
keep := func(a Abstraction) (keep bool) {
|
|
keep = false
|
|
for _, k := range liveAbs {
|
|
keep = keep || AbstractionEquals(a, k)
|
|
}
|
|
return keep
|
|
}
|
|
check := func(obsoleteAbs []Abstraction) {
|
|
// Ensure that we don't delete `From` or `To`.
|
|
// Regardless of whether they are in AbstractionTypeSet or not.
|
|
// And produce a nice error message in case we do, to aid debugging the resulting panic.
|
|
//
|
|
// This is especially important for `From`. We could break incremental replication
|
|
// if we deleted the last common filesystem version between sender and receiver.
|
|
type Problem struct {
|
|
sendArgsWhat string
|
|
fullpath string
|
|
obsoleteAbs Abstraction
|
|
}
|
|
problems := make([]Problem, 0)
|
|
checkFullpaths := make(map[string]string, 2)
|
|
checkFullpaths["ToVersion"] = sendArgs.ToVersion.FullPath(sendArgs.FS)
|
|
if sendArgs.FromVersion != nil {
|
|
checkFullpaths["FromVersion"] = sendArgs.FromVersion.FullPath(sendArgs.FS)
|
|
}
|
|
for _, a := range obsoleteAbs {
|
|
for what, fullpath := range checkFullpaths {
|
|
if a.GetFullPath() == fullpath && a.GetType().IsSnapshotOrBookmark() {
|
|
problems = append(problems, Problem{
|
|
sendArgsWhat: what,
|
|
fullpath: fullpath,
|
|
obsoleteAbs: a,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
if len(problems) == 0 {
|
|
return
|
|
}
|
|
var msg strings.Builder
|
|
fmt.Fprintf(&msg, "cleaning up send stale would destroy send args:\n")
|
|
fmt.Fprintf(&msg, " SendArgs: %s\n", pretty.Sprint(sendArgs))
|
|
for _, check := range problems {
|
|
fmt.Fprintf(&msg, "would delete %s %s because it was deemed an obsolete abstraction: %s\n",
|
|
check.sendArgsWhat, check.fullpath, check.obsoleteAbs)
|
|
}
|
|
panic(msg.String())
|
|
}
|
|
abstractionsCacheSingleton.TryBatchDestroy(ctx, s.jobId, sendArgs.FS, destroyTypes, keep, check)
|
|
}()
|
|
|
|
var sendStream io.ReadCloser
|
|
sendStream, err = zfs.ZFSSend(ctx, sendArgs)
|
|
if err != nil {
|
|
// it's ok to not destroy the abstractions we just created here, a new send attempt will take care of it
|
|
return nil, nil, errors.Wrap(err, "zfs send failed")
|
|
}
|
|
|
|
// apply rate limit
|
|
sendStream = s.bwLimit.WrapReadCloser(sendStream)
|
|
|
|
res := &pdu.SendRes{
|
|
ExpectedSize: 0,
|
|
UsedResumeToken: r.ResumeToken != "",
|
|
}
|
|
|
|
return res, sendStream, nil
|
|
}
|
|
|
|
func (s *Sender) SendDry(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, error) {
|
|
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
|
|
|
sendArgs, err := s.sendMakeArgs(ctx, r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
si, err := zfs.ZFSSendDry(ctx, sendArgs)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "zfs send dry failed")
|
|
}
|
|
|
|
// From now on, assume that sendArgs has been validated by ZFSSendDry
|
|
// (because validation involves shelling out, it's actually a little expensive)
|
|
|
|
res := &pdu.SendRes{
|
|
ExpectedSize: si.SizeEstimate,
|
|
UsedResumeToken: r.ResumeToken != "",
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
|
|
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
|
|
|
orig := r.GetOriginalReq() // may be nil, always use proto getters
|
|
fsp, err := p.filterCheckFS(orig.GetFilesystem())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
fs := fsp.ToString()
|
|
|
|
var from *zfs.FilesystemVersion
|
|
if orig.GetFrom() != nil {
|
|
f, err := sendArgsFromPDUAndValidateExistsAndGetVersion(ctx, fs, orig.GetFrom()) // no shadow
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "validate `from` exists")
|
|
}
|
|
from = &f
|
|
}
|
|
to, err := sendArgsFromPDUAndValidateExistsAndGetVersion(ctx, fs, orig.GetTo())
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "validate `to` exists")
|
|
}
|
|
|
|
replicationGuaranteeOptions, err := replicationGuaranteeOptionsFromPDU(orig.GetReplicationConfig().Protection)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
liveAbs, err := replicationGuaranteeOptions.Strategy(from != nil).SenderPostRecvConfirmed(ctx, p.jobId, fs, to)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, a := range liveAbs {
|
|
if a != nil {
|
|
abstractionsCacheSingleton.Put(a)
|
|
}
|
|
}
|
|
keep := func(a Abstraction) (keep bool) {
|
|
keep = false
|
|
for _, k := range liveAbs {
|
|
keep = keep || AbstractionEquals(a, k)
|
|
}
|
|
return keep
|
|
}
|
|
destroyTypes := AbstractionTypeSet{
|
|
AbstractionStepHold: true,
|
|
AbstractionTentativeReplicationCursorBookmark: true,
|
|
AbstractionReplicationCursorBookmarkV2: true,
|
|
}
|
|
abstractionsCacheSingleton.TryBatchDestroy(ctx, p.jobId, fs, destroyTypes, keep, nil)
|
|
|
|
return &pdu.SendCompletedRes{}, nil
|
|
|
|
}
|
|
|
|
func (p *Sender) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
|
|
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
|
|
|
dp, err := p.filterCheckFS(req.Filesystem)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return doDestroySnapshots(ctx, dp, req.Snapshots)
|
|
}
|
|
|
|
func (p *Sender) Ping(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
|
|
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
|
|
|
res := pdu.PingRes{
|
|
Echo: req.GetMessage(),
|
|
}
|
|
return &res, nil
|
|
}
|
|
|
|
func (p *Sender) PingDataconn(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
|
|
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
|
|
|
return p.Ping(ctx, req)
|
|
}
|
|
|
|
func (p *Sender) WaitForConnectivity(ctx context.Context) error {
|
|
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Sender) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) {
|
|
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
|
|
|
dp, err := p.filterCheckFS(req.Filesystem)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cursor, err := GetMostRecentReplicationCursorOfJob(ctx, dp.ToString(), p.jobId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if cursor == nil {
|
|
return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Notexist{Notexist: true}}, nil
|
|
}
|
|
return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Guid{Guid: cursor.Guid}}, nil
|
|
}
|
|
|
|
func (p *Sender) Receive(ctx context.Context, r *pdu.ReceiveReq, _ io.ReadCloser) (*pdu.ReceiveRes, error) {
|
|
return nil, fmt.Errorf("sender does not implement Receive()")
|
|
}
|
|
|
|
type FSFilter interface { // FIXME unused
|
|
Filter(path *zfs.DatasetPath) (pass bool, err error)
|
|
UserSpecifiedDatasets() zfs.UserSpecifiedDatasetsSet
|
|
}
|
|
|
|
// FIXME: can we get away without error types here?
|
|
type FSMap interface { // FIXME unused
|
|
FSFilter
|
|
Map(path *zfs.DatasetPath) (*zfs.DatasetPath, error)
|
|
Invert() (FSMap, error)
|
|
AsFilter() FSFilter
|
|
}
|
|
|
|
// NOTE: when adding members to this struct, remember
|
|
// to add them to `ReceiverConfig.copyIn()`
|
|
type ReceiverConfig struct {
|
|
JobID JobID
|
|
|
|
RootWithoutClientComponent *zfs.DatasetPath
|
|
AppendClientIdentity bool
|
|
|
|
InheritProperties []zfsprop.Property
|
|
OverrideProperties map[zfsprop.Property]string
|
|
|
|
BandwidthLimit bandwidthlimit.Config
|
|
|
|
PlaceholderEncryption PlaceholderCreationEncryptionProperty
|
|
}
|
|
|
|
//go:generate enumer -type=PlaceholderCreationEncryptionProperty -transform=kebab -trimprefix=PlaceholderCreationEncryptionProperty
|
|
type PlaceholderCreationEncryptionProperty int
|
|
|
|
// Note: the constant names, transformed through enumer, are part of the config format!
|
|
const (
|
|
PlaceholderCreationEncryptionPropertyUnspecified PlaceholderCreationEncryptionProperty = 1 << iota
|
|
PlaceholderCreationEncryptionPropertyInherit
|
|
PlaceholderCreationEncryptionPropertyOff
|
|
)
|
|
|
|
func (c *ReceiverConfig) copyIn() {
|
|
c.RootWithoutClientComponent = c.RootWithoutClientComponent.Copy()
|
|
|
|
pInherit := make([]zfsprop.Property, len(c.InheritProperties))
|
|
copy(pInherit, c.InheritProperties)
|
|
c.InheritProperties = pInherit
|
|
|
|
pOverride := make(map[zfsprop.Property]string, len(c.OverrideProperties))
|
|
for key, value := range c.OverrideProperties {
|
|
pOverride[key] = value
|
|
}
|
|
c.OverrideProperties = pOverride
|
|
}
|
|
|
|
func (c *ReceiverConfig) Validate() error {
|
|
c.JobID.MustValidate()
|
|
|
|
for _, prop := range c.InheritProperties {
|
|
err := prop.Validate()
|
|
if err != nil {
|
|
return errors.Wrapf(err, "inherit property %q", prop)
|
|
}
|
|
}
|
|
|
|
for prop := range c.OverrideProperties {
|
|
err := prop.Validate()
|
|
if err != nil {
|
|
return errors.Wrapf(err, "override property %q", prop)
|
|
}
|
|
}
|
|
|
|
if c.RootWithoutClientComponent.Length() <= 0 {
|
|
return errors.New("RootWithoutClientComponent must not be an empty dataset path")
|
|
}
|
|
|
|
if err := bandwidthlimit.ValidateConfig(c.BandwidthLimit); err != nil {
|
|
return errors.Wrap(err, "`BandwidthLimit` field invalid")
|
|
}
|
|
|
|
if !c.PlaceholderEncryption.IsAPlaceholderCreationEncryptionProperty() {
|
|
return errors.Errorf("`PlaceholderEncryption` field is invalid")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Receiver implements replication.ReplicationEndpoint for a receiving side
|
|
type Receiver struct {
|
|
pdu.UnsafeReplicationServer // prefer compilation errors over default 'method X not implemented' impl
|
|
|
|
conf ReceiverConfig // validated
|
|
|
|
bwLimit bandwidthlimit.Wrapper
|
|
|
|
recvParentCreationMtx *chainlock.L
|
|
|
|
Test_OverrideClientIdentityFunc func() string // for use by platformtest
|
|
}
|
|
|
|
func NewReceiver(config ReceiverConfig) *Receiver {
|
|
config.copyIn()
|
|
if err := config.Validate(); err != nil {
|
|
panic(err)
|
|
}
|
|
return &Receiver{
|
|
conf: config,
|
|
recvParentCreationMtx: chainlock.New(),
|
|
bwLimit: bandwidthlimit.WrapperFromConfig(config.BandwidthLimit),
|
|
}
|
|
}
|
|
|
|
func TestClientIdentity(rootFS *zfs.DatasetPath, clientIdentity string) error {
|
|
_, err := clientRoot(rootFS, clientIdentity)
|
|
return err
|
|
}
|
|
|
|
func clientRoot(rootFS *zfs.DatasetPath, clientIdentity string) (*zfs.DatasetPath, error) {
|
|
rootFSLen := rootFS.Length()
|
|
clientRootStr := path.Join(rootFS.ToString(), clientIdentity)
|
|
clientRoot, err := zfs.NewDatasetPath(clientRootStr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if rootFSLen+1 != clientRoot.Length() {
|
|
return nil, fmt.Errorf("client identity must be a single ZFS filesystem path component")
|
|
}
|
|
return clientRoot, nil
|
|
}
|
|
|
|
func (s *Receiver) clientRootFromCtx(ctx context.Context) *zfs.DatasetPath {
|
|
if !s.conf.AppendClientIdentity {
|
|
return s.conf.RootWithoutClientComponent.Copy()
|
|
}
|
|
|
|
var clientIdentity string
|
|
if s.Test_OverrideClientIdentityFunc != nil {
|
|
clientIdentity = s.Test_OverrideClientIdentityFunc()
|
|
} else {
|
|
var ok bool
|
|
clientIdentity, ok = ctx.Value(ClientIdentityKey).(string) // no shadow
|
|
if !ok {
|
|
panic("ClientIdentityKey context value must be set")
|
|
}
|
|
}
|
|
|
|
clientRoot, err := clientRoot(s.conf.RootWithoutClientComponent, clientIdentity)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("ClientIdentityContextKey must have been validated before invoking Receiver: %s", err))
|
|
}
|
|
return clientRoot
|
|
}
|
|
|
|
type subroot struct {
|
|
localRoot *zfs.DatasetPath
|
|
}
|
|
|
|
var _ zfs.DatasetFilter = subroot{}
|
|
|
|
// Filters local p
|
|
func (f subroot) Filter(p *zfs.DatasetPath) (pass bool, err error) {
|
|
return p.HasPrefix(f.localRoot) && !p.Equal(f.localRoot), nil
|
|
}
|
|
|
|
func (f subroot) UserSpecifiedDatasets() zfs.UserSpecifiedDatasetsSet {
|
|
return zfs.UserSpecifiedDatasetsSet{
|
|
f.localRoot.ToString(): true,
|
|
}
|
|
}
|
|
|
|
func (f subroot) MapToLocal(fs string) (*zfs.DatasetPath, error) {
|
|
p, err := zfs.NewDatasetPath(fs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if p.Length() == 0 {
|
|
return nil, errors.Errorf("cannot map empty filesystem")
|
|
}
|
|
c := f.localRoot.Copy()
|
|
c.Extend(p)
|
|
return c, nil
|
|
}
|
|
|
|
func (s *Receiver) ListFilesystems(ctx context.Context, req *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) {
|
|
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
|
|
|
// first make sure that root_fs is imported
|
|
if rphs, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, s.conf.RootWithoutClientComponent); err != nil {
|
|
return nil, errors.Wrap(err, "cannot determine whether root_fs exists")
|
|
} else if !rphs.FSExists {
|
|
getLogger(ctx).WithField("root_fs", s.conf.RootWithoutClientComponent).Error("root_fs does not exist")
|
|
return nil, errors.Errorf("root_fs does not exist")
|
|
}
|
|
|
|
root := s.clientRootFromCtx(ctx)
|
|
filtered, err := zfs.ZFSListMapping(ctx, subroot{root})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// present filesystem without the root_fs prefix
|
|
fss := make([]*pdu.Filesystem, 0, len(filtered))
|
|
for _, a := range filtered {
|
|
l := getLogger(ctx).WithField("fs", a)
|
|
ph, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, a)
|
|
if err != nil {
|
|
l.WithError(err).Error("error getting placeholder state")
|
|
return nil, errors.Wrapf(err, "cannot get placeholder state for fs %q", a)
|
|
}
|
|
l.WithField("placeholder_state", fmt.Sprintf("%#v", ph)).Debug("placeholder state")
|
|
if !ph.FSExists {
|
|
l.Error("inconsistent placeholder state: filesystem must exists")
|
|
err := errors.Errorf("inconsistent placeholder state: filesystem %q must exist in this context", a.ToString())
|
|
return nil, err
|
|
}
|
|
token, err := zfs.ZFSGetReceiveResumeTokenOrEmptyStringIfNotSupported(ctx, a)
|
|
if err != nil {
|
|
l.WithError(err).Error("cannot get receive resume token")
|
|
return nil, err
|
|
}
|
|
l.WithField("receive_resume_token", token).Debug("receive resume token")
|
|
|
|
a.TrimPrefix(root)
|
|
|
|
fs := &pdu.Filesystem{
|
|
Path: a.ToString(),
|
|
IsPlaceholder: ph.IsPlaceholder,
|
|
ResumeToken: token,
|
|
}
|
|
fss = append(fss, fs)
|
|
}
|
|
if len(fss) == 0 {
|
|
getLogger(ctx).Debug("no filesystems found")
|
|
return &pdu.ListFilesystemRes{}, nil
|
|
}
|
|
return &pdu.ListFilesystemRes{Filesystems: fss}, nil
|
|
}
|
|
|
|
func (s *Receiver) ListFilesystemVersions(ctx context.Context, req *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error) {
|
|
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
|
|
|
root := s.clientRootFromCtx(ctx)
|
|
lp, err := subroot{root}.MapToLocal(req.GetFilesystem())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// TODO share following code with sender
|
|
|
|
fsvs, err := zfs.ZFSListFilesystemVersions(ctx, lp, zfs.ListFilesystemVersionsOptions{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rfsvs := make([]*pdu.FilesystemVersion, len(fsvs))
|
|
for i := range fsvs {
|
|
rfsvs[i] = pdu.FilesystemVersionFromZFS(&fsvs[i])
|
|
}
|
|
|
|
return &pdu.ListFilesystemVersionsRes{Versions: rfsvs}, nil
|
|
}
|
|
|
|
func (s *Receiver) Ping(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
|
|
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
|
|
|
res := pdu.PingRes{
|
|
Echo: req.GetMessage(),
|
|
}
|
|
return &res, nil
|
|
}
|
|
|
|
func (s *Receiver) PingDataconn(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
|
|
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
|
return s.Ping(ctx, req)
|
|
}
|
|
|
|
func (s *Receiver) WaitForConnectivity(ctx context.Context) error {
|
|
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
|
return nil
|
|
}
|
|
|
|
func (s *Receiver) ReplicationCursor(ctx context.Context, _ *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) {
|
|
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
|
return nil, fmt.Errorf("ReplicationCursor not implemented for Receiver")
|
|
}
|
|
|
|
func (s *Receiver) Send(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) {
|
|
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
|
return nil, nil, fmt.Errorf("receiver does not implement Send()")
|
|
}
|
|
|
|
func (s *Receiver) SendDry(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, error) {
|
|
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
|
return nil, fmt.Errorf("receiver does not implement SendDry()")
|
|
}
|
|
|
|
func (s *Receiver) receive_GetPlaceholderCreationEncryptionValue(client_root, path *zfs.DatasetPath) (zfs.FilesystemPlaceholderCreateEncryptionValue, error) {
|
|
if !s.conf.PlaceholderEncryption.IsAPlaceholderCreationEncryptionProperty() {
|
|
panic(s.conf.PlaceholderEncryption)
|
|
}
|
|
|
|
if client_root.Equal(path) && s.conf.PlaceholderEncryption == PlaceholderCreationEncryptionPropertyUnspecified {
|
|
// If our Receiver is configured to append a client component to s.conf.RootWithoutClientComponent
|
|
// then that dataset is always going to be a placeholder.
|
|
// We don't want to burden users with the concept of placeholders if their `filesystems` filter on the sender
|
|
// doesn't introduce any gaps.
|
|
// Since the dataset hierarchy up to and including that client component dataset is still fully controlled by us,
|
|
// using `inherit` is going to make it work in all expected use cases.
|
|
return zfs.FilesystemPlaceholderCreateEncryptionInherit, nil
|
|
}
|
|
|
|
switch s.conf.PlaceholderEncryption {
|
|
case PlaceholderCreationEncryptionPropertyUnspecified:
|
|
return 0, fmt.Errorf("placeholder filesystem encryption handling is unspecified in receiver config")
|
|
case PlaceholderCreationEncryptionPropertyInherit:
|
|
return zfs.FilesystemPlaceholderCreateEncryptionInherit, nil
|
|
case PlaceholderCreationEncryptionPropertyOff:
|
|
return zfs.FilesystemPlaceholderCreateEncryptionOff, nil
|
|
default:
|
|
panic(s.conf.PlaceholderEncryption)
|
|
}
|
|
}
|
|
|
|
func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.ReadCloser) (*pdu.ReceiveRes, error) {
|
|
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
|
|
|
getLogger(ctx).Debug("incoming Receive")
|
|
defer receive.Close()
|
|
|
|
root := s.clientRootFromCtx(ctx)
|
|
lp, err := subroot{root}.MapToLocal(req.Filesystem)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "`Filesystem` invalid")
|
|
}
|
|
|
|
to := uncheckedSendArgsFromPDU(req.GetTo())
|
|
if to == nil {
|
|
return nil, errors.New("`To` must not be nil")
|
|
}
|
|
if !to.IsSnapshot() {
|
|
return nil, errors.New("`To` must be a snapshot")
|
|
}
|
|
|
|
// create placeholder parent filesystems as appropriate
|
|
//
|
|
// Manipulating the ZFS dataset hierarchy must happen exclusively.
|
|
// TODO: Use fine-grained locking to allow separate clients / requests to pass
|
|
// through the following section concurrently when operating on disjoint
|
|
// ZFS dataset hierarchy subtrees.
|
|
var visitErr error
|
|
func() {
|
|
getLogger(ctx).Debug("begin acquire recvParentCreationMtx")
|
|
defer s.recvParentCreationMtx.Lock().Unlock()
|
|
getLogger(ctx).Debug("end acquire recvParentCreationMtx")
|
|
defer getLogger(ctx).Debug("release recvParentCreationMtx")
|
|
|
|
f := zfs.NewDatasetPathForest()
|
|
f.Add(lp)
|
|
getLogger(ctx).Debug("begin tree-walk")
|
|
f.WalkTopDown(func(v *zfs.DatasetPathVisit) (visitChildTree bool) {
|
|
if v.Path.Equal(lp) {
|
|
return false
|
|
}
|
|
|
|
l := getLogger(ctx).
|
|
WithField("placeholder_fs", v.Path.ToString()).
|
|
WithField("receive_fs", lp.ToString())
|
|
|
|
ph, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, v.Path)
|
|
l.WithField("placeholder_state", fmt.Sprintf("%#v", ph)).
|
|
WithField("err", fmt.Sprintf("%s", err)).
|
|
WithField("errType", fmt.Sprintf("%T", err)).
|
|
Debug("get placeholder state for filesystem")
|
|
if err != nil {
|
|
visitErr = errors.Wrapf(err, "cannot get placeholder state of %s", v.Path.ToString())
|
|
return false
|
|
}
|
|
|
|
if !ph.FSExists {
|
|
if s.conf.RootWithoutClientComponent.HasPrefix(v.Path) {
|
|
if v.Path.Length() == 1 {
|
|
visitErr = fmt.Errorf("pool %q not imported", v.Path.ToString())
|
|
} else {
|
|
visitErr = fmt.Errorf("root_fs %q does not exist", s.conf.RootWithoutClientComponent.ToString())
|
|
}
|
|
l.WithError(visitErr).Error("placeholders are only created automatically below root_fs")
|
|
return false
|
|
}
|
|
|
|
// compute the value lazily so that users who don't rely on
|
|
// placeholders can use the default value PlaceholderCreationEncryptionPropertyUnspecified
|
|
placeholderEncryption, err := s.receive_GetPlaceholderCreationEncryptionValue(root, v.Path)
|
|
if err != nil {
|
|
l.WithError(err).Error("cannot create placeholder filesystem") // logger already contains path
|
|
visitErr = errors.Wrapf(err, "cannot create placeholder filesystem %s", v.Path.ToString())
|
|
return false
|
|
}
|
|
|
|
l := l.WithField("encryption", placeholderEncryption)
|
|
|
|
l.Debug("creating placeholder filesystem")
|
|
err = zfs.ZFSCreatePlaceholderFilesystem(ctx, v.Path, v.Parent.Path, placeholderEncryption)
|
|
if err != nil {
|
|
l.WithError(err).Error("cannot create placeholder filesystem") // logger already contains path
|
|
visitErr = errors.Wrapf(err, "cannot create placeholder filesystem %s", v.Path.ToString())
|
|
return false
|
|
}
|
|
l.Info("created placeholder filesystem")
|
|
return true
|
|
} else {
|
|
l.Debug("filesystem exists")
|
|
return true // leave this fs as is
|
|
}
|
|
})
|
|
}()
|
|
getLogger(ctx).WithField("visitErr", visitErr).Debug("complete tree-walk")
|
|
if visitErr != nil {
|
|
return nil, visitErr
|
|
}
|
|
|
|
log := getLogger(ctx).WithField("proto_fs", req.GetFilesystem()).WithField("local_fs", lp.ToString())
|
|
|
|
// determine whether we need to rollback the filesystem / change its placeholder state
|
|
var clearPlaceholderProperty bool
|
|
var recvOpts zfs.RecvOptions
|
|
ph, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, lp)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "cannot get placeholder state")
|
|
}
|
|
log.WithField("placeholder_state", fmt.Sprintf("%#v", ph)).Debug("placeholder state")
|
|
|
|
recvOpts.InheritProperties = s.conf.InheritProperties
|
|
recvOpts.OverrideProperties = s.conf.OverrideProperties
|
|
|
|
if ph.FSExists && ph.IsPlaceholder {
|
|
recvOpts.RollbackAndForceRecv = true
|
|
clearPlaceholderProperty = true
|
|
}
|
|
|
|
if clearPlaceholderProperty {
|
|
log.Info("clearing placeholder property")
|
|
if err := zfs.ZFSSetPlaceholder(ctx, lp, false); err != nil {
|
|
return nil, fmt.Errorf("cannot clear placeholder property for forced receive: %s", err)
|
|
}
|
|
}
|
|
|
|
if req.ClearResumeToken && ph.FSExists {
|
|
log.Info("clearing resume token")
|
|
if err := zfs.ZFSRecvClearResumeToken(ctx, lp.ToString()); err != nil {
|
|
return nil, errors.Wrap(err, "cannot clear resume token")
|
|
}
|
|
}
|
|
|
|
recvOpts.SavePartialRecvState, err = zfs.ResumeRecvSupported(ctx, lp)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "cannot determine whether we can use resumable send & recv")
|
|
}
|
|
|
|
// apply rate limit
|
|
receive = s.bwLimit.WrapReadCloser(receive)
|
|
|
|
var peek bytes.Buffer
|
|
var MaxPeek = envconst.Int64("ZREPL_ENDPOINT_RECV_PEEK_SIZE", 1<<20)
|
|
log.WithField("max_peek_bytes", MaxPeek).Info("peeking incoming stream")
|
|
if _, err := io.Copy(&peek, io.LimitReader(receive, MaxPeek)); err != nil {
|
|
log.WithError(err).Error("cannot read peek-buffer from send stream")
|
|
}
|
|
var peekCopy bytes.Buffer
|
|
if n, err := peekCopy.Write(peek.Bytes()); err != nil || n != peek.Len() {
|
|
panic(peek.Len())
|
|
}
|
|
|
|
log.WithField("opts", fmt.Sprintf("%#v", recvOpts)).Debug("start receive command")
|
|
|
|
snapFullPath := to.FullPath(lp.ToString())
|
|
if err := zfs.ZFSRecv(ctx, lp.ToString(), to, chainedio.NewChainedReader(&peek, receive), recvOpts); err != nil {
|
|
|
|
// best-effort rollback of placeholder state if the recv didn't start
|
|
_, resumableStatePresent := err.(*zfs.RecvFailedWithResumeTokenErr)
|
|
disablePlaceholderRestoration := envconst.Bool("ZREPL_ENDPOINT_DISABLE_PLACEHOLDER_RESTORATION", false)
|
|
placeholderRestored := !ph.IsPlaceholder
|
|
if !disablePlaceholderRestoration && !resumableStatePresent && recvOpts.RollbackAndForceRecv && ph.FSExists && ph.IsPlaceholder && clearPlaceholderProperty {
|
|
log.Info("restoring placeholder property")
|
|
if phErr := zfs.ZFSSetPlaceholder(ctx, lp, true); phErr != nil {
|
|
log.WithError(phErr).Error("cannot restore placeholder property after failed receive, subsequent replications will likely fail with a different error")
|
|
// fallthrough
|
|
} else {
|
|
placeholderRestored = true
|
|
}
|
|
// fallthrough
|
|
}
|
|
|
|
// deal with failing initial encrypted send & recv
|
|
if _, ok := err.(*zfs.RecvDestroyOrOverwriteEncryptedErr); ok && ph.IsPlaceholder && placeholderRestored {
|
|
msg := `cannot automatically replace placeholder filesystem with incoming send stream - please see receive-side log for details`
|
|
err := errors.New(msg)
|
|
log.Error(msg)
|
|
|
|
log.Error(`zrepl creates placeholder filesystems on the receiving side of a replication to match the sending side's dataset hierarchy`)
|
|
log.Error(`zrepl uses zfs receive -F to replace those placeholders with incoming full sends`)
|
|
log.Error(`OpenZFS native encryption prohibits zfs receive -F for encrypted filesystems`)
|
|
log.Error(`the current zrepl placeholder filesystem concept is thus incompatible with OpenZFS native encryption`)
|
|
|
|
tempStartFullRecvFS := lp.Copy().ToString() + ".zrepl.initial-recv"
|
|
tempStartFullRecvFSDP, dpErr := zfs.NewDatasetPath(tempStartFullRecvFS)
|
|
if dpErr != nil {
|
|
log.WithError(dpErr).Error("cannot determine temporary filesystem name for initial encrypted recv workaround")
|
|
return nil, err // yes, err, not dpErr
|
|
}
|
|
|
|
log := log.WithField("temp_recv_fs", tempStartFullRecvFS)
|
|
log.Error(`as a workaround, zrepl will now attempt to re-receive the beginning of the stream into a temporary filesystem temp_recv_fs`)
|
|
log.Error(`if that step succeeds: shut down zrepl and use 'zfs rename' to swap temp_recv_fs with local_fs, then restart zrepl`)
|
|
log.Error(`replication will then resume using resumable send+recv`)
|
|
|
|
tempPH, phErr := zfs.ZFSGetFilesystemPlaceholderState(ctx, tempStartFullRecvFSDP)
|
|
if phErr != nil {
|
|
log.WithError(phErr).Error("cannot determine placeholder state of temp_recv_fs")
|
|
return nil, err // yes, err, not dpErr
|
|
}
|
|
if tempPH.FSExists {
|
|
log.Error("temp_recv_fs already exists, assuming a (partial) initial recv to that filesystem has already been done")
|
|
return nil, err
|
|
}
|
|
|
|
recvOpts.RollbackAndForceRecv = false
|
|
recvOpts.SavePartialRecvState = true
|
|
rerecvErr := zfs.ZFSRecv(ctx, tempStartFullRecvFS, to, chainedio.NewChainedReader(&peekCopy), recvOpts)
|
|
if _, isResumable := rerecvErr.(*zfs.RecvFailedWithResumeTokenErr); rerecvErr == nil || isResumable {
|
|
log.Error("completed re-receive into temporary filesystem temp_recv_fs, now shut down zrepl and use zfs rename to swap temp_recv_fs with local_fs")
|
|
} else {
|
|
log.WithError(rerecvErr).Error("failed to receive the beginning of the stream into temporary filesystem temp_recv_fs")
|
|
log.Error("we advise you to collect the error log and current configuration, open an issue on GitHub, and revert to your previous configuration in the meantime")
|
|
}
|
|
|
|
log.Error(`if you would like to see improvements to this situation, please open an issue on GitHub`)
|
|
return nil, err
|
|
}
|
|
|
|
log.
|
|
WithError(err).
|
|
WithField("opts", fmt.Sprintf("%#v", recvOpts)).
|
|
Error("zfs receive failed")
|
|
|
|
return nil, err
|
|
}
|
|
|
|
// validate that we actually received what the sender claimed
|
|
toRecvd, err := to.ValidateExistsAndGetVersion(ctx, lp.ToString())
|
|
if err != nil {
|
|
msg := "receive request's `To` version does not match what we received in the stream"
|
|
log.WithError(err).WithField("snap", snapFullPath).Error(msg)
|
|
log.Error("aborting recv request, but keeping received snapshot for inspection")
|
|
return nil, errors.Wrap(err, msg)
|
|
}
|
|
|
|
replicationGuaranteeOptions, err := replicationGuaranteeOptionsFromPDU(req.GetReplicationConfig().Protection)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
replicationGuaranteeStrategy := replicationGuaranteeOptions.Strategy(ph.FSExists)
|
|
liveAbs, err := replicationGuaranteeStrategy.ReceiverPostRecv(ctx, s.conf.JobID, lp.ToString(), toRecvd)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, a := range liveAbs {
|
|
if a != nil {
|
|
abstractionsCacheSingleton.Put(a)
|
|
}
|
|
}
|
|
keep := func(a Abstraction) (keep bool) {
|
|
keep = false
|
|
for _, k := range liveAbs {
|
|
keep = keep || AbstractionEquals(a, k)
|
|
}
|
|
return keep
|
|
}
|
|
check := func(obsoleteAbs []Abstraction) {
|
|
for _, abs := range obsoleteAbs {
|
|
if zfs.FilesystemVersionEqualIdentity(abs.GetFilesystemVersion(), toRecvd) {
|
|
panic(fmt.Sprintf("would destroy endpoint abstraction around the filesystem version we just received %s", abs))
|
|
}
|
|
}
|
|
}
|
|
destroyTypes := AbstractionTypeSet{
|
|
AbstractionLastReceivedHold: true,
|
|
}
|
|
abstractionsCacheSingleton.TryBatchDestroy(ctx, s.conf.JobID, lp.ToString(), destroyTypes, keep, check)
|
|
|
|
return &pdu.ReceiveRes{}, nil
|
|
}
|
|
|
|
func (s *Receiver) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
|
|
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
|
|
|
root := s.clientRootFromCtx(ctx)
|
|
lp, err := subroot{root}.MapToLocal(req.Filesystem)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return doDestroySnapshots(ctx, lp, req.Snapshots)
|
|
}
|
|
|
|
func (p *Receiver) SendCompleted(ctx context.Context, _ *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
|
|
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
|
|
|
|
return &pdu.SendCompletedRes{}, nil
|
|
}
|
|
|
|
func doDestroySnapshots(ctx context.Context, lp *zfs.DatasetPath, snaps []*pdu.FilesystemVersion) (*pdu.DestroySnapshotsRes, error) {
|
|
reqs := make([]*zfs.DestroySnapOp, len(snaps))
|
|
ress := make([]*pdu.DestroySnapshotRes, len(snaps))
|
|
errs := make([]error, len(snaps))
|
|
for i, fsv := range snaps {
|
|
if fsv.Type != pdu.FilesystemVersion_Snapshot {
|
|
return nil, fmt.Errorf("version %q is not a snapshot", fsv.Name)
|
|
}
|
|
ress[i] = &pdu.DestroySnapshotRes{
|
|
Snapshot: fsv,
|
|
// Error set after batch operation
|
|
}
|
|
reqs[i] = &zfs.DestroySnapOp{
|
|
Filesystem: lp.ToString(),
|
|
Name: fsv.Name,
|
|
ErrOut: &errs[i],
|
|
}
|
|
}
|
|
zfs.ZFSDestroyFilesystemVersions(ctx, reqs)
|
|
for i := range reqs {
|
|
if errs[i] != nil {
|
|
if de, ok := errs[i].(*zfs.DestroySnapshotsError); ok && len(de.Reason) == 1 {
|
|
ress[i].Error = de.Reason[0]
|
|
} else {
|
|
ress[i].Error = errs[i].Error()
|
|
}
|
|
}
|
|
}
|
|
return &pdu.DestroySnapshotsRes{
|
|
Results: ress,
|
|
}, nil
|
|
}
|