mirror of
https://github.com/zrepl/zrepl.git
synced 2025-01-03 12:58:57 +01:00
zfs: rewrite SendStream, fix bug in Close() on FreeBSD, add platformtests
This commit was motivated by https://github.com/zrepl/zrepl/issues/495
where, on FreeBSD with OpenZFS 2.0, a SendStream.Close() call might wait indefinitely for `zfs send` to exit.
The reason is that, due to the refactoring done for redacted send & recv
(30af21b025
),
the `dump_bytes` function, which writes to the pipe, executes in a separate thread (synctask taskq) iff not `HAVE_LARGE_STACKS`.
The `zfs send` process/thread waits for that taskq thread using an uninterruptible primitive.
So when we SIGKILL `zfs send`, that signal doesn't reach the right thread to interrupt the pipe write.
Theoretically this affects both Linux and FreeBSD, but most Linux users `HAVE_LARGE_STACKS` and since https://github.com/penzfs/zfs/pull/12350/files OpenZFS on FreeBSD `HAVE_LARGE_STACKS` as well.
However, at least until FreeBSD 13.1, possibly for the entire 13 lifecycle, we're going to have to live with that oddity.
Measures taken in this commit:
- Report the behavior as an upstream bug https://github.com/openzfs/zfs/issues/12500
- Change SendStream code so that it closes zrepl's read-end of the pipe (see comment in code)
- Clean up and make explicit SendStream's state handling
- Write extensive platformtests for SendStream
- They pass on my Linux install and on FreeBSD 12
- FreeBSD 13 still needs testing.
fixes https://github.com/zrepl/zrepl/issues/495
This commit is contained in:
parent
b54e477602
commit
a8e92971d0
@ -34,5 +34,10 @@ var Cases = []Case{BatchDestroy,
|
||||
SendArgsValidationEncryptedSendOfUnencryptedDatasetForbidden__EncryptionSupported_true,
|
||||
SendArgsValidationResumeTokenDifferentFilesystemForbidden,
|
||||
SendArgsValidationResumeTokenEncryptionMismatchForbidden,
|
||||
SendStreamCloseAfterBlockedOnPipeWrite,
|
||||
SendStreamCloseAfterEOFRead,
|
||||
SendStreamMultipleCloseAfterEOF,
|
||||
SendStreamMultipleCloseBeforeEOF,
|
||||
SendStreamNonEOFReadErrorHandling,
|
||||
UndestroyableSnapshotParsing,
|
||||
}
|
||||
|
186
platformtest/tests/sendStream.go
Normal file
186
platformtest/tests/sendStream.go
Normal file
@ -0,0 +1,186 @@
|
||||
package tests
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sys/unix"
|
||||
|
||||
"github.com/zrepl/zrepl/platformtest"
|
||||
"github.com/zrepl/zrepl/util/nodefault"
|
||||
"github.com/zrepl/zrepl/zfs"
|
||||
)
|
||||
|
||||
func sendStreamTest(ctx *platformtest.Context) *zfs.SendStream {
|
||||
|
||||
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
|
||||
DESTROYROOT
|
||||
CREATEROOT
|
||||
+ "sender"
|
||||
`)
|
||||
|
||||
fs := fmt.Sprintf("%s/sender", ctx.RootDataset)
|
||||
|
||||
fsmpo, err := zfs.ZFSGetMountpoint(ctx, fs)
|
||||
require.NoError(ctx, err)
|
||||
|
||||
writeDummyData(path.Join(fsmpo.Mountpoint, "dummy.data"), 1<<26)
|
||||
mustSnapshot(ctx, fs+"@1")
|
||||
snap := fsversion(ctx, fs, "@1")
|
||||
snapSendArg := snap.ToSendArgVersion()
|
||||
|
||||
sendArgs, err := zfs.ZFSSendArgsUnvalidated{
|
||||
FS: fs,
|
||||
From: nil,
|
||||
To: &snapSendArg,
|
||||
ZFSSendFlags: zfs.ZFSSendFlags{
|
||||
Encrypted: &nodefault.Bool{B: false},
|
||||
},
|
||||
}.Validate(ctx)
|
||||
require.NoError(ctx, err)
|
||||
|
||||
sendStream, err := zfs.ZFSSend(ctx, sendArgs)
|
||||
require.NoError(ctx, err)
|
||||
|
||||
return sendStream
|
||||
|
||||
}
|
||||
|
||||
func SendStreamCloseAfterBlockedOnPipeWrite(ctx *platformtest.Context) {
|
||||
|
||||
sendStream := sendStreamTest(ctx)
|
||||
|
||||
// let the pipe buffer fill and the zfs process block uninterruptibly
|
||||
|
||||
ctx.Logf("waiting for pipe write to block")
|
||||
time.Sleep(5 * time.Second) // XXX need a platform-neutral way to detect that the pipe is full and the writer is blocked
|
||||
|
||||
ctx.Logf("closing send stream")
|
||||
err := sendStream.Close() // this is what this test case is about
|
||||
ctx.Logf("close error: %T %s", err, err)
|
||||
require.NoError(ctx, err)
|
||||
|
||||
exitErrZfsError := sendStream.TestOnly_ExitErr()
|
||||
require.Contains(ctx, exitErrZfsError.Error(), "signal")
|
||||
require.Error(ctx, exitErrZfsError)
|
||||
exitErr, ok := exitErrZfsError.WaitErr.(*exec.ExitError)
|
||||
require.True(ctx, ok)
|
||||
if exitErr.Exited() {
|
||||
// some ZFS impls (FreeBSD 12) behaves that way
|
||||
return
|
||||
}
|
||||
|
||||
// ProcessState is only available after exit
|
||||
// => use as proxy that the process was wait()ed upon and is gone
|
||||
ctx.Logf("%#v", exitErr.ProcessState)
|
||||
require.NotNil(ctx, exitErr.ProcessState)
|
||||
// and let's verify that the process got killed, so that we know it was the call to .Close() above
|
||||
waitStatus := exitErr.ProcessState.Sys().(syscall.WaitStatus)
|
||||
ctx.Logf("wait status: %#v", waitStatus)
|
||||
ctx.Logf("exit status: %v", waitStatus.ExitStatus())
|
||||
require.True(ctx, waitStatus.Signaled())
|
||||
switch waitStatus.Signal() {
|
||||
case unix.SIGKILL:
|
||||
fallthrough
|
||||
case unix.SIGPIPE:
|
||||
// ok
|
||||
|
||||
default:
|
||||
ctx.Errorf("%T %s\n%v", waitStatus.Signal(), waitStatus.Signal(), waitStatus.Signal())
|
||||
ctx.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
func SendStreamCloseAfterEOFRead(ctx *platformtest.Context) {
|
||||
|
||||
sendStream := sendStreamTest(ctx)
|
||||
|
||||
_, err := io.Copy(ioutil.Discard, sendStream)
|
||||
require.NoError(ctx, err)
|
||||
|
||||
var buf [128]byte
|
||||
n, err := sendStream.Read(buf[:])
|
||||
require.Zero(ctx, n)
|
||||
require.Equal(ctx, io.EOF, err)
|
||||
|
||||
err = sendStream.Close()
|
||||
require.NoError(ctx, err)
|
||||
|
||||
n, err = sendStream.Read(buf[:])
|
||||
require.Zero(ctx, n)
|
||||
require.Equal(ctx, os.ErrClosed, err, "same read error should be returned")
|
||||
}
|
||||
|
||||
func SendStreamMultipleCloseAfterEOF(ctx *platformtest.Context) {
|
||||
|
||||
sendStream := sendStreamTest(ctx)
|
||||
|
||||
_, err := io.Copy(ioutil.Discard, sendStream)
|
||||
require.NoError(ctx, err)
|
||||
|
||||
var buf [128]byte
|
||||
n, err := sendStream.Read(buf[:])
|
||||
require.Zero(ctx, n)
|
||||
require.Equal(ctx, io.EOF, err)
|
||||
|
||||
err = sendStream.Close()
|
||||
require.NoError(ctx, err)
|
||||
|
||||
err = sendStream.Close()
|
||||
require.Equal(ctx, os.ErrClosed, err)
|
||||
}
|
||||
|
||||
func SendStreamMultipleCloseBeforeEOF(ctx *platformtest.Context) {
|
||||
|
||||
sendStream := sendStreamTest(ctx)
|
||||
|
||||
err := sendStream.Close()
|
||||
require.NoError(ctx, err)
|
||||
|
||||
err = sendStream.Close()
|
||||
require.Equal(ctx, os.ErrClosed, err)
|
||||
}
|
||||
|
||||
type failingReadCloser struct {
|
||||
err error
|
||||
}
|
||||
|
||||
var _ io.ReadCloser = &failingReadCloser{}
|
||||
|
||||
func (c *failingReadCloser) Read(p []byte) (int, error) { return 0, c.err }
|
||||
func (c *failingReadCloser) Close() error { return c.err }
|
||||
|
||||
func SendStreamNonEOFReadErrorHandling(ctx *platformtest.Context) {
|
||||
|
||||
sendStream := sendStreamTest(ctx)
|
||||
|
||||
var buf [128]byte
|
||||
n, err := sendStream.Read(buf[:])
|
||||
require.Equal(ctx, len(buf), n)
|
||||
require.NoError(ctx, err)
|
||||
|
||||
var mockError = fmt.Errorf("taeghaefow4piesahwahjocu7ul5tiachaiLipheijae8ooZ8Pies8shohGee9feeTeirai5aiFeiyaecai4kiaLoh4azeih0tea")
|
||||
mock := &failingReadCloser{err: mockError}
|
||||
orig := sendStream.TestOnly_ReplaceStdoutReader(mock)
|
||||
|
||||
n, err = sendStream.Read(buf[:])
|
||||
require.Equal(ctx, 0, n)
|
||||
require.Equal(ctx, mockError, err)
|
||||
|
||||
if sendStream.TestOnly_ReplaceStdoutReader(orig) != mock {
|
||||
panic("incorrect test impl")
|
||||
}
|
||||
|
||||
err = sendStream.Close()
|
||||
require.NoError(ctx, err) // if we can't kill the child then this will be a flaky test, but let's assume we can kill the child
|
||||
|
||||
err = sendStream.Close()
|
||||
require.Equal(ctx, os.ErrClosed, err)
|
||||
}
|
174
zfs/zfs.go
174
zfs/zfs.go
@ -14,7 +14,6 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@ -334,60 +333,122 @@ func pipeWithCapacityHint(capacity int) (r, w *os.File, err error) {
|
||||
return stdoutReader, stdoutWriter, nil
|
||||
}
|
||||
|
||||
type SendStream struct {
|
||||
cmd *zfscmd.Cmd
|
||||
kill context.CancelFunc
|
||||
type sendStreamState int
|
||||
|
||||
closeMtx sync.Mutex
|
||||
stdoutReader *os.File
|
||||
const (
|
||||
sendStreamOpen sendStreamState = iota
|
||||
sendStreamClosed
|
||||
)
|
||||
|
||||
type SendStream struct {
|
||||
cmd *zfscmd.Cmd
|
||||
kill context.CancelFunc
|
||||
stdoutReader io.ReadCloser // not *os.File for mocking during platformtest
|
||||
stderrBuf *circlog.CircularLog
|
||||
opErr error
|
||||
|
||||
mtx sync.Mutex
|
||||
state sendStreamState
|
||||
exitErr *ZFSError
|
||||
}
|
||||
|
||||
func (s *SendStream) Read(p []byte) (n int, err error) {
|
||||
s.closeMtx.Lock()
|
||||
opErr := s.opErr
|
||||
s.closeMtx.Unlock()
|
||||
if opErr != nil {
|
||||
return 0, opErr
|
||||
}
|
||||
func (s *SendStream) Read(p []byte) (n int, _ error) {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
n, err = s.stdoutReader.Read(p)
|
||||
if err != nil {
|
||||
debug("sendStream: read err: %T %s", err, err)
|
||||
// TODO we assume here that any read error is permanent
|
||||
// which is most likely the case for a local zfs send
|
||||
kwerr := s.killAndWait(err)
|
||||
debug("sendStream: killAndWait n=%v err= %T %s", n, kwerr, kwerr)
|
||||
// TODO we assume here that any read error is permanent
|
||||
return n, kwerr
|
||||
switch s.state {
|
||||
case sendStreamClosed:
|
||||
return 0, os.ErrClosed
|
||||
|
||||
case sendStreamOpen:
|
||||
n, readErr := s.stdoutReader.Read(p)
|
||||
if readErr != nil {
|
||||
debug("sendStream: read: readErr=%T %s", readErr, readErr)
|
||||
if readErr == io.EOF {
|
||||
// io.EOF must be bubbled up as is so that consumers can handle it properly.
|
||||
return n, readErr
|
||||
}
|
||||
// Assume that the error is not retryable.
|
||||
// Try to kill now so that we can return a nice *ZFSError with captured stderr.
|
||||
// If the kill doesn't work, it doesn't matter because the caller must by contract call Close() anyways.
|
||||
killErr := s.killAndWait()
|
||||
debug("sendStream: read: killErr=%T %s", killErr, killErr)
|
||||
if killErr == nil {
|
||||
s.state = sendStreamClosed
|
||||
return n, s.exitErr // return the nice error
|
||||
} else {
|
||||
// we remain open so that we retry
|
||||
return n, readErr // return the normal error
|
||||
}
|
||||
}
|
||||
return n, readErr
|
||||
|
||||
default:
|
||||
panic("unreachable")
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (s *SendStream) Close() error {
|
||||
debug("sendStream: close called")
|
||||
return s.killAndWait(nil)
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
switch s.state {
|
||||
case sendStreamOpen:
|
||||
err := s.killAndWait()
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
s.state = sendStreamClosed
|
||||
return nil
|
||||
}
|
||||
case sendStreamClosed:
|
||||
return os.ErrClosed
|
||||
default:
|
||||
panic("unreachable")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SendStream) killAndWait(precedingReadErr error) error {
|
||||
// returns nil iff the child process is gone (has been successfully waited upon)
|
||||
// in that case, s.exitErr is set
|
||||
func (s *SendStream) killAndWait() error {
|
||||
|
||||
debug("sendStream: killAndWait enter")
|
||||
defer debug("sendStream: killAndWait leave")
|
||||
if precedingReadErr == io.EOF {
|
||||
// give the zfs process a little bit of time to terminate itself
|
||||
// if it holds this deadline, exitErr will be nil
|
||||
time.AfterFunc(200*time.Millisecond, s.kill)
|
||||
} else {
|
||||
s.kill()
|
||||
}
|
||||
|
||||
// allow async kills from Close(), that's why we only take the mutex here
|
||||
s.closeMtx.Lock()
|
||||
defer s.closeMtx.Unlock()
|
||||
// send SIGKILL
|
||||
s.kill()
|
||||
|
||||
if s.opErr != nil {
|
||||
return s.opErr
|
||||
// Close our read-end of the pipe.
|
||||
//
|
||||
// We must do this before .Wait() because in some (not all) versions/build configs of ZFS,
|
||||
// `zfs send` uses a separate kernel thread (taskq) to write the send stream (function `dump_bytes`).
|
||||
// The `zfs send` thread then waits uinterruptably for the taskq thread to finish the write.
|
||||
// And signalling the `zfs send` thread doesn't propagate to the taskq thread.
|
||||
// So we end up in a state where we .Wait() forever.
|
||||
// (See https://github.com/openzfs/zfs/issues/12500 and
|
||||
// https://github.com/zrepl/zrepl/issues/495#issuecomment-902530043)
|
||||
//
|
||||
// By closing our read end of the pipe before .Wait(), we unblock the taskq thread if there is any.
|
||||
// If there is no separate taskq thread, the SIGKILL to `zfs end` would suffice and be most precise,
|
||||
// but due to the circumstances above, there is no other portable & robust way.
|
||||
//
|
||||
// However, the fallout from closing the pipe is that (in non-taskq builds) `zfs sends` will get a SIGPIPE.
|
||||
// And on Linux, that SIGPIPE appears to win over the previously issued SIGKILL.
|
||||
// And thus, on Linux, the `zfs send` will be killed by the default SIGPIPE handler.
|
||||
// We can observe this in the WaitStatus below.
|
||||
// This behavior is slightly annoying because the *exec.ExitError's message ("signal: broken pipe")
|
||||
// isn't as clear as ("signal: killed").
|
||||
// However, it seems like we just have to live with that. (covered by platformtest)
|
||||
var closePipeErr error
|
||||
if s.stdoutReader != nil {
|
||||
closePipeErr = s.stdoutReader.Close()
|
||||
if closePipeErr == nil {
|
||||
// avoid double-closes in case waiting below doesn't work
|
||||
// and someone attempts Close again
|
||||
s.stdoutReader = nil
|
||||
} else {
|
||||
return closePipeErr
|
||||
}
|
||||
}
|
||||
|
||||
waitErr := s.cmd.Wait()
|
||||
@ -402,39 +463,30 @@ func (s *SendStream) killAndWait(precedingReadErr error) error {
|
||||
}
|
||||
}
|
||||
|
||||
// now, after we know the program exited do we close the pipe
|
||||
var closePipeErr error
|
||||
if s.stdoutReader != nil {
|
||||
closePipeErr = s.stdoutReader.Close()
|
||||
if closePipeErr == nil {
|
||||
// avoid double-closes in case anything below doesn't work
|
||||
// and someone calls Close again
|
||||
s.stdoutReader = nil
|
||||
} else {
|
||||
return closePipeErr
|
||||
}
|
||||
}
|
||||
// invariant: at this point, the child is gone and we cleaned up everything related to the SendStream
|
||||
|
||||
// we managed to tear things down, no let's give the user some pretty *ZFSError
|
||||
if exitErr != nil {
|
||||
s.opErr = &ZFSError{
|
||||
// zfs send exited with an error or was killed by a signal.
|
||||
s.exitErr = &ZFSError{
|
||||
Stderr: []byte(s.stderrBuf.String()),
|
||||
WaitErr: exitErr,
|
||||
}
|
||||
} else {
|
||||
s.opErr = precedingReadErr
|
||||
// zfs send exited successfully (we know that since waitErr was either nil or wasn't an *exec.ExitError)
|
||||
s.exitErr = nil
|
||||
}
|
||||
|
||||
// detect the edge where we're called from s.Read
|
||||
// after the pipe EOFed and zfs send exited without errors
|
||||
// this is actually the "hot" / nice path
|
||||
if exitErr == nil && precedingReadErr == io.EOF {
|
||||
return precedingReadErr
|
||||
}
|
||||
|
||||
return s.opErr
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SendStream) TestOnly_ReplaceStdoutReader(f io.ReadCloser) (prev io.ReadCloser) {
|
||||
prev = s.stdoutReader
|
||||
s.stdoutReader = f
|
||||
return prev
|
||||
}
|
||||
|
||||
func (s *SendStream) TestOnly_ExitErr() *ZFSError { return s.exitErr }
|
||||
|
||||
// NOTE: When updating this struct, make sure to update funcs Validate ValidateCorrespondsToResumeToken
|
||||
type ZFSSendArgVersion struct {
|
||||
RelName string
|
||||
|
@ -209,3 +209,7 @@ func (c *Cmd) Runtime() time.Duration {
|
||||
}
|
||||
return c.waitReturnedAt.Sub(c.startedAt)
|
||||
}
|
||||
|
||||
func (c *Cmd) TestOnly_ExecCmd() *exec.Cmd {
|
||||
return c.cmd
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user