diff --git a/endpoint/endpoint.go b/endpoint/endpoint.go index 4282116..f16d271 100644 --- a/endpoint/endpoint.go +++ b/endpoint/endpoint.go @@ -2,17 +2,17 @@ package endpoint import ( - "bufio" + "bytes" "context" "fmt" "io" "path" - "strings" "sync" "github.com/pkg/errors" "github.com/zrepl/zrepl/replication/logic/pdu" + "github.com/zrepl/zrepl/util/chainedio" "github.com/zrepl/zrepl/util/chainlock" "github.com/zrepl/zrepl/util/envconst" "github.com/zrepl/zrepl/util/semaphore" @@ -812,14 +812,21 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io. } defer guard.Release() + log.Info("peeking 1M ahead") + var peek bytes.Buffer + var MaxPeek = envconst.Int64("ZREPL_ENDPOINT_RECV_PEEK_SIZE", 1<<20) + if _, err := io.Copy(&peek, io.LimitReader(receive, MaxPeek)); err != nil { + log.WithError(err).Error("cannot read peek-buffer from send stream") + } + var peekCopy bytes.Buffer + if n, err := peekCopy.Write(peek.Bytes()); err != nil || n != peek.Len() { + panic(peek.Len()) + } + log.WithField("opts", fmt.Sprintf("%#v", recvOpts)).Debug("start receive command") snapFullPath := to.FullPath(lp.ToString()) - if err := zfs.ZFSRecv(ctx, lp.ToString(), to, receive, recvOpts); err != nil { - log. - WithError(err). - WithField("opts", fmt.Sprintf("%#v", recvOpts)). - Error("zfs receive failed") + if err := zfs.ZFSRecv(ctx, lp.ToString(), to, chainedio.NewChainedReader(&peek, receive), recvOpts); err != nil { // best-effort rollback of placeholder state if the recv didn't start _, resumableStatePresent := err.(*zfs.RecvFailedWithResumeTokenErr) @@ -838,21 +845,56 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io. // deal with failing initial encrypted send & recv if _, ok := err.(*zfs.RecvDestroyOrOverwriteEncryptedErr); ok && ph.IsPlaceholder && placeholderRestored { - msg := `cannot replace placeholder filesystem with incoming send stream: OpenZFS does not support replacing encrypted filesystems using zfs receive -F` - long := ` - It is very likely that you are encountering this error because you changed the sending job's list of filesystems. - Replication should start working again once you change those settings back to what they were before. - You can learn more about zrepl's placeholder filesystems at https://zrepl.github.io/configuration/overview.html#replication-placeholder-property - If you would like to see improvements to this situation, please open an issue at https://github.com/zrepl/zrepl/issues , citing the previous error messages - ` + msg := `cannot automatically replace placeholder filesystem with incoming send stream - please see receive-side log for details` + err := errors.New(msg) log.Error(msg) - s := bufio.NewScanner(strings.NewReader(long)) - for s.Scan() { - log.Error(strings.TrimSpace(s.Text())) + + log.Error(`zrepl creates placeholder filesystems on the receiving side of a replication to match the sending side's dataset hierarchy`) + log.Error(`zrepl uses zfs receive -F to replace those placeholders with incoming full sends`) + log.Error(`OpenZFS native encryption prohibits zfs receive -F for encrypted filesystems`) + log.Error(`the current zrepl placeholder filesystem concept is thus incompatible with OpenZFS native encryption`) + + tempStartFullRecvFS := lp.Copy().ToString() + ".zrepl.initial-recv" + tempStartFullRecvFSDP, dpErr := zfs.NewDatasetPath(tempStartFullRecvFS) + if dpErr != nil { + log.WithError(dpErr).Error("cannot determine temporary filesystem name for initial encrypted recv workaround") + return nil, err // yes, err, not dpErr } - return nil, errors.New(msg) + + log := log.WithField("temp_recv_fs", tempStartFullRecvFS) + log.Error(`as a workaround, zrepl will now attempt to re-receive the beginning of the stream into a temporary filesystem temp_recv_fs`) + log.Error(`if that step succeeds: shut down zrepl and use 'zfs rename' to swap temp_recv_fs with local_fs, then restart zrepl`) + log.Error(`replication will then resume using resumable send+recv`) + + tempPH, phErr := zfs.ZFSGetFilesystemPlaceholderState(ctx, tempStartFullRecvFSDP) + if phErr != nil { + log.WithError(phErr).Error("cannot determine placeholder state of temp_recv_fs") + return nil, err // yes, err, not dpErr + } + if tempPH.FSExists { + log.Error("temp_recv_fs already exists, assuming a (partial) initial recv to that filesystem has already been done") + return nil, err + } + + recvOpts.RollbackAndForceRecv = false + recvOpts.SavePartialRecvState = true + rerecvErr := zfs.ZFSRecv(ctx, tempStartFullRecvFS, to, chainedio.NewChainedReader(&peekCopy), recvOpts) + if _, isResumable := rerecvErr.(*zfs.RecvFailedWithResumeTokenErr); rerecvErr == nil || isResumable { + log.Error("completed re-receive into temporary filesystem temp_recv_fs, now shut down zrepl and use zfs rename to swap temp_recv_fs with local_fs") + } else { + log.WithError(rerecvErr).Error("failed to receive the beginning of the stream into temporary filesystem temp_recv_fs") + log.Error("we advise you to collect the error log and current configuration, open an issue on GitHub, and revert to your previous configuration in the meantime") + } + + log.Error(`if you would like to see improvements to this situation, please open an issue on GitHub`) + return nil, err } + log. + WithError(err). + WithField("opts", fmt.Sprintf("%#v", recvOpts)). + Error("zfs receive failed") + return nil, err } diff --git a/util/chainedio/chainedio_readcloser.go b/util/chainedio/chainedio_readcloser.go new file mode 100644 index 0000000..1fe040c --- /dev/null +++ b/util/chainedio/chainedio_readcloser.go @@ -0,0 +1,43 @@ +package chainedio + +import "io" + +type ChainedReadCloser struct { + readers []io.Reader + curReader int +} + +func NewChainedReader(reader ...io.Reader) *ChainedReadCloser { + return &ChainedReadCloser{ + readers: reader, + curReader: 0, + } +} + +func (c *ChainedReadCloser) Read(buf []byte) (n int, err error) { + + n = 0 + + for c.curReader < len(c.readers) { + n, err = c.readers[c.curReader].Read(buf) + if err == io.EOF { + c.curReader++ + continue + } + break + } + if c.curReader == len(c.readers) { + err = io.EOF // actually, there was no gap + } + + return +} + +func (c *ChainedReadCloser) Close() error { + for _, r := range c.readers { + if c, ok := r.(io.Closer); ok { + c.Close() // TODO debug log error? + } + } + return nil +} diff --git a/util/chainedio/chainedio_reader.go b/util/chainedio/chainedio_reader.go deleted file mode 100644 index a7bdea6..0000000 --- a/util/chainedio/chainedio_reader.go +++ /dev/null @@ -1,34 +0,0 @@ -package chainedio - -import "io" - -type ChainedReader struct { - Readers []io.Reader - curReader int -} - -func NewChainedReader(reader ...io.Reader) *ChainedReader { - return &ChainedReader{ - Readers: reader, - curReader: 0, - } -} - -func (c *ChainedReader) Read(buf []byte) (n int, err error) { - - n = 0 - - for c.curReader < len(c.Readers) { - n, err = c.Readers[c.curReader].Read(buf) - if err == io.EOF { - c.curReader++ - continue - } - break - } - if c.curReader == len(c.Readers) { - err = io.EOF // actually, there was no gap - } - - return -}