mirror of
https://github.com/zrepl/zrepl.git
synced 2025-01-05 05:48:57 +01:00
a8e92971d0
This commit was motivated by https://github.com/zrepl/zrepl/issues/495
where, on FreeBSD with OpenZFS 2.0, a SendStream.Close() call might wait indefinitely for `zfs send` to exit.
The reason is that, due to the refactoring done for redacted send & recv
(30af21b025
),
the `dump_bytes` function, which writes to the pipe, executes in a separate thread (synctask taskq) iff not `HAVE_LARGE_STACKS`.
The `zfs send` process/thread waits for that taskq thread using an uninterruptible primitive.
So when we SIGKILL `zfs send`, that signal doesn't reach the right thread to interrupt the pipe write.
Theoretically this affects both Linux and FreeBSD, but most Linux users `HAVE_LARGE_STACKS` and since https://github.com/penzfs/zfs/pull/12350/files OpenZFS on FreeBSD `HAVE_LARGE_STACKS` as well.
However, at least until FreeBSD 13.1, possibly for the entire 13 lifecycle, we're going to have to live with that oddity.
Measures taken in this commit:
- Report the behavior as an upstream bug https://github.com/openzfs/zfs/issues/12500
- Change SendStream code so that it closes zrepl's read-end of the pipe (see comment in code)
- Clean up and make explicit SendStream's state handling
- Write extensive platformtests for SendStream
- They pass on my Linux install and on FreeBSD 12
- FreeBSD 13 still needs testing.
fixes https://github.com/zrepl/zrepl/issues/495
187 lines
4.7 KiB
Go
187 lines
4.7 KiB
Go
package tests
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"os/exec"
|
|
"path"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
"golang.org/x/sys/unix"
|
|
|
|
"github.com/zrepl/zrepl/platformtest"
|
|
"github.com/zrepl/zrepl/util/nodefault"
|
|
"github.com/zrepl/zrepl/zfs"
|
|
)
|
|
|
|
func sendStreamTest(ctx *platformtest.Context) *zfs.SendStream {
|
|
|
|
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
|
|
DESTROYROOT
|
|
CREATEROOT
|
|
+ "sender"
|
|
`)
|
|
|
|
fs := fmt.Sprintf("%s/sender", ctx.RootDataset)
|
|
|
|
fsmpo, err := zfs.ZFSGetMountpoint(ctx, fs)
|
|
require.NoError(ctx, err)
|
|
|
|
writeDummyData(path.Join(fsmpo.Mountpoint, "dummy.data"), 1<<26)
|
|
mustSnapshot(ctx, fs+"@1")
|
|
snap := fsversion(ctx, fs, "@1")
|
|
snapSendArg := snap.ToSendArgVersion()
|
|
|
|
sendArgs, err := zfs.ZFSSendArgsUnvalidated{
|
|
FS: fs,
|
|
From: nil,
|
|
To: &snapSendArg,
|
|
ZFSSendFlags: zfs.ZFSSendFlags{
|
|
Encrypted: &nodefault.Bool{B: false},
|
|
},
|
|
}.Validate(ctx)
|
|
require.NoError(ctx, err)
|
|
|
|
sendStream, err := zfs.ZFSSend(ctx, sendArgs)
|
|
require.NoError(ctx, err)
|
|
|
|
return sendStream
|
|
|
|
}
|
|
|
|
func SendStreamCloseAfterBlockedOnPipeWrite(ctx *platformtest.Context) {
|
|
|
|
sendStream := sendStreamTest(ctx)
|
|
|
|
// let the pipe buffer fill and the zfs process block uninterruptibly
|
|
|
|
ctx.Logf("waiting for pipe write to block")
|
|
time.Sleep(5 * time.Second) // XXX need a platform-neutral way to detect that the pipe is full and the writer is blocked
|
|
|
|
ctx.Logf("closing send stream")
|
|
err := sendStream.Close() // this is what this test case is about
|
|
ctx.Logf("close error: %T %s", err, err)
|
|
require.NoError(ctx, err)
|
|
|
|
exitErrZfsError := sendStream.TestOnly_ExitErr()
|
|
require.Contains(ctx, exitErrZfsError.Error(), "signal")
|
|
require.Error(ctx, exitErrZfsError)
|
|
exitErr, ok := exitErrZfsError.WaitErr.(*exec.ExitError)
|
|
require.True(ctx, ok)
|
|
if exitErr.Exited() {
|
|
// some ZFS impls (FreeBSD 12) behaves that way
|
|
return
|
|
}
|
|
|
|
// ProcessState is only available after exit
|
|
// => use as proxy that the process was wait()ed upon and is gone
|
|
ctx.Logf("%#v", exitErr.ProcessState)
|
|
require.NotNil(ctx, exitErr.ProcessState)
|
|
// and let's verify that the process got killed, so that we know it was the call to .Close() above
|
|
waitStatus := exitErr.ProcessState.Sys().(syscall.WaitStatus)
|
|
ctx.Logf("wait status: %#v", waitStatus)
|
|
ctx.Logf("exit status: %v", waitStatus.ExitStatus())
|
|
require.True(ctx, waitStatus.Signaled())
|
|
switch waitStatus.Signal() {
|
|
case unix.SIGKILL:
|
|
fallthrough
|
|
case unix.SIGPIPE:
|
|
// ok
|
|
|
|
default:
|
|
ctx.Errorf("%T %s\n%v", waitStatus.Signal(), waitStatus.Signal(), waitStatus.Signal())
|
|
ctx.FailNow()
|
|
}
|
|
}
|
|
|
|
func SendStreamCloseAfterEOFRead(ctx *platformtest.Context) {
|
|
|
|
sendStream := sendStreamTest(ctx)
|
|
|
|
_, err := io.Copy(ioutil.Discard, sendStream)
|
|
require.NoError(ctx, err)
|
|
|
|
var buf [128]byte
|
|
n, err := sendStream.Read(buf[:])
|
|
require.Zero(ctx, n)
|
|
require.Equal(ctx, io.EOF, err)
|
|
|
|
err = sendStream.Close()
|
|
require.NoError(ctx, err)
|
|
|
|
n, err = sendStream.Read(buf[:])
|
|
require.Zero(ctx, n)
|
|
require.Equal(ctx, os.ErrClosed, err, "same read error should be returned")
|
|
}
|
|
|
|
func SendStreamMultipleCloseAfterEOF(ctx *platformtest.Context) {
|
|
|
|
sendStream := sendStreamTest(ctx)
|
|
|
|
_, err := io.Copy(ioutil.Discard, sendStream)
|
|
require.NoError(ctx, err)
|
|
|
|
var buf [128]byte
|
|
n, err := sendStream.Read(buf[:])
|
|
require.Zero(ctx, n)
|
|
require.Equal(ctx, io.EOF, err)
|
|
|
|
err = sendStream.Close()
|
|
require.NoError(ctx, err)
|
|
|
|
err = sendStream.Close()
|
|
require.Equal(ctx, os.ErrClosed, err)
|
|
}
|
|
|
|
func SendStreamMultipleCloseBeforeEOF(ctx *platformtest.Context) {
|
|
|
|
sendStream := sendStreamTest(ctx)
|
|
|
|
err := sendStream.Close()
|
|
require.NoError(ctx, err)
|
|
|
|
err = sendStream.Close()
|
|
require.Equal(ctx, os.ErrClosed, err)
|
|
}
|
|
|
|
type failingReadCloser struct {
|
|
err error
|
|
}
|
|
|
|
var _ io.ReadCloser = &failingReadCloser{}
|
|
|
|
func (c *failingReadCloser) Read(p []byte) (int, error) { return 0, c.err }
|
|
func (c *failingReadCloser) Close() error { return c.err }
|
|
|
|
func SendStreamNonEOFReadErrorHandling(ctx *platformtest.Context) {
|
|
|
|
sendStream := sendStreamTest(ctx)
|
|
|
|
var buf [128]byte
|
|
n, err := sendStream.Read(buf[:])
|
|
require.Equal(ctx, len(buf), n)
|
|
require.NoError(ctx, err)
|
|
|
|
var mockError = fmt.Errorf("taeghaefow4piesahwahjocu7ul5tiachaiLipheijae8ooZ8Pies8shohGee9feeTeirai5aiFeiyaecai4kiaLoh4azeih0tea")
|
|
mock := &failingReadCloser{err: mockError}
|
|
orig := sendStream.TestOnly_ReplaceStdoutReader(mock)
|
|
|
|
n, err = sendStream.Read(buf[:])
|
|
require.Equal(ctx, 0, n)
|
|
require.Equal(ctx, mockError, err)
|
|
|
|
if sendStream.TestOnly_ReplaceStdoutReader(orig) != mock {
|
|
panic("incorrect test impl")
|
|
}
|
|
|
|
err = sendStream.Close()
|
|
require.NoError(ctx, err) // if we can't kill the child then this will be a flaky test, but let's assume we can kill the child
|
|
|
|
err = sendStream.Close()
|
|
require.Equal(ctx, os.ErrClosed, err)
|
|
}
|