zfs: rewrite SendStream, fix bug in Close() on FreeBSD, add platformtests

This commit was motivated by https://github.com/zrepl/zrepl/issues/495
where, on FreeBSD with OpenZFS 2.0, a SendStream.Close() call might wait indefinitely for `zfs send` to exit.
The reason is that, due to the refactoring done for redacted send & recv
(30af21b025),
the `dump_bytes` function, which writes to the pipe, executes in a separate thread (synctask taskq) iff not `HAVE_LARGE_STACKS`.
The `zfs send` process/thread waits for that taskq thread using an uninterruptible primitive.
So when we SIGKILL `zfs send`, that signal doesn't reach the right thread to interrupt the pipe write.

Theoretically this affects both Linux and FreeBSD, but most Linux users `HAVE_LARGE_STACKS` and since https://github.com/penzfs/zfs/pull/12350/files OpenZFS on FreeBSD `HAVE_LARGE_STACKS` as well.
However, at least until FreeBSD 13.1, possibly for the entire 13 lifecycle, we're going to have to live with that oddity.

Measures taken in this commit:
- Report the behavior as an upstream bug https://github.com/openzfs/zfs/issues/12500
- Change SendStream code so that it closes zrepl's read-end of the pipe (see comment in code)
- Clean up and make explicit SendStream's state handling
- Write extensive platformtests for SendStream
    - They pass on my Linux install and on FreeBSD 12
    - FreeBSD 13 still needs testing.

fixes https://github.com/zrepl/zrepl/issues/495
This commit is contained in:
Christian Schwarz 2021-08-20 17:43:28 +02:00
parent ff6ebe87f3
commit 226962b4fb
4 changed files with 308 additions and 61 deletions

View File

@ -34,5 +34,10 @@ var Cases = []Case{BatchDestroy,
SendArgsValidationEncryptedSendOfUnencryptedDatasetForbidden__EncryptionSupported_true,
SendArgsValidationResumeTokenDifferentFilesystemForbidden,
SendArgsValidationResumeTokenEncryptionMismatchForbidden,
SendStreamCloseAfterBlockedOnPipeWrite,
SendStreamCloseAfterEOFRead,
SendStreamMultipleCloseAfterEOF,
SendStreamMultipleCloseBeforeEOF,
SendStreamNonEOFReadErrorHandling,
UndestroyableSnapshotParsing,
}

View File

@ -0,0 +1,186 @@
package tests
import (
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path"
"syscall"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/sys/unix"
"github.com/zrepl/zrepl/platformtest"
"github.com/zrepl/zrepl/util/nodefault"
"github.com/zrepl/zrepl/zfs"
)
func sendStreamTest(ctx *platformtest.Context) *zfs.SendStream {
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
DESTROYROOT
CREATEROOT
+ "sender"
`)
fs := fmt.Sprintf("%s/sender", ctx.RootDataset)
fsmpo, err := zfs.ZFSGetMountpoint(ctx, fs)
require.NoError(ctx, err)
writeDummyData(path.Join(fsmpo.Mountpoint, "dummy.data"), 1<<26)
mustSnapshot(ctx, fs+"@1")
snap := fsversion(ctx, fs, "@1")
snapSendArg := snap.ToSendArgVersion()
sendArgs, err := zfs.ZFSSendArgsUnvalidated{
FS: fs,
From: nil,
To: &snapSendArg,
ZFSSendFlags: zfs.ZFSSendFlags{
Encrypted: &nodefault.Bool{B: false},
},
}.Validate(ctx)
require.NoError(ctx, err)
sendStream, err := zfs.ZFSSend(ctx, sendArgs)
require.NoError(ctx, err)
return sendStream
}
func SendStreamCloseAfterBlockedOnPipeWrite(ctx *platformtest.Context) {
sendStream := sendStreamTest(ctx)
// let the pipe buffer fill and the zfs process block uninterruptibly
ctx.Logf("waiting for pipe write to block")
time.Sleep(5 * time.Second) // XXX need a platform-neutral way to detect that the pipe is full and the writer is blocked
ctx.Logf("closing send stream")
err := sendStream.Close() // this is what this test case is about
ctx.Logf("close error: %T %s", err, err)
require.NoError(ctx, err)
exitErrZfsError := sendStream.TestOnly_ExitErr()
require.Contains(ctx, exitErrZfsError.Error(), "signal")
require.Error(ctx, exitErrZfsError)
exitErr, ok := exitErrZfsError.WaitErr.(*exec.ExitError)
require.True(ctx, ok)
if exitErr.Exited() {
// some ZFS impls (FreeBSD 12) behaves that way
return
}
// ProcessState is only available after exit
// => use as proxy that the process was wait()ed upon and is gone
ctx.Logf("%#v", exitErr.ProcessState)
require.NotNil(ctx, exitErr.ProcessState)
// and let's verify that the process got killed, so that we know it was the call to .Close() above
waitStatus := exitErr.ProcessState.Sys().(syscall.WaitStatus)
ctx.Logf("wait status: %#v", waitStatus)
ctx.Logf("exit status: %v", waitStatus.ExitStatus())
require.True(ctx, waitStatus.Signaled())
switch waitStatus.Signal() {
case unix.SIGKILL:
fallthrough
case unix.SIGPIPE:
// ok
default:
ctx.Errorf("%T %s\n%v", waitStatus.Signal(), waitStatus.Signal(), waitStatus.Signal())
ctx.FailNow()
}
}
func SendStreamCloseAfterEOFRead(ctx *platformtest.Context) {
sendStream := sendStreamTest(ctx)
_, err := io.Copy(ioutil.Discard, sendStream)
require.NoError(ctx, err)
var buf [128]byte
n, err := sendStream.Read(buf[:])
require.Zero(ctx, n)
require.Equal(ctx, io.EOF, err)
err = sendStream.Close()
require.NoError(ctx, err)
n, err = sendStream.Read(buf[:])
require.Zero(ctx, n)
require.Equal(ctx, os.ErrClosed, err, "same read error should be returned")
}
func SendStreamMultipleCloseAfterEOF(ctx *platformtest.Context) {
sendStream := sendStreamTest(ctx)
_, err := io.Copy(ioutil.Discard, sendStream)
require.NoError(ctx, err)
var buf [128]byte
n, err := sendStream.Read(buf[:])
require.Zero(ctx, n)
require.Equal(ctx, io.EOF, err)
err = sendStream.Close()
require.NoError(ctx, err)
err = sendStream.Close()
require.Equal(ctx, os.ErrClosed, err)
}
func SendStreamMultipleCloseBeforeEOF(ctx *platformtest.Context) {
sendStream := sendStreamTest(ctx)
err := sendStream.Close()
require.NoError(ctx, err)
err = sendStream.Close()
require.Equal(ctx, os.ErrClosed, err)
}
type failingReadCloser struct {
err error
}
var _ io.ReadCloser = &failingReadCloser{}
func (c *failingReadCloser) Read(p []byte) (int, error) { return 0, c.err }
func (c *failingReadCloser) Close() error { return c.err }
func SendStreamNonEOFReadErrorHandling(ctx *platformtest.Context) {
sendStream := sendStreamTest(ctx)
var buf [128]byte
n, err := sendStream.Read(buf[:])
require.Equal(ctx, len(buf), n)
require.NoError(ctx, err)
var mockError = fmt.Errorf("taeghaefow4piesahwahjocu7ul5tiachaiLipheijae8ooZ8Pies8shohGee9feeTeirai5aiFeiyaecai4kiaLoh4azeih0tea")
mock := &failingReadCloser{err: mockError}
orig := sendStream.TestOnly_ReplaceStdoutReader(mock)
n, err = sendStream.Read(buf[:])
require.Equal(ctx, 0, n)
require.Equal(ctx, mockError, err)
if sendStream.TestOnly_ReplaceStdoutReader(orig) != mock {
panic("incorrect test impl")
}
err = sendStream.Close()
require.NoError(ctx, err) // if we can't kill the child then this will be a flaky test, but let's assume we can kill the child
err = sendStream.Close()
require.Equal(ctx, os.ErrClosed, err)
}

View File

@ -14,7 +14,6 @@ import (
"strconv"
"strings"
"sync"
"time"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
@ -334,60 +333,122 @@ func pipeWithCapacityHint(capacity int) (r, w *os.File, err error) {
return stdoutReader, stdoutWriter, nil
}
type sendStreamState int
const (
sendStreamOpen sendStreamState = iota
sendStreamClosed
)
type SendStream struct {
cmd *zfscmd.Cmd
kill context.CancelFunc
closeMtx sync.Mutex
stdoutReader *os.File
stdoutReader io.ReadCloser // not *os.File for mocking during platformtest
stderrBuf *circlog.CircularLog
opErr error
mtx sync.Mutex
state sendStreamState
exitErr *ZFSError
}
func (s *SendStream) Read(p []byte) (n int, err error) {
s.closeMtx.Lock()
opErr := s.opErr
s.closeMtx.Unlock()
if opErr != nil {
return 0, opErr
}
func (s *SendStream) Read(p []byte) (n int, _ error) {
s.mtx.Lock()
defer s.mtx.Unlock()
n, err = s.stdoutReader.Read(p)
if err != nil {
debug("sendStream: read err: %T %s", err, err)
// TODO we assume here that any read error is permanent
// which is most likely the case for a local zfs send
kwerr := s.killAndWait(err)
debug("sendStream: killAndWait n=%v err= %T %s", n, kwerr, kwerr)
// TODO we assume here that any read error is permanent
return n, kwerr
switch s.state {
case sendStreamClosed:
return 0, os.ErrClosed
case sendStreamOpen:
n, readErr := s.stdoutReader.Read(p)
if readErr != nil {
debug("sendStream: read: readErr=%T %s", readErr, readErr)
if readErr == io.EOF {
// io.EOF must be bubbled up as is so that consumers can handle it properly.
return n, readErr
}
// Assume that the error is not retryable.
// Try to kill now so that we can return a nice *ZFSError with captured stderr.
// If the kill doesn't work, it doesn't matter because the caller must by contract call Close() anyways.
killErr := s.killAndWait()
debug("sendStream: read: killErr=%T %s", killErr, killErr)
if killErr == nil {
s.state = sendStreamClosed
return n, s.exitErr // return the nice error
} else {
// we remain open so that we retry
return n, readErr // return the normal error
}
}
return n, readErr
default:
panic("unreachable")
}
return n, err
}
func (s *SendStream) Close() error {
debug("sendStream: close called")
return s.killAndWait(nil)
s.mtx.Lock()
defer s.mtx.Unlock()
switch s.state {
case sendStreamOpen:
err := s.killAndWait()
if err != nil {
return err
} else {
s.state = sendStreamClosed
return nil
}
case sendStreamClosed:
return os.ErrClosed
default:
panic("unreachable")
}
}
func (s *SendStream) killAndWait(precedingReadErr error) error {
// returns nil iff the child process is gone (has been successfully waited upon)
// in that case, s.exitErr is set
func (s *SendStream) killAndWait() error {
debug("sendStream: killAndWait enter")
defer debug("sendStream: killAndWait leave")
if precedingReadErr == io.EOF {
// give the zfs process a little bit of time to terminate itself
// if it holds this deadline, exitErr will be nil
time.AfterFunc(200*time.Millisecond, s.kill)
} else {
// send SIGKILL
s.kill()
// Close our read-end of the pipe.
//
// We must do this before .Wait() because in some (not all) versions/build configs of ZFS,
// `zfs send` uses a separate kernel thread (taskq) to write the send stream (function `dump_bytes`).
// The `zfs send` thread then waits uinterruptably for the taskq thread to finish the write.
// And signalling the `zfs send` thread doesn't propagate to the taskq thread.
// So we end up in a state where we .Wait() forever.
// (See https://github.com/openzfs/zfs/issues/12500 and
// https://github.com/zrepl/zrepl/issues/495#issuecomment-902530043)
//
// By closing our read end of the pipe before .Wait(), we unblock the taskq thread if there is any.
// If there is no separate taskq thread, the SIGKILL to `zfs end` would suffice and be most precise,
// but due to the circumstances above, there is no other portable & robust way.
//
// However, the fallout from closing the pipe is that (in non-taskq builds) `zfs sends` will get a SIGPIPE.
// And on Linux, that SIGPIPE appears to win over the previously issued SIGKILL.
// And thus, on Linux, the `zfs send` will be killed by the default SIGPIPE handler.
// We can observe this in the WaitStatus below.
// This behavior is slightly annoying because the *exec.ExitError's message ("signal: broken pipe")
// isn't as clear as ("signal: killed").
// However, it seems like we just have to live with that. (covered by platformtest)
var closePipeErr error
if s.stdoutReader != nil {
closePipeErr = s.stdoutReader.Close()
if closePipeErr == nil {
// avoid double-closes in case waiting below doesn't work
// and someone attempts Close again
s.stdoutReader = nil
} else {
return closePipeErr
}
// allow async kills from Close(), that's why we only take the mutex here
s.closeMtx.Lock()
defer s.closeMtx.Unlock()
if s.opErr != nil {
return s.opErr
}
waitErr := s.cmd.Wait()
@ -402,39 +463,30 @@ func (s *SendStream) killAndWait(precedingReadErr error) error {
}
}
// now, after we know the program exited do we close the pipe
var closePipeErr error
if s.stdoutReader != nil {
closePipeErr = s.stdoutReader.Close()
if closePipeErr == nil {
// avoid double-closes in case anything below doesn't work
// and someone calls Close again
s.stdoutReader = nil
} else {
return closePipeErr
}
}
// invariant: at this point, the child is gone and we cleaned up everything related to the SendStream
// we managed to tear things down, no let's give the user some pretty *ZFSError
if exitErr != nil {
s.opErr = &ZFSError{
// zfs send exited with an error or was killed by a signal.
s.exitErr = &ZFSError{
Stderr: []byte(s.stderrBuf.String()),
WaitErr: exitErr,
}
} else {
s.opErr = precedingReadErr
// zfs send exited successfully (we know that since waitErr was either nil or wasn't an *exec.ExitError)
s.exitErr = nil
}
// detect the edge where we're called from s.Read
// after the pipe EOFed and zfs send exited without errors
// this is actually the "hot" / nice path
if exitErr == nil && precedingReadErr == io.EOF {
return precedingReadErr
}
return s.opErr
return nil
}
func (s *SendStream) TestOnly_ReplaceStdoutReader(f io.ReadCloser) (prev io.ReadCloser) {
prev = s.stdoutReader
s.stdoutReader = f
return prev
}
func (s *SendStream) TestOnly_ExitErr() *ZFSError { return s.exitErr }
// NOTE: When updating this struct, make sure to update funcs Validate ValidateCorrespondsToResumeToken
type ZFSSendArgVersion struct {
RelName string

View File

@ -209,3 +209,7 @@ func (c *Cmd) Runtime() time.Duration {
}
return c.waitReturnedAt.Sub(c.startedAt)
}
func (c *Cmd) TestOnly_ExecCmd() *exec.Cmd {
return c.cmd
}