From 226962b4fb1162290d7bdb18ce0716ec18744126 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 20 Aug 2021 17:43:28 +0200 Subject: [PATCH] zfs: rewrite SendStream, fix bug in Close() on FreeBSD, add platformtests This commit was motivated by https://github.com/zrepl/zrepl/issues/495 where, on FreeBSD with OpenZFS 2.0, a SendStream.Close() call might wait indefinitely for `zfs send` to exit. The reason is that, due to the refactoring done for redacted send & recv (https://github.com/openzfs/zfs/commit/30af21b02569ac192f52ce6e6511015f8a8d5729), the `dump_bytes` function, which writes to the pipe, executes in a separate thread (synctask taskq) iff not `HAVE_LARGE_STACKS`. The `zfs send` process/thread waits for that taskq thread using an uninterruptible primitive. So when we SIGKILL `zfs send`, that signal doesn't reach the right thread to interrupt the pipe write. Theoretically this affects both Linux and FreeBSD, but most Linux users `HAVE_LARGE_STACKS` and since https://github.com/penzfs/zfs/pull/12350/files OpenZFS on FreeBSD `HAVE_LARGE_STACKS` as well. However, at least until FreeBSD 13.1, possibly for the entire 13 lifecycle, we're going to have to live with that oddity. Measures taken in this commit: - Report the behavior as an upstream bug https://github.com/openzfs/zfs/issues/12500 - Change SendStream code so that it closes zrepl's read-end of the pipe (see comment in code) - Clean up and make explicit SendStream's state handling - Write extensive platformtests for SendStream - They pass on my Linux install and on FreeBSD 12 - FreeBSD 13 still needs testing. fixes https://github.com/zrepl/zrepl/issues/495 --- platformtest/tests/generated_cases.go | 5 + platformtest/tests/sendStream.go | 186 ++++++++++++++++++++++++++ zfs/zfs.go | 174 +++++++++++++++--------- zfs/zfscmd/zfscmd.go | 4 + 4 files changed, 308 insertions(+), 61 deletions(-) create mode 100644 platformtest/tests/sendStream.go diff --git a/platformtest/tests/generated_cases.go b/platformtest/tests/generated_cases.go index 9a94c15..7294402 100644 --- a/platformtest/tests/generated_cases.go +++ b/platformtest/tests/generated_cases.go @@ -34,5 +34,10 @@ var Cases = []Case{BatchDestroy, SendArgsValidationEncryptedSendOfUnencryptedDatasetForbidden__EncryptionSupported_true, SendArgsValidationResumeTokenDifferentFilesystemForbidden, SendArgsValidationResumeTokenEncryptionMismatchForbidden, + SendStreamCloseAfterBlockedOnPipeWrite, + SendStreamCloseAfterEOFRead, + SendStreamMultipleCloseAfterEOF, + SendStreamMultipleCloseBeforeEOF, + SendStreamNonEOFReadErrorHandling, UndestroyableSnapshotParsing, } diff --git a/platformtest/tests/sendStream.go b/platformtest/tests/sendStream.go new file mode 100644 index 0000000..7fbe5e7 --- /dev/null +++ b/platformtest/tests/sendStream.go @@ -0,0 +1,186 @@ +package tests + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "os/exec" + "path" + "syscall" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/sys/unix" + + "github.com/zrepl/zrepl/platformtest" + "github.com/zrepl/zrepl/util/nodefault" + "github.com/zrepl/zrepl/zfs" +) + +func sendStreamTest(ctx *platformtest.Context) *zfs.SendStream { + + platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, ` + DESTROYROOT + CREATEROOT + + "sender" + `) + + fs := fmt.Sprintf("%s/sender", ctx.RootDataset) + + fsmpo, err := zfs.ZFSGetMountpoint(ctx, fs) + require.NoError(ctx, err) + + writeDummyData(path.Join(fsmpo.Mountpoint, "dummy.data"), 1<<26) + mustSnapshot(ctx, fs+"@1") + snap := fsversion(ctx, fs, "@1") + snapSendArg := snap.ToSendArgVersion() + + sendArgs, err := zfs.ZFSSendArgsUnvalidated{ + FS: fs, + From: nil, + To: &snapSendArg, + ZFSSendFlags: zfs.ZFSSendFlags{ + Encrypted: &nodefault.Bool{B: false}, + }, + }.Validate(ctx) + require.NoError(ctx, err) + + sendStream, err := zfs.ZFSSend(ctx, sendArgs) + require.NoError(ctx, err) + + return sendStream + +} + +func SendStreamCloseAfterBlockedOnPipeWrite(ctx *platformtest.Context) { + + sendStream := sendStreamTest(ctx) + + // let the pipe buffer fill and the zfs process block uninterruptibly + + ctx.Logf("waiting for pipe write to block") + time.Sleep(5 * time.Second) // XXX need a platform-neutral way to detect that the pipe is full and the writer is blocked + + ctx.Logf("closing send stream") + err := sendStream.Close() // this is what this test case is about + ctx.Logf("close error: %T %s", err, err) + require.NoError(ctx, err) + + exitErrZfsError := sendStream.TestOnly_ExitErr() + require.Contains(ctx, exitErrZfsError.Error(), "signal") + require.Error(ctx, exitErrZfsError) + exitErr, ok := exitErrZfsError.WaitErr.(*exec.ExitError) + require.True(ctx, ok) + if exitErr.Exited() { + // some ZFS impls (FreeBSD 12) behaves that way + return + } + + // ProcessState is only available after exit + // => use as proxy that the process was wait()ed upon and is gone + ctx.Logf("%#v", exitErr.ProcessState) + require.NotNil(ctx, exitErr.ProcessState) + // and let's verify that the process got killed, so that we know it was the call to .Close() above + waitStatus := exitErr.ProcessState.Sys().(syscall.WaitStatus) + ctx.Logf("wait status: %#v", waitStatus) + ctx.Logf("exit status: %v", waitStatus.ExitStatus()) + require.True(ctx, waitStatus.Signaled()) + switch waitStatus.Signal() { + case unix.SIGKILL: + fallthrough + case unix.SIGPIPE: + // ok + + default: + ctx.Errorf("%T %s\n%v", waitStatus.Signal(), waitStatus.Signal(), waitStatus.Signal()) + ctx.FailNow() + } +} + +func SendStreamCloseAfterEOFRead(ctx *platformtest.Context) { + + sendStream := sendStreamTest(ctx) + + _, err := io.Copy(ioutil.Discard, sendStream) + require.NoError(ctx, err) + + var buf [128]byte + n, err := sendStream.Read(buf[:]) + require.Zero(ctx, n) + require.Equal(ctx, io.EOF, err) + + err = sendStream.Close() + require.NoError(ctx, err) + + n, err = sendStream.Read(buf[:]) + require.Zero(ctx, n) + require.Equal(ctx, os.ErrClosed, err, "same read error should be returned") +} + +func SendStreamMultipleCloseAfterEOF(ctx *platformtest.Context) { + + sendStream := sendStreamTest(ctx) + + _, err := io.Copy(ioutil.Discard, sendStream) + require.NoError(ctx, err) + + var buf [128]byte + n, err := sendStream.Read(buf[:]) + require.Zero(ctx, n) + require.Equal(ctx, io.EOF, err) + + err = sendStream.Close() + require.NoError(ctx, err) + + err = sendStream.Close() + require.Equal(ctx, os.ErrClosed, err) +} + +func SendStreamMultipleCloseBeforeEOF(ctx *platformtest.Context) { + + sendStream := sendStreamTest(ctx) + + err := sendStream.Close() + require.NoError(ctx, err) + + err = sendStream.Close() + require.Equal(ctx, os.ErrClosed, err) +} + +type failingReadCloser struct { + err error +} + +var _ io.ReadCloser = &failingReadCloser{} + +func (c *failingReadCloser) Read(p []byte) (int, error) { return 0, c.err } +func (c *failingReadCloser) Close() error { return c.err } + +func SendStreamNonEOFReadErrorHandling(ctx *platformtest.Context) { + + sendStream := sendStreamTest(ctx) + + var buf [128]byte + n, err := sendStream.Read(buf[:]) + require.Equal(ctx, len(buf), n) + require.NoError(ctx, err) + + var mockError = fmt.Errorf("taeghaefow4piesahwahjocu7ul5tiachaiLipheijae8ooZ8Pies8shohGee9feeTeirai5aiFeiyaecai4kiaLoh4azeih0tea") + mock := &failingReadCloser{err: mockError} + orig := sendStream.TestOnly_ReplaceStdoutReader(mock) + + n, err = sendStream.Read(buf[:]) + require.Equal(ctx, 0, n) + require.Equal(ctx, mockError, err) + + if sendStream.TestOnly_ReplaceStdoutReader(orig) != mock { + panic("incorrect test impl") + } + + err = sendStream.Close() + require.NoError(ctx, err) // if we can't kill the child then this will be a flaky test, but let's assume we can kill the child + + err = sendStream.Close() + require.Equal(ctx, os.ErrClosed, err) +} diff --git a/zfs/zfs.go b/zfs/zfs.go index fca80b4..141ea5e 100644 --- a/zfs/zfs.go +++ b/zfs/zfs.go @@ -14,7 +14,6 @@ import ( "strconv" "strings" "sync" - "time" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -334,60 +333,122 @@ func pipeWithCapacityHint(capacity int) (r, w *os.File, err error) { return stdoutReader, stdoutWriter, nil } -type SendStream struct { - cmd *zfscmd.Cmd - kill context.CancelFunc +type sendStreamState int - closeMtx sync.Mutex - stdoutReader *os.File +const ( + sendStreamOpen sendStreamState = iota + sendStreamClosed +) + +type SendStream struct { + cmd *zfscmd.Cmd + kill context.CancelFunc + stdoutReader io.ReadCloser // not *os.File for mocking during platformtest stderrBuf *circlog.CircularLog - opErr error + + mtx sync.Mutex + state sendStreamState + exitErr *ZFSError } -func (s *SendStream) Read(p []byte) (n int, err error) { - s.closeMtx.Lock() - opErr := s.opErr - s.closeMtx.Unlock() - if opErr != nil { - return 0, opErr - } +func (s *SendStream) Read(p []byte) (n int, _ error) { + s.mtx.Lock() + defer s.mtx.Unlock() - n, err = s.stdoutReader.Read(p) - if err != nil { - debug("sendStream: read err: %T %s", err, err) - // TODO we assume here that any read error is permanent - // which is most likely the case for a local zfs send - kwerr := s.killAndWait(err) - debug("sendStream: killAndWait n=%v err= %T %s", n, kwerr, kwerr) - // TODO we assume here that any read error is permanent - return n, kwerr + switch s.state { + case sendStreamClosed: + return 0, os.ErrClosed + + case sendStreamOpen: + n, readErr := s.stdoutReader.Read(p) + if readErr != nil { + debug("sendStream: read: readErr=%T %s", readErr, readErr) + if readErr == io.EOF { + // io.EOF must be bubbled up as is so that consumers can handle it properly. + return n, readErr + } + // Assume that the error is not retryable. + // Try to kill now so that we can return a nice *ZFSError with captured stderr. + // If the kill doesn't work, it doesn't matter because the caller must by contract call Close() anyways. + killErr := s.killAndWait() + debug("sendStream: read: killErr=%T %s", killErr, killErr) + if killErr == nil { + s.state = sendStreamClosed + return n, s.exitErr // return the nice error + } else { + // we remain open so that we retry + return n, readErr // return the normal error + } + } + return n, readErr + + default: + panic("unreachable") } - return n, err } func (s *SendStream) Close() error { debug("sendStream: close called") - return s.killAndWait(nil) + s.mtx.Lock() + defer s.mtx.Unlock() + + switch s.state { + case sendStreamOpen: + err := s.killAndWait() + if err != nil { + return err + } else { + s.state = sendStreamClosed + return nil + } + case sendStreamClosed: + return os.ErrClosed + default: + panic("unreachable") + } } -func (s *SendStream) killAndWait(precedingReadErr error) error { +// returns nil iff the child process is gone (has been successfully waited upon) +// in that case, s.exitErr is set +func (s *SendStream) killAndWait() error { debug("sendStream: killAndWait enter") defer debug("sendStream: killAndWait leave") - if precedingReadErr == io.EOF { - // give the zfs process a little bit of time to terminate itself - // if it holds this deadline, exitErr will be nil - time.AfterFunc(200*time.Millisecond, s.kill) - } else { - s.kill() - } - // allow async kills from Close(), that's why we only take the mutex here - s.closeMtx.Lock() - defer s.closeMtx.Unlock() + // send SIGKILL + s.kill() - if s.opErr != nil { - return s.opErr + // Close our read-end of the pipe. + // + // We must do this before .Wait() because in some (not all) versions/build configs of ZFS, + // `zfs send` uses a separate kernel thread (taskq) to write the send stream (function `dump_bytes`). + // The `zfs send` thread then waits uinterruptably for the taskq thread to finish the write. + // And signalling the `zfs send` thread doesn't propagate to the taskq thread. + // So we end up in a state where we .Wait() forever. + // (See https://github.com/openzfs/zfs/issues/12500 and + // https://github.com/zrepl/zrepl/issues/495#issuecomment-902530043) + // + // By closing our read end of the pipe before .Wait(), we unblock the taskq thread if there is any. + // If there is no separate taskq thread, the SIGKILL to `zfs end` would suffice and be most precise, + // but due to the circumstances above, there is no other portable & robust way. + // + // However, the fallout from closing the pipe is that (in non-taskq builds) `zfs sends` will get a SIGPIPE. + // And on Linux, that SIGPIPE appears to win over the previously issued SIGKILL. + // And thus, on Linux, the `zfs send` will be killed by the default SIGPIPE handler. + // We can observe this in the WaitStatus below. + // This behavior is slightly annoying because the *exec.ExitError's message ("signal: broken pipe") + // isn't as clear as ("signal: killed"). + // However, it seems like we just have to live with that. (covered by platformtest) + var closePipeErr error + if s.stdoutReader != nil { + closePipeErr = s.stdoutReader.Close() + if closePipeErr == nil { + // avoid double-closes in case waiting below doesn't work + // and someone attempts Close again + s.stdoutReader = nil + } else { + return closePipeErr + } } waitErr := s.cmd.Wait() @@ -402,39 +463,30 @@ func (s *SendStream) killAndWait(precedingReadErr error) error { } } - // now, after we know the program exited do we close the pipe - var closePipeErr error - if s.stdoutReader != nil { - closePipeErr = s.stdoutReader.Close() - if closePipeErr == nil { - // avoid double-closes in case anything below doesn't work - // and someone calls Close again - s.stdoutReader = nil - } else { - return closePipeErr - } - } + // invariant: at this point, the child is gone and we cleaned up everything related to the SendStream - // we managed to tear things down, no let's give the user some pretty *ZFSError if exitErr != nil { - s.opErr = &ZFSError{ + // zfs send exited with an error or was killed by a signal. + s.exitErr = &ZFSError{ Stderr: []byte(s.stderrBuf.String()), WaitErr: exitErr, } } else { - s.opErr = precedingReadErr + // zfs send exited successfully (we know that since waitErr was either nil or wasn't an *exec.ExitError) + s.exitErr = nil } - // detect the edge where we're called from s.Read - // after the pipe EOFed and zfs send exited without errors - // this is actually the "hot" / nice path - if exitErr == nil && precedingReadErr == io.EOF { - return precedingReadErr - } - - return s.opErr + return nil } +func (s *SendStream) TestOnly_ReplaceStdoutReader(f io.ReadCloser) (prev io.ReadCloser) { + prev = s.stdoutReader + s.stdoutReader = f + return prev +} + +func (s *SendStream) TestOnly_ExitErr() *ZFSError { return s.exitErr } + // NOTE: When updating this struct, make sure to update funcs Validate ValidateCorrespondsToResumeToken type ZFSSendArgVersion struct { RelName string diff --git a/zfs/zfscmd/zfscmd.go b/zfs/zfscmd/zfscmd.go index 1cbc9ed..094f310 100644 --- a/zfs/zfscmd/zfscmd.go +++ b/zfs/zfscmd/zfscmd.go @@ -209,3 +209,7 @@ func (c *Cmd) Runtime() time.Duration { } return c.waitReturnedAt.Sub(c.startedAt) } + +func (c *Cmd) TestOnly_ExecCmd() *exec.Cmd { + return c.cmd +}