mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-25 01:44:43 +01:00
endpoint: Receiver.Receive: save a peek of recv stream to a temporary dataset if placeholder + encryption recv -F error is detected
This commit is contained in:
parent
05a39eaddf
commit
42ffca09db
@ -2,17 +2,17 @@
|
||||
package endpoint
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/zrepl/zrepl/replication/logic/pdu"
|
||||
"github.com/zrepl/zrepl/util/chainedio"
|
||||
"github.com/zrepl/zrepl/util/chainlock"
|
||||
"github.com/zrepl/zrepl/util/envconst"
|
||||
"github.com/zrepl/zrepl/util/semaphore"
|
||||
@ -812,14 +812,21 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.
|
||||
}
|
||||
defer guard.Release()
|
||||
|
||||
log.Info("peeking 1M ahead")
|
||||
var peek bytes.Buffer
|
||||
var MaxPeek = envconst.Int64("ZREPL_ENDPOINT_RECV_PEEK_SIZE", 1<<20)
|
||||
if _, err := io.Copy(&peek, io.LimitReader(receive, MaxPeek)); err != nil {
|
||||
log.WithError(err).Error("cannot read peek-buffer from send stream")
|
||||
}
|
||||
var peekCopy bytes.Buffer
|
||||
if n, err := peekCopy.Write(peek.Bytes()); err != nil || n != peek.Len() {
|
||||
panic(peek.Len())
|
||||
}
|
||||
|
||||
log.WithField("opts", fmt.Sprintf("%#v", recvOpts)).Debug("start receive command")
|
||||
|
||||
snapFullPath := to.FullPath(lp.ToString())
|
||||
if err := zfs.ZFSRecv(ctx, lp.ToString(), to, receive, recvOpts); err != nil {
|
||||
log.
|
||||
WithError(err).
|
||||
WithField("opts", fmt.Sprintf("%#v", recvOpts)).
|
||||
Error("zfs receive failed")
|
||||
if err := zfs.ZFSRecv(ctx, lp.ToString(), to, chainedio.NewChainedReader(&peek, receive), recvOpts); err != nil {
|
||||
|
||||
// best-effort rollback of placeholder state if the recv didn't start
|
||||
_, resumableStatePresent := err.(*zfs.RecvFailedWithResumeTokenErr)
|
||||
@ -838,21 +845,56 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.
|
||||
|
||||
// deal with failing initial encrypted send & recv
|
||||
if _, ok := err.(*zfs.RecvDestroyOrOverwriteEncryptedErr); ok && ph.IsPlaceholder && placeholderRestored {
|
||||
msg := `cannot replace placeholder filesystem with incoming send stream: OpenZFS does not support replacing encrypted filesystems using zfs receive -F`
|
||||
long := `
|
||||
It is very likely that you are encountering this error because you changed the sending job's list of filesystems.
|
||||
Replication should start working again once you change those settings back to what they were before.
|
||||
You can learn more about zrepl's placeholder filesystems at https://zrepl.github.io/configuration/overview.html#replication-placeholder-property
|
||||
If you would like to see improvements to this situation, please open an issue at https://github.com/zrepl/zrepl/issues , citing the previous error messages
|
||||
`
|
||||
msg := `cannot automatically replace placeholder filesystem with incoming send stream - please see receive-side log for details`
|
||||
err := errors.New(msg)
|
||||
log.Error(msg)
|
||||
s := bufio.NewScanner(strings.NewReader(long))
|
||||
for s.Scan() {
|
||||
log.Error(strings.TrimSpace(s.Text()))
|
||||
|
||||
log.Error(`zrepl creates placeholder filesystems on the receiving side of a replication to match the sending side's dataset hierarchy`)
|
||||
log.Error(`zrepl uses zfs receive -F to replace those placeholders with incoming full sends`)
|
||||
log.Error(`OpenZFS native encryption prohibits zfs receive -F for encrypted filesystems`)
|
||||
log.Error(`the current zrepl placeholder filesystem concept is thus incompatible with OpenZFS native encryption`)
|
||||
|
||||
tempStartFullRecvFS := lp.Copy().ToString() + ".zrepl.initial-recv"
|
||||
tempStartFullRecvFSDP, dpErr := zfs.NewDatasetPath(tempStartFullRecvFS)
|
||||
if dpErr != nil {
|
||||
log.WithError(dpErr).Error("cannot determine temporary filesystem name for initial encrypted recv workaround")
|
||||
return nil, err // yes, err, not dpErr
|
||||
}
|
||||
return nil, errors.New(msg)
|
||||
|
||||
log := log.WithField("temp_recv_fs", tempStartFullRecvFS)
|
||||
log.Error(`as a workaround, zrepl will now attempt to re-receive the beginning of the stream into a temporary filesystem temp_recv_fs`)
|
||||
log.Error(`if that step succeeds: shut down zrepl and use 'zfs rename' to swap temp_recv_fs with local_fs, then restart zrepl`)
|
||||
log.Error(`replication will then resume using resumable send+recv`)
|
||||
|
||||
tempPH, phErr := zfs.ZFSGetFilesystemPlaceholderState(ctx, tempStartFullRecvFSDP)
|
||||
if phErr != nil {
|
||||
log.WithError(phErr).Error("cannot determine placeholder state of temp_recv_fs")
|
||||
return nil, err // yes, err, not dpErr
|
||||
}
|
||||
if tempPH.FSExists {
|
||||
log.Error("temp_recv_fs already exists, assuming a (partial) initial recv to that filesystem has already been done")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
recvOpts.RollbackAndForceRecv = false
|
||||
recvOpts.SavePartialRecvState = true
|
||||
rerecvErr := zfs.ZFSRecv(ctx, tempStartFullRecvFS, to, chainedio.NewChainedReader(&peekCopy), recvOpts)
|
||||
if _, isResumable := rerecvErr.(*zfs.RecvFailedWithResumeTokenErr); rerecvErr == nil || isResumable {
|
||||
log.Error("completed re-receive into temporary filesystem temp_recv_fs, now shut down zrepl and use zfs rename to swap temp_recv_fs with local_fs")
|
||||
} else {
|
||||
log.WithError(rerecvErr).Error("failed to receive the beginning of the stream into temporary filesystem temp_recv_fs")
|
||||
log.Error("we advise you to collect the error log and current configuration, open an issue on GitHub, and revert to your previous configuration in the meantime")
|
||||
}
|
||||
|
||||
log.Error(`if you would like to see improvements to this situation, please open an issue on GitHub`)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.
|
||||
WithError(err).
|
||||
WithField("opts", fmt.Sprintf("%#v", recvOpts)).
|
||||
Error("zfs receive failed")
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
43
util/chainedio/chainedio_readcloser.go
Normal file
43
util/chainedio/chainedio_readcloser.go
Normal file
@ -0,0 +1,43 @@
|
||||
package chainedio
|
||||
|
||||
import "io"
|
||||
|
||||
type ChainedReadCloser struct {
|
||||
readers []io.Reader
|
||||
curReader int
|
||||
}
|
||||
|
||||
func NewChainedReader(reader ...io.Reader) *ChainedReadCloser {
|
||||
return &ChainedReadCloser{
|
||||
readers: reader,
|
||||
curReader: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ChainedReadCloser) Read(buf []byte) (n int, err error) {
|
||||
|
||||
n = 0
|
||||
|
||||
for c.curReader < len(c.readers) {
|
||||
n, err = c.readers[c.curReader].Read(buf)
|
||||
if err == io.EOF {
|
||||
c.curReader++
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
if c.curReader == len(c.readers) {
|
||||
err = io.EOF // actually, there was no gap
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (c *ChainedReadCloser) Close() error {
|
||||
for _, r := range c.readers {
|
||||
if c, ok := r.(io.Closer); ok {
|
||||
c.Close() // TODO debug log error?
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,34 +0,0 @@
|
||||
package chainedio
|
||||
|
||||
import "io"
|
||||
|
||||
type ChainedReader struct {
|
||||
Readers []io.Reader
|
||||
curReader int
|
||||
}
|
||||
|
||||
func NewChainedReader(reader ...io.Reader) *ChainedReader {
|
||||
return &ChainedReader{
|
||||
Readers: reader,
|
||||
curReader: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ChainedReader) Read(buf []byte) (n int, err error) {
|
||||
|
||||
n = 0
|
||||
|
||||
for c.curReader < len(c.Readers) {
|
||||
n, err = c.Readers[c.curReader].Read(buf)
|
||||
if err == io.EOF {
|
||||
c.curReader++
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
if c.curReader == len(c.Readers) {
|
||||
err = io.EOF // actually, there was no gap
|
||||
}
|
||||
|
||||
return
|
||||
}
|
Loading…
Reference in New Issue
Block a user