// Package endpoint implements replication endpoints for use with package replication.
package endpoint

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"path"
	"strings"

	"github.com/kr/pretty"
	"github.com/pkg/errors"

	"github.com/zrepl/zrepl/internal/daemon/logging/trace"

	"github.com/zrepl/zrepl/internal/replication/logic/pdu"
	"github.com/zrepl/zrepl/internal/util/bandwidthlimit"
	"github.com/zrepl/zrepl/internal/util/chainedio"
	"github.com/zrepl/zrepl/internal/util/chainlock"
	"github.com/zrepl/zrepl/internal/util/envconst"
	"github.com/zrepl/zrepl/internal/util/nodefault"
	"github.com/zrepl/zrepl/internal/zfs"
	zfsprop "github.com/zrepl/zrepl/internal/zfs/property"
)

type SenderConfig struct {
	FSF   zfs.DatasetFilter
	JobID JobID

	Encrypt              *nodefault.Bool
	SendRaw              bool
	SendProperties       bool
	SendBackupProperties bool
	SendLargeBlocks      bool
	SendCompressed       bool
	SendEmbeddedData     bool
	SendSaved            bool

	BandwidthLimit bandwidthlimit.Config
}

func (c *SenderConfig) Validate() error {
	c.JobID.MustValidate()
	if err := c.Encrypt.ValidateNoDefault(); err != nil {
		return errors.Wrap(err, "`Encrypt` field invalid")
	}
	if _, err := StepHoldTag(c.JobID); err != nil {
		return fmt.Errorf("JobID cannot be used for hold tag: %s", err)
	}
	if err := bandwidthlimit.ValidateConfig(c.BandwidthLimit); err != nil {
		return errors.Wrap(err, "`BandwidthLimit` field invalid")
	}
	return nil
}

// Sender implements replication.ReplicationEndpoint for a sending side
type Sender struct {
	pdu.UnsafeReplicationServer // prefer compilation errors over default 'method X not implemented' impl

	FSFilter zfs.DatasetFilter
	jobId    JobID
	config   SenderConfig
	bwLimit  bandwidthlimit.Wrapper
}

func NewSender(conf SenderConfig) *Sender {
	if err := conf.Validate(); err != nil {
		panic("invalid config" + err.Error())
	}

	ratelimiter := bandwidthlimit.WrapperFromConfig(conf.BandwidthLimit)

	return &Sender{
		FSFilter: conf.FSF,
		jobId:    conf.JobID,
		config:   conf,
		bwLimit:  ratelimiter,
	}
}

func (s *Sender) filterCheckFS(fs string) (*zfs.DatasetPath, error) {
	dp, err := zfs.NewDatasetPath(fs)
	if err != nil {
		return nil, err
	}
	if dp.Length() == 0 {
		return nil, errors.New("empty filesystem not allowed")
	}
	pass, err := s.FSFilter.Filter(dp)
	if err != nil {
		return nil, err
	}
	if !pass {
		return nil, fmt.Errorf("endpoint does not allow access to filesystem %s", fs)
	}
	return dp, nil
}

func (s *Sender) ListFilesystems(ctx context.Context, r *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) {
	defer trace.WithSpanFromStackUpdateCtx(&ctx)()

	fss, err := zfs.ZFSListMapping(ctx, s.FSFilter)
	if err != nil {
		return nil, err
	}
	rfss := make([]*pdu.Filesystem, 0, len(fss))
	for _, a := range fss {
		// TODO: dedup code with Receiver.ListFilesystems
		l := getLogger(ctx).WithField("fs", a)
		ph, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, a)
		if err != nil {
			l.WithError(err).Error("error getting placeholder state")
			return nil, errors.Wrapf(err, "cannot get placeholder state for fs %q", a)
		}
		l.WithField("placeholder_state", fmt.Sprintf("%#v", ph)).Debug("placeholder state")
		if !ph.FSExists {
			l.Error("inconsistent placeholder state: filesystem must exists")
			err := errors.Errorf("inconsistent placeholder state: filesystem %q must exist in this context", a.ToString())
			return nil, err
		}

		fs := &pdu.Filesystem{
			Path: a.ToString(),
			// ResumeToken does not make sense from Sender
			IsPlaceholder: ph.IsPlaceholder,
		}
		rfss = append(rfss, fs)
	}
	res := &pdu.ListFilesystemRes{Filesystems: rfss}
	return res, nil
}

func (s *Sender) ListFilesystemVersions(ctx context.Context, r *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error) {
	defer trace.WithSpanFromStackUpdateCtx(&ctx)()

	lp, err := s.filterCheckFS(r.GetFilesystem())
	if err != nil {
		return nil, err
	}
	fsvs, err := zfs.ZFSListFilesystemVersions(ctx, lp, zfs.ListFilesystemVersionsOptions{})
	if err != nil {
		return nil, err
	}
	rfsvs := make([]*pdu.FilesystemVersion, len(fsvs))
	for i := range fsvs {
		rfsvs[i] = pdu.FilesystemVersionFromZFS(&fsvs[i])
	}
	res := &pdu.ListFilesystemVersionsRes{Versions: rfsvs}
	return res, nil

}

func uncheckedSendArgsFromPDU(fsv *pdu.FilesystemVersion) *zfs.ZFSSendArgVersion {
	if fsv == nil {
		return nil
	}
	return &zfs.ZFSSendArgVersion{RelName: fsv.GetRelName(), GUID: fsv.Guid}
}

func sendArgsFromPDUAndValidateExistsAndGetVersion(ctx context.Context, fs string, fsv *pdu.FilesystemVersion) (v zfs.FilesystemVersion, err error) {
	sendArgs := uncheckedSendArgsFromPDU(fsv)
	if sendArgs == nil {
		return v, errors.New("must not be nil")
	}
	version, err := sendArgs.ValidateExistsAndGetVersion(ctx, fs)
	if err != nil {
		return v, err
	}
	return version, nil
}

func (s *Sender) sendMakeArgs(ctx context.Context, r *pdu.SendReq) (sendArgs zfs.ZFSSendArgsValidated, _ error) {

	_, err := s.filterCheckFS(r.Filesystem)
	if err != nil {
		return sendArgs, err
	}

	sendArgsUnvalidated := zfs.ZFSSendArgsUnvalidated{
		FS:   r.Filesystem,
		From: uncheckedSendArgsFromPDU(r.GetFrom()), // validated by zfs.ZFSSendDry / zfs.ZFSSend
		To:   uncheckedSendArgsFromPDU(r.GetTo()),   // validated by zfs.ZFSSendDry / zfs.ZFSSend
		ZFSSendFlags: zfs.ZFSSendFlags{
			ResumeToken:      r.ResumeToken, // nil or not nil, depending on decoding success
			Encrypted:        s.config.Encrypt,
			Properties:       s.config.SendProperties,
			BackupProperties: s.config.SendBackupProperties,
			Raw:              s.config.SendRaw,
			LargeBlocks:      s.config.SendLargeBlocks,
			Compressed:       s.config.SendCompressed,
			EmbeddedData:     s.config.SendEmbeddedData,
			Saved:            s.config.SendSaved,
		},
	}

	sendArgs, err = sendArgsUnvalidated.Validate(ctx)
	if err != nil {
		return sendArgs, errors.Wrap(err, "validate send arguments")
	}
	return sendArgs, nil
}

func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) {
	defer trace.WithSpanFromStackUpdateCtx(&ctx)()

	sendArgs, err := s.sendMakeArgs(ctx, r)
	if err != nil {
		return nil, nil, err
	}

	// create holds or bookmarks of `From` and `To` to guarantee one of the following:
	// - that the replication step can always be resumed (`holds`),
	// - that the replication step can be interrupted and a future replication
	//   step with same or different `To` but same `From` is still possible (`bookmarks`)
	// - nothing (`none`)
	//
	// ...
	//
	// ... actually create the abstractions
	replicationGuaranteeOptions, err := replicationGuaranteeOptionsFromPDU(r.GetReplicationConfig().Protection)
	if err != nil {
		return nil, nil, err
	}
	replicationGuaranteeStrategy := replicationGuaranteeOptions.Strategy(sendArgs.From != nil)
	liveAbs, err := replicationGuaranteeStrategy.SenderPreSend(ctx, s.jobId, &sendArgs)
	if err != nil {
		return nil, nil, err
	}
	for _, a := range liveAbs {
		if a != nil {
			abstractionsCacheSingleton.Put(a)
		}
	}

	// cleanup the mess that _this function_ might have created in prior failed attempts:
	//
	// In summary, we delete every endpoint ZFS abstraction created on this filesystem for this job id,
	// except for the ones we just created above.
	//
	// This is the most robust approach to avoid leaking (= forgetting to clean up) endpoint ZFS abstractions,
	// all under the assumption that there will only ever be one send for a (jobId,fs) combination at any given time.
	//
	// Note that the SendCompleted rpc can't be relied upon for this purpose:
	// - it might be lost due to network errors,
	// - or never be sent by a potentially malicious or buggy client,
	// - or never be send because the replication step failed at some point
	//   (potentially leaving a resumable state on the receiver, which is the case where we really do not want to blow away the step holds too soon.)
	//
	// Note further that a resuming send, due to the idempotent nature of func CreateReplicationCursor and HoldStep,
	// will never lose its step holds because we just (idempotently re-)created them above, before attempting the cleanup.
	destroyTypes := AbstractionTypeSet{
		AbstractionStepHold:                           true,
		AbstractionTentativeReplicationCursorBookmark: true,
	}
	// The replication planner can also pick an endpoint zfs abstraction as FromVersion.
	// Keep it, so that the replication will succeed.
	//
	// NB: there is no abstraction for snapshots, so, we only need to check bookmarks.
	if sendArgs.FromVersion != nil && sendArgs.FromVersion.IsBookmark() {
		dp, err := zfs.NewDatasetPath(sendArgs.FS)
		if err != nil {
			panic(err) // sendArgs is validated, this shouldn't happen
		}
		liveAbs = append(liveAbs, destroyTypes.ExtractBookmark(dp, sendArgs.FromVersion))
	}
	func() {
		ctx, endSpan := trace.WithSpan(ctx, "cleanup-stale-abstractions")
		defer endSpan()

		keep := func(a Abstraction) (keep bool) {
			keep = false
			for _, k := range liveAbs {
				keep = keep || AbstractionEquals(a, k)
			}
			return keep
		}
		check := func(obsoleteAbs []Abstraction) {
			// Ensure that we don't delete `From` or `To`.
			// Regardless of whether they are in AbstractionTypeSet or not.
			// And produce a nice error message in case we do, to aid debugging the resulting panic.
			//
			// This is especially important for `From`. We could break incremental replication
			// if we deleted the last common filesystem version between sender and receiver.
			type Problem struct {
				sendArgsWhat string
				fullpath     string
				obsoleteAbs  Abstraction
			}
			problems := make([]Problem, 0)
			checkFullpaths := make(map[string]string, 2)
			checkFullpaths["ToVersion"] = sendArgs.ToVersion.FullPath(sendArgs.FS)
			if sendArgs.FromVersion != nil {
				checkFullpaths["FromVersion"] = sendArgs.FromVersion.FullPath(sendArgs.FS)
			}
			for _, a := range obsoleteAbs {
				for what, fullpath := range checkFullpaths {
					if a.GetFullPath() == fullpath && a.GetType().IsSnapshotOrBookmark() {
						problems = append(problems, Problem{
							sendArgsWhat: what,
							fullpath:     fullpath,
							obsoleteAbs:  a,
						})
					}
				}
			}
			if len(problems) == 0 {
				return
			}
			var msg strings.Builder
			fmt.Fprintf(&msg, "cleaning up send stale would destroy send args:\n")
			fmt.Fprintf(&msg, "  SendArgs: %s\n", pretty.Sprint(sendArgs))
			for _, check := range problems {
				fmt.Fprintf(&msg, "would delete %s %s because it was deemed an obsolete abstraction: %s\n",
					check.sendArgsWhat, check.fullpath, check.obsoleteAbs)
			}
			panic(msg.String())
		}
		abstractionsCacheSingleton.TryBatchDestroy(ctx, s.jobId, sendArgs.FS, destroyTypes, keep, check)
	}()

	var sendStream io.ReadCloser
	sendStream, err = zfs.ZFSSend(ctx, sendArgs)
	if err != nil {
		// it's ok to not destroy the abstractions we just created here, a new send attempt will take care of it
		return nil, nil, errors.Wrap(err, "zfs send failed")
	}

	// apply rate limit
	sendStream = s.bwLimit.WrapReadCloser(sendStream)

	res := &pdu.SendRes{
		ExpectedSize:    0,
		UsedResumeToken: r.ResumeToken != "",
	}

	return res, sendStream, nil
}

func (s *Sender) SendDry(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, error) {
	defer trace.WithSpanFromStackUpdateCtx(&ctx)()

	sendArgs, err := s.sendMakeArgs(ctx, r)
	if err != nil {
		return nil, err
	}

	si, err := zfs.ZFSSendDry(ctx, sendArgs)
	if err != nil {
		return nil, errors.Wrap(err, "zfs send dry failed")
	}

	// From now on, assume that sendArgs has been validated by ZFSSendDry
	// (because validation involves shelling out, it's actually a little expensive)

	res := &pdu.SendRes{
		ExpectedSize:    si.SizeEstimate,
		UsedResumeToken: r.ResumeToken != "",
	}
	return res, nil
}

func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
	defer trace.WithSpanFromStackUpdateCtx(&ctx)()

	orig := r.GetOriginalReq() // may be nil, always use proto getters
	fsp, err := p.filterCheckFS(orig.GetFilesystem())
	if err != nil {
		return nil, err
	}
	fs := fsp.ToString()

	var from *zfs.FilesystemVersion
	if orig.GetFrom() != nil {
		f, err := sendArgsFromPDUAndValidateExistsAndGetVersion(ctx, fs, orig.GetFrom()) // no shadow
		if err != nil {
			return nil, errors.Wrap(err, "validate `from` exists")
		}
		from = &f
	}
	to, err := sendArgsFromPDUAndValidateExistsAndGetVersion(ctx, fs, orig.GetTo())
	if err != nil {
		return nil, errors.Wrap(err, "validate `to` exists")
	}

	replicationGuaranteeOptions, err := replicationGuaranteeOptionsFromPDU(orig.GetReplicationConfig().Protection)
	if err != nil {
		return nil, err
	}
	liveAbs, err := replicationGuaranteeOptions.Strategy(from != nil).SenderPostRecvConfirmed(ctx, p.jobId, fs, to)
	if err != nil {
		return nil, err
	}
	for _, a := range liveAbs {
		if a != nil {
			abstractionsCacheSingleton.Put(a)
		}
	}
	keep := func(a Abstraction) (keep bool) {
		keep = false
		for _, k := range liveAbs {
			keep = keep || AbstractionEquals(a, k)
		}
		return keep
	}
	destroyTypes := AbstractionTypeSet{
		AbstractionStepHold:                           true,
		AbstractionTentativeReplicationCursorBookmark: true,
		AbstractionReplicationCursorBookmarkV2:        true,
	}
	abstractionsCacheSingleton.TryBatchDestroy(ctx, p.jobId, fs, destroyTypes, keep, nil)

	return &pdu.SendCompletedRes{}, nil

}

func (p *Sender) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
	defer trace.WithSpanFromStackUpdateCtx(&ctx)()

	dp, err := p.filterCheckFS(req.Filesystem)
	if err != nil {
		return nil, err
	}
	return doDestroySnapshots(ctx, dp, req.Snapshots)
}

func (p *Sender) Ping(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
	defer trace.WithSpanFromStackUpdateCtx(&ctx)()

	res := pdu.PingRes{
		Echo: req.GetMessage(),
	}
	return &res, nil
}

func (p *Sender) PingDataconn(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
	defer trace.WithSpanFromStackUpdateCtx(&ctx)()

	return p.Ping(ctx, req)
}

func (p *Sender) WaitForConnectivity(ctx context.Context) error {
	defer trace.WithSpanFromStackUpdateCtx(&ctx)()

	return nil
}

func (p *Sender) ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) {
	defer trace.WithSpanFromStackUpdateCtx(&ctx)()

	dp, err := p.filterCheckFS(req.Filesystem)
	if err != nil {
		return nil, err
	}

	cursor, err := GetMostRecentReplicationCursorOfJob(ctx, dp.ToString(), p.jobId)
	if err != nil {
		return nil, err
	}
	if cursor == nil {
		return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Notexist{Notexist: true}}, nil
	}
	return &pdu.ReplicationCursorRes{Result: &pdu.ReplicationCursorRes_Guid{Guid: cursor.Guid}}, nil
}

func (p *Sender) Receive(ctx context.Context, r *pdu.ReceiveReq, _ io.ReadCloser) (*pdu.ReceiveRes, error) {
	return nil, fmt.Errorf("sender does not implement Receive()")
}

type FSFilter interface { // FIXME unused
	Filter(path *zfs.DatasetPath) (pass bool, err error)
	UserSpecifiedDatasets() zfs.UserSpecifiedDatasetsSet
}

// FIXME: can we get away without error types here?
type FSMap interface { // FIXME unused
	FSFilter
	Map(path *zfs.DatasetPath) (*zfs.DatasetPath, error)
	Invert() (FSMap, error)
	AsFilter() FSFilter
}

// NOTE: when adding members to this struct, remember
// to add them to `ReceiverConfig.copyIn()`
type ReceiverConfig struct {
	JobID JobID

	RootWithoutClientComponent *zfs.DatasetPath
	AppendClientIdentity       bool

	InheritProperties  []zfsprop.Property
	OverrideProperties map[zfsprop.Property]string

	BandwidthLimit bandwidthlimit.Config

	PlaceholderEncryption PlaceholderCreationEncryptionProperty
}

//go:generate enumer -type=PlaceholderCreationEncryptionProperty -transform=kebab -trimprefix=PlaceholderCreationEncryptionProperty
type PlaceholderCreationEncryptionProperty int

// Note: the constant names, transformed through enumer, are part of the config format!
const (
	PlaceholderCreationEncryptionPropertyUnspecified PlaceholderCreationEncryptionProperty = 1 << iota
	PlaceholderCreationEncryptionPropertyInherit
	PlaceholderCreationEncryptionPropertyOff
)

func (c *ReceiverConfig) copyIn() {
	c.RootWithoutClientComponent = c.RootWithoutClientComponent.Copy()

	pInherit := make([]zfsprop.Property, len(c.InheritProperties))
	copy(pInherit, c.InheritProperties)
	c.InheritProperties = pInherit

	pOverride := make(map[zfsprop.Property]string, len(c.OverrideProperties))
	for key, value := range c.OverrideProperties {
		pOverride[key] = value
	}
	c.OverrideProperties = pOverride
}

func (c *ReceiverConfig) Validate() error {
	c.JobID.MustValidate()

	for _, prop := range c.InheritProperties {
		err := prop.Validate()
		if err != nil {
			return errors.Wrapf(err, "inherit property %q", prop)
		}
	}

	for prop := range c.OverrideProperties {
		err := prop.Validate()
		if err != nil {
			return errors.Wrapf(err, "override property %q", prop)
		}
	}

	if c.RootWithoutClientComponent.Length() <= 0 {
		return errors.New("RootWithoutClientComponent must not be an empty dataset path")
	}

	if err := bandwidthlimit.ValidateConfig(c.BandwidthLimit); err != nil {
		return errors.Wrap(err, "`BandwidthLimit` field invalid")
	}

	if !c.PlaceholderEncryption.IsAPlaceholderCreationEncryptionProperty() {
		return errors.Errorf("`PlaceholderEncryption` field is invalid")
	}

	return nil
}

// Receiver implements replication.ReplicationEndpoint for a receiving side
type Receiver struct {
	pdu.UnsafeReplicationServer // prefer compilation errors over default 'method X not implemented' impl

	conf ReceiverConfig // validated

	bwLimit bandwidthlimit.Wrapper

	recvParentCreationMtx *chainlock.L

	Test_OverrideClientIdentityFunc func() string // for use by platformtest
}

func NewReceiver(config ReceiverConfig) *Receiver {
	config.copyIn()
	if err := config.Validate(); err != nil {
		panic(err)
	}
	return &Receiver{
		conf:                  config,
		recvParentCreationMtx: chainlock.New(),
		bwLimit:               bandwidthlimit.WrapperFromConfig(config.BandwidthLimit),
	}
}

func TestClientIdentity(rootFS *zfs.DatasetPath, clientIdentity string) error {
	_, err := clientRoot(rootFS, clientIdentity)
	return err
}

func clientRoot(rootFS *zfs.DatasetPath, clientIdentity string) (*zfs.DatasetPath, error) {
	rootFSLen := rootFS.Length()
	clientRootStr := path.Join(rootFS.ToString(), clientIdentity)
	clientRoot, err := zfs.NewDatasetPath(clientRootStr)
	if err != nil {
		return nil, err
	}
	if rootFSLen+1 != clientRoot.Length() {
		return nil, fmt.Errorf("client identity must be a single ZFS filesystem path component")
	}
	return clientRoot, nil
}

func (s *Receiver) clientRootFromCtx(ctx context.Context) *zfs.DatasetPath {
	if !s.conf.AppendClientIdentity {
		return s.conf.RootWithoutClientComponent.Copy()
	}

	var clientIdentity string
	if s.Test_OverrideClientIdentityFunc != nil {
		clientIdentity = s.Test_OverrideClientIdentityFunc()
	} else {
		var ok bool
		clientIdentity, ok = ctx.Value(ClientIdentityKey).(string) // no shadow
		if !ok {
			panic("ClientIdentityKey context value must be set")
		}
	}

	clientRoot, err := clientRoot(s.conf.RootWithoutClientComponent, clientIdentity)
	if err != nil {
		panic(fmt.Sprintf("ClientIdentityContextKey must have been validated before invoking Receiver: %s", err))
	}
	return clientRoot
}

type subroot struct {
	localRoot *zfs.DatasetPath
}

var _ zfs.DatasetFilter = subroot{}

// Filters local p
func (f subroot) Filter(p *zfs.DatasetPath) (pass bool, err error) {
	return p.HasPrefix(f.localRoot) && !p.Equal(f.localRoot), nil
}

func (f subroot) UserSpecifiedDatasets() zfs.UserSpecifiedDatasetsSet {
	return zfs.UserSpecifiedDatasetsSet{
		f.localRoot.ToString(): true,
	}
}

func (f subroot) MapToLocal(fs string) (*zfs.DatasetPath, error) {
	p, err := zfs.NewDatasetPath(fs)
	if err != nil {
		return nil, err
	}
	if p.Length() == 0 {
		return nil, errors.Errorf("cannot map empty filesystem")
	}
	c := f.localRoot.Copy()
	c.Extend(p)
	return c, nil
}

func (s *Receiver) ListFilesystems(ctx context.Context, req *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error) {
	defer trace.WithSpanFromStackUpdateCtx(&ctx)()

	// first make sure that root_fs is imported
	if rphs, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, s.conf.RootWithoutClientComponent); err != nil {
		return nil, errors.Wrap(err, "cannot determine whether root_fs exists")
	} else if !rphs.FSExists {
		getLogger(ctx).WithField("root_fs", s.conf.RootWithoutClientComponent).Error("root_fs does not exist")
		return nil, errors.Errorf("root_fs does not exist")
	}

	root := s.clientRootFromCtx(ctx)
	filtered, err := zfs.ZFSListMapping(ctx, subroot{root})
	if err != nil {
		return nil, err
	}
	// present filesystem without the root_fs prefix
	fss := make([]*pdu.Filesystem, 0, len(filtered))
	for _, a := range filtered {
		l := getLogger(ctx).WithField("fs", a)
		ph, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, a)
		if err != nil {
			l.WithError(err).Error("error getting placeholder state")
			return nil, errors.Wrapf(err, "cannot get placeholder state for fs %q", a)
		}
		l.WithField("placeholder_state", fmt.Sprintf("%#v", ph)).Debug("placeholder state")
		if !ph.FSExists {
			l.Error("inconsistent placeholder state: filesystem must exists")
			err := errors.Errorf("inconsistent placeholder state: filesystem %q must exist in this context", a.ToString())
			return nil, err
		}
		token, err := zfs.ZFSGetReceiveResumeTokenOrEmptyStringIfNotSupported(ctx, a)
		if err != nil {
			l.WithError(err).Error("cannot get receive resume token")
			return nil, err
		}
		l.WithField("receive_resume_token", token).Debug("receive resume token")

		a.TrimPrefix(root)

		fs := &pdu.Filesystem{
			Path:          a.ToString(),
			IsPlaceholder: ph.IsPlaceholder,
			ResumeToken:   token,
		}
		fss = append(fss, fs)
	}
	if len(fss) == 0 {
		getLogger(ctx).Debug("no filesystems found")
		return &pdu.ListFilesystemRes{}, nil
	}
	return &pdu.ListFilesystemRes{Filesystems: fss}, nil
}

func (s *Receiver) ListFilesystemVersions(ctx context.Context, req *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error) {
	defer trace.WithSpanFromStackUpdateCtx(&ctx)()

	root := s.clientRootFromCtx(ctx)
	lp, err := subroot{root}.MapToLocal(req.GetFilesystem())
	if err != nil {
		return nil, err
	}
	// TODO share following code with sender

	fsvs, err := zfs.ZFSListFilesystemVersions(ctx, lp, zfs.ListFilesystemVersionsOptions{})
	if err != nil {
		return nil, err
	}

	rfsvs := make([]*pdu.FilesystemVersion, len(fsvs))
	for i := range fsvs {
		rfsvs[i] = pdu.FilesystemVersionFromZFS(&fsvs[i])
	}

	return &pdu.ListFilesystemVersionsRes{Versions: rfsvs}, nil
}

func (s *Receiver) Ping(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
	defer trace.WithSpanFromStackUpdateCtx(&ctx)()

	res := pdu.PingRes{
		Echo: req.GetMessage(),
	}
	return &res, nil
}

func (s *Receiver) PingDataconn(ctx context.Context, req *pdu.PingReq) (*pdu.PingRes, error) {
	defer trace.WithSpanFromStackUpdateCtx(&ctx)()
	return s.Ping(ctx, req)
}

func (s *Receiver) WaitForConnectivity(ctx context.Context) error {
	defer trace.WithSpanFromStackUpdateCtx(&ctx)()
	return nil
}

func (s *Receiver) ReplicationCursor(ctx context.Context, _ *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error) {
	defer trace.WithSpanFromStackUpdateCtx(&ctx)()
	return nil, fmt.Errorf("ReplicationCursor not implemented for Receiver")
}

func (s *Receiver) Send(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) {
	defer trace.WithSpanFromStackUpdateCtx(&ctx)()
	return nil, nil, fmt.Errorf("receiver does not implement Send()")
}

func (s *Receiver) SendDry(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, error) {
	defer trace.WithSpanFromStackUpdateCtx(&ctx)()
	return nil, fmt.Errorf("receiver does not implement SendDry()")
}

func (s *Receiver) receive_GetPlaceholderCreationEncryptionValue(client_root, path *zfs.DatasetPath) (zfs.FilesystemPlaceholderCreateEncryptionValue, error) {
	if !s.conf.PlaceholderEncryption.IsAPlaceholderCreationEncryptionProperty() {
		panic(s.conf.PlaceholderEncryption)
	}

	if client_root.Equal(path) && s.conf.PlaceholderEncryption == PlaceholderCreationEncryptionPropertyUnspecified {
		// If our Receiver is configured to append a client component to s.conf.RootWithoutClientComponent
		// then that dataset is always going to be a placeholder.
		// We don't want to burden users with the concept of placeholders if their `filesystems` filter on the sender
		// doesn't introduce any gaps.
		// Since the dataset hierarchy up to and including that client component dataset is still fully controlled by us,
		// using `inherit` is going to make it work in all expected use cases.
		return zfs.FilesystemPlaceholderCreateEncryptionInherit, nil
	}

	switch s.conf.PlaceholderEncryption {
	case PlaceholderCreationEncryptionPropertyUnspecified:
		return 0, fmt.Errorf("placeholder filesystem encryption handling is unspecified in receiver config")
	case PlaceholderCreationEncryptionPropertyInherit:
		return zfs.FilesystemPlaceholderCreateEncryptionInherit, nil
	case PlaceholderCreationEncryptionPropertyOff:
		return zfs.FilesystemPlaceholderCreateEncryptionOff, nil
	default:
		panic(s.conf.PlaceholderEncryption)
	}
}

func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.ReadCloser) (*pdu.ReceiveRes, error) {
	defer trace.WithSpanFromStackUpdateCtx(&ctx)()

	getLogger(ctx).Debug("incoming Receive")
	defer receive.Close()

	root := s.clientRootFromCtx(ctx)
	lp, err := subroot{root}.MapToLocal(req.Filesystem)
	if err != nil {
		return nil, errors.Wrap(err, "`Filesystem` invalid")
	}

	to := uncheckedSendArgsFromPDU(req.GetTo())
	if to == nil {
		return nil, errors.New("`To` must not be nil")
	}
	if !to.IsSnapshot() {
		return nil, errors.New("`To` must be a snapshot")
	}

	// create placeholder parent filesystems as appropriate
	//
	// Manipulating the ZFS dataset hierarchy must happen exclusively.
	// TODO: Use fine-grained locking to allow separate clients / requests to pass
	// 		 through the following section concurrently when operating on disjoint
	//       ZFS dataset hierarchy subtrees.
	var visitErr error
	func() {
		getLogger(ctx).Debug("begin acquire recvParentCreationMtx")
		defer s.recvParentCreationMtx.Lock().Unlock()
		getLogger(ctx).Debug("end acquire recvParentCreationMtx")
		defer getLogger(ctx).Debug("release recvParentCreationMtx")

		f := zfs.NewDatasetPathForest()
		f.Add(lp)
		getLogger(ctx).Debug("begin tree-walk")
		f.WalkTopDown(func(v *zfs.DatasetPathVisit) (visitChildTree bool) {
			if v.Path.Equal(lp) {
				return false
			}

			l := getLogger(ctx).
				WithField("placeholder_fs", v.Path.ToString()).
				WithField("receive_fs", lp.ToString())

			ph, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, v.Path)
			l.WithField("placeholder_state", fmt.Sprintf("%#v", ph)).
				WithField("err", fmt.Sprintf("%s", err)).
				WithField("errType", fmt.Sprintf("%T", err)).
				Debug("get placeholder state for filesystem")
			if err != nil {
				visitErr = errors.Wrapf(err, "cannot get placeholder state of %s", v.Path.ToString())
				return false
			}

			if !ph.FSExists {
				if s.conf.RootWithoutClientComponent.HasPrefix(v.Path) {
					if v.Path.Length() == 1 {
						visitErr = fmt.Errorf("pool %q not imported", v.Path.ToString())
					} else {
						visitErr = fmt.Errorf("root_fs %q does not exist", s.conf.RootWithoutClientComponent.ToString())
					}
					l.WithError(visitErr).Error("placeholders are only created automatically below root_fs")
					return false
				}

				// compute the value lazily so that users who don't rely on
				// placeholders can use the default value PlaceholderCreationEncryptionPropertyUnspecified
				placeholderEncryption, err := s.receive_GetPlaceholderCreationEncryptionValue(root, v.Path)
				if err != nil {
					l.WithError(err).Error("cannot create placeholder filesystem") // logger already contains path
					visitErr = errors.Wrapf(err, "cannot create placeholder filesystem %s", v.Path.ToString())
					return false
				}

				l := l.WithField("encryption", placeholderEncryption)

				l.Debug("creating placeholder filesystem")
				err = zfs.ZFSCreatePlaceholderFilesystem(ctx, v.Path, v.Parent.Path, placeholderEncryption)
				if err != nil {
					l.WithError(err).Error("cannot create placeholder filesystem") // logger already contains path
					visitErr = errors.Wrapf(err, "cannot create placeholder filesystem %s", v.Path.ToString())
					return false
				}
				l.Info("created placeholder filesystem")
				return true
			} else {
				l.Debug("filesystem exists")
				return true // leave this fs as is
			}
		})
	}()
	getLogger(ctx).WithField("visitErr", visitErr).Debug("complete tree-walk")
	if visitErr != nil {
		return nil, visitErr
	}

	log := getLogger(ctx).WithField("proto_fs", req.GetFilesystem()).WithField("local_fs", lp.ToString())

	// determine whether we need to rollback the filesystem / change its placeholder state
	var clearPlaceholderProperty bool
	var recvOpts zfs.RecvOptions
	ph, err := zfs.ZFSGetFilesystemPlaceholderState(ctx, lp)
	if err != nil {
		return nil, errors.Wrap(err, "cannot get placeholder state")
	}
	log.WithField("placeholder_state", fmt.Sprintf("%#v", ph)).Debug("placeholder state")

	recvOpts.InheritProperties = s.conf.InheritProperties
	recvOpts.OverrideProperties = s.conf.OverrideProperties

	if ph.FSExists && ph.IsPlaceholder {
		recvOpts.RollbackAndForceRecv = true
		clearPlaceholderProperty = true
	}

	if clearPlaceholderProperty {
		log.Info("clearing placeholder property")
		if err := zfs.ZFSSetPlaceholder(ctx, lp, false); err != nil {
			return nil, fmt.Errorf("cannot clear placeholder property for forced receive: %s", err)
		}
	}

	if req.ClearResumeToken && ph.FSExists {
		log.Info("clearing resume token")
		if err := zfs.ZFSRecvClearResumeToken(ctx, lp.ToString()); err != nil {
			return nil, errors.Wrap(err, "cannot clear resume token")
		}
	}

	recvOpts.SavePartialRecvState, err = zfs.ResumeRecvSupported(ctx, lp)
	if err != nil {
		return nil, errors.Wrap(err, "cannot determine whether we can use resumable send & recv")
	}

	// apply rate limit
	receive = s.bwLimit.WrapReadCloser(receive)

	var peek bytes.Buffer
	var MaxPeek = envconst.Int64("ZREPL_ENDPOINT_RECV_PEEK_SIZE", 1<<20)
	log.WithField("max_peek_bytes", MaxPeek).Info("peeking incoming stream")
	if _, err := io.Copy(&peek, io.LimitReader(receive, MaxPeek)); err != nil {
		log.WithError(err).Error("cannot read peek-buffer from send stream")
	}
	var peekCopy bytes.Buffer
	if n, err := peekCopy.Write(peek.Bytes()); err != nil || n != peek.Len() {
		panic(peek.Len())
	}

	log.WithField("opts", fmt.Sprintf("%#v", recvOpts)).Debug("start receive command")

	snapFullPath := to.FullPath(lp.ToString())
	if err := zfs.ZFSRecv(ctx, lp.ToString(), to, chainedio.NewChainedReader(&peek, receive), recvOpts); err != nil {

		// best-effort rollback of placeholder state if the recv didn't start
		_, resumableStatePresent := err.(*zfs.RecvFailedWithResumeTokenErr)
		disablePlaceholderRestoration := envconst.Bool("ZREPL_ENDPOINT_DISABLE_PLACEHOLDER_RESTORATION", false)
		placeholderRestored := !ph.IsPlaceholder
		if !disablePlaceholderRestoration && !resumableStatePresent && recvOpts.RollbackAndForceRecv && ph.FSExists && ph.IsPlaceholder && clearPlaceholderProperty {
			log.Info("restoring placeholder property")
			if phErr := zfs.ZFSSetPlaceholder(ctx, lp, true); phErr != nil {
				log.WithError(phErr).Error("cannot restore placeholder property after failed receive, subsequent replications will likely fail with a different error")
				// fallthrough
			} else {
				placeholderRestored = true
			}
			// fallthrough
		}

		// deal with failing initial encrypted send & recv
		if _, ok := err.(*zfs.RecvDestroyOrOverwriteEncryptedErr); ok && ph.IsPlaceholder && placeholderRestored {
			msg := `cannot automatically replace placeholder filesystem with incoming send stream - please see receive-side log for details`
			err := errors.New(msg)
			log.Error(msg)

			log.Error(`zrepl creates placeholder filesystems on the receiving side of a replication to match the sending side's dataset hierarchy`)
			log.Error(`zrepl uses zfs receive -F to replace those placeholders with incoming full sends`)
			log.Error(`OpenZFS native encryption prohibits zfs receive -F for encrypted filesystems`)
			log.Error(`the current zrepl placeholder filesystem concept is thus incompatible with OpenZFS native encryption`)

			tempStartFullRecvFS := lp.Copy().ToString() + ".zrepl.initial-recv"
			tempStartFullRecvFSDP, dpErr := zfs.NewDatasetPath(tempStartFullRecvFS)
			if dpErr != nil {
				log.WithError(dpErr).Error("cannot determine temporary filesystem name for initial encrypted recv workaround")
				return nil, err // yes, err, not dpErr
			}

			log := log.WithField("temp_recv_fs", tempStartFullRecvFS)
			log.Error(`as a workaround, zrepl will now attempt to re-receive the beginning of the stream into a temporary filesystem temp_recv_fs`)
			log.Error(`if that step succeeds: shut down zrepl and use 'zfs rename' to swap temp_recv_fs with local_fs, then restart zrepl`)
			log.Error(`replication will then resume using resumable send+recv`)

			tempPH, phErr := zfs.ZFSGetFilesystemPlaceholderState(ctx, tempStartFullRecvFSDP)
			if phErr != nil {
				log.WithError(phErr).Error("cannot determine placeholder state of temp_recv_fs")
				return nil, err // yes, err, not dpErr
			}
			if tempPH.FSExists {
				log.Error("temp_recv_fs already exists, assuming a (partial) initial recv to that filesystem has already been done")
				return nil, err
			}

			recvOpts.RollbackAndForceRecv = false
			recvOpts.SavePartialRecvState = true
			rerecvErr := zfs.ZFSRecv(ctx, tempStartFullRecvFS, to, chainedio.NewChainedReader(&peekCopy), recvOpts)
			if _, isResumable := rerecvErr.(*zfs.RecvFailedWithResumeTokenErr); rerecvErr == nil || isResumable {
				log.Error("completed re-receive into temporary filesystem temp_recv_fs, now shut down zrepl and use zfs rename to swap temp_recv_fs with local_fs")
			} else {
				log.WithError(rerecvErr).Error("failed to receive the beginning of the stream into temporary filesystem temp_recv_fs")
				log.Error("we advise you to collect the error log and current configuration, open an issue on GitHub, and revert to your previous configuration in the meantime")
			}

			log.Error(`if you would like to see improvements to this situation, please open an issue on GitHub`)
			return nil, err
		}

		log.
			WithError(err).
			WithField("opts", fmt.Sprintf("%#v", recvOpts)).
			Error("zfs receive failed")

		return nil, err
	}

	// validate that we actually received what the sender claimed
	toRecvd, err := to.ValidateExistsAndGetVersion(ctx, lp.ToString())
	if err != nil {
		msg := "receive request's `To` version does not match what we received in the stream"
		log.WithError(err).WithField("snap", snapFullPath).Error(msg)
		log.Error("aborting recv request, but keeping received snapshot for inspection")
		return nil, errors.Wrap(err, msg)
	}

	replicationGuaranteeOptions, err := replicationGuaranteeOptionsFromPDU(req.GetReplicationConfig().Protection)
	if err != nil {
		return nil, err
	}
	replicationGuaranteeStrategy := replicationGuaranteeOptions.Strategy(ph.FSExists)
	liveAbs, err := replicationGuaranteeStrategy.ReceiverPostRecv(ctx, s.conf.JobID, lp.ToString(), toRecvd)
	if err != nil {
		return nil, err
	}
	for _, a := range liveAbs {
		if a != nil {
			abstractionsCacheSingleton.Put(a)
		}
	}
	keep := func(a Abstraction) (keep bool) {
		keep = false
		for _, k := range liveAbs {
			keep = keep || AbstractionEquals(a, k)
		}
		return keep
	}
	check := func(obsoleteAbs []Abstraction) {
		for _, abs := range obsoleteAbs {
			if zfs.FilesystemVersionEqualIdentity(abs.GetFilesystemVersion(), toRecvd) {
				panic(fmt.Sprintf("would destroy endpoint abstraction around the filesystem version we just received %s", abs))
			}
		}
	}
	destroyTypes := AbstractionTypeSet{
		AbstractionLastReceivedHold: true,
	}
	abstractionsCacheSingleton.TryBatchDestroy(ctx, s.conf.JobID, lp.ToString(), destroyTypes, keep, check)

	return &pdu.ReceiveRes{}, nil
}

func (s *Receiver) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error) {
	defer trace.WithSpanFromStackUpdateCtx(&ctx)()

	root := s.clientRootFromCtx(ctx)
	lp, err := subroot{root}.MapToLocal(req.Filesystem)
	if err != nil {
		return nil, err
	}
	return doDestroySnapshots(ctx, lp, req.Snapshots)
}

func (p *Receiver) SendCompleted(ctx context.Context, _ *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error) {
	defer trace.WithSpanFromStackUpdateCtx(&ctx)()

	return &pdu.SendCompletedRes{}, nil
}

func doDestroySnapshots(ctx context.Context, lp *zfs.DatasetPath, snaps []*pdu.FilesystemVersion) (*pdu.DestroySnapshotsRes, error) {
	reqs := make([]*zfs.DestroySnapOp, len(snaps))
	ress := make([]*pdu.DestroySnapshotRes, len(snaps))
	errs := make([]error, len(snaps))
	for i, fsv := range snaps {
		if fsv.Type != pdu.FilesystemVersion_Snapshot {
			return nil, fmt.Errorf("version %q is not a snapshot", fsv.Name)
		}
		ress[i] = &pdu.DestroySnapshotRes{
			Snapshot: fsv,
			// Error set after batch operation
		}
		reqs[i] = &zfs.DestroySnapOp{
			Filesystem: lp.ToString(),
			Name:       fsv.Name,
			ErrOut:     &errs[i],
		}
	}
	zfs.ZFSDestroyFilesystemVersions(ctx, reqs)
	for i := range reqs {
		if errs[i] != nil {
			if de, ok := errs[i].(*zfs.DestroySnapshotsError); ok && len(de.Reason) == 1 {
				ress[i].Error = de.Reason[0]
			} else {
				ress[i].Error = errs[i].Error()
			}
		}
	}
	return &pdu.DestroySnapshotsRes{
		Results: ress,
	}, nil
}