mirror of
https://github.com/zrepl/zrepl.git
synced 2025-06-24 11:41:27 +02:00
[#277] endpoint: Receiver.Receive: error message explaining problem with placeholders and encryption
This commit is contained in:
parent
27db8c0afe
commit
f772b3d39f
@ -2,6 +2,7 @@
|
|||||||
package endpoint
|
package endpoint
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@ -11,6 +12,7 @@ import (
|
|||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"github.com/zrepl/zrepl/replication/logic/pdu"
|
"github.com/zrepl/zrepl/replication/logic/pdu"
|
||||||
|
"github.com/zrepl/zrepl/util/chainedio"
|
||||||
"github.com/zrepl/zrepl/util/chainlock"
|
"github.com/zrepl/zrepl/util/chainlock"
|
||||||
"github.com/zrepl/zrepl/util/envconst"
|
"github.com/zrepl/zrepl/util/envconst"
|
||||||
"github.com/zrepl/zrepl/util/semaphore"
|
"github.com/zrepl/zrepl/util/semaphore"
|
||||||
@ -810,27 +812,89 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.
|
|||||||
}
|
}
|
||||||
defer guard.Release()
|
defer guard.Release()
|
||||||
|
|
||||||
|
var peek bytes.Buffer
|
||||||
|
var MaxPeek = envconst.Int64("ZREPL_ENDPOINT_RECV_PEEK_SIZE", 1<<20)
|
||||||
|
log.WithField("max_peek_bytes", MaxPeek).Info("peeking incoming stream")
|
||||||
|
if _, err := io.Copy(&peek, io.LimitReader(receive, MaxPeek)); err != nil {
|
||||||
|
log.WithError(err).Error("cannot read peek-buffer from send stream")
|
||||||
|
}
|
||||||
|
var peekCopy bytes.Buffer
|
||||||
|
if n, err := peekCopy.Write(peek.Bytes()); err != nil || n != peek.Len() {
|
||||||
|
panic(peek.Len())
|
||||||
|
}
|
||||||
|
|
||||||
log.WithField("opts", fmt.Sprintf("%#v", recvOpts)).Debug("start receive command")
|
log.WithField("opts", fmt.Sprintf("%#v", recvOpts)).Debug("start receive command")
|
||||||
|
|
||||||
snapFullPath := to.FullPath(lp.ToString())
|
snapFullPath := to.FullPath(lp.ToString())
|
||||||
if err := zfs.ZFSRecv(ctx, lp.ToString(), to, receive, recvOpts); err != nil {
|
if err := zfs.ZFSRecv(ctx, lp.ToString(), to, chainedio.NewChainedReader(&peek, receive), recvOpts); err != nil {
|
||||||
log.
|
|
||||||
WithError(err).
|
|
||||||
WithField("opts", fmt.Sprintf("%#v", recvOpts)).
|
|
||||||
Error("zfs receive failed")
|
|
||||||
|
|
||||||
// best-effort rollback of placeholder state if the recv didn't start
|
// best-effort rollback of placeholder state if the recv didn't start
|
||||||
_, resumableStatePresent := err.(*zfs.RecvFailedWithResumeTokenErr)
|
_, resumableStatePresent := err.(*zfs.RecvFailedWithResumeTokenErr)
|
||||||
disablePlaceholderRestoration := envconst.Bool("ZREPL_ENDPOINT_DISABLE_PLACEHOLDER_RESTORATION", false)
|
disablePlaceholderRestoration := envconst.Bool("ZREPL_ENDPOINT_DISABLE_PLACEHOLDER_RESTORATION", false)
|
||||||
|
placeholderRestored := !ph.IsPlaceholder
|
||||||
if !disablePlaceholderRestoration && !resumableStatePresent && recvOpts.RollbackAndForceRecv && ph.FSExists && ph.IsPlaceholder && clearPlaceholderProperty {
|
if !disablePlaceholderRestoration && !resumableStatePresent && recvOpts.RollbackAndForceRecv && ph.FSExists && ph.IsPlaceholder && clearPlaceholderProperty {
|
||||||
log.Info("restoring placeholder property")
|
log.Info("restoring placeholder property")
|
||||||
if phErr := zfs.ZFSSetPlaceholder(ctx, lp, true); phErr != nil {
|
if phErr := zfs.ZFSSetPlaceholder(ctx, lp, true); phErr != nil {
|
||||||
log.WithError(phErr).Error("cannot restore placeholder property after failed receive, subsequent replications will likely fail with a different error")
|
log.WithError(phErr).Error("cannot restore placeholder property after failed receive, subsequent replications will likely fail with a different error")
|
||||||
// fallthrough
|
// fallthrough
|
||||||
|
} else {
|
||||||
|
placeholderRestored = true
|
||||||
}
|
}
|
||||||
// fallthrough
|
// fallthrough
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// deal with failing initial encrypted send & recv
|
||||||
|
if _, ok := err.(*zfs.RecvDestroyOrOverwriteEncryptedErr); ok && ph.IsPlaceholder && placeholderRestored {
|
||||||
|
msg := `cannot automatically replace placeholder filesystem with incoming send stream - please see receive-side log for details`
|
||||||
|
err := errors.New(msg)
|
||||||
|
log.Error(msg)
|
||||||
|
|
||||||
|
log.Error(`zrepl creates placeholder filesystems on the receiving side of a replication to match the sending side's dataset hierarchy`)
|
||||||
|
log.Error(`zrepl uses zfs receive -F to replace those placeholders with incoming full sends`)
|
||||||
|
log.Error(`OpenZFS native encryption prohibits zfs receive -F for encrypted filesystems`)
|
||||||
|
log.Error(`the current zrepl placeholder filesystem concept is thus incompatible with OpenZFS native encryption`)
|
||||||
|
|
||||||
|
tempStartFullRecvFS := lp.Copy().ToString() + ".zrepl.initial-recv"
|
||||||
|
tempStartFullRecvFSDP, dpErr := zfs.NewDatasetPath(tempStartFullRecvFS)
|
||||||
|
if dpErr != nil {
|
||||||
|
log.WithError(dpErr).Error("cannot determine temporary filesystem name for initial encrypted recv workaround")
|
||||||
|
return nil, err // yes, err, not dpErr
|
||||||
|
}
|
||||||
|
|
||||||
|
log := log.WithField("temp_recv_fs", tempStartFullRecvFS)
|
||||||
|
log.Error(`as a workaround, zrepl will now attempt to re-receive the beginning of the stream into a temporary filesystem temp_recv_fs`)
|
||||||
|
log.Error(`if that step succeeds: shut down zrepl and use 'zfs rename' to swap temp_recv_fs with local_fs, then restart zrepl`)
|
||||||
|
log.Error(`replication will then resume using resumable send+recv`)
|
||||||
|
|
||||||
|
tempPH, phErr := zfs.ZFSGetFilesystemPlaceholderState(ctx, tempStartFullRecvFSDP)
|
||||||
|
if phErr != nil {
|
||||||
|
log.WithError(phErr).Error("cannot determine placeholder state of temp_recv_fs")
|
||||||
|
return nil, err // yes, err, not dpErr
|
||||||
|
}
|
||||||
|
if tempPH.FSExists {
|
||||||
|
log.Error("temp_recv_fs already exists, assuming a (partial) initial recv to that filesystem has already been done")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
recvOpts.RollbackAndForceRecv = false
|
||||||
|
recvOpts.SavePartialRecvState = true
|
||||||
|
rerecvErr := zfs.ZFSRecv(ctx, tempStartFullRecvFS, to, chainedio.NewChainedReader(&peekCopy), recvOpts)
|
||||||
|
if _, isResumable := rerecvErr.(*zfs.RecvFailedWithResumeTokenErr); rerecvErr == nil || isResumable {
|
||||||
|
log.Error("completed re-receive into temporary filesystem temp_recv_fs, now shut down zrepl and use zfs rename to swap temp_recv_fs with local_fs")
|
||||||
|
} else {
|
||||||
|
log.WithError(rerecvErr).Error("failed to receive the beginning of the stream into temporary filesystem temp_recv_fs")
|
||||||
|
log.Error("we advise you to collect the error log and current configuration, open an issue on GitHub, and revert to your previous configuration in the meantime")
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Error(`if you would like to see improvements to this situation, please open an issue on GitHub`)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.
|
||||||
|
WithError(err).
|
||||||
|
WithField("opts", fmt.Sprintf("%#v", recvOpts)).
|
||||||
|
Error("zfs receive failed")
|
||||||
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
43
util/chainedio/chainedio_readcloser.go
Normal file
43
util/chainedio/chainedio_readcloser.go
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
package chainedio
|
||||||
|
|
||||||
|
import "io"
|
||||||
|
|
||||||
|
type ChainedReadCloser struct {
|
||||||
|
readers []io.Reader
|
||||||
|
curReader int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewChainedReader(reader ...io.Reader) *ChainedReadCloser {
|
||||||
|
return &ChainedReadCloser{
|
||||||
|
readers: reader,
|
||||||
|
curReader: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ChainedReadCloser) Read(buf []byte) (n int, err error) {
|
||||||
|
|
||||||
|
n = 0
|
||||||
|
|
||||||
|
for c.curReader < len(c.readers) {
|
||||||
|
n, err = c.readers[c.curReader].Read(buf)
|
||||||
|
if err == io.EOF {
|
||||||
|
c.curReader++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if c.curReader == len(c.readers) {
|
||||||
|
err = io.EOF // actually, there was no gap
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ChainedReadCloser) Close() error {
|
||||||
|
for _, r := range c.readers {
|
||||||
|
if c, ok := r.(io.Closer); ok {
|
||||||
|
c.Close() // TODO debug log error?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
@ -1,34 +0,0 @@
|
|||||||
package chainedio
|
|
||||||
|
|
||||||
import "io"
|
|
||||||
|
|
||||||
type ChainedReader struct {
|
|
||||||
Readers []io.Reader
|
|
||||||
curReader int
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewChainedReader(reader ...io.Reader) *ChainedReader {
|
|
||||||
return &ChainedReader{
|
|
||||||
Readers: reader,
|
|
||||||
curReader: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ChainedReader) Read(buf []byte) (n int, err error) {
|
|
||||||
|
|
||||||
n = 0
|
|
||||||
|
|
||||||
for c.curReader < len(c.Readers) {
|
|
||||||
n, err = c.Readers[c.curReader].Read(buf)
|
|
||||||
if err == io.EOF {
|
|
||||||
c.curReader++
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if c.curReader == len(c.Readers) {
|
|
||||||
err = io.EOF // actually, there was no gap
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
Loading…
x
Reference in New Issue
Block a user