rpc/dataconn: microbenchmark

This commit is contained in:
Christian Schwarz 2018-12-23 13:45:35 +01:00
parent 796c5ad42d
commit 0230c6321f
3 changed files with 118 additions and 46 deletions

9
Gopkg.lock generated
View File

@ -177,6 +177,14 @@
revision = "645ef00459ed84a119197bfb8d8205042c6df63d"
version = "v0.8.0"
[[projects]]
digest = "1:1cbc6b98173422a756ae79e485952cb37a0a460c710541c75d3e9961c5a60782"
name = "github.com/pkg/profile"
packages = ["."]
pruneopts = ""
revision = "5b67d428864e92711fcbd2f8629456121a56d91f"
version = "v1.2.1"
[[projects]]
digest = "1:256484dbbcd271f9ecebc6795b2df8cad4c458dd0f5fd82a8c2fa0c29f233411"
name = "github.com/pmezard/go-difflib"
@ -399,6 +407,7 @@
"github.com/kr/pretty",
"github.com/mattn/go-isatty",
"github.com/pkg/errors",
"github.com/pkg/profile",
"github.com/problame/go-netssh",
"github.com/prometheus/client_golang/prometheus",
"github.com/prometheus/client_golang/prometheus/promhttp",

View File

@ -1,7 +1,19 @@
// microbenchmark to manually test rpc/dataconn perforamnce
//
// With stdin / stdout on client and server, simulating zfs send|recv piping
//
// ./microbenchmark -appmode server | pv -r > /dev/null
// ./microbenchmark -appmode client -direction recv < /dev/zero
//
//
// Without the overhead of pipes (just protocol perforamnce, mostly useful with perf bc no bw measurement)
//
// ./microbenchmark -appmode client -direction recv -devnoopWriter -devnoopReader
// ./microbenchmark -appmode server -devnoopReader -devnoopWriter
//
package main
import (
"bytes"
"context"
"flag"
"fmt"
@ -10,9 +22,14 @@ import (
"os"
"github.com/pkg/profile"
"github.com/zrepl/zrepl/rpc/dataconn"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/replication/pdu"
"github.com/zrepl/zrepl/rpc/dataconn"
"github.com/zrepl/zrepl/rpc/dataconn/timeoutconn"
"github.com/zrepl/zrepl/transport"
"github.com/zrepl/zrepl/util/devnoop"
"github.com/zrepl/zrepl/zfs"
)
func orDie(err error) {
@ -21,54 +38,93 @@ func orDie(err error) {
}
}
type devNullHandler struct{}
type readerStreamCopier struct{ io.Reader }
func (devNullHandler) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) {
var res pdu.SendRes
return &res, os.Stdin, nil
func (readerStreamCopier) Close() error { return nil }
type readerStreamCopierErr struct {
error
}
func (devNullHandler) Receive(ctx context.Context, r *pdu.ReceiveReq, stream io.Reader) (*pdu.ReceiveRes, error) {
var buf [1<<15]byte
_, err := io.CopyBuffer(os.Stdout, stream, buf[:])
func (readerStreamCopierErr) IsReadError() bool { return false }
func (readerStreamCopierErr) IsWriteError() bool { return true }
func (c readerStreamCopier) WriteStreamTo(w io.Writer) zfs.StreamCopierError {
var buf [1 << 21]byte
_, err := io.CopyBuffer(w, c.Reader, buf[:])
// always assume write error
return readerStreamCopierErr{err}
}
type devNullHandler struct{}
func (devNullHandler) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.StreamCopier, error) {
var res pdu.SendRes
if args.devnoopReader {
return &res, readerStreamCopier{devnoop.Get()}, nil
} else {
return &res, readerStreamCopier{os.Stdin}, nil
}
}
func (devNullHandler) Receive(ctx context.Context, r *pdu.ReceiveReq, stream zfs.StreamCopier) (*pdu.ReceiveRes, error) {
var out io.Writer = os.Stdout
if args.devnoopWriter {
out = devnoop.Get()
}
err := stream.WriteStreamTo(out)
var res pdu.ReceiveRes
return &res, err
}
type tcpConnecter struct {
net, addr string
addr string
}
func (c tcpConnecter) Connect(ctx context.Context) (net.Conn, error) {
return net.Dial(c.net, c.addr)
func (c tcpConnecter) Connect(ctx context.Context) (timeoutconn.Wire, error) {
conn, err := net.Dial("tcp", c.addr)
if err != nil {
return nil, err
}
return conn.(*net.TCPConn), nil
}
type tcpListener struct {
nl *net.TCPListener
clientIdent string
}
func (l tcpListener) Accept(ctx context.Context) (*transport.AuthConn, error) {
tcpconn, err := l.nl.AcceptTCP()
orDie(err)
return transport.NewAuthConn(tcpconn, l.clientIdent), nil
}
func (l tcpListener) Addr() net.Addr { return l.nl.Addr() }
func (l tcpListener) Close() error { return l.nl.Close() }
var args struct {
addr string
appmode string
direction string
profile bool
addr string
appmode string
direction string
profile bool
devnoopReader bool
devnoopWriter bool
}
func server() {
log := logger.NewStderrDebugLogger()
log.Debug("starting server")
l, err := net.Listen("tcp", args.addr)
nl, err := net.Listen("tcp", args.addr)
orDie(err)
l := tcpListener{nl.(*net.TCPListener), "fakeclientidentity"}
srvConfig := dataconn.ServerConfig{
Shared: dataconn.SharedConfig {
MaxProtoLen: 4096,
MaxHeaderLen: 4096,
SendChunkSize: 1 << 17,
MaxRecvChunkSize: 1 << 17,
},
}
srv := dataconn.NewServer(devNullHandler{}, srvConfig, nil)
srv := dataconn.NewServer(nil, logger.NewStderrDebugLogger(), devNullHandler{})
ctx := context.Background()
ctx = dataconn.WithLogger(ctx, log)
srv.Serve(ctx, l)
}
@ -76,6 +132,8 @@ func server() {
func main() {
flag.BoolVar(&args.profile, "profile", false, "")
flag.BoolVar(&args.devnoopReader, "devnoopReader", false, "")
flag.BoolVar(&args.devnoopWriter, "devnoopWriter", false, "")
flag.StringVar(&args.addr, "address", ":8888", "")
flag.StringVar(&args.appmode, "appmode", "client|server", "")
flag.StringVar(&args.direction, "direction", "", "send|recv")
@ -99,34 +157,25 @@ func client() {
logger := logger.NewStderrDebugLogger()
ctx := context.Background()
ctx = dataconn.WithLogger(ctx, logger)
clientConfig := dataconn.ClientConfig{
Shared: dataconn.SharedConfig {
MaxProtoLen: 4096,
MaxHeaderLen: 4096,
SendChunkSize: 1 << 17,
MaxRecvChunkSize: 1 << 17,
},
}
orDie(clientConfig.Validate())
connecter := tcpConnecter{"tcp", args.addr}
client := dataconn.NewClient(connecter, clientConfig)
connecter := tcpConnecter{args.addr}
client := dataconn.NewClient(connecter, logger)
switch args.direction {
case "send":
req := pdu.SendReq{}
_, stream, err := client.ReqSendStream(ctx, &req)
_, stream, err := client.ReqSend(ctx, &req)
orDie(err)
var buf [1<<15]byte
_, err = io.CopyBuffer(os.Stdout, stream, buf[:])
err = stream.WriteStreamTo(os.Stdout)
orDie(err)
case "recv":
var buf bytes.Buffer
buf.WriteString("teststreamtobereceived")
var r io.Reader = os.Stdin
if args.devnoopReader {
r = devnoop.Get()
}
s := readerStreamCopier{r}
req := pdu.ReceiveReq{}
_, err := client.ReqRecv(ctx, &req, os.Stdin)
_, err := client.ReqRecv(ctx, &req, &s)
orDie(err)
default:
orDie(fmt.Errorf("unknown direction%q", args.direction))

14
util/devnoop/devnoop.go Normal file
View File

@ -0,0 +1,14 @@
// package devnoop provides an io.ReadWriteCloser that never errors
// and always reports reads / writes to / from buffers as complete.
// The buffers themselves are never touched.
package devnoop
type Dev struct{}
func Get() Dev {
return Dev{}
}
func (Dev) Write(p []byte) (n int, err error) { return len(p), nil }
func (Dev) Read(p []byte) (n int, err error) { return len(p), nil }
func (Dev) Close() error { return nil }