mirror of
https://github.com/zrepl/zrepl.git
synced 2025-01-08 23:39:04 +01:00
10a14a8c50
package trace: - introduce the concept of tasks and spans, tracked as linked list within ctx - see package-level docs for an overview of the concepts - **main feature 1**: unique stack of task and span IDs - makes it easy to follow a series of log entries in concurrent code - **main feature 2**: ability to produce a chrome://tracing-compatible trace file - either via an env variable or a `zrepl pprof` subcommand - this is not a CPU profile, we already have go pprof for that - but it is very useful to visually inspect where the replication / snapshotter / pruner spends its time ( fixes #307 ) usage in package daemon/logging: - goal: every log entry should have a trace field with the ID stack from package trace - make `logging.GetLogger(ctx, Subsys)` the authoritative `logger.Logger` factory function - the context carries a linked list of injected fields which `logging.GetLogger` adds to the logger it returns - `logging.GetLogger` also uses package `trace` to get the task-and-span-stack and injects it into the returned logger's fields
176 lines
4.0 KiB
Go
176 lines
4.0 KiB
Go
// microbenchmark to manually test rpc/dataconn performance
|
|
//
|
|
// With stdin / stdout on client and server, simulating zfs send|recv piping
|
|
//
|
|
// ./microbenchmark -appmode server | pv -r > /dev/null
|
|
// ./microbenchmark -appmode client -direction recv < /dev/zero
|
|
//
|
|
//
|
|
// Without the overhead of pipes (just protocol performance, mostly useful with perf bc no bw measurement)
|
|
//
|
|
// ./microbenchmark -appmode client -direction recv -devnoopWriter -devnoopReader
|
|
// ./microbenchmark -appmode server -devnoopReader -devnoopWriter
|
|
//
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"os"
|
|
|
|
"github.com/pkg/profile"
|
|
|
|
"github.com/zrepl/zrepl/logger"
|
|
"github.com/zrepl/zrepl/replication/logic/pdu"
|
|
"github.com/zrepl/zrepl/rpc/dataconn"
|
|
"github.com/zrepl/zrepl/rpc/dataconn/timeoutconn"
|
|
"github.com/zrepl/zrepl/transport"
|
|
"github.com/zrepl/zrepl/util/devnoop"
|
|
)
|
|
|
|
func orDie(err error) {
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
type readerStreamCopier struct{ io.Reader }
|
|
|
|
func (readerStreamCopier) Close() error { return nil }
|
|
|
|
type devNullHandler struct{}
|
|
|
|
func (devNullHandler) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) {
|
|
var res pdu.SendRes
|
|
if args.devnoopReader {
|
|
return &res, readerStreamCopier{devnoop.Get()}, nil
|
|
} else {
|
|
return &res, readerStreamCopier{os.Stdin}, nil
|
|
}
|
|
}
|
|
|
|
func (devNullHandler) Receive(ctx context.Context, r *pdu.ReceiveReq, stream io.ReadCloser) (*pdu.ReceiveRes, error) {
|
|
var out io.Writer = os.Stdout
|
|
if args.devnoopWriter {
|
|
out = devnoop.Get()
|
|
}
|
|
_, err := io.Copy(out, stream)
|
|
var res pdu.ReceiveRes
|
|
return &res, err
|
|
}
|
|
|
|
func (devNullHandler) PingDataconn(ctx context.Context, r *pdu.PingReq) (*pdu.PingRes, error) {
|
|
return &pdu.PingRes{
|
|
Echo: r.GetMessage(),
|
|
}, nil
|
|
}
|
|
|
|
type tcpConnecter struct {
|
|
addr string
|
|
}
|
|
|
|
func (c tcpConnecter) Connect(ctx context.Context) (timeoutconn.Wire, error) {
|
|
conn, err := net.Dial("tcp", c.addr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return conn.(*net.TCPConn), nil
|
|
}
|
|
|
|
type tcpListener struct {
|
|
nl *net.TCPListener
|
|
clientIdent string
|
|
}
|
|
|
|
func (l tcpListener) Accept(ctx context.Context) (*transport.AuthConn, error) {
|
|
tcpconn, err := l.nl.AcceptTCP()
|
|
orDie(err)
|
|
return transport.NewAuthConn(tcpconn, l.clientIdent), nil
|
|
}
|
|
|
|
func (l tcpListener) Addr() net.Addr { return l.nl.Addr() }
|
|
|
|
func (l tcpListener) Close() error { return l.nl.Close() }
|
|
|
|
var args struct {
|
|
addr string
|
|
appmode string
|
|
direction string
|
|
profile bool
|
|
devnoopReader bool
|
|
devnoopWriter bool
|
|
}
|
|
|
|
func server() {
|
|
|
|
log := logger.NewStderrDebugLogger()
|
|
log.Debug("starting server")
|
|
nl, err := net.Listen("tcp", args.addr)
|
|
orDie(err)
|
|
l := tcpListener{nl.(*net.TCPListener), "fakeclientidentity"}
|
|
|
|
srv := dataconn.NewServer(nil, nil, logger.NewStderrDebugLogger(), devNullHandler{})
|
|
|
|
ctx := context.Background()
|
|
|
|
srv.Serve(ctx, l)
|
|
|
|
}
|
|
|
|
func main() {
|
|
|
|
flag.BoolVar(&args.profile, "profile", false, "")
|
|
flag.BoolVar(&args.devnoopReader, "devnoopReader", false, "")
|
|
flag.BoolVar(&args.devnoopWriter, "devnoopWriter", false, "")
|
|
flag.StringVar(&args.addr, "address", ":8888", "")
|
|
flag.StringVar(&args.appmode, "appmode", "client|server", "")
|
|
flag.StringVar(&args.direction, "direction", "", "send|recv")
|
|
flag.Parse()
|
|
|
|
if args.profile {
|
|
defer profile.Start(profile.CPUProfile).Stop()
|
|
}
|
|
|
|
switch args.appmode {
|
|
case "client":
|
|
client()
|
|
case "server":
|
|
server()
|
|
default:
|
|
orDie(fmt.Errorf("unknown appmode %q", args.appmode))
|
|
}
|
|
}
|
|
|
|
func client() {
|
|
|
|
logger := logger.NewStderrDebugLogger()
|
|
ctx := context.Background()
|
|
|
|
connecter := tcpConnecter{args.addr}
|
|
client := dataconn.NewClient(connecter, logger)
|
|
|
|
switch args.direction {
|
|
case "send":
|
|
req := pdu.SendReq{}
|
|
_, stream, err := client.ReqSend(ctx, &req)
|
|
orDie(err)
|
|
_, err = io.Copy(os.Stdout, stream)
|
|
orDie(err)
|
|
case "recv":
|
|
var r io.Reader = os.Stdin
|
|
if args.devnoopReader {
|
|
r = devnoop.Get()
|
|
}
|
|
s := readerStreamCopier{r}
|
|
req := pdu.ReceiveReq{}
|
|
_, err := client.ReqRecv(ctx, &req, &s)
|
|
orDie(err)
|
|
default:
|
|
orDie(fmt.Errorf("unknown direction%q", args.direction))
|
|
}
|
|
|
|
}
|