zrepl/rpc/dataconn/stream/stream_test.go

133 lines
2.8 KiB
Go
Raw Permalink Normal View History

package stream
import (
"bytes"
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
2019-03-22 19:41:12 +01:00
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/rpc/dataconn/heartbeatconn"
"github.com/zrepl/zrepl/util/socketpair"
)
func TestFrameTypesOk(t *testing.T) {
t.Logf("%v", End)
assert.True(t, heartbeatconn.IsPublicFrameType(End))
assert.True(t, heartbeatconn.IsPublicFrameType(StreamErrTrailer))
}
func TestStreamer(t *testing.T) {
anc, bnc, err := socketpair.SocketPair()
require.NoError(t, err)
hto := 1 * time.Hour
a := heartbeatconn.Wrap(anc, hto, hto)
b := heartbeatconn.Wrap(bnc, hto, hto)
log := logger.NewStderrDebugLogger()
ctx := WithLogger(context.Background(), log)
stype := uint32(0x23)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
var buf bytes.Buffer
buf.Write(
bytes.Repeat([]byte{1, 2}, 1<<25),
)
writeStream(ctx, a, &buf, stype)
log.Debug("WriteStream returned")
a.Shutdown()
}()
go func() {
defer wg.Done()
var buf bytes.Buffer
ch := make(chan readFrameResult, 5)
wg.Add(1)
go func() {
defer wg.Done()
rpc/dataconn/stream: Conn: handle concurrent Close calls + goroutine leak fix * Add Close() in closeState to identify the first closer * Non-first closers get an error * Reads and Writes from the Conn get an error if the conn was closed during the Read / Write was running * The first closer starts _separate_ goroutine draining the c.frameReads channel * The first closer then waits for the goroutine that fills c.frameReads to exit refs 3bfe0c16d0233cac66a01a6f89959c34ef01c663 fixes #174 readFrames would block on `reads <-` but only after that would stream.Conn.readFrames close c.waitReadFramesDone which was too late because stream.Conn.Close would wait for c.waitReadFramesDone to be closed before draining the channel ^^^^^^ (not frameconn.Conn, that closed successfully) 195 @ 0x1032ae0 0x1006cab 0x1006c81 0x1006a65 0x15505be 0x155163e 0x1060bc1 0x15505bd github.com/zrepl/zrepl/rpc/dataconn/stream.readFrames+0x16d github.com/zrepl/zrepl/rpc/dataconn/stream/stream.go:220 0x155163d github.com/zrepl/zrepl/rpc/dataconn/stream.(*Conn).readFrames+0xbd github.com/zrepl/zrepl/rpc/dataconn/stream/stream_conn.go:71 195 @ 0x1032ae0 0x10078c8 0x100789e 0x100758b 0x1552678 0x1557a4b 0x1556aec 0x1060bc1 0x1552677 github.com/zrepl/zrepl/rpc/dataconn/stream.(*Conn).Close+0x77 github.com/zrepl/zrepl/rpc/dataconn/stream/stream_conn.go:191 0x1557a4a github.com/zrepl/zrepl/rpc/dataconn.(*Server).serveConn.func1+0x5a github.com/zrepl/zrepl/rpc/dataconn/dataconn_server.go:93 0x1556aeb github.com/zrepl/zrepl/rpc/dataconn.(*Server).serveConn+0x87b github.com/zrepl/zrepl/rpc/dataconn/dataconn_server.go:176
2019-09-13 14:48:18 +02:00
readFrames(ch, nil, b)
}()
err := readStream(ch, b, &buf, stype)
log.WithField("errType", fmt.Sprintf("%T %v", err, err)).Debug("ReadStream returned")
assert.Nil(t, err)
expected := bytes.Repeat([]byte{1, 2}, 1<<25)
assert.True(t, bytes.Equal(expected, buf.Bytes()))
b.Shutdown()
}()
wg.Wait()
}
type errReader struct {
t *testing.T
readErr error
}
func (er errReader) Read(p []byte) (n int, err error) {
er.t.Logf("errReader.Read called")
return 0, er.readErr
}
func TestMultiFrameStreamErrTraileror(t *testing.T) {
anc, bnc, err := socketpair.SocketPair()
require.NoError(t, err)
hto := 1 * time.Hour
a := heartbeatconn.Wrap(anc, hto, hto)
b := heartbeatconn.Wrap(bnc, hto, hto)
log := logger.NewStderrDebugLogger()
ctx := WithLogger(context.Background(), log)
longErr := fmt.Errorf("an error that definitley spans more than one frame:\n%s", strings.Repeat("a\n", 1<<4))
stype := uint32(0x23)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
r := errReader{t, longErr}
writeStream(ctx, a, &r, stype)
a.Shutdown()
}()
go func() {
defer wg.Done()
defer b.Shutdown()
var buf bytes.Buffer
ch := make(chan readFrameResult, 5)
wg.Add(1)
go func() {
defer wg.Done()
rpc/dataconn/stream: Conn: handle concurrent Close calls + goroutine leak fix * Add Close() in closeState to identify the first closer * Non-first closers get an error * Reads and Writes from the Conn get an error if the conn was closed during the Read / Write was running * The first closer starts _separate_ goroutine draining the c.frameReads channel * The first closer then waits for the goroutine that fills c.frameReads to exit refs 3bfe0c16d0233cac66a01a6f89959c34ef01c663 fixes #174 readFrames would block on `reads <-` but only after that would stream.Conn.readFrames close c.waitReadFramesDone which was too late because stream.Conn.Close would wait for c.waitReadFramesDone to be closed before draining the channel ^^^^^^ (not frameconn.Conn, that closed successfully) 195 @ 0x1032ae0 0x1006cab 0x1006c81 0x1006a65 0x15505be 0x155163e 0x1060bc1 0x15505bd github.com/zrepl/zrepl/rpc/dataconn/stream.readFrames+0x16d github.com/zrepl/zrepl/rpc/dataconn/stream/stream.go:220 0x155163d github.com/zrepl/zrepl/rpc/dataconn/stream.(*Conn).readFrames+0xbd github.com/zrepl/zrepl/rpc/dataconn/stream/stream_conn.go:71 195 @ 0x1032ae0 0x10078c8 0x100789e 0x100758b 0x1552678 0x1557a4b 0x1556aec 0x1060bc1 0x1552677 github.com/zrepl/zrepl/rpc/dataconn/stream.(*Conn).Close+0x77 github.com/zrepl/zrepl/rpc/dataconn/stream/stream_conn.go:191 0x1557a4a github.com/zrepl/zrepl/rpc/dataconn.(*Server).serveConn.func1+0x5a github.com/zrepl/zrepl/rpc/dataconn/dataconn_server.go:93 0x1556aeb github.com/zrepl/zrepl/rpc/dataconn.(*Server).serveConn+0x87b github.com/zrepl/zrepl/rpc/dataconn/dataconn_server.go:176
2019-09-13 14:48:18 +02:00
readFrames(ch, nil, b)
}()
err := readStream(ch, b, &buf, stype)
t.Logf("%s", err)
require.NotNil(t, err)
assert.True(t, buf.Len() == 0)
assert.Equal(t, err.Kind, ReadStreamErrorKindSource)
receivedErr := err.Err.Error()
expectedErr := longErr.Error()
assert.True(t, receivedErr == expectedErr) // builtin Equals is too slow
if receivedErr != expectedErr {
t.Logf("lengths: %v %v", len(receivedErr), len(expectedErr))
}
}()
wg.Wait()
}