mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-29 11:55:03 +01:00
c600cc1f60
We had too many spurious test failures in the past. But on a developer machine, the tests don't usually fail because the system isn't loaded as much. So, only disable test on CircleCI.
195 lines
4.9 KiB
Go
195 lines
4.9 KiB
Go
package timeoutconn
|
|
|
|
import (
|
|
"bytes"
|
|
"io"
|
|
"net"
|
|
"sync"
|
|
"syscall"
|
|
"testing"
|
|
"time"
|
|
"unsafe"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/zrepl/zrepl/util/socketpair"
|
|
"github.com/zrepl/zrepl/util/zreplcircleci"
|
|
)
|
|
|
|
func TestReadTimeout(t *testing.T) {
|
|
|
|
a, b, err := socketpair.SocketPair()
|
|
require.NoError(t, err)
|
|
defer a.Close()
|
|
defer b.Close()
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(2)
|
|
go func() {
|
|
defer wg.Done()
|
|
var buf bytes.Buffer
|
|
buf.WriteString("tooktoolong")
|
|
time.Sleep(500 * time.Millisecond)
|
|
_, err := io.Copy(a, &buf)
|
|
require.NoError(t, err)
|
|
}()
|
|
|
|
go func() {
|
|
defer wg.Done()
|
|
conn := Wrap(b, 100*time.Millisecond)
|
|
buf := [4]byte{} // shorter than message put on wire
|
|
n, err := conn.Read(buf[:])
|
|
assert.Equal(t, 0, n)
|
|
assert.Error(t, err)
|
|
netErr, ok := err.(net.Error)
|
|
require.True(t, ok)
|
|
assert.True(t, netErr.Timeout())
|
|
}()
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
type writeBlockConn struct {
|
|
net.Conn
|
|
blockTime time.Duration
|
|
}
|
|
|
|
func (c writeBlockConn) Write(p []byte) (int, error) {
|
|
time.Sleep(c.blockTime)
|
|
return c.Conn.Write(p)
|
|
}
|
|
|
|
func (c writeBlockConn) CloseWrite() error {
|
|
return c.Conn.Close()
|
|
}
|
|
|
|
func TestWriteTimeout(t *testing.T) {
|
|
a, b, err := socketpair.SocketPair()
|
|
require.NoError(t, err)
|
|
defer a.Close()
|
|
defer b.Close()
|
|
var buf bytes.Buffer
|
|
buf.WriteString("message")
|
|
blockConn := writeBlockConn{a, 500 * time.Millisecond}
|
|
conn := Wrap(blockConn, 100*time.Millisecond)
|
|
n, err := conn.Write(buf.Bytes())
|
|
assert.Equal(t, 0, n)
|
|
assert.Error(t, err)
|
|
netErr, ok := err.(net.Error)
|
|
require.True(t, ok)
|
|
assert.True(t, netErr.Timeout())
|
|
}
|
|
|
|
func TestNoPartialReadsDueToDeadline(t *testing.T) {
|
|
zreplcircleci.SkipOnCircleCI(t, "needs predictable low scheduling latency")
|
|
|
|
a, b, err := socketpair.SocketPair()
|
|
require.NoError(t, err)
|
|
defer a.Close()
|
|
defer b.Close()
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(2)
|
|
go func() {
|
|
defer wg.Done()
|
|
a.Write([]byte{1, 2, 3, 4, 5})
|
|
// sleep to provoke a partial read in the consumer goroutine
|
|
time.Sleep(50 * time.Millisecond)
|
|
a.Write([]byte{6, 7, 8, 9, 10})
|
|
}()
|
|
|
|
go func() {
|
|
defer wg.Done()
|
|
bc := Wrap(b, 100*time.Millisecond)
|
|
var buf bytes.Buffer
|
|
beginRead := time.Now()
|
|
// io.Copy will encounter a partial read, then wait ~50ms until the other 5 bytes are written
|
|
// It is still going to fail with deadline err because it expects EOF
|
|
n, err := io.Copy(&buf, bc)
|
|
readDuration := time.Since(beginRead)
|
|
t.Logf("read duration=%s", readDuration)
|
|
t.Logf("recv done n=%v err=%v", n, err)
|
|
t.Logf("buf=%v", buf.Bytes())
|
|
neterr, ok := err.(net.Error)
|
|
require.True(t, ok)
|
|
assert.True(t, neterr.Timeout())
|
|
|
|
assert.Equal(t, int64(10), n)
|
|
assert.Equal(t, []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, buf.Bytes())
|
|
// 50ms for the second read, 100ms after that one for the deadline
|
|
// allow for some jitter
|
|
assert.True(t, readDuration > 140*time.Millisecond)
|
|
assert.True(t, readDuration < 200*time.Millisecond)
|
|
}()
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
type partialWriteMockConn struct {
|
|
net.Conn // to satisfy interface
|
|
buf bytes.Buffer
|
|
writeDuration time.Duration
|
|
returnAfterBytesWritten int
|
|
}
|
|
|
|
func newPartialWriteMockConn(writeDuration time.Duration, returnAfterBytesWritten int) *partialWriteMockConn {
|
|
return &partialWriteMockConn{
|
|
writeDuration: writeDuration,
|
|
returnAfterBytesWritten: returnAfterBytesWritten,
|
|
}
|
|
}
|
|
|
|
func (c *partialWriteMockConn) Write(p []byte) (int, error) {
|
|
time.Sleep(c.writeDuration)
|
|
consumeBytes := len(p)
|
|
if consumeBytes > c.returnAfterBytesWritten {
|
|
consumeBytes = c.returnAfterBytesWritten
|
|
}
|
|
n, err := c.buf.Write(p[0:consumeBytes])
|
|
if err != nil || n != consumeBytes {
|
|
panic("bytes.Buffer behaves unexpectedly")
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
func TestPartialWriteMockConn(t *testing.T) {
|
|
mc := newPartialWriteMockConn(100*time.Millisecond, 5)
|
|
buf := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
|
|
begin := time.Now()
|
|
n, err := mc.Write(buf[:])
|
|
duration := time.Since(begin)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, 5, n)
|
|
assert.True(t, duration > 100*time.Millisecond)
|
|
assert.True(t, duration < 150*time.Millisecond)
|
|
}
|
|
|
|
func TestNoPartialWritesDueToDeadline(t *testing.T) {
|
|
a, b, err := socketpair.SocketPair()
|
|
require.NoError(t, err)
|
|
defer a.Close()
|
|
defer b.Close()
|
|
var buf bytes.Buffer
|
|
buf.WriteString("message")
|
|
blockConn := writeBlockConn{a, 150 * time.Millisecond}
|
|
conn := Wrap(blockConn, 100*time.Millisecond)
|
|
n, err := conn.Write(buf.Bytes())
|
|
assert.Equal(t, 0, n)
|
|
assert.Error(t, err)
|
|
netErr, ok := err.(net.Error)
|
|
require.True(t, ok)
|
|
assert.True(t, netErr.Timeout())
|
|
}
|
|
|
|
func TestIovecLenFieldIsMachineUint(t *testing.T) {
|
|
iov := syscall.Iovec{}
|
|
_ = iov // make linter happy (unsafe.Sizeof not recognized as usage)
|
|
size_t := unsafe.Sizeof(iov.Len)
|
|
if size_t != unsafe.Sizeof(uint(23)) {
|
|
t.Fatalf("expecting (struct iov)->Len to be sizeof(uint)")
|
|
}
|
|
// ssize_t is defined to be the signed version of size_t,
|
|
// so we know sizeof(ssize_t) == sizeof(int)
|
|
}
|